PostgreSQL queue throughput benchmark: PgQue vs PgQ vs PGMQ vs River vs Que vs pg-boss (Hetzner CAX31, 3h)
Goal
Side-by-side throughput benchmark of all six queue systems from the PgQue comparison matrix under identical hardware and workload conditions, with full telemetry via pg_ash + pg-flight-recorder.
This is a validation run (3h). If results are clean, a full 24h run follows.
Systems under test
| VM name | System | Version | Notes |
|---|---|---|---|
nik-pgque-pgque |
PgQue | alpha1 (release) | Pure PL/pgSQL |
nik-pgque-pgq |
PgQ | 3.5.1 | PL-only mode — switch_plonly.sql applied post-install (no C extension) |
nik-pgque-pgmq |
PGMQ | 1.11.0 — PL/pgSQL-only | SQL file install, no Rust |
nik-pgque-river |
River | 0.34.0 | Go worker required |
nik-pgque-que |
Que | 2.4.0 (que-rb/que) | Ruby worker required |
nik-pgque-pgboss |
pg-boss | 12.15.0 | Node.js worker required |
Infrastructure
- Provider: Hetzner Cloud — CAX31 (8 vCPU ARM64, 16 GB RAM, local NVMe SSD)
- Location:
nbg1— all 6 VMs in the same datacenter - OS: Ubuntu 24.04 LTS
- PostgreSQL: 18 (pinned minor version, identical on all VMs)
- Estimated cost: ~€0.063/hr × 6 VMs × 3h ≈ €1.15
Step 1 — Provision VMs
# Set your Hetzner SSH key name first
SSH_KEY="your-hetzner-ssh-key-name"
for SYS in pgque pgq pgmq river que pgboss; do
hcloud server create \
--name nik-pgque-$SYS \
--type cax31 \
--image ubuntu-24.04 \
--location nbg1 \
--ssh-key "$SSH_KEY"
echo "Created nik-pgque-$SYS"
done
# Print IPs (save these)
for SYS in pgque pgq pgmq river que pgboss; do
echo "nik-pgque-$SYS $(hcloud server ip nik-pgque-$SYS)"
doneAdd a firewall to block public Postgres access:
hcloud firewall create --name pgque-bench-fw
hcloud firewall add-rule pgque-bench-fw --direction in --port 22 --protocol tcp --source-ips 0.0.0.0/0
# No rule for 5432 — Postgres stays internal only
for SYS in pgque pgq pgmq river que pgboss; do
hcloud firewall apply-to-resource pgque-bench-fw \
--type server --server nik-pgque-$SYS
doneStep 2 — Bootstrap all VMs: PostgreSQL 18 + telemetry
Save as bootstrap.sh and run on every VM:
#!/usr/bin/env bash
set -euo pipefail
export DEBIAN_FRONTEND=noninteractive
# ── PostgreSQL 18 from PGDG ──────────────────────────────────────────────────
apt-get update -qq
apt-get install -y curl gnupg lsb-release build-essential git python3-pip python3-psycopg2 -qq
curl -fsSL https://www.postgresql.org/media/keys/ACCC4CF8.asc \
| gpg --dearmor -o /usr/share/keyrings/postgresql.gpg
echo "deb [signed-by=/usr/share/keyrings/postgresql.gpg] \
https://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" \
> /etc/apt/sources.list.d/postgresql.list
apt-get update -qq
apt-get install -y postgresql-18 postgresql-server-dev-18 postgresql-18-cron -qq
pip3 install psycopg2-binary --break-system-packages
# ── pg_ash — pure SQL extension (no compilation) ─────────────────────────────
# Installed into the bench DB in Step 3 after DB creation
git clone https://github.com/NikolayS/pg_ash /tmp/pg_ash
# ── pg-flight-recorder — pure SQL extension (no compilation) ─────────────────
# Uses absolute paths /record_sql/ and /analyze_sql/ for \i in install.sql
# (Docker convention — we create symlinks to match)
git clone https://github.com/NikolayS/pg-flight-recorder /tmp/pgfr
ln -sfn /tmp/pgfr/_record/sql /record_sql
ln -sfn /tmp/pgfr/_analyze/sql /analyze_sql
mkdir -p /data
chown postgres:postgres /data
echo "=== bootstrap.sh done ==="Step 3 — PostgreSQL configuration (identical on all VMs)
Append to /etc/postgresql/18/main/postgresql.conf:
# ── Extensions requiring preload ─────────────────────────────────────────────
# pg_cron needed on ALL VMs — pg_ash and pg-flight-recorder use it for scheduling
shared_preload_libraries = 'pg_stat_statements,pg_cron'
cron.database_name = 'bench'
# ── Memory ───────────────────────────────────────────────────────────────────
shared_buffers = 4GB
effective_cache_size = 12GB
# ── WAL / durability ─────────────────────────────────────────────────────────
synchronous_commit = off # critical for throughput; matches #76 baseline
wal_level = minimal
wal_compression = lz4
max_wal_size = 16GB
checkpoint_completion_target = 0.9
# ── bgwriter ─────────────────────────────────────────────────────────────────
bgwriter_delay = 50ms
bgwriter_lru_maxpages = 400
bgwriter_lru_multiplier = 4.0
# ── Storage / query planner ──────────────────────────────────────────────────
random_page_cost = 1.1
effective_io_concurrency = 200
max_connections = 200
# ── Replication (required when wal_level=minimal) ────────────────────────────
max_wal_senders = 0 # required: wal_level=minimal won't start otherwise
# ── Autovacuum — keep bloat predictable during heavy DML ─────────────────────
autovacuum_vacuum_scale_factor = 0.01
autovacuum_analyze_scale_factor = 0.01
autovacuum_vacuum_cost_delay = 2ms
# ── JIT ─────────────────────────────────────────────────────────────────────
jit = off # eliminate JIT overhead from short queue opsApply, create benchmark database, and install telemetry:
# Allow password-free localhost connections (needed for River, Que, pg-boss CLI tools)
sed -i '/^host.*127.0.0.1.*scram-sha-256/i host all postgres 127.0.0.1/32 trust' \
/etc/postgresql/18/main/pg_hba.conf
systemctl restart postgresql@18-main
sudo -u postgres psql -c "CREATE DATABASE bench;"
# pg_stat_statements and pg_cron (both are in shared_preload_libraries)
sudo -u postgres psql -d bench -c "
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
CREATE EXTENSION IF NOT EXISTS pg_cron;
"
# pg_ash: pure SQL — install via script, then start 1-second sampling via pg_cron
sudo -u postgres psql -d bench -f /tmp/pg_ash/sql/ash-install.sql
sudo -u postgres psql -d bench -c "SELECT * FROM ash.start();"
# pg-flight-recorder: pure SQL — symlinks were created in bootstrap.sh
# _record, _analyze, _control each have their own install.sql
sudo -u postgres psql -d bench -f /tmp/pgfr/_record/install.sql
sudo -u postgres psql -d bench -f /tmp/pgfr/_analyze/install.sql
sudo -u postgres psql -d bench -f /tmp/pgfr/_control/install.sqlNote:
pg_ashandpg-flight-recorderare pure PL/pgSQL — nomakeorCREATE EXTENSION. They usepg_cronto schedule sampling jobs automatically upon install. That's whypg_cronmust be inshared_preload_librarieson all VMs, not just the PgQ VM.
Step 4 — Install queue systems (per-VM)
nik-pgque-pgque — PgQue (pure PL/pgSQL)
git clone --branch alpha1 --depth 1 https://github.com/NikolayS/pgque /tmp/pgque
# SQL file is in sql/ subdirectory
sudo -u postgres psql -d bench -f /tmp/pgque/sql/pgque.sql
sudo -u postgres psql -d bench -c "SELECT pgque.create_queue('bench_queue');"
# Register consumer and set up ticker (pgque is batch-based like pgq)
sudo -u postgres psql -d bench -c "SELECT pgque.ticker('bench_queue');"
sudo -u postgres psql -d bench -c "SELECT pgque.register_consumer('bench_queue', 'bench_consumer');"
sudo -u postgres psql -d bench -c "SELECT pgque.ticker('bench_queue');"
# pg_cron ticker safety net — minimum interval for pg_cron is 1 minute.
# bench.py calls ticker inline on every consumer iteration, so this is backup only:
sudo -u postgres psql -d bench -c "SELECT cron.schedule('pgque-ticker', '* * * * *', \$\$SELECT pgque.ticker()\$\$);"nik-pgque-pgq — PgQ
# Build pgq from source — installs C extension by default
git clone --branch v3.5.1 --depth 1 https://github.com/pgq/pgq /tmp/pgq
cd /tmp/pgq
make USE_PGXS=1 PG_CONFIG=/usr/lib/postgresql/18/bin/pg_config
make USE_PGXS=1 install
# pg_cron is already in shared_preload_libraries (installed in Step 3)
sudo -u postgres psql -d bench -c "CREATE EXTENSION pgq;"
# IMPORTANT: Switch to PL-only mode — replaces the C insert_event_raw() with
# the PL/pgSQL version. PgQue is pure PL/pgSQL, so PgQ must also run without
# its C extension to make the comparison apples-to-apples.
sudo -u postgres psql -d bench -f /tmp/pgq/sql/switch_plonly.sql
# Verify PL-only is active (prolang should be 'plpgsql', NOT 'c'):
sudo -u postgres psql -d bench -c "
SELECT proname, l.lanname
FROM pg_proc p JOIN pg_language l ON p.prolang = l.oid
WHERE proname = 'insert_event_raw'
AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'pgq');"
# Expected: prolang = plpgsql. If it shows 'c', switch_plonly.sql did not work.
sudo -u postgres psql -d bench -c "SELECT pgq.create_queue('bench_queue');"
# Ticker must run at least once before register_consumer works
sudo -u postgres psql -d bench -c "SELECT pgq.ticker('bench_queue');"
sudo -u postgres psql -d bench -c "SELECT pgq.register_consumer('bench_queue', 'bench_consumer');"
# pg_cron ticker safety net — minimum interval for pg_cron is 1 minute.
# bench.py calls ticker inline on every consumer iteration, so this is backup only:
sudo -u postgres psql -d bench -c "SELECT cron.schedule('pgq-ticker', '* * * * *', \$\$SELECT pgq.ticker()\$\$);"nik-pgque-pgmq — PGMQ 1.11.0 (PL/pgSQL-only)
No Rust extension — install directly from the SQL file:
# Download SQL-only install file for pinned tag
curl -fsSL \
https://raw.githubusercontent.com/tembo-io/pgmq/v1.11.0/pgmq-extension/sql/pgmq.sql \
-o /tmp/pgmq.sql
sudo -u postgres psql -d bench -f /tmp/pgmq.sql
sudo -u postgres psql -d bench -c "SELECT pgmq.create('bench_queue');"nik-pgque-river — River 0.34.0 (Go worker)
# Use official Go tarball — Ubuntu 24.04's golang-go may be too old for River 0.34
curl -fsSL https://go.dev/dl/go1.22.4.linux-arm64.tar.gz | tar -C /usr/local -xz
export PATH=$PATH:/usr/local/go/bin
echo 'export PATH=$PATH:/usr/local/go/bin' >> /root/.bashrc
go install github.com/riverqueue/river/cmd/river@v0.34.0
export PATH=$PATH:$(go env GOPATH)/bin
echo "export PATH=\$PATH:$(go env GOPATH)/bin" >> /root/.bashrc
sudo -u postgres psql -d bench -c "CREATE EXTENSION IF NOT EXISTS pgcrypto;"
# Use 127.0.0.1 (not localhost) — pg_hba trust rule applies to 127.0.0.1
# Table is river_job (no trailing 's')
river migrate-up --database-url "postgres://postgres@127.0.0.1/bench"nik-pgque-que — Que 2.4.0 (Ruby)
apt-get install -y ruby ruby-dev libpq-dev
gem install que -v 2.4.0
gem install sequel pg
# Que v2 requires Sequel adapter (not raw PG.connect):
ruby -r sequel -r que -e "
Que.connection = Sequel.connect('postgres://postgres@127.0.0.1/bench')
Que.migrate!(version: Que::Migrations::CURRENT_VERSION)
"nik-pgque-pgboss — pg-boss 12.15.0 (Node.js)
# Ubuntu 24.04 ships Node 18 — pg-boss 12 needs Node >= 18 but Node 20 is safer:
curl -fsSL https://deb.nodesource.com/setup_20.x | bash -
apt-get install -y nodejs
npm install -g pg-boss@12.15.0
# Create schema + bench_queue up-front
# pg-boss 12.x: PgBoss is a named export; use NODE_PATH for global modules
# pg-boss 12 uses snake_case columns (created_on, start_after, keep_until)
NODE_PATH=/usr/lib/node_modules node -e "
const { PgBoss } = require('pg-boss');
const boss = new PgBoss('postgres://postgres@127.0.0.1/bench');
boss.start().then(async () => {
await boss.createQueue('bench_queue');
console.log('schema ready, bench_queue created');
process.exit(0);
}).catch(e => { console.error(e.message); process.exit(1); });
"Step 5 — Smoke test (30s per VM)
Save smoke_bench.py on each VM and run: python3 /root/smoke_bench.py --system <name> --duration 30
This script runs a 30-second producer burst, a 30-second consumer drain, and verifies pg_ash + pgfr telemetry:
#!/usr/bin/env python3
"""
smoke_bench.py — 30s producer + consumer smoke test with TPS measurement.
Usage: python3 smoke_bench.py --system <pgque|pgq|pgmq|river|que|pgboss>
"""
import argparse, threading, time, sys
import psycopg2
PAYLOAD = "x" * 2000
DSN = "host=127.0.0.1 dbname=bench user=postgres"
ENQUEUE = {
"pgque": "SELECT pgque.send('bench_queue', json_build_object('data', %s)::jsonb)",
"pgq": "SELECT pgq.insert_event('bench_queue', 'bench', %s)",
"pgmq": "SELECT pgmq.send('bench_queue', json_build_object('data', %s)::jsonb)",
# River 0.34: table is river_job (no 's'); state='available' required (no default)
"river": "INSERT INTO river_job(kind,args,max_attempts,queue,state) VALUES('bench',json_build_object('data',%s)::jsonb,25,'default','available')",
# Que 2.x: job_schema_version=1 required
"que": "INSERT INTO que_jobs(job_schema_version,job_class,args,queue,run_at,priority,error_count) VALUES(1,'BenchJob',json_build_array(%s),'default',now(),100,0)",
# pg-boss 12.x: snake_case columns, bench_queue must be pre-created via boss.createQueue()
"pgboss": "INSERT INTO pgboss.job(name,data,priority) VALUES('bench_queue',json_build_object('data',%s)::jsonb,0)",
}
stop = threading.Event()
def run_producer(system, duration, results):
conn = psycopg2.connect(DSN)
conn.autocommit = True
cur = conn.cursor()
count = 0
t0 = time.monotonic()
while time.monotonic() - t0 < duration and not stop.is_set():
try:
cur.execute(ENQUEUE[system], (PAYLOAD,))
count += 1
except Exception as e:
print(f"[producer] {e}", file=sys.stderr)
time.sleep(0.1)
elapsed = time.monotonic() - t0
results['producer_tps'] = count / elapsed
results['producer_count'] = count
conn.close()
def run_consumer_batch(system, duration, results):
"""Batch-based consumer for pgque and pgq (ticker → next_batch → finish_batch)."""
conn = psycopg2.connect(DSN)
conn.autocommit = False
cur = conn.cursor()
count = 0
t0 = time.monotonic()
ticker_sql = f"SELECT {system}.ticker('bench_queue')"
batch_sql = f"SELECT {system}.next_batch('bench_queue', 'bench_consumer')"
events_sql = f"SELECT count(*) FROM {system}.get_batch_events(%s)"
finish_sql = f"SELECT {system}.finish_batch(%s)"
while time.monotonic() - t0 < duration and not stop.is_set():
cur.execute(ticker_sql)
cur.execute(batch_sql)
row = cur.fetchone()
batch_id = row[0] if row else None
if batch_id is None:
conn.commit()
time.sleep(0.05)
continue
cur.execute(events_sql, (batch_id,))
n = cur.fetchone()[0]
cur.execute(finish_sql, (batch_id,))
conn.commit()
count += n
elapsed = time.monotonic() - t0
results['consumer_tps'] = count / elapsed
results['consumer_count'] = count
conn.close()
def run_consumer_pgmq(duration, results):
conn = psycopg2.connect(DSN)
conn.autocommit = True
cur = conn.cursor()
count = 0
t0 = time.monotonic()
while time.monotonic() - t0 < duration and not stop.is_set():
cur.execute("SELECT msg_id FROM pgmq.read('bench_queue', 30, 50)")
rows = cur.fetchall()
if rows:
cur.execute("SELECT pgmq.delete('bench_queue', %s)", ([r[0] for r in rows],))
count += len(rows)
else:
time.sleep(0.001)
elapsed = time.monotonic() - t0
results['consumer_tps'] = count / elapsed
results['consumer_count'] = count
conn.close()
def prefill_pgmq(n=50000):
conn = psycopg2.connect(DSN)
conn.autocommit = True
cur = conn.cursor()
cur.execute(f"SELECT count(*) FROM generate_series(1,{n}) i, LATERAL (SELECT pgmq.send('bench_queue', json_build_object('data',repeat('x',2000))::jsonb)) s")
conn.close()
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--system", required=True)
ap.add_argument("--duration", type=int, default=30)
args = ap.parse_args()
sys_name, dur = args.system, args.duration
results = {}
print(f"[{sys_name}] Producer phase ({dur}s)...")
t = threading.Thread(target=run_producer, args=(sys_name, dur, results))
t.start(); t.join()
print(f"[{sys_name}] Producer: {results['producer_tps']:.0f} TPS ({results['producer_count']} events)")
if sys_name in ("pgque", "pgq"):
print(f"[{sys_name}] Consumer phase ({dur}s)...")
t = threading.Thread(target=run_consumer_batch, args=(sys_name, dur, results))
t.start(); t.join()
print(f"[{sys_name}] Consumer: {results['consumer_tps']:.0f} TPS ({results['consumer_count']} events)")
elif sys_name == "pgmq":
print(f"[{sys_name}] Pre-filling 50k messages for consumer phase...")
prefill_pgmq()
print(f"[{sys_name}] Consumer phase ({dur}s)...")
t = threading.Thread(target=run_consumer_pgmq, args=(dur, results))
t.start(); t.join()
print(f"[{sys_name}] Consumer: {results['consumer_tps']:.0f} TPS ({results['consumer_count']} events)")
else:
print(f"[{sys_name}] Consumer: native worker (river/que/pgboss — not tested in smoke)")
results['consumer_tps'] = None
# Verify telemetry
conn = psycopg2.connect(DSN)
conn.autocommit = True
cur = conn.cursor()
cur.execute("SELECT count(*) FROM ash.sample")
ash_count = cur.fetchone()[0]
cur.execute("SELECT pgfr_record.snapshot()")
pgfr_ts = cur.fetchone()[0]
conn.close()
ctps = f"{results.get('consumer_tps', 0):.0f}" if results.get('consumer_tps') is not None else "native-worker"
print(f"[{sys_name}] pg_ash samples: {ash_count}")
print(f"[{sys_name}] pgfr snapshot: {pgfr_ts}")
print(f"[{sys_name}] SUMMARY producer={results['producer_tps']:.0f} consumer={ctps} ash={ash_count} pgfr=OK")
if __name__ == "__main__":
main()Expected results (Hetzner CAX31, ~30s, 1 thread):
| System | Producer TPS | Consumer TPS |
|---|---|---|
| pgque | ~3000 | ~3000 |
| pgq | ~4000 | ~4000 |
| pgmq | ~3000 | ~3000 |
| river | ~2900 | native-worker |
| que | ~2200 | native-worker |
| pgboss | ~2900 | native-worker |
Iterate until all 6 pass. Fix install/config issues and re-run.
Step 6 — Benchmark load script
Save as bench.py on each VM. Runs producer and consumer phases for all SQL-addressable systems. Worker-based systems (River, Que, pg-boss) use their native worker for the consumer phase.
#!/usr/bin/env python3
"""
Note: bench.py's --warmup 60 (default) discards the first 60s of each phase,
so these CSVs already exclude cold-start. No further trimming needed here.
"""
bench.py — Queue throughput load generator
Install: pip install psycopg2-binary
Usage:
python3 bench.py --system pgque --phase producer --clients 8 --duration 3600 > /data/producer.csv
python3 bench.py --system pgq --phase consumer --clients 8 --duration 3600 > /data/consumer.csv
# etc.
Output: CSV ts,system,phase,evps (one row per 10s bucket)
Note: pg-boss producer uses bench_pgboss.js (Node.js) — see below.
"""
import argparse, threading, time, sys
from datetime import datetime, timezone
import psycopg2
PAYLOAD = "x" * 2000 # ~2 KiB text payload (JSON-wrapping systems add ~10 bytes overhead)
# ── Enqueue SQL (parameterized: %s = PAYLOAD) ────────────────────────────────
ENQUEUE = {
# pgque: payload must be jsonb
"pgque": "SELECT pgque.send('bench_queue', json_build_object('data', %s)::jsonb)",
"pgq": "SELECT pgq.insert_event('bench_queue', 'bench', %s)",
"pgmq": "SELECT pgmq.send('bench_queue', json_build_object('data', %s)::jsonb)",
# River 0.34: table is river_job (no trailing 's'); state='available' required (no default)
"river": """INSERT INTO river_job(kind,args,max_attempts,queue,state)
VALUES('bench',json_build_object('data',%s)::jsonb,25,'default','available')""",
# Que 2.x: job_schema_version=1 required (no default), args must be JSON array
"que": """INSERT INTO que_jobs(job_schema_version,job_class,args,queue,run_at,priority,error_count)
VALUES(1,'BenchJob',json_build_array(%s),'default',now(),100,0)""",
# pg-boss 12.x: snake_case columns; bench_queue must be pre-created via boss.createQueue()
"pgboss": """INSERT INTO pgboss.job(name,data,priority)
VALUES('bench_queue',json_build_object('data',%s)::jsonb,0)""",
}
# ── Dequeue: returns list of IDs (fetchall), then acks ───────────────────────
# Returns (receive_sql, ack_sql_template) where ack takes an array of ids
DEQUEUE = {
# pgmq: visibility window 30s, batch 50
"pgmq": (
"SELECT msg_id FROM pgmq.read('bench_queue', 30, 50)",
"SELECT pgmq.delete('bench_queue', %s)",
),
# pgque: batch-based — handled by pgque_consumer() below (same pattern as pgq)
# pgq: batch-based — handled by pgq_consumer() below
# River/Que/pgboss: native workers handle dequeue
}
def batch_consumer(system, conn, stop_event, count_ref, lock):
"""Generic batch consumer for pgque and pgq.
Both use the same pattern: ticker → next_batch → get_batch_events → finish_batch.
bench.py ticks inline; pg_cron ticker is a safety net only."""
cur = conn.cursor()
while not stop_event.is_set():
try:
cur.execute(f"SELECT {system}.ticker('bench_queue')")
cur.execute(f"SELECT {system}.next_batch('bench_queue', 'bench_consumer')")
row = cur.fetchone()
batch_id = row[0] if row else None
if batch_id is None:
time.sleep(0.05)
continue
cur.execute(f"SELECT count(*) FROM {system}.get_batch_events(%s)", (batch_id,))
n = cur.fetchone()[0]
cur.execute(f"SELECT {system}.finish_batch(%s)", (batch_id,))
conn.commit()
if n:
with lock:
count_ref[0] += n
except Exception as e:
print(f"[{system}_consumer] {e}", file=sys.stderr)
time.sleep(0.1)
cur.close()
stop_event = threading.Event()
_print_lock = threading.Lock()
def worker(system, phase, dsn, wid, warmup_until):
conn = psycopg2.connect(dsn)
conn.autocommit = True
cur = conn.cursor()
count = 0
t0 = time.monotonic()
while not stop_event.is_set():
try:
if phase in ("producer", "mixed_producer"):
cur.execute(ENQUEUE[system], (PAYLOAD,))
count += 1
elif phase in ("consumer", "mixed_consumer"):
if system == "pgq":
time.sleep(0.01) # pgq_consumer() thread handles this
continue
if system not in DEQUEUE:
time.sleep(0.1) # worker-based: native process handles dequeue
continue
recv_sql, ack_sql = DEQUEUE[system]
cur.execute(recv_sql)
rows = cur.fetchall()
if rows:
ids = [r[0] for r in rows]
cur.execute(ack_sql, (ids,))
count += len(ids)
except Exception as e:
print(f"[w{wid}] {e}", file=sys.stderr)
time.sleep(0.1)
continue
now = time.monotonic()
if now - t0 >= 10:
if now >= warmup_until: # skip warmup buckets
evps = count / (now - t0)
ts = datetime.now(timezone.utc).isoformat()
with _print_lock:
print(f"{ts},{system},{phase},{evps:.1f}", flush=True)
count = 0
t0 = now
cur.close()
conn.close()
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--system", required=True, choices=list(ENQUEUE.keys()))
ap.add_argument("--phase", required=True,
choices=["producer", "consumer", "mixed_producer", "mixed_consumer"])
ap.add_argument("--clients", type=int, default=8)
ap.add_argument("--duration", type=int, default=3600)
ap.add_argument("--warmup", type=int, default=60, help="seconds to skip at start")
ap.add_argument("--dsn", default="host=localhost dbname=bench user=postgres")
args = ap.parse_args()
print("ts,system,phase,evps")
warmup_until = time.monotonic() + args.warmup
threads = [
threading.Thread(target=worker,
args=(args.system, args.phase, args.dsn, i, warmup_until),
daemon=True)
for i in range(args.clients)
]
# pgque + pgq consumer: one dedicated batch-consumer thread
batch_count = [0]
if args.system in ("pgque", "pgq") and args.phase in ("consumer", "mixed_consumer"):
bc_conn = psycopg2.connect(args.dsn)
bc_conn.autocommit = False
t = threading.Thread(target=batch_consumer,
args=(args.system, bc_conn, stop_event, batch_count, _print_lock),
daemon=True)
threads.append(t)
for t in threads: t.start()
time.sleep(args.duration)
stop_event.set()
for t in threads: t.join(timeout=10)
if __name__ == "__main__":
main()Pre-fill queue for consumer phase
Run before the consumer phase — prefill.sh handles per-system SQL:
#!/usr/bin/env bash
# prefill.sh — insert 2M messages into bench_queue
SYS=${1:?Usage: prefill.sh <system>}
PG="sudo -u postgres psql -d bench"
N=2000000
echo "Pre-filling $N messages for $SYS..."
case $SYS in
pgque)
# pgque.send requires jsonb payload
$PG -c "SELECT count(*) FROM generate_series(1,$N) i, LATERAL (SELECT pgque.send('bench_queue', json_build_object('data',repeat('x',2000))::jsonb)) s;"
# Tick many times to create many manageable batches (~500 events each).
# A single ticker call creates one huge batch — avoid that.
$PG -c "DO \$\$
BEGIN
FOR i IN 1..4500 LOOP
PERFORM pgque.ticker('bench_queue');
END LOOP;
END;\$\$;"
;;
pgq)
$PG -c "SELECT count(*) FROM generate_series(1,$N) i, LATERAL (SELECT pgq.insert_event('bench_queue','bench',repeat('x',2000))) s;"
# Tick many times to create many manageable batches (~500 events each).
# A single ticker call creates one huge batch — avoid that.
# queue_ticker_max_count=500 default: 2M events → ~4000 batches needed.
# Call ticker in a loop until next_batch returns NULL (queue drained by ticks).
$PG -c "DO \$\$
DECLARE batch_id bigint;
BEGIN
FOR i IN 1..4500 LOOP
SELECT pgq.ticker('bench_queue');
END LOOP;
END;\$\$;"
;;
pgmq)
$PG -c "SELECT count(*) FROM generate_series(1,$N) i, LATERAL (SELECT pgmq.send('bench_queue', json_build_object('data',repeat('x',2000))::jsonb)) s;"
;;
river)
# river_job (no 's'); state='available' required (no default in River 0.34 schema)
$PG -c "INSERT INTO river_job(kind,args,max_attempts,queue,state) SELECT 'bench',json_build_object('data',repeat('x',2000))::jsonb,25,'default','available' FROM generate_series(1,$N);"
;;
que)
# job_schema_version=1 required in Que v2
$PG -c "INSERT INTO que_jobs(job_schema_version,job_class,args,queue,run_at,priority,error_count) SELECT 1,'BenchJob',json_build_array(repeat('x',2000)),'default',now(),100,0 FROM generate_series(1,$N);"
;;
pgboss)
# pg-boss 12.x: snake_case, bench_queue must exist
$PG -c "INSERT INTO pgboss.job(name,data,priority) SELECT 'bench_queue',json_build_object('data',repeat('x',2000))::jsonb,0 FROM generate_series(1,$N);"
;;
esac
echo "Pre-fill complete."Run all 3 phases
#!/usr/bin/env bash
set -euo pipefail
# run_benchmark.sh — run inside tmux to survive network interruptions
# Usage: tmux new-session -s bench "bash /root/run_benchmark.sh <system>"
SYS=${1:?Usage: run_benchmark.sh <system>}
# Use 127.0.0.1 (trust auth) not localhost (socket peer auth)
DSN="host=127.0.0.1 dbname=bench user=postgres"
PY="python3 /root/bench.py --system $SYS --clients 8 --warmup 60 --dsn '$DSN'"
mkdir -p /data
# Reset pg_stat_statements so collected data covers only the benchmark run
sudo -u postgres psql -d bench -c "SELECT pg_stat_statements_reset();"
echo "=== Phase 1: producer (1h) ==="
$PY --phase producer --duration 3600 > /data/producer.csv
echo "=== Phase 2: consumer (1h) — pre-filling queue first ==="
bash /root/prefill.sh "$SYS"
$PY --phase consumer --duration 3600 > /data/consumer.csv
echo "=== Phase 3: mixed (1h) ==="
$PY --phase mixed_producer --duration 3600 > /data/mixed_producer.csv &
$PY --phase mixed_consumer --duration 3600 > /data/mixed_consumer.csv &
wait
echo "Run complete. Artifacts in /data/"Always run inside
tmuxto prevent the 3h run from being interrupted by network drops. Start with:tmux new-session -s bench "bash /root/run_benchmark.sh <system>"Re-attach with:tmux attach -t bench
For worker-based systems (River, Que, pg-boss): consumer + mixed consumer phases require starting their native worker alongside the producer. See worker startup below.
River worker (on nik-pgque-river)
# Run inside tmux alongside run_benchmark.sh
export PATH=$PATH:/usr/local/go/bin:/root/go/bin
tmux new-session -s worker "river work --database-url 'postgres://postgres@127.0.0.1/bench' --queues default=8"Que worker (on nik-pgque-que)
# Que v2.x uses Sequel adapter (not raw PG.connect)
tmux new-session -s worker "ruby -r sequel -r que -e \"
Que.connection = Sequel.connect('postgres://postgres@127.0.0.1/bench')
Que::Worker.new(queues: ['default'], worker_count: 8).tap(&:run)
\""pg-boss worker (on nik-pgque-pgboss)
# pg-boss 12.x: PgBoss is a named export, not default export
tmux new-session -s worker "NODE_PATH=/usr/lib/node_modules node -e \"
const { PgBoss } = require('pg-boss');
const boss = new PgBoss('postgres://postgres@127.0.0.1/bench');
boss.start().then(() => {
boss.work('bench_queue', {teamSize: 8}, job => Promise.resolve());
});
\""Step 7 — Collect all artifacts (run locally after all 3 phases complete)
Save as collect.sh:
#!/usr/bin/env bash
set -euo pipefail
mkdir -p results
for SYS in pgque pgq pgmq river que pgboss; do
IP=$(hcloud server ip nik-pgque-$SYS)
echo "=== Collecting: $SYS ($IP) ==="
# Throughput CSVs from bench.py
for PHASE in producer consumer mixed_producer mixed_consumer; do
scp -o StrictHostKeyChecking=no \
root@$IP:/data/${PHASE}.csv \
results/${SYS}_${PHASE}.csv 2>/dev/null || echo " (no ${PHASE}.csv)"
done
# pg_ash: dump wait event summary to CSV
# ash.sample is the raw table; ash.samples() is a view function
ssh root@$IP "sudo -u postgres psql -d bench -c \
\"COPY (
SELECT
sample_ts AS ts_bucket,
COALESCE(we.wait_event_type, 'CPU') AS wait_event_type,
COALESCE(we.wait_event, 'CPU') AS wait_event,
count(*) AS cnt
FROM ash.sample s
LEFT JOIN ash.wait_event we ON s.wait_event_id = we.id
GROUP BY 1, 2, 3
ORDER BY 1
) TO '/tmp/ash.csv' CSV HEADER\""
scp root@$IP:/tmp/ash.csv results/${SYS}_ash.csv
# pg-flight-recorder: export HTML report
# Use pgfr_analyze.performance_report() — pgfr_record.snapshot() takes a new snapshot
ssh root@$IP "sudo -u postgres psql -d bench -c \
\"SELECT pgfr_record.snapshot();\""
ssh root@$IP "sudo -u postgres psql -d bench -t -c \
\"SELECT pgfr_analyze.performance_report();\"" > results/${SYS}_pgfr.html
# pg_stat_statements — cumulative totals (time-series deltas are in pgfr HTML)
ssh root@$IP "sudo -u postgres psql -d bench -c \
\"\COPY (
SELECT query, calls, total_exec_time, mean_exec_time, rows
FROM pg_stat_statements
ORDER BY calls DESC
LIMIT 50
) TO '/tmp/pgss.csv' CSV HEADER\""
scp root@$IP:/tmp/pgss.csv results/${SYS}_pgss.csv
# pg_stat_wal + pg_stat_checkpointer + pg_stat_bgwriter (PG17+ split)
ssh root@$IP "sudo -u postgres psql -d bench -Atc \
\"SELECT row_to_json(t) FROM (
SELECT w.*, c.num_timed, c.num_requested, c.write_time, c.sync_time,
b.buffers_clean, b.maxwritten_clean, b.buffers_alloc
FROM pg_stat_wal w, pg_stat_checkpointer c, pg_stat_bgwriter b
) t;\"" > results/${SYS}_pg_stats.json
echo " Done: $SYS"
done
echo ""
echo "All artifacts in ./results/"
ls -lh results/Step 8 — Generate charts
Install deps locally: pip install pandas matplotlib
Save as analyze.py:
#!/usr/bin/env python3
"""
analyze.py — Generate benchmark charts.
Reads: results/<sys>_<phase>.csv — ts,system,phase,evps
results/<sys>_ash.csv — ts_bucket,wait_event_type,wait_event,cnt
Output: charts/1_throughput_bar.png
charts/2_throughput_timeline.png
charts/3_wait_events.png
charts/4_summary_table.png
"""
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import matplotlib.patches as mpatches
import numpy as np
from pathlib import Path
SYSTEMS = ["pgque", "pgq", "pgmq", "river", "que", "pgboss"]
PHASES = ["producer", "consumer", "mixed_producer"]
COLORS = dict(zip(SYSTEMS, ["#4C72B0","#DD8452","#55A868","#C44E52","#8172B3","#937860"]))
WE_PAL = {"CPU":"#4CAF50","Client":"#2196F3","IO":"#FF9800",
"Lock":"#F44336","LWLock":"#9C27B0","Other":"#9E9E9E"}
OUT = Path("charts")
OUT.mkdir(exist_ok=True)
# ── Load throughput data ──────────────────────────────────────────────────────
frames = []
for sys in SYSTEMS:
for phase in PHASES:
p = Path(f"results/{sys}_{phase}.csv")
if not p.exists():
continue
df = pd.read_csv(p)
if df.empty:
continue
df.columns = df.columns.str.strip()
df["ts"] = pd.to_datetime(df["ts"], utc=True)
df["system"] = sys
df["phase"] = phase
frames.append(df[["ts","system","phase","evps"]])
if not frames:
raise SystemExit("No throughput CSV files found in results/")
tp = pd.concat(frames, ignore_index=True)
# ── Chart 1: Throughput bar chart ─────────────────────────────────────────────
mean_tp = (tp.groupby(["system","phase"])["evps"]
.mean()
.unstack(fill_value=0)
.reindex(SYSTEMS, fill_value=0))
fig, ax = plt.subplots(figsize=(13, 6))
x = np.arange(len(SYSTEMS))
w = 0.25
for i, phase in enumerate(PHASES):
if phase not in mean_tp.columns:
continue
vals = mean_tp[phase].values
bars = ax.bar(x + i*w, vals, w,
label=phase.replace("_"," "),
color=[COLORS[s] for s in SYSTEMS],
alpha=0.85)
for bar, v in zip(bars, vals):
if v > 500:
ax.text(bar.get_x()+bar.get_width()/2, bar.get_height()+200,
f"{v:,.0f}", ha="center", va="bottom", fontsize=7, rotation=45)
ax.set_xticks(x + w)
ax.set_xticklabels(SYSTEMS, fontsize=11)
ax.set_ylabel("events / second")
ax.set_title("Throughput — all systems, all phases (mean ev/s over full phase duration)")
ax.yaxis.set_major_formatter(mticker.FuncFormatter(lambda v,_: f"{v:,.0f}"))
ax.legend(title="phase")
plt.tight_layout()
plt.savefig(OUT / "1_throughput_bar.png", dpi=150)
plt.close()
print("Saved: 1_throughput_bar.png")
# ── Chart 2: Throughput timeline (10s buckets) ────────────────────────────────
fig, axes = plt.subplots(len(PHASES), 1, figsize=(15, 4*len(PHASES)), sharex=False)
if len(PHASES) == 1:
axes = [axes]
for ax, phase in zip(axes, PHASES):
pdata = tp[tp.phase == phase]
for sys in SYSTEMS:
sdata = pdata[pdata.system == sys].set_index("ts").sort_index()
if sdata.empty:
continue
sdata = sdata["evps"].resample("10s").mean().dropna()
t0 = sdata.index[0]
mins = (sdata.index - t0).total_seconds() / 60
ax.plot(mins, sdata.values, label=sys, color=COLORS[sys], linewidth=1.2)
ax.set_title(f"Phase: {phase.replace('_',' ')}")
ax.set_ylabel("ev/s")
ax.set_xlabel("minutes into phase")
ax.yaxis.set_major_formatter(mticker.FuncFormatter(lambda v,_: f"{v:,.0f}"))
ax.legend(ncol=3, fontsize=9)
ax.grid(alpha=0.3)
plt.tight_layout()
plt.savefig(OUT / "2_throughput_timeline.png", dpi=150)
plt.close()
print("Saved: 2_throughput_timeline.png")
# ── Chart 3: Wait event breakdown (stacked bar) ───────────────────────────────
ash_data = {}
for sys in SYSTEMS:
p = Path(f"results/{sys}_ash.csv")
if not p.exists():
continue
df = pd.read_csv(p)
if df.empty:
continue
df["wait_event_type"] = df["wait_event_type"].fillna("CPU")
total = df["cnt"].sum()
cats = df.groupby("wait_event_type")["cnt"].sum() / total * 100
ash_data[sys] = cats
if ash_data:
all_cats = sorted(set().union(*[d.index for d in ash_data.values()]))
fig, ax = plt.subplots(figsize=(12, 6))
bottoms = np.zeros(len(SYSTEMS))
legend_patches = []
for cat in all_cats:
vals = np.array([ash_data.get(s, pd.Series()).get(cat, 0.0) for s in SYSTEMS])
color = WE_PAL.get(cat, WE_PAL["Other"])
ax.bar(SYSTEMS, vals, bottom=bottoms, color=color, label=cat)
bottoms += vals
legend_patches.append(mpatches.Patch(color=color, label=cat))
ax.set_title("Wait Event Breakdown — all phases combined")
ax.set_ylabel("% of pg_ash samples")
ax.set_ylim(0, 105)
ax.legend(handles=legend_patches, bbox_to_anchor=(1.01, 1), loc="upper left")
# Annotate CPU % on top of each bar
for i, sys in enumerate(SYSTEMS):
cpu_pct = ash_data.get(sys, pd.Series()).get("CPU", 0.0)
ax.text(i, cpu_pct/2, f"{cpu_pct:.1f}%", ha="center", va="center",
color="white", fontsize=9, fontweight="bold")
plt.tight_layout()
plt.savefig(OUT / "3_wait_events.png", dpi=150, bbox_inches="tight")
plt.close()
print("Saved: 3_wait_events.png")
# ── Chart 4: Summary table ────────────────────────────────────────────────────
rows = []
for sys in SYSTEMS:
prod_mean = tp[(tp.system==sys)&(tp.phase=="producer")]["evps"].mean()
cons_mean = tp[(tp.system==sys)&(tp.phase=="consumer")]["evps"].mean()
rows.append({
"System": sys,
"Producer ev/s": f"{prod_mean:,.0f}" if not np.isnan(prod_mean) else "—",
"Consumer ev/s": f"{cons_mean:,.0f}" if not np.isnan(cons_mean) else "—*",
"CPU %": f"{ash_data.get(sys, pd.Series()).get('CPU', 0.0):.1f}%" if ash_data else "—",
})
df_tbl = pd.DataFrame(rows)
fig, ax = plt.subplots(figsize=(10, 2.5))
ax.axis("off")
tbl = ax.table(cellText=df_tbl.values, colLabels=df_tbl.columns,
cellLoc="center", loc="center")
tbl.auto_set_font_size(True)
tbl.scale(1, 1.6)
ax.set_title("Summary (mean throughput over full phase duration)\n* worker-based consumer measured via pg_ash only",
fontsize=9, pad=12)
plt.tight_layout()
plt.savefig(OUT / "4_summary_table.png", dpi=150, bbox_inches="tight")
plt.close()
print("Saved: 4_summary_table.png")
# ── Chart 5: pg_stat_statements — top queries by calls per system ─────────────
# Uses results/<sys>_pgss.csv (cumulative totals over full run)
# pg-flight-recorder HTML reports contain the time-series breakdown per snapshot interval
pgss_data = {}
for sys in SYSTEMS:
p = Path(f"results/{sys}_pgss.csv")
if not p.exists():
continue
df = pd.read_csv(p)
if df.empty:
continue
df["query_short"] = df["query"].str.strip().str[:60]
pgss_data[sys] = df.nlargest(10, "calls")[["query_short","calls","mean_exec_time"]]
if pgss_data:
n_sys = len(pgss_data)
fig, axes = plt.subplots(1, n_sys, figsize=(5 * n_sys, 6), sharey=False)
if n_sys == 1:
axes = [axes]
for ax, (sys, df) in zip(axes, pgss_data.items()):
ax.barh(df["query_short"], df["calls"], color=COLORS.get(sys, "#888"))
ax.set_title(sys)
ax.set_xlabel("total calls")
ax.xaxis.set_major_formatter(mticker.FuncFormatter(lambda v,_: f"{v:,.0f}"))
ax.invert_yaxis()
ax.tick_params(axis='y', labelsize=7)
fig.suptitle("pg_stat_statements — top 10 queries by call count (full run)", y=1.02)
plt.tight_layout()
plt.savefig(OUT / "5_pgss_top_queries.png", dpi=150, bbox_inches="tight")
plt.close()
print("Saved: 5_pgss_top_queries.png")
print(f"\nAll charts written to ./{OUT}/")
# Note: pg-flight-recorder HTML reports (results/<sys>_pgfr.html) contain
# pgss time-series deltas per snapshot interval — open them for query-level
# breakdown over time.
</details>
---
## Step 9 — Post results to this issue
Save as **`post_results.sh`** (run locally after charts are generated):
<details>
```bash
#!/usr/bin/env bash
set -euo pipefail
ISSUE_IID=77
PROJECT="postgres-ai%2Fpostgresql-consulting%2Ftests-and-benchmarks"
REPO="postgres-ai/postgresql-consulting/tests-and-benchmarks"
# ── Upload chart images and build markdown body ───────────────────────────────
BODY="## Benchmark results (3h run)\n\n"
for IMG in charts/1_throughput_bar.png charts/2_throughput_timeline.png \
charts/3_wait_events.png charts/4_summary_table.png \
charts/5_pgss_top_queries.png; do
[ -f "$IMG" ] || { echo "Missing: $IMG"; continue; }
UPLOAD=$(glab api --method POST \
"projects/${PROJECT}/uploads" \
--field "file=@${IMG}")
MD=$(echo "$UPLOAD" | python3 -c "import sys,json; print(json.load(sys.stdin)['markdown'])")
TITLE=$(basename "$IMG" .png | sed 's/_/ /g')
BODY+="### ${TITLE}\n${MD}\n\n"
done
# ── Upload pg-flight-recorder HTML reports ────────────────────────────────────
BODY+="### pg-flight-recorder reports\n\n"
for SYS in pgque pgq pgmq river que pgboss; do
HTML="results/${SYS}_pgfr.html"
[ -f "$HTML" ] || continue
UPLOAD=$(glab api --method POST \
"projects/${PROJECT}/uploads" \
--field "file=@${HTML}")
URL=$(echo "$UPLOAD" | python3 -c "import sys,json; print(json.load(sys.stdin)['full_path'])")
BODY+="- **${SYS}**: https://gitlab.com${URL}\n"
done
# ── Post as comment ───────────────────────────────────────────────────────────
glab issue comment $ISSUE_IID \
--repo "$REPO" \
--body "$(echo -e "$BODY")"
echo "Results posted to issue #${ISSUE_IID}"Step 10 — Teardown
Run only after confirming all artifacts are collected locally.
for SYS in pgque pgq pgmq river que pgboss; do
hcloud server delete nik-pgque-$SYS && echo "Deleted nik-pgque-$SYS"
done
hcloud firewall delete pgque-bench-fwChecklist
Provisioning
- All 6 VMs created, SSH accessible, firewall applied
- PostgreSQL 18 (same minor version) running on all VMs
-
pg_ashsampling —SELECT count(*) FROM ash.sample;> 0 after 30s -
pg-flight-recorder—SELECT pgfr_record.snapshot();returns a timestamp -
synchronous_commit = offconfirmed inSHOW synchronous_commit;
Per-system install
- pgque schema loaded,
bench_queuecreated - pgq extension +
bench_queuecreated, ticker running - pgmq extension +
bench_queuecreated - river schema migrated
- que
que_jobstable created - pg-boss schema created
Smoke test (all 6 must pass before proceeding)
- nik-pgque-pgque
- nik-pgque-pgq
- nik-pgque-pgmq
- nik-pgque-river
- nik-pgque-que
- nik-pgque-pgboss
Benchmark run
- Queues pre-filled before consumer phase
- All 6 producer phases started simultaneously
- Consumer phases started simultaneously (native workers for River/Que/pg-boss)
- Mixed phases started simultaneously
- No OOM / disk-full conditions observed during run
Artifact collection
-
/data/*.csvfrom all VMs -
pg_ashdump from all VMs →results/<sys>_ash.csv -
pg-flight-recorderHTML from all VMs →results/<sys>_pgfr.html -
pg_stat_wal+pg_stat_bgwriterJSON from all VMs -
charts/1_throughput_bar.png -
charts/2_throughput_timeline.png -
charts/3_wait_events.png -
charts/4_summary_table.png -
charts/5_pgss_top_queries.png - All charts + reports posted as comment on this issue
- VMs deleted
Methodology notes (expert review findings)
PgQ C vs PL/pgSQL
PgQ 3.5.1 ships a C extension (pgq_insert_event_raw) that caches INSERT plans per-backend. PgQue is intentionally pure PL/pgSQL. To make the comparison fair, PgQ is run in PL-only mode via sql/switch_plonly.sql, which replaces the C insert_event_raw() function with the PL/pgSQL equivalent. Verify post-install with:
SELECT proname, l.lanname FROM pg_proc p JOIN pg_language l ON p.prolang = l.oid
WHERE proname = 'insert_event_raw' AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'pgq');
-- Expected: lanname = plpgsql (NOT c)Autovacuum asymmetry (architectural difference, not a setup error)
PgQ and PgQue use TRUNCATE rotation on event tables, with autovacuum explicitly disabled by pgq.tune_storage() at queue creation time. No dead tuples accumulate on event tables.
PGMQ, River, Que, and pg-boss use a single table with per-message DELETE. Dead tuples accumulate and autovacuum runs continuously during consumer phases. This is a genuine architectural difference — it is part of what the benchmark measures — but must be understood when comparing consumer TPS.
Collected artifacts include n_dead_tup and autovacuum stats to quantify this effect.
wal_level = minimal and PgQ TRUNCATE
Under wal_level = minimal, TRUNCATE is near-zero WAL cost. In production (wal_level = replica), TRUNCATE generates more WAL. This setting slightly understates the WAL advantage of PgQ's TRUNCATE rotation; results should be read with this in mind.
Consumer measurement for River / Que / pg-boss
These three systems use native worker processes for consumption. Their consumer and mixed-phase TPS is measured indirectly via pg_ash CPU activity of worker connections and by row-count delta on job tables. The comparison in consumer-only and mixed phases is therefore not fully apples-to-apples with PgQ/PgQue/PGMQ, which are tested with direct SQL consumers.
Notes on worker-based systems
River, Que, and pg-boss require a native worker process for the consumer phase. The bench.py producer loop (direct INSERT) measures enqueue throughput. Consumer throughput for these systems is measured indirectly via pg_ash (CPU activity of worker connections) and by monitoring the job table row count over time.
Next
If 3h run is clean → full 24h run with the same setup (same VMs or fresh provision).
References
- Previous benchmark: work item #76 (PgQ vs PgQue, Apple Silicon, 10 min)
- Comparison matrix: https://github.com/NikolayS/pgque#readme