Skip to content

Broken Connection after context cancellation

When cancelling a running QueryRowContext the driver returns interrupted (9)

Same does NOT happen when using mattn-github.com/mattn/go-sqlite3` package.

I assume somewhere in driver handling is a race condition that doesn't properly "clean up" an interrupted connection when returning to the pool.

EDIT: While experimenting, it seems that a work-around for the issue with interrupted connections MIGHT be returning false from (*sqlite.conn).IsValid()

func (c *conn) IsValid() bool {
	// return c.db != 0 // original code
	return c.db != 0 && sqlite3.Xsqlite3_is_interrupted(c.tls, c.db) == 0
}

EDIT2: Upon reading Sqlite docs, it seems that some pending statement might not be finalized correctly when cancellation occurs

The sqlite3_interrupt(D) call is in effect until all currently running SQL statements on database connection D complete. Any new SQL statements that are started after the sqlite3_interrupt() call and before the running statement count reaches zero are interrupted as if they had been running prior to the sqlite3_interrupt() call. New SQL statements that are started after the running statement count reaches zero are not effected by the sqlite3_interrupt(). A call to sqlite3_interrupt(D) that occurs when there are no running SQL statements is a no-op and has no effect on SQL statements that are started after the sqlite3_interrupt() call returns.

Original Comments

func (s *stmt) query(ctx context.Context, args []driver.NamedValue) (r driver.Rows, err error) {
// ...
		if pstmt, err = s.c.prepareV2(&psql); err != nil {
			return nil, err // This is interrupted (9)
			// returning ErrBadConn here "drops" the dead connection
			// allowing to retry with a clean one, but still drops a request.
			// return nil, driver.ErrBadConn
		}

Once a connection is in "interrupted" state, it's no longer able to handle any new calls.

affected version: v1.34.1 tested OS: windows 11 amd64

Repro:

  1. Run a bunch of requests more than can be handled concurrently e.g. by calling http://127.0.0.1:8082/items/2 repeatedly after initially seeding via http://127.0.0.1:8082/seed
  2. then cancel requests that time-out
  3. At some point a running sqlite operation will be "interrupted" and leave the connection in a broken state.
  4. Any further call that fetches the same pooled connection will be dropped as it is "interrupted"

Repro example (Server):

package main

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"net/http"
	"os"
	"os/signal"
	"path/filepath"
	"runtime"
	"time"

	_ "github.com/mattn/go-sqlite3"
	_ "modernc.org/sqlite"
)

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
	defer cancel()

	absDb, err := filepath.Abs("simple-web.db")
	if err != nil {
		panic(err)
	}
	dbSlash := filepath.ToSlash(absDb)
	connStr := "file:///" + dbSlash + "?_pragma=foreign_keys(1)&_pragma=journal_mode(WAL)&_pragma=synchronous(NORMAL)&_pragma=busy_timeout(10000)"
	// connStr = "file:///" + dbSlash + "?_foreign_keys=1&_journal_mode=WAL&_synchronous=NORMAL&_busy_timeout=10000&_mutex=no"
	fmt.Printf("connecting to %s\n", connStr)
	db, err := sql.Open("sqlite", connStr)
	if err != nil {
		panic(err)
	}
	db.SetMaxOpenConns(2 * runtime.NumCPU())
	db.SetMaxIdleConns(2 * runtime.NumCPU())
	db.SetConnMaxLifetime(5 * time.Minute)
	db.SetConnMaxIdleTime(1 * time.Minute)
	defer db.Close()

	if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS data (
	id INTEGER PRIMARY KEY AUTOINCREMENT,
	name TEXT NOT NULL DEFAULT ''
	)`); err != nil {
		panic(err)
	}

	http.HandleFunc("/seed", func(w http.ResponseWriter, r *http.Request) {
		db.Exec(`INSERT INTO data (Name) VALUES ('A'),('B'),('C'),('D')`)
	})

	http.HandleFunc("/items/{id}", func(w http.ResponseWriter, r *http.Request) {
		id := r.PathValue("id")
		type entry struct {
			Id   int64  `sql:"id"`
			Name string `sql:"name"`
		}

		scanEntry := func(ctx context.Context, id string) (entry, error) {
			// this might return ERROR_BUSY even though we have a busy_timeout set and
			// per interface, db.Conn(..) should BLOCK until a connection is obtained
			dbConn, err := db.Conn(ctx)
			if err != nil {
				return entry{}, fmt.Errorf("obtaining db conn: %w", err)
			}
			defer dbConn.Close()

			// modernc sqlite breaks connections when context gets cancelled
			// ctx = context.Background()
			row := dbConn.QueryRowContext(ctx, "SELECT Id,Name FROM data WHERE id = ?", id)
			if err := row.Err(); err != nil {
				return entry{}, fmt.Errorf("retrieving row: %w", err)
			}

			e := entry{}
			if err := row.Scan(&e.Id, &e.Name); err != nil {
				return entry{}, fmt.Errorf("scanning entry: %w", err)
			}
			return e, nil
		}

		const HttpClientClosedRequest = 499

		e, err := scanEntry(r.Context(), id)
		if err != nil {
			if errors.Is(err, context.Canceled) {
				w.WriteHeader(HttpClientClosedRequest)
				return
			}
			w.WriteHeader(http.StatusNotFound)
			fmt.Fprintf(os.Stderr, "%v - failed to retrieve row %v\n", time.Now().Format(time.RFC3339), err)
			return
		}

		w.Header().Set("Content-Type", "text/plain")
		fmt.Fprintf(w, "%#v", e)
	})

	server := http.Server{
		Addr:    ":8082",
		Handler: http.DefaultServeMux,
	}
	go func() {
		server.ListenAndServe()
	}()
	<-ctx.Done()
	server.Shutdown(context.Background())

	db.Close()
}

Heres some sample Code in C# that runs for ~1000ms before cancelling any pending requests

async Task Main()
{
    var cts = CancellationTokenSource.CreateLinkedTokenSource(QueryCancelToken);
    var token = cts.Token;
    
    var numRequests = Environment.ProcessorCount;
    numRequests *= 20;

    cts.CancelAfter(1000);
    var tasks = Enumerable.Range(0, numRequests).Select(j => Task.Run(async () =>
    {
        var avg = j;
        using var hc = new HttpClient();
        try
        {
            while (!token.IsCancellationRequested)
            {
                using var after500ms = CancellationTokenSource.CreateLinkedTokenSource(token);
                after500ms.CancelAfter(500);

                try
                {
                    using var resp = await hc.GetAsync("http://127.0.0.1:8082/items/2", after500ms.Token);
                    resp.EnsureSuccessStatusCode();
                }
                catch
                {
                }
            }
        }
        catch (OperationCanceledException ocex) when (ocex.CancellationToken == token)
        {
        }
        catch
        {

        }
    }));

    await Task.WhenAll(tasks);
}
Edited by John Heckendorf