Loading v2/postgres/README.md 0 → 100644 +145 −0 Original line number Diff line number Diff line # postgres Package `postgres` provides an instrumented PostgreSQL client that implements `app.Component`, plugging into the `app.App` lifecycle for managed startup and graceful shutdown. ## Quick start ```go db, err := postgres.NewWithConfig(&postgres.Config{ DSN: os.Getenv("DATABASE_URL"), Tracer: a.Tracer(), }) if err != nil { log.Fatal(err) // DSN parse errors are caught here, before Start } a.Register(db) // Start creates the pool and pings; Shutdown closes it // After a.Start(ctx): rows, err := db.DB().QueryContext(ctx, "SELECT id FROM users WHERE active = $1", true) ``` ## Two access paths The client exposes two ways to interact with the database after `Start`: ### `DB()` — standard `*sql.DB` ```go db.DB().QueryContext(ctx, "SELECT id, name FROM projects WHERE active = $1", true) db.DB().ExecContext(ctx, "UPDATE users SET active = false WHERE id = $1", id) ``` Compatible with ORMs (sqlx, GORM, Bun), query builders, and migration frameworks (golang-migrate, goose). ### `Pool()` — pgx-native `*pgxpool.Pool` ```go // Bulk load via COPY conn, err := db.Pool().Acquire(ctx) defer conn.Release() _, err = conn.Conn().PgConn().CopyFrom(ctx, pgconn.CopyFromReader(r, sql)) // Batch queries — send many, read all results at once batch := &pgx.Batch{} batch.Queue("INSERT INTO events (name) VALUES ($1)", "signup") batch.Queue("INSERT INTO events (name) VALUES ($1)", "purchase") br := db.Pool().SendBatch(ctx, batch) defer br.Close() // LISTEN / NOTIFY for pub-sub patterns conn, _ := db.Pool().Acquire(ctx) defer conn.Release() _, err = conn.Exec(ctx, "LISTEN channel_name") ``` Both paths share the same connection pool and query tracer — OpenTelemetry spans are recorded regardless of which path is used. ## Capability comparison | Capability | `DB()` (`*sql.DB`) | `Pool()` (`*pgxpool.Pool`) | |---|---|---| | Standard queries, transactions | ✓ | ✓ | | ORM / migration tool compatibility | ✓ | ✗ | | `COPY FROM` / `COPY TO` bulk load | ✗ | ✓ | | Batch queries | ✗ | ✓ | | Named prepared statements | ✗ | ✓ | | `LISTEN` / `NOTIFY` | ✗ | ✓ | | `pgconn` low-level protocol access | ✗ | ✓ | **Use `DB()`** for most CRUD workloads. **Use `Pool()`** for bulk inserts, high-throughput pipelines, or event-driven patterns. ## Configuration ```go db, err := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://user:pass@localhost:5432/mydb", Name: "primary", // component name in logs (default: "postgres") // Connection pool tuning: MaxConns: 20, // max open connections (0 = pgxpool default) MinConns: 2, // min connections to keep warm (0 = none) ConnMaxLifetime: 5 * time.Minute, // recycle connections after this duration ConnMaxIdleTime: 60 * time.Second, // close idle connections after this duration Tracer: a.Tracer(), // nil disables query tracing }) ``` ### PgBouncer When running behind PgBouncer in transaction-pooling mode, set `ConnMaxLifetime` and `ConnMaxIdleTime` to ensure stale connections are recycled promptly: ```go db, err := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://user:pass@pgbouncer:6432/mydb", ConnMaxLifetime: 5 * time.Minute, ConnMaxIdleTime: 60 * time.Second, }) ``` ## Query tracing When `Tracer` is set, every query automatically receives an OpenTelemetry span: - **Span name:** SQL verb — `"db SELECT"`, `"db INSERT"`, etc. (low cardinality) - **Attributes:** `db.system = "postgresql"`, `db.statement = <full SQL>`, `db.name = <database name>` - **Errors:** recorded on the span automatically ## Multiple databases Give each client a distinct `Name` so log lines and error messages are unambiguous: ```go primary, _ := postgres.NewWithConfig(&postgres.Config{Name: "primary", DSN: primaryDSN}) replica, _ := postgres.NewWithConfig(&postgres.Config{Name: "replica", DSN: replicaDSN}) a.Register(primary) a.Register(replica) ``` ## Testing Tests that don't need a real database can inspect the configured tracer via `QueryTracer()` without calling `Start`: ```go db, _ := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://localhost/mydb", Tracer: tracer, }) assert.NotNil(t, db.QueryTracer()) assert.Nil(t, db.DB()) // nil before Start assert.Nil(t, db.Pool()) // nil before Start ``` For integration tests, use a real PostgreSQL instance and call `Start` / `Shutdown` directly. v2/postgres/client.go +72 −33 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ import ( "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/stdlib" "gitlab.com/gitlab-org/labkit/v2/app" "gitlab.com/gitlab-org/labkit/v2/trace" Loading @@ -26,23 +27,33 @@ type Config struct { // Use distinct names when a service connects to multiple databases. Name string // MaxConns is the maximum number of open connections to the database. // Maps to [sql.DB.SetMaxOpenConns]. Zero means unlimited. // MaxConns is the maximum number of connections in the pool. // Maps to pgxpool.Config.MaxConns. Zero uses the pgxpool default // (max(4, runtime.NumCPU())). MaxConns int // MaxIdleConns is the maximum number of idle connections in the pool. // Maps to [sql.DB.SetMaxIdleConns]. Zero uses the default (2). // MinConns is the minimum number of connections to keep open in the pool, // even when idle. Maps to pgxpool.Config.MinConns. Zero means no minimum. // Use this to pre-warm connections at startup and avoid cold-start latency // on the first requests. MinConns int // MaxIdleConns is not used by the pgxpool backend and has no effect. // To keep a minimum number of connections warm, use [Config.MinConns]. // To close connections that have been idle too long, use [Config.ConnMaxIdleTime]. // // Deprecated: retained for source compatibility; has no effect. MaxIdleConns int // ConnMaxLifetime is the maximum duration a connection may be reused. // Maps to [sql.DB.SetConnMaxLifetime]. Zero means connections are not // closed due to age. Set this when using PgBouncer or similar connection // poolers to ensure stale connections are recycled. // Maps to pgxpool.Config.MaxConnLifetime. Zero means no age limit. // Set this when using PgBouncer or similar connection poolers to ensure // stale connections are recycled. ConnMaxLifetime time.Duration // ConnMaxIdleTime is the maximum duration a connection may sit idle // before being closed. Maps to [sql.DB.SetConnMaxIdleTime]. Zero means // connections are not closed due to idle time. // before being closed. Maps to pgxpool.Config.MaxConnIdleTime. // Zero means connections are not closed due to idle time. ConnMaxIdleTime time.Duration // Tracer is used to create spans for queries. When nil, tracing is Loading @@ -50,15 +61,21 @@ type Config struct { Tracer *trace.Tracer } // Client is an instrumented PostgreSQL client backed by [database/sql] that // implements [app.Component]. Call [Start] to open the connection and // [Shutdown] to close it. // Client is an instrumented PostgreSQL client backed by a [pgxpool.Pool] that // implements [app.Component]. Call [Start] to open the pool and [Shutdown] to // close it. // // [Client.DB] returns a standard [*sql.DB] derived from the pool, compatible // with ORMs, query builders, and migration frameworks. [Client.Pool] returns // the underlying [*pgxpool.Pool] for pgx-native operations such as COPY, // batch queries, and LISTEN/NOTIFY. type Client struct { name string connCfg *pgx.ConnConfig db *sql.DB pool *pgxpool.Pool maxConns int maxIdleConns int minConns int connMaxLifetime time.Duration connMaxIdleTime time.Duration } Loading Loading @@ -94,7 +111,7 @@ func NewWithConfig(cfg *Config) (*Client, error) { name: name, connCfg: connCfg, maxConns: cfg.MaxConns, maxIdleConns: cfg.MaxIdleConns, minConns: cfg.MinConns, connMaxLifetime: cfg.ConnMaxLifetime, connMaxIdleTime: cfg.ConnMaxIdleTime, }, nil Loading @@ -105,12 +122,21 @@ func (c *Client) Name() string { return c.name } // DB returns the underlying [*sql.DB]. It returns nil before [Start] has been // called successfully. // DB returns the standard [*sql.DB] derived from the pool. Compatible with // ORMs, query builders, and migration frameworks. Returns nil before [Start] // has been called successfully. func (c *Client) DB() *sql.DB { return c.db } // Pool returns the underlying [*pgxpool.Pool] for pgx-native operations. // Use this for bulk loads ([COPY FROM / COPY TO]), batch queries, named // prepared statements, and LISTEN/NOTIFY. Returns nil before [Start] has been // called successfully. func (c *Client) Pool() *pgxpool.Pool { return c.pool } // QueryTracer returns the [pgx.QueryTracer] configured on this client, or nil // when no [Config.Tracer] was provided. This is primarily useful for testing // the tracing integration without a live database. Loading @@ -118,40 +144,53 @@ func (c *Client) QueryTracer() pgx.QueryTracer { return c.connCfg.Tracer } // Start opens the database connection pool and verifies connectivity with a // ping. It satisfies [app.Component] and should be called via [app.App.Start]. // Start creates the connection pool and verifies connectivity with a ping. // It satisfies [app.Component] and should be called via [app.App.Start]. func (c *Client) Start(ctx context.Context) error { db := stdlib.OpenDB(*c.connCfg) poolCfg := &pgxpool.Config{ ConnConfig: c.connCfg, } if c.maxConns > 0 { db.SetMaxOpenConns(c.maxConns) poolCfg.MaxConns = int32(c.maxConns) //nolint:gosec } if c.maxIdleConns > 0 { db.SetMaxIdleConns(c.maxIdleConns) if c.minConns > 0 { poolCfg.MinConns = int32(c.minConns) //nolint:gosec } if c.connMaxLifetime > 0 { db.SetConnMaxLifetime(c.connMaxLifetime) poolCfg.MaxConnLifetime = c.connMaxLifetime } if c.connMaxIdleTime > 0 { db.SetConnMaxIdleTime(c.connMaxIdleTime) poolCfg.MaxConnIdleTime = c.connMaxIdleTime } if err := db.PingContext(ctx); err != nil { db.Close() pool, err := pgxpool.NewWithConfig(ctx, poolCfg) if err != nil { return fmt.Errorf("%s: creating pool: %w", c.name, err) } if err := pool.Ping(ctx); err != nil { pool.Close() return fmt.Errorf("%s: ping failed: %w", c.name, err) } c.db = db c.pool = pool c.db = stdlib.OpenDBFromPool(pool) return nil } // Shutdown closes the database and releases all connections. It satisfies // [app.Component] and should be called via [app.App.Shutdown]. // Shutdown closes the sql.DB and the underlying pool, releasing all // connections. It satisfies [app.Component] and should be called via // [app.App.Shutdown]. func (c *Client) Shutdown(_ context.Context) error { var dbErr error if c.db != nil { if err := c.db.Close(); err != nil { return fmt.Errorf("%s: %w", c.name, err) dbErr = c.db.Close() } if c.pool != nil { c.pool.Close() } if dbErr != nil { return fmt.Errorf("%s: %w", c.name, dbErr) } return nil } v2/postgres/client_test.go +6 −0 Original line number Diff line number Diff line Loading @@ -151,3 +151,9 @@ func TestQueryTracer_RecordsError(t *testing.T) { assert.True(t, spans[0].HasError) assert.Equal(t, queryErr.Error(), spans[0].StatusMsg) } func TestPool_NilBeforeStart(t *testing.T) { client, err := postgres.New("postgres://user:pass@localhost:5432/mydb") require.NoError(t, err) assert.Nil(t, client.Pool()) } v2/postgres/doc.go +31 −14 Original line number Diff line number Diff line /* Package postgres provides an instrumented PostgreSQL client backed by [database/sql] (using [github.com/jackc/pgx/v5/stdlib] as the driver) that implements [app.Component]. Package postgres provides an instrumented PostgreSQL client backed by a [pgxpool.Pool] that implements [app.Component]. Exposing [*sql.DB] enables compatibility with standard Go database tooling (ORMs, query builders, migration frameworks) and any code that targets the database/sql interface. Two access paths are exposed after [Client.Start]: When a [trace.Tracer] is provided, every query automatically receives an OpenTelemetry span. Spans are named after the SQL verb (e.g. "db SELECT", "db INSERT") and carry the full statement as the db.statement attribute. Errors are recorded on the span. - [Client.DB] returns a standard [*sql.DB] derived from the pool. Use this for standard queries and full compatibility with ORMs (sqlx, GORM, Bun), query builders, and migration frameworks (golang-migrate, goose). - [Client.Pool] returns the underlying [*pgxpool.Pool] for pgx-native operations: COPY FROM/TO for bulk loads, batch queries, named prepared statements, and LISTEN/NOTIFY for pub-sub patterns. Both paths share the same connection pool and query tracer, so OpenTelemetry spans are recorded regardless of which path is used. # Basic usage Loading @@ -22,8 +25,14 @@ Errors are recorded on the span. if err := a.Start(ctx); err != nil { log.Fatal(err) } defer a.Shutdown(ctx) // Standard database/sql path — compatible with ORMs and migrations. rows, err := client.DB().QueryContext(ctx, "SELECT id FROM users WHERE active = $1", true) // pgx-native path — for COPY, batching, LISTEN/NOTIFY. batch := &pgx.Batch{} batch.Queue("INSERT INTO events (name) VALUES ($1)", "signup") results := client.Pool().SendBatch(ctx, batch) # With tracing client, err := postgres.NewWithConfig(&postgres.Config{ Loading @@ -31,11 +40,19 @@ Errors are recorded on the span. Tracer: a.Tracer(), }) # Accessing the database Every query automatically receives an OpenTelemetry span named after the SQL verb (e.g. "db SELECT", "db INSERT"). The full statement is recorded as the db.statement attribute and errors are recorded on the span. [Client.DB] returns the underlying [*sql.DB] for standard database/sql access: # PgBouncer db := client.DB() row := db.QueryRowContext(ctx, "SELECT count(*) FROM events") When running behind PgBouncer in transaction-pooling mode, set ConnMaxLifetime and ConnMaxIdleTime to recycle stale connections: client, err := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://user:pass@pgbouncer:6432/mydb", ConnMaxLifetime: 5 * time.Minute, ConnMaxIdleTime: 60 * time.Second, }) */ package postgres v2/postgres/example_test.go 0 → 100644 +59 −0 Original line number Diff line number Diff line package postgres_test import ( "context" "time" "gitlab.com/gitlab-org/labkit/v2/postgres" ) // Example shows an instrumented PostgreSQL client registered as an app // component. Start creates the pool and pings the server; Shutdown closes // all connections cleanly. func Example() { ctx := context.Background() db, err := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://user:pass@localhost:5432/mydb", // Tracer: a.Tracer(), }) if err != nil { // DSN parse errors are caught here before Start is called. panic(err) } // Register with app.App so the lifecycle is managed automatically: // a.Register(db) // a.Run(ctx) // // Or manage manually: if err := db.Start(ctx); err != nil { panic(err) } defer db.Shutdown(ctx) // DB() returns *sql.DB — compatible with ORMs, query builders, migrations. row := db.DB().QueryRowContext(ctx, "SELECT version()") var version string _ = row.Scan(&version) // Pool() returns *pgxpool.Pool — for COPY, batching, LISTEN/NOTIFY. _ = db.Pool() } // ExampleNewWithConfig shows connection pool tuning for services running // behind PgBouncer, where long-lived connections should be recycled. func ExampleNewWithConfig() { db, err := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://user:pass@pgbouncer:6432/mydb", Name: "primary", MaxConns: 20, MinConns: 2, // keep 2 connections warm ConnMaxLifetime: 5 * time.Minute, ConnMaxIdleTime: 60 * time.Second, }) if err != nil { panic(err) } _ = db } Loading
v2/postgres/README.md 0 → 100644 +145 −0 Original line number Diff line number Diff line # postgres Package `postgres` provides an instrumented PostgreSQL client that implements `app.Component`, plugging into the `app.App` lifecycle for managed startup and graceful shutdown. ## Quick start ```go db, err := postgres.NewWithConfig(&postgres.Config{ DSN: os.Getenv("DATABASE_URL"), Tracer: a.Tracer(), }) if err != nil { log.Fatal(err) // DSN parse errors are caught here, before Start } a.Register(db) // Start creates the pool and pings; Shutdown closes it // After a.Start(ctx): rows, err := db.DB().QueryContext(ctx, "SELECT id FROM users WHERE active = $1", true) ``` ## Two access paths The client exposes two ways to interact with the database after `Start`: ### `DB()` — standard `*sql.DB` ```go db.DB().QueryContext(ctx, "SELECT id, name FROM projects WHERE active = $1", true) db.DB().ExecContext(ctx, "UPDATE users SET active = false WHERE id = $1", id) ``` Compatible with ORMs (sqlx, GORM, Bun), query builders, and migration frameworks (golang-migrate, goose). ### `Pool()` — pgx-native `*pgxpool.Pool` ```go // Bulk load via COPY conn, err := db.Pool().Acquire(ctx) defer conn.Release() _, err = conn.Conn().PgConn().CopyFrom(ctx, pgconn.CopyFromReader(r, sql)) // Batch queries — send many, read all results at once batch := &pgx.Batch{} batch.Queue("INSERT INTO events (name) VALUES ($1)", "signup") batch.Queue("INSERT INTO events (name) VALUES ($1)", "purchase") br := db.Pool().SendBatch(ctx, batch) defer br.Close() // LISTEN / NOTIFY for pub-sub patterns conn, _ := db.Pool().Acquire(ctx) defer conn.Release() _, err = conn.Exec(ctx, "LISTEN channel_name") ``` Both paths share the same connection pool and query tracer — OpenTelemetry spans are recorded regardless of which path is used. ## Capability comparison | Capability | `DB()` (`*sql.DB`) | `Pool()` (`*pgxpool.Pool`) | |---|---|---| | Standard queries, transactions | ✓ | ✓ | | ORM / migration tool compatibility | ✓ | ✗ | | `COPY FROM` / `COPY TO` bulk load | ✗ | ✓ | | Batch queries | ✗ | ✓ | | Named prepared statements | ✗ | ✓ | | `LISTEN` / `NOTIFY` | ✗ | ✓ | | `pgconn` low-level protocol access | ✗ | ✓ | **Use `DB()`** for most CRUD workloads. **Use `Pool()`** for bulk inserts, high-throughput pipelines, or event-driven patterns. ## Configuration ```go db, err := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://user:pass@localhost:5432/mydb", Name: "primary", // component name in logs (default: "postgres") // Connection pool tuning: MaxConns: 20, // max open connections (0 = pgxpool default) MinConns: 2, // min connections to keep warm (0 = none) ConnMaxLifetime: 5 * time.Minute, // recycle connections after this duration ConnMaxIdleTime: 60 * time.Second, // close idle connections after this duration Tracer: a.Tracer(), // nil disables query tracing }) ``` ### PgBouncer When running behind PgBouncer in transaction-pooling mode, set `ConnMaxLifetime` and `ConnMaxIdleTime` to ensure stale connections are recycled promptly: ```go db, err := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://user:pass@pgbouncer:6432/mydb", ConnMaxLifetime: 5 * time.Minute, ConnMaxIdleTime: 60 * time.Second, }) ``` ## Query tracing When `Tracer` is set, every query automatically receives an OpenTelemetry span: - **Span name:** SQL verb — `"db SELECT"`, `"db INSERT"`, etc. (low cardinality) - **Attributes:** `db.system = "postgresql"`, `db.statement = <full SQL>`, `db.name = <database name>` - **Errors:** recorded on the span automatically ## Multiple databases Give each client a distinct `Name` so log lines and error messages are unambiguous: ```go primary, _ := postgres.NewWithConfig(&postgres.Config{Name: "primary", DSN: primaryDSN}) replica, _ := postgres.NewWithConfig(&postgres.Config{Name: "replica", DSN: replicaDSN}) a.Register(primary) a.Register(replica) ``` ## Testing Tests that don't need a real database can inspect the configured tracer via `QueryTracer()` without calling `Start`: ```go db, _ := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://localhost/mydb", Tracer: tracer, }) assert.NotNil(t, db.QueryTracer()) assert.Nil(t, db.DB()) // nil before Start assert.Nil(t, db.Pool()) // nil before Start ``` For integration tests, use a real PostgreSQL instance and call `Start` / `Shutdown` directly.
v2/postgres/client.go +72 −33 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ import ( "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/stdlib" "gitlab.com/gitlab-org/labkit/v2/app" "gitlab.com/gitlab-org/labkit/v2/trace" Loading @@ -26,23 +27,33 @@ type Config struct { // Use distinct names when a service connects to multiple databases. Name string // MaxConns is the maximum number of open connections to the database. // Maps to [sql.DB.SetMaxOpenConns]. Zero means unlimited. // MaxConns is the maximum number of connections in the pool. // Maps to pgxpool.Config.MaxConns. Zero uses the pgxpool default // (max(4, runtime.NumCPU())). MaxConns int // MaxIdleConns is the maximum number of idle connections in the pool. // Maps to [sql.DB.SetMaxIdleConns]. Zero uses the default (2). // MinConns is the minimum number of connections to keep open in the pool, // even when idle. Maps to pgxpool.Config.MinConns. Zero means no minimum. // Use this to pre-warm connections at startup and avoid cold-start latency // on the first requests. MinConns int // MaxIdleConns is not used by the pgxpool backend and has no effect. // To keep a minimum number of connections warm, use [Config.MinConns]. // To close connections that have been idle too long, use [Config.ConnMaxIdleTime]. // // Deprecated: retained for source compatibility; has no effect. MaxIdleConns int // ConnMaxLifetime is the maximum duration a connection may be reused. // Maps to [sql.DB.SetConnMaxLifetime]. Zero means connections are not // closed due to age. Set this when using PgBouncer or similar connection // poolers to ensure stale connections are recycled. // Maps to pgxpool.Config.MaxConnLifetime. Zero means no age limit. // Set this when using PgBouncer or similar connection poolers to ensure // stale connections are recycled. ConnMaxLifetime time.Duration // ConnMaxIdleTime is the maximum duration a connection may sit idle // before being closed. Maps to [sql.DB.SetConnMaxIdleTime]. Zero means // connections are not closed due to idle time. // before being closed. Maps to pgxpool.Config.MaxConnIdleTime. // Zero means connections are not closed due to idle time. ConnMaxIdleTime time.Duration // Tracer is used to create spans for queries. When nil, tracing is Loading @@ -50,15 +61,21 @@ type Config struct { Tracer *trace.Tracer } // Client is an instrumented PostgreSQL client backed by [database/sql] that // implements [app.Component]. Call [Start] to open the connection and // [Shutdown] to close it. // Client is an instrumented PostgreSQL client backed by a [pgxpool.Pool] that // implements [app.Component]. Call [Start] to open the pool and [Shutdown] to // close it. // // [Client.DB] returns a standard [*sql.DB] derived from the pool, compatible // with ORMs, query builders, and migration frameworks. [Client.Pool] returns // the underlying [*pgxpool.Pool] for pgx-native operations such as COPY, // batch queries, and LISTEN/NOTIFY. type Client struct { name string connCfg *pgx.ConnConfig db *sql.DB pool *pgxpool.Pool maxConns int maxIdleConns int minConns int connMaxLifetime time.Duration connMaxIdleTime time.Duration } Loading Loading @@ -94,7 +111,7 @@ func NewWithConfig(cfg *Config) (*Client, error) { name: name, connCfg: connCfg, maxConns: cfg.MaxConns, maxIdleConns: cfg.MaxIdleConns, minConns: cfg.MinConns, connMaxLifetime: cfg.ConnMaxLifetime, connMaxIdleTime: cfg.ConnMaxIdleTime, }, nil Loading @@ -105,12 +122,21 @@ func (c *Client) Name() string { return c.name } // DB returns the underlying [*sql.DB]. It returns nil before [Start] has been // called successfully. // DB returns the standard [*sql.DB] derived from the pool. Compatible with // ORMs, query builders, and migration frameworks. Returns nil before [Start] // has been called successfully. func (c *Client) DB() *sql.DB { return c.db } // Pool returns the underlying [*pgxpool.Pool] for pgx-native operations. // Use this for bulk loads ([COPY FROM / COPY TO]), batch queries, named // prepared statements, and LISTEN/NOTIFY. Returns nil before [Start] has been // called successfully. func (c *Client) Pool() *pgxpool.Pool { return c.pool } // QueryTracer returns the [pgx.QueryTracer] configured on this client, or nil // when no [Config.Tracer] was provided. This is primarily useful for testing // the tracing integration without a live database. Loading @@ -118,40 +144,53 @@ func (c *Client) QueryTracer() pgx.QueryTracer { return c.connCfg.Tracer } // Start opens the database connection pool and verifies connectivity with a // ping. It satisfies [app.Component] and should be called via [app.App.Start]. // Start creates the connection pool and verifies connectivity with a ping. // It satisfies [app.Component] and should be called via [app.App.Start]. func (c *Client) Start(ctx context.Context) error { db := stdlib.OpenDB(*c.connCfg) poolCfg := &pgxpool.Config{ ConnConfig: c.connCfg, } if c.maxConns > 0 { db.SetMaxOpenConns(c.maxConns) poolCfg.MaxConns = int32(c.maxConns) //nolint:gosec } if c.maxIdleConns > 0 { db.SetMaxIdleConns(c.maxIdleConns) if c.minConns > 0 { poolCfg.MinConns = int32(c.minConns) //nolint:gosec } if c.connMaxLifetime > 0 { db.SetConnMaxLifetime(c.connMaxLifetime) poolCfg.MaxConnLifetime = c.connMaxLifetime } if c.connMaxIdleTime > 0 { db.SetConnMaxIdleTime(c.connMaxIdleTime) poolCfg.MaxConnIdleTime = c.connMaxIdleTime } if err := db.PingContext(ctx); err != nil { db.Close() pool, err := pgxpool.NewWithConfig(ctx, poolCfg) if err != nil { return fmt.Errorf("%s: creating pool: %w", c.name, err) } if err := pool.Ping(ctx); err != nil { pool.Close() return fmt.Errorf("%s: ping failed: %w", c.name, err) } c.db = db c.pool = pool c.db = stdlib.OpenDBFromPool(pool) return nil } // Shutdown closes the database and releases all connections. It satisfies // [app.Component] and should be called via [app.App.Shutdown]. // Shutdown closes the sql.DB and the underlying pool, releasing all // connections. It satisfies [app.Component] and should be called via // [app.App.Shutdown]. func (c *Client) Shutdown(_ context.Context) error { var dbErr error if c.db != nil { if err := c.db.Close(); err != nil { return fmt.Errorf("%s: %w", c.name, err) dbErr = c.db.Close() } if c.pool != nil { c.pool.Close() } if dbErr != nil { return fmt.Errorf("%s: %w", c.name, dbErr) } return nil }
v2/postgres/client_test.go +6 −0 Original line number Diff line number Diff line Loading @@ -151,3 +151,9 @@ func TestQueryTracer_RecordsError(t *testing.T) { assert.True(t, spans[0].HasError) assert.Equal(t, queryErr.Error(), spans[0].StatusMsg) } func TestPool_NilBeforeStart(t *testing.T) { client, err := postgres.New("postgres://user:pass@localhost:5432/mydb") require.NoError(t, err) assert.Nil(t, client.Pool()) }
v2/postgres/doc.go +31 −14 Original line number Diff line number Diff line /* Package postgres provides an instrumented PostgreSQL client backed by [database/sql] (using [github.com/jackc/pgx/v5/stdlib] as the driver) that implements [app.Component]. Package postgres provides an instrumented PostgreSQL client backed by a [pgxpool.Pool] that implements [app.Component]. Exposing [*sql.DB] enables compatibility with standard Go database tooling (ORMs, query builders, migration frameworks) and any code that targets the database/sql interface. Two access paths are exposed after [Client.Start]: When a [trace.Tracer] is provided, every query automatically receives an OpenTelemetry span. Spans are named after the SQL verb (e.g. "db SELECT", "db INSERT") and carry the full statement as the db.statement attribute. Errors are recorded on the span. - [Client.DB] returns a standard [*sql.DB] derived from the pool. Use this for standard queries and full compatibility with ORMs (sqlx, GORM, Bun), query builders, and migration frameworks (golang-migrate, goose). - [Client.Pool] returns the underlying [*pgxpool.Pool] for pgx-native operations: COPY FROM/TO for bulk loads, batch queries, named prepared statements, and LISTEN/NOTIFY for pub-sub patterns. Both paths share the same connection pool and query tracer, so OpenTelemetry spans are recorded regardless of which path is used. # Basic usage Loading @@ -22,8 +25,14 @@ Errors are recorded on the span. if err := a.Start(ctx); err != nil { log.Fatal(err) } defer a.Shutdown(ctx) // Standard database/sql path — compatible with ORMs and migrations. rows, err := client.DB().QueryContext(ctx, "SELECT id FROM users WHERE active = $1", true) // pgx-native path — for COPY, batching, LISTEN/NOTIFY. batch := &pgx.Batch{} batch.Queue("INSERT INTO events (name) VALUES ($1)", "signup") results := client.Pool().SendBatch(ctx, batch) # With tracing client, err := postgres.NewWithConfig(&postgres.Config{ Loading @@ -31,11 +40,19 @@ Errors are recorded on the span. Tracer: a.Tracer(), }) # Accessing the database Every query automatically receives an OpenTelemetry span named after the SQL verb (e.g. "db SELECT", "db INSERT"). The full statement is recorded as the db.statement attribute and errors are recorded on the span. [Client.DB] returns the underlying [*sql.DB] for standard database/sql access: # PgBouncer db := client.DB() row := db.QueryRowContext(ctx, "SELECT count(*) FROM events") When running behind PgBouncer in transaction-pooling mode, set ConnMaxLifetime and ConnMaxIdleTime to recycle stale connections: client, err := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://user:pass@pgbouncer:6432/mydb", ConnMaxLifetime: 5 * time.Minute, ConnMaxIdleTime: 60 * time.Second, }) */ package postgres
v2/postgres/example_test.go 0 → 100644 +59 −0 Original line number Diff line number Diff line package postgres_test import ( "context" "time" "gitlab.com/gitlab-org/labkit/v2/postgres" ) // Example shows an instrumented PostgreSQL client registered as an app // component. Start creates the pool and pings the server; Shutdown closes // all connections cleanly. func Example() { ctx := context.Background() db, err := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://user:pass@localhost:5432/mydb", // Tracer: a.Tracer(), }) if err != nil { // DSN parse errors are caught here before Start is called. panic(err) } // Register with app.App so the lifecycle is managed automatically: // a.Register(db) // a.Run(ctx) // // Or manage manually: if err := db.Start(ctx); err != nil { panic(err) } defer db.Shutdown(ctx) // DB() returns *sql.DB — compatible with ORMs, query builders, migrations. row := db.DB().QueryRowContext(ctx, "SELECT version()") var version string _ = row.Scan(&version) // Pool() returns *pgxpool.Pool — for COPY, batching, LISTEN/NOTIFY. _ = db.Pool() } // ExampleNewWithConfig shows connection pool tuning for services running // behind PgBouncer, where long-lived connections should be recycled. func ExampleNewWithConfig() { db, err := postgres.NewWithConfig(&postgres.Config{ DSN: "postgres://user:pass@pgbouncer:6432/mydb", Name: "primary", MaxConns: 20, MinConns: 2, // keep 2 connections warm ConnMaxLifetime: 5 * time.Minute, ConnMaxIdleTime: 60 * time.Second, }) if err != nil { panic(err) } _ = db }