Processor
  • 24 Jul 2024
  • 3 Minutes to read
  • Contributors
  • Dark
    Light
  • PDF

Processor

  • Dark
    Light
  • PDF

Article summary

Processor Nodes are Nodes that allow you to perform almost any processing that you want on a given message.

With a Processor Node you can:

  • Filter messages that you do not wish to continue in your processing network.
  • Transform messages from one format to another.
  • Split messages.
  • Augment messages with data from API's, external databases, or your Tenant's table.
  • Combine messages by storing intermediate messages in your Tenant's table.
  • Create entirely new messages (e.g. - create a message in response to an echo.timer message).
  • And much more...

Processor Function

To effect the message bitmapping, Processor Nodes use a Python function that is provided by you. You may provide this Python function either directly in the Processor Node (an inline function) or by specifying the name of a Processor Function in your Tenant's Function Library.

Processor functions must conform to the following template:

def processor(*, context, message, source, **kwargs):

    # TODO - Perform any processing of the message here.
    # Remember, you MUST return a string, list/tuple/Interable of strings or None.
    # If None is returned, the message will be filtered.

    return message

NOTE - you must not have any Python code outside of the single function def statement!!

Arguments

The arguments for your processor functions are all keyword arguments. an explanation of the arguments are below:

ArgumentTypeDescription
contextobjectThe Context object providing execution environment information and helper objects and methods.
messagestrThe message itself as a string. You will have to decode this string if it actually represents a complex object (e.g. - json.loads(message))
sourcestrThe name of the Node that sent the message to you Bitmap Router Node.
kwargsdictThis is present specifically to future-proof your function. If, in the future, EchoStream Processor Nodes pass additional arguments to your processor function, those additional arguments will not break the call to your function.

Return

Your processor function must return one of the following types; None, str, list[str], tuple[str, ...], or Iterable (where the values of the Iterable are str). Each of these causes the Processor Node to do different things.

  • None: The Processor Node will drop the original message from processing, effectively filtering the message.
  • str: The Processor Node will send the returned str as the message, maintaining the orignal message's trackingId.
  • list[str], tuple[str, ...], Iterable: The Processor Node will send each contained str as a new message. The original message's trackingId will be appended to the new message's prevTrackingIds and a new trackingId created.

Note - if you return any type other than those specified, the Processor Node will throw an exception and cease processing.

Requirements

You can use any package available on PyPI that supports Python3.12 or higher in your Processor function.

Simply add the requirement to the requirements of your Processor Node and it will be included in your Node. Thisis done using pip requirement specifiers.

For example, to include the most popular PostgreSQL database adapter for Python (psycopyg), you would put the following requirement in your Node's requirements:

  • For the latest release:
psycopg2
  • Pinning the release:
psycopg2 == 2.9.3
  • Ensuring a baseline release
psycopg2 >= 2.9.2

Then in your Processor function, you can use psycopg2 by importing it, as follows:

def processor(*, context, message, source, **kwargs):
    
    # Make SURE that you import INSIDE of the function!!
    import psycopg2

    # TODO - Perform any processing of the message here.
    # Remember, you MUST return a string, list/tuple/Interable of strings or None.
    # If None is returned, the message will be filtered.

    return message

NOTE - All imports must occur inside your function definition to be recognized and executed by EchoStream!!


What's Next