Using the echostream-node package

Prev Next

EchoStream provides the echostream-node Python package to make developing External Nodes and Managed Nodes in Python significantly simpler, allowing you to concentrate on your high-value business requirements and not low-level EchoStream plumbing.

Processing Models

The package supports the following types of Node processing models:

  • Run-once External Nodes that are executed, send messages, and exit.
  • Daemon External and Managed Nodes that execute until signalled and can send and receive messages.
  • AWS Lambda External Nodes that execute on demand and can send and receive messages.

Concurrency Models

For Run-once and Daemon processing models both threading and asyncio concurrency models are supported. AWS Lambda External Nodes only support the threading concurrency model.

Implementation

Installation

Python

pip install echostream-node

AWS Lambda

You may use the publiclally provided layer instead of directly installing echostream-node in your lambda package. This layer includes echostream-node and all of the Python dependencies except those built-in to the AWS Lambda environment for Python.

The Layer arn is:

arn:aws:lambda:{region}:226390263822:layer:echostream-node-{version}:1

where {version} is the version of echostream-node that you want, with . replaced with _ and {region} is the AWS region that your Lambda will run in. Currently, us-east-1, us-east-2, us-west-1 and us-west-2 are supported.

For example, for echostream-node==0.3.7 in the us-east-1 region the layer arn would be:

arn:aws:lambda:us-east-1:226390263822:layer:echostream-node-0_3_7:1

Use

Configuration

To instantiate a Node a number of variables are required. These can be provided either as environment variables or directly on Node creation:

Parameter Environment Variable Description
appsync_endpoint APPSYNC_ENDPOINT The URL to the EchoStream API endpoint.
client_id CLIENT_ID The Application Client ID for the App's Cognito Client Application.
name NODE The Node's name.
password PASSWORD The password for the App User for the Node's App.
tenant TENANT The name of the Tenant that the Node is a part of.
username USER_NAME The name of the App User for the Node's App.
user_pool_id USER_POOL_ID The User Pool Id for the App's Cognito User Pool.

Threading External Node

The following code instantiates a Node, runs it, sends 100 messages (of type echo.text) and then waits until it is signaled for termination.

from signal import SIGHUP, SIGINT, SIGTERM, signal, strsignal

from echostream_node import Message
from echostream_node.threading import AppNode


class MyExternalNode(AppNode):

    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
        
    def signal_handler(self, signum: int, _: object) -> None:
        print(f"{strsignal(signum)} received, shutting down")
        self.stop()

    def start(self) -> None:
        super().start()
        signal(SIGHUP, self.signal_handler)
        signal(SIGINT, self.signal_handler)
        signal(SIGTERM, self.signal_handler)

try:
    my_external_node = MyExternalNode()
    my_external_node.start()
    for i in range(100):
        message = my_external_node.create_message(str(i))
        my_external_node.send_message(message)
        my_external_node.audit_message(message)
    my_external_node.join()
except Exception:
    print("Error running node")

Asycnio External Node

The following code instantiates a Node, runs it, sends 100 messages (of type echo.text) and then waits until it is signaled for termination.

import asyncio

import aiorun
from echostream_node import Message
from echostream_node.asyncio import AppNode

class MyExternalNode(AppNode):

    async def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)


async def main() -> None:
    try:
        node = MyExternalNode()
        await node.start()
        for i in range(100):
            message = my_external_node.create_message(str(i))
            my_external_node.send_message(message)
            my_external_node.audit_message(message)
        await node.join()
    except asyncio.CancelledError:
        pass
    except Exception:
        print("Error running node")


if __name__ == "__main__":
    aiorun.run(main(), stop_on_unhandled_errors=True, use_uvloop=True)

AWS Lambda Threading External Node

The following code defines an AWS Lambda function that receives messages from EchoStream Edge SQS queues that are configured as an event source.

from echostream_node import Message
from echostream_node.threading import LambdaNode

class MyExternalNode(LambdaNode):
    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
        
MY_EXTERNAL_NODE = MyExternalNode()

def lambda_handler(event, context):
    MY_EXTERNAL_NODE.handle_event(event)

AWS Lambda Asyncio External Node

The following code defines an AWS Lambda function that receives messages from EchoStream Edge SQS queues that are configured as an event source.

from echostream_node import Message
from echostream_node.asyncio import LambdaNode

class MyExternalNode(LambdaNode):
    async def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
        
MY_EXTERNAL_NODE = MyExternalNode()

def lambda_handler(event, context):
    MY_EXTERNAL_NODE.handle_event(event)

API

API docuementation for the echostream-node package may be found at https://docs.echostream-node.echo.stream.