Verified Commit 1d8784a5 authored by Joakim Olsson's avatar Joakim Olsson
Browse files

fix: remove columns from unique events index

parent 07232a39
Loading
Loading
Loading
Loading
Loading
+20 −3
Original line number Diff line number Diff line
@@ -378,19 +378,36 @@ func createSchema(ctx context.Context, db *sqlx.DB) error {
	if err != nil {
		return err
	}
	row := db.QueryRowxContext(ctx, `SELECT coalesce(max(1), 0) as found FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid WHERE a.attnum = ANY (ix.indkey) AND t.relkind = 'r' AND t.relname = 'events' AND a.attname = 'aggregate_name'`)
	var found int
	row := db.QueryRowxContext(ctx, `SELECT coalesce(max(1), 0) as found FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid WHERE a.attnum = ANY (ix.indkey) AND t.relkind = 'r' AND t.relname = 'events' AND a.attname = 'name'`)
	err = row.Scan(&found)
	if err != nil {
		return err
	}
	if found == 0 {
	if found == 1 {
		// Re-sequence events
		_, err = db.ExecContext(ctx, `with tmp as (
select id as eid,
       row_number() over (partition by aggregate_id, aggregate_name order by tstamp) rn
from events
where (aggregate_id, aggregate_name) in (select aggregate_id, aggregate_name
                                         from events
                                         group by aggregate_id, aggregate_name, sequence_no
                                         having count(*) > 1)
)
update events
set sequence_no = tmp.rn
from tmp
where id = eid`)
		if err != nil {
			return err
		}
		_, err = db.ExecContext(ctx, `drop index if exists event_ix`)
		if err != nil {
			return err
		}
	}
	_, err = db.ExecContext(ctx, `create unique index if not exists event_ix on events (aggregate_id, aggregate_name, sequence_no, id, name, tstamp)`)
	_, err = db.ExecContext(ctx, `create unique index if not exists event_ix on events (aggregate_id, aggregate_name, sequence_no)`)
	if err != nil {
		return err
	}
+49 −8
Original line number Diff line number Diff line
@@ -264,7 +264,40 @@ func TestStore_CreateSchema_ErrorQueryingIndexColumnExistence(t *testing.T) {
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("alter table events alter column aggregate_name set not null").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectQuery("SELECT coalesce(max(1), 0) as found FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid WHERE a.attnum = ANY (ix.indkey) AND t.relkind = 'r' AND t.relname = 'events' AND a.attname = 'aggregate_name'").
	mock.ExpectQuery("SELECT coalesce(max(1), 0) as found FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid WHERE a.attnum = ANY (ix.indkey) AND t.relkind = 'r' AND t.relname = 'events' AND a.attname = 'name'").
		WillReturnError(errors.New("error"))

	_, err = New(context.Background(), db, WithEventTypes(&Area{}))
	assert.EqualError(t, err, "error")
}

func TestStore_CreateSchema_ErrorReSequenceEvents(t *testing.T) {
	db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
	assert.NoError(t, err)

	mock.ExpectExec("create table if not exists events ( id bigserial primary key, name text not null, aggregate_id text not null, sequence_no bigint not null, payload text not null, tstamp timestamptz not null )").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("create unique index if not exists event_ix on events (aggregate_id, sequence_no, id, name, tstamp)").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("create table if not exists snapshots ( id bigserial primary key, name text not null, aggregate_id text not null, sequence_no bigint not null, payload text not null, tstamp timestamptz not null )").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("create unique index if not exists snapshot_ix on snapshots (aggregate_id, sequence_no, id, name, tstamp)").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("create table if not exists aggregates (id text, name text, primary key (id, name))").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("create unique index if not exists aggregate_ix on aggregates (id, name)").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("alter table events add column if not exists aggregate_name text").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("update aggregates set name = TRIM(LEADING '*' FROM name) where name like '*%'").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("with cte as ( select a.name, e.id, e.aggregate_name from aggregates a join events e on a.id = e.aggregate_id ) update events set aggregate_name = cte.name from cte where events.id = cte.id and events.aggregate_name is null").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("alter table events alter column aggregate_name set not null").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectQuery("SELECT coalesce(max(1), 0) as found FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid WHERE a.attnum = ANY (ix.indkey) AND t.relkind = 'r' AND t.relname = 'events' AND a.attname = 'name'").
		WillReturnRows(sqlmock.NewRows([]string{"found"}).AddRow(1))
	mock.ExpectExec("with tmp as (\nselect id as eid,\n       row_number() over (partition by aggregate_id, aggregate_name order by tstamp) rn\nfrom events\nwhere (aggregate_id, aggregate_name) in (select aggregate_id, aggregate_name\n                                         from events\n                                         group by aggregate_id, aggregate_name, sequence_no\n                                         having count(*) > 1)\n)\nupdate events\nset sequence_no = tmp.rn\nfrom tmp\nwhere id = eid").
		WillReturnError(errors.New("error"))

	_, err = New(context.Background(), db, WithEventTypes(&Area{}))
@@ -295,8 +328,10 @@ func TestStore_CreateSchema_ErrorDropEventsIndex(t *testing.T) {
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("alter table events alter column aggregate_name set not null").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectQuery("SELECT coalesce(max(1), 0) as found FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid WHERE a.attnum = ANY (ix.indkey) AND t.relkind = 'r' AND t.relname = 'events' AND a.attname = 'aggregate_name'").
		WillReturnRows(sqlmock.NewRows([]string{"found"}).AddRow(0))
	mock.ExpectQuery("SELECT coalesce(max(1), 0) as found FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid WHERE a.attnum = ANY (ix.indkey) AND t.relkind = 'r' AND t.relname = 'events' AND a.attname = 'name'").
		WillReturnRows(sqlmock.NewRows([]string{"found"}).AddRow(1))
	mock.ExpectExec("with tmp as (\nselect id as eid,\n       row_number() over (partition by aggregate_id, aggregate_name order by tstamp) rn\nfrom events\nwhere (aggregate_id, aggregate_name) in (select aggregate_id, aggregate_name\n                                         from events\n                                         group by aggregate_id, aggregate_name, sequence_no\n                                         having count(*) > 1)\n)\nupdate events\nset sequence_no = tmp.rn\nfrom tmp\nwhere id = eid").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("drop index if exists event_ix").
		WillReturnError(errors.New("error"))

@@ -328,11 +363,13 @@ func TestStore_CreateSchema_ErrorReCreateEventsIndex(t *testing.T) {
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("alter table events alter column aggregate_name set not null").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectQuery("SELECT coalesce(max(1), 0) as found FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid WHERE a.attnum = ANY (ix.indkey) AND t.relkind = 'r' AND t.relname = 'events' AND a.attname = 'aggregate_name'").
		WillReturnRows(sqlmock.NewRows([]string{"found"}).AddRow(0))
	mock.ExpectQuery("SELECT coalesce(max(1), 0) as found FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid WHERE a.attnum = ANY (ix.indkey) AND t.relkind = 'r' AND t.relname = 'events' AND a.attname = 'name'").
		WillReturnRows(sqlmock.NewRows([]string{"found"}).AddRow(1))
	mock.ExpectExec("with tmp as (\nselect id as eid,\n       row_number() over (partition by aggregate_id, aggregate_name order by tstamp) rn\nfrom events\nwhere (aggregate_id, aggregate_name) in (select aggregate_id, aggregate_name\n                                         from events\n                                         group by aggregate_id, aggregate_name, sequence_no\n                                         having count(*) > 1)\n)\nupdate events\nset sequence_no = tmp.rn\nfrom tmp\nwhere id = eid").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("drop index if exists event_ix").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("create unique index if not exists event_ix on events (aggregate_id, aggregate_name, sequence_no, id, name, tstamp)").
	mock.ExpectExec("create unique index if not exists event_ix on events (aggregate_id, aggregate_name, sequence_no)").
		WillReturnError(errors.New("error"))

	_, err = New(context.Background(), db, WithEventTypes(&Area{}))
@@ -1248,9 +1285,13 @@ func databaseSetupMock(mock sqlmock.Sqlmock) {
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("alter table events alter column aggregate_name set not null").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectQuery("SELECT coalesce(max(1), 0) as found FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid WHERE a.attnum = ANY (ix.indkey) AND t.relkind = 'r' AND t.relname = 'events' AND a.attname = 'aggregate_name'").
	mock.ExpectQuery("SELECT coalesce(max(1), 0) as found FROM pg_class t JOIN pg_index ix ON t.oid = ix.indrelid JOIN pg_class i ON i.oid = ix.indexrelid JOIN pg_attribute a ON a.attrelid = t.oid WHERE a.attnum = ANY (ix.indkey) AND t.relkind = 'r' AND t.relname = 'events' AND a.attname = 'name'").
		WillReturnRows(sqlmock.NewRows([]string{"found"}).AddRow(1))
	mock.ExpectExec("create unique index if not exists event_ix on events (aggregate_id, aggregate_name, sequence_no, id, name, tstamp)").
	mock.ExpectExec("with tmp as (\nselect id as eid,\n       row_number() over (partition by aggregate_id, aggregate_name order by tstamp) rn\nfrom events\nwhere (aggregate_id, aggregate_name) in (select aggregate_id, aggregate_name\n                                         from events\n                                         group by aggregate_id, aggregate_name, sequence_no\n                                         having count(*) > 1)\n)\nupdate events\nset sequence_no = tmp.rn\nfrom tmp\nwhere id = eid").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("drop index if exists event_ix").
		WillReturnResult(sqlmock.NewResult(0, 0))
	mock.ExpectExec("create unique index if not exists event_ix on events (aggregate_id, aggregate_name, sequence_no)").
		WillReturnResult(sqlmock.NewResult(0, 0))
}