# Worker A worker consume tasks. It register coroutines as tasks, under a task_name. It use by default the coroutine name - but be carefull of decorators because of the `__name__` attribute. Like the [broker], worker rely on a [queue] and a [cache]. Check the [broker] page for available queues and caches. It's better to setup a worker first (before the broker), so you're sure the queue is setup. ## Setup a worker ```python from aio_task import Worker worker = await Worker.create( queue_type="rabbitmq", # desired type of queue queue_conf={"url": "amqp://guest:guest@localhost:5672", "routing_key": "tasks_queue"}, # dict with the queue configuration - depends on the queue type. cache_type="redis", # desired type of cache cache_conf={"address": "redis://localhost"}, # dict with the cache configuration - depends on the cache type. ) ``` **These params should match with the [broker] ones.** ## Register tasks ```python async def coro(**kwargs): # task example pass async def coro2(**kwargs): pass worker.register_handler( coro=coro, # the coroutine corresponding to the task. task_name="coro" # optional → use coro.__name__ ) worker.register_handler(coro2) # register another task worker.register_handler(coro2, task_name="task1") # register task under another name ``` When you have register all your tasks, start the worker to consume tasks. ```python await worker.start() ``` Once the worker is started, you cannot register other tasks. ## Error management If an exception occure while processing the task, the exception will be logged by the worker and set as task's result. ```python async def coro(): # example of task giving an error raise ValueError("Oops") # setup and start the worker worker.register_handler(coro) await worker.start() ``` ```python # Then, from the broker perspective task_id = await broker.create_task("coro") await asyncio.sleep(0.1) # time for task to be processed task = await broker.get_task(task_id) # fetch task from the cache assert task.result == {"exception": {"class": "ValueError", "args": ("Oops",), "str": "Oops"}} ``` The task result has to be serializable (with `json.dumps`), otherwise worker will log an error and set the error as task's result. ```python # task result if the result is not serializable. {'exception': {'args': ('Task result is not serializable',), 'class': 'ValueError', 'str': 'Task result is not serializable'}} ``` ## Shutdown ```python await worker.close() ``` That's it! [Task]: [broker]: [queue]: [cache]: