Commit 3d5af370 authored by Anatoly Stansler's avatar Anatoly Stansler 🎯

Merge branch '117-observe' into 'master'

feat: CI observer, PoC ("dblab clone observe", "dblab clone observe-summary")

See merge request !91
parents 2ed0bb8f 236ef63f
Pipeline #132012199 passed with stages
in 5 minutes and 54 seconds
......@@ -33,7 +33,7 @@ build-binary-generic:
stage: build-binary
only:
refs:
- master
- branches
- tags
artifacts:
paths:
......@@ -100,6 +100,19 @@ build-image-feature-client:
before_script:
- cp ./bin/dblab-alpine ./bin/dblab
build-image-feature-client-extended:
<<: *build_image_definition
<<: *only_feature
variables:
REGISTRY_USER: "${CI_REGISTRY_USER}"
REGISTRY_PASSWORD: "${CI_REGISTRY_PASSWORD}"
REGISTRY: "${CI_REGISTRY}"
DOCKER_FILE: "Dockerfile.dblab-extended"
DOCKER_NAME: "registry.gitlab.com/postgres-ai/database-lab/dblab-extended"
TAGS: "${DOCKER_NAME}:${CI_COMMIT_REF_SLUG}"
before_script:
- cp ./bin/dblab-linux-amd64 ./bin/dblab
build-image-master-server:
<<: *build_image_definition
<<: *only_master
......
FROM docker:19
# Install dependecies.
# Install dependencies.
RUN apk update && apk add --no-cache bash jq
WORKDIR /home/dblab
......
# Currently (at PoC stage), the only DB migration tool that is supported is sqitch.org
# This "extended" image is supposed to have many other tools in the future (liqubase, flyway, etc.)
FROM sqitch/sqitch:1.0.0
USER root
# Install dependencies.
RUN apt-get update && apt-get -y install bash jq
WORKDIR /home/dblab
COPY ./bin/dblab ./bin/dblab
RUN mv ./bin/dblab /usr/local/bin/dblab 2> /dev/null
ENTRYPOINT []
CMD ./bin/dblab
......@@ -8,12 +8,18 @@ package clone
import (
"encoding/json"
"fmt"
"os"
"github.com/urfave/cli/v2"
"gitlab.com/postgres-ai/database-lab/cmd/cli/commands"
"gitlab.com/postgres-ai/database-lab/pkg/client/dblabapi/types"
"gitlab.com/postgres-ai/database-lab/pkg/models"
"gitlab.com/postgres-ai/database-lab/pkg/observer"
)
const (
errorExitStatus = 1
)
// list runs a request to list clones of an instance.
......@@ -190,3 +196,56 @@ func destroy() func(*cli.Context) error {
return err
}
}
// observe runs a request to observe clone.
func observe() func(*cli.Context) error {
return func(cliCtx *cli.Context) error {
dblabClient, err := commands.ClientByCLIContext(cliCtx)
if err != nil {
return err
}
cloneID := cliCtx.Args().First()
clone, err := dblabClient.GetClone(cliCtx.Context, cloneID)
if err != nil {
return err
}
obsConfig := observer.Config{
Follow: cliCtx.Bool("follow"),
IntervalSeconds: cliCtx.Uint64("interval-seconds"),
MaxLockDurationSeconds: cliCtx.Uint64("max-lock-duration-seconds"),
MaxDurationSeconds: cliCtx.Uint64("max-duration-seconds"),
SSLMode: cliCtx.String("sslmode"),
}
obs := observer.NewObserver(obsConfig, cliCtx.App.Writer)
clone.DB.Password = cliCtx.String("password")
return obs.Start(clone)
}
}
// observeSummary shows observing summary and check satisfaction of performance requirements.
func observeSummary() func(*cli.Context) error {
return func(cliCtx *cli.Context) error {
obs := observer.NewObserver(observer.Config{}, cliCtx.App.Writer)
if err := obs.LoadObserverState(); err != nil {
return err
}
if err := obs.PrintSummary(); err != nil {
return err
}
if err := obs.CheckPerformanceRequirements(); err != nil {
// Exit with error status without printing additional error logs.
os.Exit(errorExitStatus)
}
return nil
}
}
......@@ -109,6 +109,52 @@ func CommandList() []*cli.Command {
},
},
},
{
Name: "observe",
Usage: "[EXPERIMENTAL] monitor clone state",
ArgsUsage: "CLONE_ID",
Before: checkCloneIDBefore,
Action: observe(),
Flags: []cli.Flag{
&cli.StringFlag{
Name: "password",
Usage: "clone database password",
EnvVars: []string{"CLONE_PASSWORD"},
Required: true,
},
&cli.StringFlag{
Name: "sslmode",
Usage: "connection SSL mode",
EnvVars: []string{"SSLMODE"},
Value: "disable",
},
&cli.BoolFlag{
Name: "follow",
Usage: "follow state monitor output",
Aliases: []string{"f"},
},
&cli.IntFlag{
Name: "interval-seconds",
Usage: "interval of metric gathering and output",
EnvVars: []string{"DBLAB_INTERVAL_SECONDS"},
},
&cli.IntFlag{
Name: "max-lock-duration-seconds",
Usage: "maximum allowed duration for locks",
EnvVars: []string{"DBLAB_MAX_LOCK_DURATION_SECONDS"},
},
&cli.IntFlag{
Name: "max-duration-seconds",
Usage: "maximum allowed duration for operation",
EnvVars: []string{"DBLAB_MAX_DURATION_SECONDS"},
},
},
},
{
Name: "observe-summary",
Usage: "[EXPERIMENTAL] summarize clone monitoring and check results",
Action: observeSummary(),
},
},
}}
}
......
/*
2020 © Postgres.ai
*/
// Package observer provides clone monitoring.
package observer
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"strings"
"time"
"github.com/pkg/errors"
"gitlab.com/postgres-ai/database-lab/pkg/log"
"gitlab.com/postgres-ai/database-lab/pkg/models"
"gitlab.com/postgres-ai/database-lab/pkg/util"
)
const (
defaultIntervalSeconds = 10
defaultMaxLockDurationSeconds = 10
defaultMaxDurationSeconds = 60 * 60 // 1 hour.
stateFilePath = "/tmp/dblab-observe-state.json"
)
// Config defines configuration options for observer.
type Config struct {
Follow bool `json:"follow"`
IntervalSeconds uint64 `json:"intervalSeconds"`
MaxLockDurationSeconds uint64 `json:"maxLockDurationSeconds"`
MaxDurationSeconds uint64 `json:"maxDurationSeconds"`
SSLMode string `json:"sslmode"`
}
// Observer defines monitoring service.
type Observer struct {
StartedAt time.Time `json:"startedAt"`
Elapsed time.Duration `json:"elapsed"`
CounterTotal uint64 `json:"counterTotal"`
CounterWarning uint64 `json:"counterWarning"`
CounterSuccess uint64 `json:"counterSuccess"`
Config Config `json:"config"`
writer io.Writer
}
// NewObserver creates Observer instance.
func NewObserver(config Config, writer io.Writer) *Observer {
if config.IntervalSeconds == 0 {
config.IntervalSeconds = defaultIntervalSeconds
}
if config.MaxLockDurationSeconds == 0 {
config.MaxLockDurationSeconds = defaultMaxLockDurationSeconds
}
if config.MaxDurationSeconds == 0 {
config.MaxDurationSeconds = defaultMaxDurationSeconds
}
return &Observer{
Config: config,
writer: writer,
}
}
// Start runs clone monitoring.
func (obs *Observer) Start(clone *models.Clone) error {
log.Dbg("Start observing...")
db, err := initConnection(clone, obs.Config.SSLMode)
if err != nil {
return errors.Wrap(err, "cannot connect to database")
}
obs.StartedAt = time.Now()
for {
now := time.Now()
obs.Elapsed = time.Since(obs.StartedAt)
var output strings.Builder
output.WriteString(fmt.Sprintf("[%s] Database Lab Observer:\n", util.FormatTime(now)))
output.WriteString(fmt.Sprintf(" Elapsed: %s\n", util.DurationToString(obs.Elapsed)))
output.WriteString(" Dangerous locks:\n")
dangerousLocks, err := runQuery(db, buildLocksMetricQuery(obs.Config.MaxLockDurationSeconds))
if err != nil {
return errors.Wrap(err, "cannot query metrics")
}
obs.CounterTotal++
if len(dangerousLocks) > 0 {
obs.CounterWarning++
} else {
dangerousLocks = " Not observed\n"
obs.CounterSuccess++
}
output.WriteString(dangerousLocks)
output.WriteString(" Observed intervals:\n")
output.WriteString(fmt.Sprintf(" Successful: %d\n", obs.CounterSuccess))
output.WriteString(fmt.Sprintf(" With dangerous locks: %d\n", obs.CounterWarning))
_, err = fmt.Fprintln(obs.writer, output.String())
if err != nil {
return errors.Wrap(err, "cannot print")
}
err = obs.SaveObserverState()
if err != nil {
return errors.Wrap(err, "cannot save observer state")
}
if !obs.Config.Follow {
break
}
time.Sleep(time.Duration(obs.Config.IntervalSeconds) * time.Second)
}
return nil
}
// SaveObserverState saves observer state to the disk.
func (obs *Observer) SaveObserverState() error {
bytes, err := json.MarshalIndent(obs, "", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(stateFilePath, bytes, 0644)
if err != nil {
return err
}
return nil
}
// LoadObserverState loads observer state from the disk.
func (obs *Observer) LoadObserverState() error {
bytes, err := ioutil.ReadFile(stateFilePath)
if err != nil {
return err
}
err = json.Unmarshal(bytes, &obs)
if err != nil {
return err
}
return nil
}
// PrintSummary prints monitoring summary.
func (obs *Observer) PrintSummary() error {
maxDuration := time.Duration(obs.Config.MaxDurationSeconds) * time.Second
var summary strings.Builder
summary.WriteString("Summary:\n")
summary.WriteString(formatSummaryItem(fmt.Sprintf("Duration: %s", util.DurationToString(obs.Elapsed))))
summary.WriteString(formatSummaryItem(fmt.Sprintf("Intervals with dangerous locks: %d", obs.CounterWarning)))
summary.WriteString(formatSummaryItem(fmt.Sprintf("Total number of observed intervals: %d", obs.CounterTotal)))
summary.WriteString("\nPerformance checklist:\n")
summary.WriteString(formatChecklistItem(fmt.Sprintf("Duration < %s", util.DurationToString(maxDuration)), obs.CheckDuration()))
summary.WriteString(formatChecklistItem("No dangerous locks", obs.CheckLocks()))
_, err := fmt.Fprint(obs.writer, summary.String())
if err != nil {
return errors.Wrap(err, "cannot print")
}
return nil
}
// CheckPerformanceRequirements checks monitoring data and returns an error if any of performance requires was not satisfied.
func (obs *Observer) CheckPerformanceRequirements() error {
if obs.CheckDuration() || obs.CheckLocks() {
return errors.New("performance requirements not satisfied")
}
return nil
}
// CheckDuration checks duration of the operation.
func (obs *Observer) CheckDuration() bool {
return obs.Elapsed < time.Duration(obs.Config.MaxDurationSeconds)*time.Second
}
// CheckLocks checks long-lasting locks during the operation.
func (obs *Observer) CheckLocks() bool {
return obs.CounterWarning == 0
}
func formatSummaryItem(str string) string {
return " " + str + "\n"
}
func formatChecklistItem(str string, state bool) string {
stateStr := colorizeRed("FAILED")
if state {
stateStr = colorizeGreen("PASSED")
}
return " " + str + ": " + stateStr + "\n"
}
func colorizeRed(str string) string {
return fmt.Sprintf("\033[1;31m%s\033[0m", str)
}
func colorizeGreen(str string) string {
return fmt.Sprintf("\033[1;32m%s\033[0m", str)
}
/*
2020 © Postgres.ai
*/
package observer
import (
"fmt"
)
const queryLocks = `with lock_data as (
select
a.datname,
l.relation::regclass,
l.transactionid,
l.mode,
l.locktype,
l.granted,
a.usename,
a.query,
a.query_start,
a.state,
a.wait_event_type,
a.wait_event,
a.xact_start,
clock_timestamp() - a.xact_start as xact_duration,
a.query_start,
clock_timestamp() - a.query_start as query_duration,
a.state_change,
clock_timestamp() - a.state_change as state_changed_ago,
a.pid
from pg_stat_activity a
join pg_locks l ON l.pid = a.pid
where l.mode ~* 'exclusive'
)
select row_to_json(lock_data)
from lock_data
where query_duration > interval '%d second'
order by query_duration desc;`
func buildLocksMetricQuery(maxLockDurationSeconds uint64) string {
return fmt.Sprintf(queryLocks, maxLockDurationSeconds)
}
/*
2020 © Postgres.ai
*/
package observer
import (
"context"
"database/sql"
"fmt"
_ "github.com/lib/pq" //nolint
"github.com/pkg/errors"
"gitlab.com/postgres-ai/database-lab/pkg/log"
"gitlab.com/postgres-ai/database-lab/pkg/models"
)
func initConnection(clone *models.Clone, sslMode string) (*sql.DB, error) {
db, err := sql.Open("postgres", buildConnectionString(clone, sslMode))
if err != nil {
return nil, errors.Wrap(err, "cannot init connection")
}
if err := db.PingContext(context.Background()); err != nil {
return nil, errors.Wrap(err, "cannot init connection")
}
return db, nil
}
func runQuery(db *sql.DB, query string, args ...interface{}) (string, error) {
var result = ""
rows, err := db.Query(query, args...)
if err != nil {
log.Err("DB query:", err)
return "", err
}
defer func() {
if err := rows.Close(); err != nil {
log.Err("Error when closing:", err)
}
}()
for rows.Next() {
var s string
if err := rows.Scan(&s); err != nil {
log.Err("DB query traversal:", err)
return s, err
}
result += s + "\n"
}
if err := rows.Err(); err != nil {
log.Err("DB query traversal:", err)
return result, err
}
return result, nil
}
func buildConnectionString(clone *models.Clone, sslMode string) string {
db := clone.DB
return fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=postgres sslmode=%s",
db.Host, db.Port, db.Username, db.Password, sslMode)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment