Broker

A broker produce and fetch task. It rely on a queue and a cache.

Queue available:

  • rabbitmq
  • dummy (offline - not persistant, for tests only)
  • Write your own!

Cache available:

  • redis
  • dummy (offline - not persistant, for tests only)
  • Write your own!

A worker has to be setup to consum tasks. You may encouter errors if you try to create tasks from the broker without having setup a worker first.

With many workers, tasks are load balanced accross workers - it’s a round robin.

Setup a broker

from aio_task import Broker

broker = await Broker.create(
    queue_type="rabbitmq",  # desired type of queue
    queue_conf={"url": ...},  # dict with the queue configuration - depends on the queue type.
    cache_type="redis",  # desired type of cache
    cache_conf={"address": ...},  # dict with the cache configuration - depends on the cache type.
)

This params should match with the worker ones.

Produce a task

task_id = await broker.create_task("my_task", {"arg1": 1, "arg2": 2})
  • If task “my_task” was never registred by a worker before, it will produce an error.
  • If a task “my_task” was previously register by a worker then task is pushed to the queue event if no worker is currently up.

The task_id is a str, generated with uuid.uuid4().

Fetch a task

task = await broker.get_task(task_id)

Tasks are stored in a cache that a have a TTL (default 86400 seconds, i.e. 1 day). You can change this value by setting ttl in cache_conf.

If task is not in Cache, this raise a ValueError.

Otherwise, it returns a Task object.

Shutdown

await broker.close()

That’s it!