- Print
- DarkLight
- PDF
Using the echostream-node package
EchoStream provides the echostream-node
Python package to make developing External Nodes and Managed Nodes in Python significantly simpler, allowing you to concentrate on your high-value business requirements and not low-level EchoStream plumbing.
Processing Models
The package supports the following types of Node processing models:
- Run-once External Nodes that are executed, send messages, and exit.
- Daemon External and Managed Nodes that execute until signalled and can send and receive messages.
- AWS Lambda External Nodes that execute on demand and can send and receive messages.
Concurrency Models
For Run-once and Daemon processing models both threading
and asyncio
concurrency models are supported. AWS Lambda External Nodes only support the threading
concurrency model.
Implementation
Installation
Python
pip install echostream-node
AWS Lambda
You may use the publiclally provided layer instead of directly installing echostream-node
in your lambda package. This layer includes echostream-node
and all of the Python dependencies except those built-in to the AWS Lambda environment for Python.
The Layer arn is:
arn:aws:lambda:{region}:226390263822:layer:echostream-node-{version}:1
where {version}
is the version of echostream-node
that you want, with .
replaced with _
and {region}
is the AWS region that your Lambda will run in. Currently, us-east-1
, us-east-2
, us-west-1
and us-west-2
are supported.
For example, for echostream-node==0.3.7
in the us-east-1
region the layer arn would be:
arn:aws:lambda:us-east-1:226390263822:layer:echostream-node-0_3_7:1
Use
Configuration
To instantiate a Node a number of variables are required. These can be provided either as environment variables or directly on Node creation:
Parameter | Environment Variable | Description |
---|---|---|
appsync_endpoint | APPSYNC_ENDPOINT | The URL to the EchoStream API endpoint. |
client_id | CLIENT_ID | The Application Client ID for the App's Cognito Client Application. |
name | NODE | The Node's name. |
password | PASSWORD | The password for the App User for the Node's App. |
tenant | TENANT | The name of the Tenant that the Node is a part of. |
username | USER_NAME | The name of the App User for the Node's App. |
user_pool_id | USER_POOL_ID | The User Pool Id for the App's Cognito User Pool. |
Threading External Node
The following code instantiates a Node, runs it, sends 100 messages (of type echo.text
) and then waits until it is signaled for termination.
from signal import SIGHUP, SIGINT, SIGTERM, signal, strsignal
from echostream_node import Message
from echostream_node.threading import AppNode
class MyExternalNode(AppNode):
def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
def signal_handler(self, signum: int, _: object) -> None:
print(f"{strsignal(signum)} received, shutting down")
self.stop()
def start(self) -> None:
super().start()
signal(SIGHUP, self.signal_handler)
signal(SIGINT, self.signal_handler)
signal(SIGTERM, self.signal_handler)
try:
my_external_node = MyExternalNode()
my_external_node.start()
for i in range(100):
message = my_external_node.create_message(str(i))
my_external_node.send_message(message)
my_external_node.audit_message(message)
my_external_node.join()
except Exception:
print("Error running node")
Asycnio External Node
The following code instantiates a Node, runs it, sends 100 messages (of type echo.text
) and then waits until it is signaled for termination.
import asyncio
import aiorun
from echostream_node import Message
from echostream_node.asyncio import AppNode
class MyExternalNode(AppNode):
async def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
async def main() -> None:
try:
node = MyExternalNode()
await node.start()
for i in range(100):
message = my_external_node.create_message(str(i))
my_external_node.send_message(message)
my_external_node.audit_message(message)
await node.join()
except asyncio.CancelledError:
pass
except Exception:
print("Error running node")
if __name__ == "__main__":
aiorun.run(main(), stop_on_unhandled_errors=True, use_uvloop=True)
AWS Lambda Threading External Node
The following code defines an AWS Lambda function that receives messages from EchoStream Edge SQS queues that are configured as an event source.
from echostream_node import Message
from echostream_node.threading import LambdaNode
class MyExternalNode(LambdaNode):
def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
MY_EXTERNAL_NODE = MyExternalNode()
def lambda_handler(event, context):
MY_EXTERNAL_NODE.handle_event(event)
AWS Lambda Asyncio External Node
The following code defines an AWS Lambda function that receives messages from EchoStream Edge SQS queues that are configured as an event source.
from echostream_node import Message
from echostream_node.asyncio import LambdaNode
class MyExternalNode(LambdaNode):
async def handle_received_message(self, *, message: Message, source: str) -> None:
print(f"Got a message:\n{message.body}")
self.audit_message(message, source=source)
MY_EXTERNAL_NODE = MyExternalNode()
def lambda_handler(event, context):
MY_EXTERNAL_NODE.handle_event(event)
API
API docuementation for the echostream-node
package may be found at https://docs.echostream-node.echo.stream.