Broker¶
A broker produce and fetch task. It rely on a queue and a cache.
Queue available:
- rabbitmq
- dummy (offline - not persistant, for tests only)
- Write your own!
Cache available:
- redis
- dummy (offline - not persistant, for tests only)
- Write your own!
A worker has to be setup to consum tasks. You may encouter errors if you try to create tasks from the broker without having setup a worker first.
With many workers, tasks are load balanced accross workers - it’s a round robin.
Setup a broker¶
from aio_task import Broker
broker = await Broker.create(
queue_type="rabbitmq", # desired type of queue
queue_conf={"url": ...}, # dict with the queue configuration - depends on the queue type.
cache_type="redis", # desired type of cache
cache_conf={"address": ...}, # dict with the cache configuration - depends on the cache type.
)
This params should match with the worker ones.
Produce a task¶
task_id = await broker.create_task("my_task", {"arg1": 1, "arg2": 2})
- If task “my_task” was never registred by a worker before, it will produce an error.
- If a task “my_task” was previously register by a worker then task is pushed to the queue event if no worker is currently up.
The task_id is a str, generated with uuid.uuid4().