# Worker A worker consume tasks. It register coroutines as tasks, under a task_name. It use by default the coroutine name - but be carefull of decorators may change the `__name__` attribut. Like the [broker], worker rely on a [queue] and a [cache]. Check the [broker] page for queues and caches available. It's perferable to setup a worker first (before the broker), so it can setup the queue and adve. ## Setup a worker ```python from aio_task import Worker worker = await Worker.create( queue_type="rabbitmq", # desired type of queue queue_conf={"url": ...}, # dict with the queue configuration - depends on the queue type. cache_type="redis", # desired type of cache cache_conf={"address": ...}, # dict with the cache configuration - depends on the cache type. ) ``` **This params should match with the [broker] ones.** ## Register tasks ```python worker.register_handler( coro=coro, # the coroutine corresponding to the task. task_name="coro" # optional → use coro.__name__ ) ``` When you have register all your tasks, start the worker to consume tasks. ```python await worker.start() ``` Once the worker is started, you can not register other tasks. ## Error management If an exception occur while processing the task, the exception will be logged by the worker and set as task's result. ```python async def coro(): """ A task example. """ raise ValueError("Oops") # ... setup the worker and the broker ... task_id = await broker.create_task("coro") task = await broker.get_task("123-456...") assert task.result == {"exception": {"class": "ValueError", "args": ("Oops",), "str": "Oops"}} ``` The task result has to seriable (with `json.dumps`), otherwise worker will log an error and set the error as task's result. ```python # task result if the result is not serializable. {'exception': {'args': ('Task result is not serializable',), 'class': 'ValueError', 'str': 'Task result is not serializable'}} ``` ## Shutdown ```python await worker.close() ``` That's it! [Task]: [worker]: [queue]: [cache]: