- Print
- DarkLight
- PDF
Processor Nodes are Nodes that allow you to perform almost any processing that you want on a given message.
With a Processor Node you can:
- Filter messages that you do not wish to continue in your processing network.
- Transform messages from one format to another.
- Split messages.
- Augment messages with data from API's, external databases, or your Tenant's table.
- Combine messages by storing intermediate messages in your Tenant's table.
- Create entirely new messages (e.g. - create a message in response to an
echo.timer
message). - And much more...
Processor Function
To effect the message bitmapping, Processor Nodes use a Python function that is provided by you. You may provide this Python function either directly in the Processor Node (an inline function) or by specifying the name of a Processor Function in your Tenant's Function Library.
Processor functions must conform to the following template:
def processor(*, context, message, source, **kwargs):
# TODO - Perform any processing of the message here.
# Remember, you MUST return a string, list/tuple/Interable of strings or None.
# If None is returned, the message will be filtered.
return message
NOTE - you must not have any Python code outside of the single function
def
statement!!
Arguments
The arguments for your processor functions are all keyword arguments. an explanation of the arguments are below:
Argument | Type | Description |
---|---|---|
context | object | The Context object providing execution environment information and helper objects and methods. |
message | str | The message itself as a string. You will have to decode this string if it actually represents a complex object (e.g. - json.loads(message) ) |
source | str | The name of the Node that sent the message to you Bitmap Router Node. |
kwargs | dict | This is present specifically to future-proof your function. If, in the future, EchoStream Processor Nodes pass additional arguments to your processor function, those additional arguments will not break the call to your function. |
Return
Your processor function must return one of the following types; None
, str
, list[str]
, tuple[str, ...]
, or Iterable
(where the values of the Iterable
are str
). Each of these causes the Processor Node to do different things.
None
: The Processor Node will drop the original message from processing, effectively filtering the message.str
: The Processor Node will send the returnedstr
as the message, maintaining the orignal message'strackingId
.list[str]
,tuple[str, ...]
,Iterable
: The Processor Node will send each containedstr
as a new message. The original message'strackingId
will be appended to the new message'sprevTrackingIds
and a newtrackingId
created.
Note - if you return any type other than those specified, the Processor Node will throw an exception and cease processing.
Requirements
You can use any package available on PyPI that supports Python3.12 or higher in your Processor function.
Simply add the requirement to the requirements
of your Processor Node and it will be included in your Node. Thisis done using pip requirement specifiers.
For example, to include the most popular PostgreSQL database adapter for Python (psycopyg), you would put the following requirement in your Node's requirements
:
- For the latest release:
psycopg2
- Pinning the release:
psycopg2 == 2.9.3
- Ensuring a baseline release
psycopg2 >= 2.9.2
Then in your Processor function, you can use psycopg2
by importing it, as follows:
def processor(*, context, message, source, **kwargs):
# Make SURE that you import INSIDE of the function!!
import psycopg2
# TODO - Perform any processing of the message here.
# Remember, you MUST return a string, list/tuple/Interable of strings or None.
# If None is returned, the message will be filtered.
return message
NOTE - All imports must occur inside your function definition to be recognized and executed by EchoStream!!