Using the echostream-node package
  • 05 Apr 2023
  • 3 Minutes to read
  • Contributors
  • Dark
    Light
  • PDF

Using the echostream-node package

  • Dark
    Light
  • PDF

Article summary

EchoStream provides the echostream-node Python package to make developing External Nodes and Managed Nodes in Python significantly simpler, allowing you to concentrate on your high-value business requirements and not low-level EchoStream plumbing.

Processing Models

The package supports the following types of Node processing models:

  • Run-once External Nodes that are executed, send messages, and exit.
  • Daemon External and Managed Nodes that execute until signalled and can send and receive messages.
  • AWS Lambda External Nodes that execute on demand and can send and receive messages.

Concurrency Models

For Run-once and Daemon processing models both threading and asyncio concurrency models are supported. AWS Lambda External Nodes only support the threading concurrency model.

Implementation

Installation

Python

pip install echostream-node

AWS Lambda

You may use the publiclally provided layer instead of directly installing echostream-node in your lambda package. This layer includes echostream-node and all of the Python dependencies except those built-in to the AWS Lambda environment for Python.

The Layer arn is:

arn:aws:lambda:{region}:226390263822:layer:echostream-node-{version}:1

where {version} is the version of echostream-node that you want, with . replaced with _ and {region} is the AWS region that your Lambda will run in. Currently, us-east-1, us-east-2, us-west-1 and us-west-2 are supported.

For example, for echostream-node==0.3.7 in the us-east-1 region the layer arn would be:

arn:aws:lambda:us-east-1:226390263822:layer:echostream-node-0_3_7:1

Use

Configuration

To instantiate a Node a number of variables are required. These can be provided either as environment variables or directly on Node creation:

ParameterEnvironment VariableDescription
appsync_endpointAPPSYNC_ENDPOINTThe URL to the EchoStream API endpoint.
client_idCLIENT_IDThe Application Client ID for the App's Cognito Client Application.
nameNODEThe Node's name.
passwordPASSWORDThe password for the App User for the Node's App.
tenantTENANTThe name of the Tenant that the Node is a part of.
usernameUSER_NAMEThe name of the App User for the Node's App.
user_pool_idUSER_POOL_IDThe User Pool Id for the App's Cognito User Pool.

Threading External Node

The following code instantiates a Node, runs it, sends 100 messages (of type echo.text) and then waits until it is signaled for termination.

from signal import SIGHUP, SIGINT, SIGTERM, signal, strsignal

from echostream_node import Message
from echostream_node.threading import AppNode


class MyExternalNode(AppNode):

    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
        
    def signal_handler(self, signum: int, _: object) -> None:
        print(f"{strsignal(signum)} received, shutting down")
        self.stop()

    def start(self) -> None:
        super().start()
        signal(SIGHUP, self.signal_handler)
        signal(SIGINT, self.signal_handler)
        signal(SIGTERM, self.signal_handler)

try:
    my_external_node = MyExternalNode()
    my_external_node.start()
    for i in range(100):
        message = my_external_node.create_message(str(i))
        my_external_node.send_message(message)
        my_external_node.audit_message(message)
    my_external_node.join()
except Exception:
    print("Error running node")

Asycnio External Node

The following code instantiates a Node, runs it, sends 100 messages (of type echo.text) and then waits until it is signaled for termination.

import asyncio

import aiorun
from echostream_node import Message
from echostream_node.asyncio import AppNode

class MyExternalNode(AppNode):

    async def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)


async def main() -> None:
    try:
        node = MyExternalNode()
        await node.start()
        for i in range(100):
            message = my_external_node.create_message(str(i))
            my_external_node.send_message(message)
            my_external_node.audit_message(message)
        await node.join()
    except asyncio.CancelledError:
        pass
    except Exception:
        print("Error running node")


if __name__ == "__main__":
    aiorun.run(main(), stop_on_unhandled_errors=True, use_uvloop=True)

AWS Lambda Threading External Node

The following code defines an AWS Lambda function that receives messages from EchoStream Edge SQS queues that are configured as an event source.

from echostream_node import Message
from echostream_node.threading import LambdaNode

class MyExternalNode(LambdaNode):
    def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
        
MY_EXTERNAL_NODE = MyExternalNode()

def lambda_handler(event, context):
    MY_EXTERNAL_NODE.handle_event(event)

AWS Lambda Asyncio External Node

The following code defines an AWS Lambda function that receives messages from EchoStream Edge SQS queues that are configured as an event source.

from echostream_node import Message
from echostream_node.asyncio import LambdaNode

class MyExternalNode(LambdaNode):
    async def handle_received_message(self, *, message: Message, source: str) -> None:
        print(f"Got a message:\n{message.body}")
        self.audit_message(message, source=source)
        
MY_EXTERNAL_NODE = MyExternalNode()

def lambda_handler(event, context):
    MY_EXTERNAL_NODE.handle_event(event)

API

API docuementation for the echostream-node package may be found at https://docs.echostream-node.echo.stream.


What's Next