Verified Commit 2acddbb8 authored by Étienne BERSAC's avatar Étienne BERSAC
Browse files

Extend documentation

parent 1fcec0d1
docs/changelog.rst
\ No newline at end of file
# Dramatiq-pg Changelog
## Unreleased
This release requires an update of the schema.
- Stores result in Database. This is enabled by default.
- Flush all queues from CLI.
## [0.4.0] (2019-03-13)
- Fixed blocking consumer thread. `select` syscall is now called every seconds
by default.
- Removed automatic recovery on startup. This break multi-worker process on same
queue with long running task. You need to manually requeue messages after a
crash.
- Added delayed task support.
- Added documentation on deployment constaints and limitations.
- Added manual requeue from CLI tool.
- Added URL parameter to PostgresBroker constructor.
- Reuse listening connexion to purge message table. This reduce slighly
connexion usage.
## [0.3.0] (2019-03-07)
- Added rejecting message (nack).
- Added message replay from table at startup. Missed NOTIFY are not lost
anymore.
- Requeue old consumed message on startup. Recover from crashed process.
- Added CLI tool to manually purge queue and show some stats.
- Added random periodic purge of message table.
- Use BIGSERIAL on message table.
- Added index on message table to fasten purge and stats.
- Added projet licence, logo and metadata.
## [0.2.0] (2019-02-22)
- First working implementation.
- Added func tests.
......@@ -10,14 +10,14 @@ broker.
- Super simple deployment.
- Uses plain psycopg2. No ORM.
- Stores message payload as native JSONb.
- Stores message payload and results as native JSONb.
- Stores all messages in a single table, in a dedicated schema.
- Uses LISTEN/NOTIFY to keep worker sync. No polling.
- Replay pending messages on worker startup.
- Requeues failed tasks.
- Requeue of failed tasks.
- Delayed task.
- Reliable thanks to Postgres MVCC.
- Self-healing. Old messages are purge from time to time.
- Utility CLI for maintainance : flush, purge, stats, etc.
Note that dramatiq assumes tasks are idempotent. This broker makes the same
assumptions for recovering after a crash.
......@@ -40,7 +40,7 @@ assumptions for recovering after a crash.
import psycopg2.pool
from dramatiq_pg import PostgresBroker
dramatiq.set_broker(PostgresBroker(url="postgresql:///?minconn=0&maxconn=10"))
dramatiq.set_broker(PostgresBroker(i))
@dramatiq.actor
def myactor():
......@@ -56,35 +56,6 @@ The CLI tool `dramatiq-pg` allows you to requeue messages, purge old messages
and show stats on the queue. See `--help` for details.
## Result storage
Dramatiq-pg implements a [Result
backend](https://dramatiq.io/cookbook.html#results) and **enables automatically
Results middleware**. This way, the PostgresBackend reuse the same connection
pool. Note that only actors defined with `store_results=True` will triggers
result storage. You can disable the `Results` middleware by passing
`results=False` to broker constructor.
``` python
broker = PostgresBroker(url=conninfo, results=False)
```
## Deployment
Postgres does not replicate notifications to standby instances. Thus the broker
connection pool must point to the master instance. Actor can connect to hot
standby for its work.
If you use pgbouncer, you must configure session pooling method to keep notify.
Each dramatiq process opens one persistent connection per queue and one
connection to ack messages. Thus, to be save, you should provision pool size
with `num_processes x num_queues x 2`. When you use `message.get_result()` a
connection is reserved in the pool. A best practice is to only add process as
needed and reduce the number of queues.
## Support
If you encounter a bug or miss a feature, please [open an issue on
......
Dramatiq-pg Changelog
=====================
Unreleased
----------
This release requires an update of the schema.
- Stores result in Database. This is enabled by default.
- Flush all queues from CLI.
Version 0.4.0
-------------
Released 2019-03-13.
- Fixed blocking consumer thread. ``select`` syscall is now called
every seconds by default.
- Removed automatic recovery on startup. This break multi-worker
process on same queue with long running task. You need to manually
requeue messages after a crash.
- Added delayed task support.
- Added documentation on deployment constaints and limitations.
- Added manual requeue from CLI tool.
- Added URL parameter to PostgresBroker constructor.
- Reuse listening connexion to purge message table. This reduce slighly
connexion usage.
Version 0.3.0
-------------
Released 2019-03-07.
- Added rejecting message (nack).
- Added message replay from table at startup. Missed NOTIFY are not
lost anymore.
- Requeue old consumed message on startup. Recover from crashed
process.
- Added CLI tool to manually purge queue and show some stats.
- Added random periodic purge of message table.
- Use BIGSERIAL on message table.
- Added index on message table to fasten purge and stats.
- Added projet licence, logo and metadata.
Version 0.2.0
-------------
Released 2019-02-22.
- First working implementation.
- Added func tests.
==================
Deployment Guide
==================
Dramatiq-pg implements broker logic in application process. Postgres is only
responsible of the storage and the inter-client notifications. There is no
additionnal service to maintain.
However, you have to setup properly the application and to keep Postgres healthy
as usual.
Application Setup
=================
Your application is likely to use Postgres for it's business data. However,
using Postgres as a broker has some limitation and you should not configure
Dramatiq-pg like application code. The Dramatiq-pg configuration must conform
with the following limitations:
- Postgres server must be primary, not standby. Both producer and consumer
writes in the message table.
- Postgres emits notify only to connection on the same server. Postgres does not
replicate notify.
- If you use pgbouncer, you must configure session pooling method to keep
notify. Dramatiq-pg already use a client-side connection pool. You won't
benefit of advanced feature of pgbouncer to reduce connection to Postgres
server.
Each dramatiq worker process opens one persistent connection per queue and one
connection to ack messages. Thus, to be safe, you should provision worker pool
size with ``num_processes x num_queues x 2``. A best practice is to keep process
count low and reduce the number of queues.
The application consume a non-persistent connection to emit the message. When
application stores task result, a connection is consumed in Dramatiq-pg
connection pool to wait for and fetch the task result. There is no persistent
connection.
Monitoring
==========
Dramatiq has `Prometheus support built-in
<https://dramatiq.io/advanced.html#prometheus-metrics>`_. Dramatiq-pg does
**not** adds metrics to your regular Postgres monitoring.
The ``dramatiq-pg`` CLI tool has a ``stats`` command that output some metric.
::
$ dramatiq-pg status
queued: 0
consumed: 0
done: 3
rejected: 0
The ``dramatiq-pg`` CLI tool is only configured using ``PG*`` env vars.
Troubleshooting
===============
When a worker process crashes in the middle of a task, the message is not
replayed automatically. If you don't replay it, it will never be processed
completely. Use ``dramatiq-pg recover`` to requeue consumed message. The
``--minage`` parameter may help you to avoid requeue message consumed by running
worker. ``--minage`` accepts a Postgres interval value.
::
dramatiq-pg recover --minage 5m
Assuming the crash occured 5 minutes ago, this command requeues messages
consumed 5 minutes ago and beyond, excluding messages consumed between 5 minutes
ago and now.
Note that Dramatiq assumes tasks are idempotent. Thus, requeueing a processing
task should not be an issue.
Flushing
--------
You can flush all queues, including queued and consumed messages by using
``dramatiq-pg flush`` command. All messages are lost.
Queue Maintainance
==================
Dramatiq-pg tries to be self-healing, even without dedicated service. Worker
randomly purge queues from message older than 30 days. Automatic purge triggers
daily per worker.
You can trigger manually a purge of old messages by calling ``dramatiq-pg
purge``. This command accepts a ``--maxage`` argument with a Postgres interval
value. All message marked as ``done`` or ``rejected`` and older than ``maxage``
will be dropped.
You may have some bloat in queue table. Configure Postgres auto vacuum and
monitoring to keep bloat under control.
=============
Get Started
=============
- Install dramatiq-pg package from PyPI::
$ pip install dramatiq-pg
- Apply dramatiq\_pg/schema.sql file in your database::
$ psql -f dramatiq_pg/schema.sql
- Before importing actors, define global broker with a connection
pool::
import dramatiq
import psycopg2.pool
from dramatiq_pg import PostgresBroker
dramatiq.set_broker(PostgresBroker(url="postgresql:///?minconn=0&maxconn=10"))
@dramatiq.actor
def myactor():
...
Now declare/import actors and manage worker just like any `dramatiq setup
<https://dramatiq.io/guide.html>`_ . An `example script
<https://gitlab.com/dalibo/dramatiq-pg/blob/master/example.py>`_ is available,
tested on CI.
The CLI tool ``dramatiq-pg`` allows you to flush queues, requeue messages, purge
old messages and show stats on the queue. See ``--help`` for details.
See more
in `full documentation
<https://gitlab.com/dalibo/dramatiq-pg/blob/master/docs/index.rst>`_.
=============
Dramatiq-pg
=============
Welcome to Dramatiq-pg documentation. Dramatiq-pg is a broker implementation of
Dramatiq_ backed by Postgres_ RDBMS. Dramatiq-pg is licensed under the
`PostgreSQL license`_.
Features
--------
- Super simple deployment.
- Uses plain psycopg2. No ORM.
- Stores message payload and results as native JSONb.
- Standalone result storage.
- Stores all messages in a single table, in a dedicated schema.
- Uses LISTEN/NOTIFY to keep worker sync. No polling.
- Requeue of failed tasks.
- Delayed task.
- Reliable thanks to Postgres MVCC.
- Self-healing. Old messages are purge from time to time.
- Utility CLI for maintainance : flush, purge, stats, etc.
Contents
--------
- `Get Started <get-started.rst>`_
- `User Guide <user-guide.rst>`_
- `Deployment Guide <deployment-guide.rst>`_
- `Changelog <./changelog.rst>`_
Project Info
------------
- `Source Code <https://gitlab.com/dalibo/dramatiq-pg>`_
- `Issue tracker <https://gitlab.com/dalibo/dramatiq-pg/issues>`_
- `PostgreSQL License`_
.. _Dramatiq: https://dramatiq.io/
.. _Postgres: https://postgresql.org/
.. _PostgreSQL license: ../LICENSE
============
User Guide
============
Enabling Postgres Broker
========================
Dramatiq-pg is available on PyPI. Install it with pip::
pip install dramatiq-pg
This package installs a Python package named ``dramatiq_pg`` and a script named
``dramatiq-pg``. To use Postgres as a Dramatiq message broker, use
``dramatiq_pg.PostgresBroker`` class.
::
from dramatiq import set_broker
from dramatiq_pg import PostgresBroker
set_broker(PostgresBroker())
By default, ``PostgresBroker`` reads ``PG*`` environment variables.
Setting up PostgreSQL
=====================
Postgres is not a native broker. Thus you need to initialize schema and table
before using it. For now, Dramatiq-pg does not manage the schema for you and
let's you use your database migration tool. Dramatiq-pg ships a ``schema.sql``
file as a starting point for initializing the database for Dramatiq-pg.
::
psql -f dramatiq_pg/schema.sql
Table and type are contained in a ``dramatiq`` schema.
Connection Configuration
========================
The ``PostgresBroker`` class accepts either a ``pool`` or an ``url`` argument.
The ``pool`` is a psycopg2 connection pool object.
::
from dramatiq_pg import PostgresBroker
from psycopg2.pool import ThreadedConnectionPool
broker = PostgresBroker(pool=ThreadedConnectionPool(0, 8, "")
The ``url`` argument is a psycopg2-compatible `connection string
<http://initd.org/psycopg/docs/module.html#psycopg2.connect>`_, also called
*dsn*. Internally, ``PostgresBroker`` creates a ``ThreadedConnectionPool``. You
can customize de size of the pool by setting ``minconn`` and ``maxconn`` query
parameters. ``PostgresBroker`` reads ``minconn`` and ``maxconn`` only from URL,
not from keyword/value connection string.
::
from dramatiq_pg import PostgresBroker
broker = PostgresBroker(url="postgresql://user:password@host/dbname?minconn=0&maxconn=8)
The default value of ``minconn`` is 0 while ``maxconn`` defaults to 16.
Result Storage
==============
Dramatiq-pg implements a `Result backend
<https://dramatiq.io/cookbook.html#results>`_ storing results in Postgres.
``PostgresBroker`` **enables automatically Results middleware** with a
``PostgresBackend`` sharing the same connection pool. Note that only actors
defined with ``store_results=True`` triggers result storage.
When using multiple brokers, you must pass the backend to
``message.get_result()`` method. This is a limitation of Dramatiq.
``PostgresBroker`` keeps a reference of it's auto-created backend.
::
message = actor.send()
message.get_result(backend=broker.backend)
Disabling Result Storage
------------------------
You can disable the ``Results`` middleware by passing ``results=False`` to
broker constructor.
::
broker = PostgresBroker(url=conninfo, results=False)
Using Result Storage Alone
--------------------------
You may want to use Postgres as a result storage while using another message
broker (like RabbitMQ). To do this, directly use the ``PostgresBackend`` class.
::
from dramatiq import Results
from dramatiq_pg import PostgresBackend
backend = PostgresBackend(url=conninfo)
broker.add_middleware(Results(backend=backend))
......@@ -4,6 +4,7 @@ version = "0.4.1"
description = "Postgres Broker for Dramatiq Task Queue"
authors = ["Étienne BERSAC"]
homepage = "https://gitlab.com/dalibo/dramatiq-pg"
documentation = "https://gitlab.com/dalibo/dramatiq-pg/blob/master/docs/index.rst"
license = "PostgreSQL"
readme = "README.md"
keywords = ["postgres", "task queue", "dramatiq"]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment