A router object. Holds routes and references to functions for dispatch
Create a new router object. All parameters are optional.
Parameters: |
|
---|---|
Exceptions : | None |
Returns: | None |
Route a message to all nodes marked as entry points.
Note
This function does not optionally accept a single argument (dictionary) as other points in this API do - it must be expanded to keyword arguments in this case.
Decorate a function to make it a node.
Parameters: |
|
---|
In addition to all of the above, you can define a wrap_node function on a subclass of Router, which will need to receive node and an options dictionary. Any extra options passed to node will be passed down to the options dictionary. See emit.router.CeleryRouter.wrap_node as an example.
Returns: | decorated and wrapped function, or decorator if called directly |
---|
Examples
Multiple fields:
@router.node(['quotient', 'remainder'])
def division_with_remainder(msg):
return msg.numer / msg.denom, msg.numer % msg.denom
This function would end up returing a dictionary that looked something like:
{'quotient': 2, 'remainder': 1}
The next node in the graph would recieve a emit.message.Message with “quotient” and “remainder” fields.
Emitting multiple values:
@router.node(['word'])
def parse_document(msg):
for word in msg.document.clean().split(' '):
yield word
If the function returns a generator, Emit will gather the values together and make sure the generator exits cleanly before returning (but this may change in the future via a flag.) Therefore, the return value will look like this:
({'word': "I've"},
{'word': 'got'},
{'word': 'a'},
{'word': 'lovely'},
{'word': 'bunch'},
{'word': 'of'},
{'word': 'coconuts'})
Each message in the tuple will be passed on individually in the graph.
Add an entry point
Parameters: | destination (str) – node to route to initially |
---|
dispatch a message to a named function
Parameters: |
|
---|
Get message object from a call.
Raises : | TypeError (if the format is not what we expect) |
---|
This is where arguments to nodes are turned into Messages. Arguments are parsed in the following order:
- A single positional argument (a dict)
- No positional arguments and a number of keyword arguments
Get the name to reference a function by
Parameters: | func (callable) – function to get the name of |
---|
Register a named function in the graph
Parameters: |
|
---|
fields, subscribe_to and entry_point are the same as in Router.node().
Add routes to the ignore dictionary
Parameters: |
|
---|
Ignore dictionary takes the following form:
{'node_a': set(['node_b', 'node_c']),
'node_b': set(['node_d'])}
Add routes to the routing dictionary
Parameters: |
|
---|
Routing dictionary takes the following form:
{'node_a': set(['node_b', 'node_c']),
'node_b': set(['node_d'])}
Using the routing dictionary, dispatch a message to all subscribers
Parameters: |
|
---|
Router specifically for Celery routing
Specifically route when celery is needed
Parameters: | celery_task (A celery task decorator, in any form) – celery task to apply to all nodes (can be overridden in Router.node().) |
---|
enqueue a message with Celery
Parameters: |
|
---|
Router specifically for RQ routing
Specific routing when using RQ
Parameters: | redis_connection (redis.Redis) – a redis connection to send to all the tasks (can be overridden in Router.node().) |
---|
callable object to wrap communication to a node in another language
to use this, subclass ShellNode, providing “command”. Decorate it however you feel like.
Messages will be passed in on lines in msgpack format. This class expects similar output: msgpack messages separated by a newline.