Worker¶
A worker consume tasks. It register coroutines as tasks, under a task_name. It use by default the coroutine name - but be carefull of decorators because of the __name__ attribute.
Like the broker, worker rely on a queue and a cache. Check the broker page for available queues and caches.
It’s better to setup a worker first (before the broker), so you’re sure the queue is setup.
Setup a worker¶
from aio_task import Worker
worker = await Worker.create(
queue_type="rabbitmq", # desired type of queue
queue_conf={"url": "amqp://guest:guest@localhost:5672",
"routing_key": "tasks_queue"}, # dict with the queue configuration - depends on the queue type.
cache_type="redis", # desired type of cache
cache_conf={"address": "redis://localhost"}, # dict with the cache configuration - depends on the cache type.
)
These params should match with the broker ones.
Register tasks¶
async def coro(**kwargs): # task example
pass
async def coro2(**kwargs):
pass
worker.register_handler(
coro=coro, # the coroutine corresponding to the task.
task_name="coro" # optional → use coro.__name__
)
worker.register_handler(coro2) # register another task
worker.register_handler(coro2, task_name="task1") # register task under another name
When you have register all your tasks, start the worker to consume tasks.
await worker.start()
Once the worker is started, you cannot register other tasks.
Error management¶
If an exception occure while processing the task, the exception will be logged by the worker and set as task’s result.
async def coro(): # example of task giving an error
raise ValueError("Oops")
# setup and start the worker
worker.register_handler(coro)
await worker.start()
# Then, from the broker perspective
task_id = await broker.create_task("coro")
await asyncio.sleep(0.1) # time for task to be processed
task = await broker.get_task(task_id) # fetch task from the cache
assert task.result == {"exception": {"class": "ValueError",
"args": ("Oops",),
"str": "Oops"}}
The task result has to be serializable (with json.dumps), otherwise worker will log an error and set the error as task’s result.
# task result if the result is not serializable.
{'exception': {'args': ('Task result is not serializable',),
'class': 'ValueError',
'str': 'Task result is not serializable'}}