Worker

A worker consume tasks. It register coroutines as tasks, under a task_name. It use by default the coroutine name - but be carefull of decorators may change the __name__ attribut.

Like the [broker], worker rely on a queue and a cache. Check the [broker] page for queues and caches available.

It’s perferable to setup a worker first (before the broker), so it can setup the queue and adve.

Setup a worker

from aio_task import Worker

worker = await Worker.create(
    queue_type="rabbitmq",  # desired type of queue
    queue_conf={"url": ...},  # dict with the queue configuration - depends on the queue type.
    cache_type="redis",  # desired type of cache
    cache_conf={"address": ...},  # dict with the cache configuration - depends on the cache type.
)

This params should match with the [broker] ones.

Register tasks

worker.register_handler(
    coro=coro,  # the coroutine corresponding to the task.
    task_name="coro"  # optional → use coro.__name__
)

When you have register all your tasks, start the worker to consume tasks.

await worker.start()

Once the worker is started, you can not register other tasks.

Error management

If an exception occur while processing the task, the exception will be logged by the worker and set as task’s result.

async def coro():
    """ A task example. """
    raise ValueError("Oops")

# ... setup the worker and the broker ...

task_id = await broker.create_task("coro")
task = await broker.get_task("123-456...")

assert task.result == {"exception": {"class": "ValueError",
                       "args": ("Oops",),
                       "str": "Oops"}}

The task result has to seriable (with json.dumps), otherwise worker will log an error and set the error as task’s result.

# task result if the result is not serializable.
{'exception': {'args': ('Task result is not serializable',),
               'class': 'ValueError',
               'str': 'Task result is not serializable'}}

Shutdown

await worker.close()

That’s it!