...
 
Commits (3)
......@@ -8,6 +8,7 @@ package clone
import (
"encoding/json"
"fmt"
"os"
"github.com/urfave/cli/v2"
......@@ -17,6 +18,10 @@ import (
"gitlab.com/postgres-ai/database-lab/pkg/observer"
)
const (
errorExitStatus = 1
)
// list runs a request to list clones of an instance.
func list() func(*cli.Context) error {
return func(cliCtx *cli.Context) error {
......@@ -202,23 +207,52 @@ func observe() func(*cli.Context) error {
cloneID := cliCtx.Args().First()
isFollow := cliCtx.Bool("follow")
clone, err := dblabClient.GetClone(cliCtx.Context, cloneID)
if err != nil {
return err
}
clonePassword := cliCtx.String("password")
clone.DB.Password = clonePassword
config := observer.Config{
Follow: cliCtx.Bool("follow"),
IntervalSeconds: cliCtx.Uint64("interval-seconds"),
MaxLockDurationSeconds: cliCtx.Uint64("max-lock-duration-seconds"),
MaxDurationSeconds: cliCtx.Uint64("max-duration-seconds"),
}
obs := observer.NewObserver(config, cliCtx.App.Writer)
clone.DB.Password = cliCtx.String("password")
err = obs.Start(clone)
if err != nil {
return err
}
return nil
}
}
obs := observer.NewObserver(cliCtx.App.Writer)
// observeSummary shows observing summary and check satisfaction of performance requirements.
func observeSummary() func(*cli.Context) error {
return func(cliCtx *cli.Context) error {
obs := observer.NewObserver(observer.Config{}, cliCtx.App.Writer)
err = obs.Start(clone, isFollow)
err := obs.LoadObserverState()
if err != nil {
return err
}
err = obs.PrintSummary()
if err != nil {
return err
}
err = obs.CheckPerformanceRequirements()
if err != nil {
// Exit with error status without printing additional error logs.
os.Exit(errorExitStatus)
}
return nil
}
}
......@@ -118,7 +118,8 @@ func CommandList() []*cli.Command {
Flags: []cli.Flag{
&cli.StringFlag{
Name: "password",
Usage: "database password",
Usage: "clone database password",
EnvVars: []string{"CLONE_PASSWORD"},
Required: true,
},
&cli.BoolFlag{
......@@ -126,8 +127,25 @@ func CommandList() []*cli.Command {
Usage: "follow state monitor output",
Aliases: []string{"f"},
},
&cli.IntFlag{
Name: "interval-seconds",
Usage: "interval of metric gathering and output",
},
&cli.IntFlag{
Name: "max-lock-duration-seconds",
Usage: "maximum allowed duration for locks",
},
&cli.IntFlag{
Name: "max-duration-seconds",
Usage: "maximum allowed duration for operation",
},
},
},
{
Name: "observe-summary",
Usage: "summarize clone monitoring and check results",
Action: observeSummary(),
},
},
}}
}
......
......@@ -6,13 +6,12 @@
package observer
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"time"
_ "github.com/lib/pq" //nolint
"github.com/pkg/errors"
"gitlab.com/postgres-ai/database-lab/pkg/log"
......@@ -20,28 +19,55 @@ import (
"gitlab.com/postgres-ai/database-lab/pkg/util"
)
const defaultIntervalSeconds = 10
const (
defaultIntervalSeconds = 10
defaultMaxLockDurationSeconds = 10
defaultMaxDurationSeconds = 60 * 60 // 1 hour.
stateFilePath = "/tmp/dblab-observe-state.json"
)
// Config defines configuration options for observer.
type Config struct {
Follow bool `json:"follow"`
IntervalSeconds uint64 `json:"intervalSeconds"`
MaxLockDurationSeconds uint64 `json:"maxLockDurationSeconds"`
MaxDurationSeconds uint64 `json:"maxDurationSeconds"`
}
// Observer defines monitoring service.
type Observer struct {
StartedAt time.Time
CounterTotal uint64
CounterWarning uint64
CounterSuccess uint64
IntervalSeconds int64
Writer io.Writer
StartedAt time.Time `json:"startedAt"`
Elapsed time.Duration `json:"elapsed"`
CounterTotal uint64 `json:"counterTotal"`
CounterWarning uint64 `json:"counterWarning"`
CounterSuccess uint64 `json:"counterSuccess"`
Config Config `json:"config"`
writer io.Writer
}
// NewObserver creates Observer instance.
func NewObserver(writer io.Writer) *Observer {
func NewObserver(config Config, writer io.Writer) *Observer {
if config.IntervalSeconds == 0 {
config.IntervalSeconds = defaultIntervalSeconds
}
if config.MaxLockDurationSeconds == 0 {
config.MaxLockDurationSeconds = defaultMaxLockDurationSeconds
}
if config.MaxDurationSeconds == 0 {
config.MaxDurationSeconds = defaultMaxDurationSeconds
}
return &Observer{
IntervalSeconds: defaultIntervalSeconds,
Writer: writer,
Config: config,
writer: writer,
}
}
// Start runs clone monitoring.
func (obs Observer) Start(clone *models.Clone, follow bool) error {
func (obs *Observer) Start(clone *models.Clone) error {
log.Dbg("Start observing...")
db, err := initConnection(clone)
......@@ -53,101 +79,126 @@ func (obs Observer) Start(clone *models.Clone, follow bool) error {
for {
now := time.Now()
elapsed := time.Since(obs.StartedAt)
obs.Elapsed = time.Since(obs.StartedAt)
_, err = fmt.Fprintf(obs.Writer, "%s Elapsed: %s\n", util.FormatTime(now), util.DurationToString(elapsed))
if err != nil {
return errors.Wrap(err, "cannot print")
}
output := fmt.Sprintf("[%s] Database Lab Observer:\n", util.FormatTime(now))
output += fmt.Sprintf(" Elapsed: %s\n", util.DurationToString(obs.Elapsed))
output += " Long-lasting locks:\n"
_, err = fmt.Fprintf(obs.Writer, "Locks:\n")
if err != nil {
return errors.Wrap(err, "cannot print")
}
query := fmt.Sprintf(queryLocks, obs.IntervalSeconds)
output, err := runQuery(db, query)
locksMetricOutput, err := runQuery(db, buildLocksMetricQuery(obs.Config.MaxLockDurationSeconds))
if err != nil {
return errors.Wrap(err, "cannot query metrics")
}
_, err = fmt.Fprintln(obs.Writer, output)
if err != nil {
return errors.Wrap(err, "cannot print")
}
if !follow {
break
}
output += locksMetricOutput
obs.CounterTotal++
if len(output) > 0 {
if len(locksMetricOutput) > 0 {
obs.CounterWarning++
} else {
obs.CounterSuccess++
}
_, err = fmt.Fprintf(obs.Writer, "Intervals:\nSuccessful:%d With warnings:%d\n\n", obs.CounterSuccess, obs.CounterWarning)
output += " Measurement intervals:\n"
output += fmt.Sprintf(" Successful: %d\n", obs.CounterSuccess)
output += fmt.Sprintf(" With warnings: %d\n", obs.CounterWarning)
_, err = fmt.Fprintln(obs.writer, output)
if err != nil {
return errors.Wrap(err, "cannot print")
}
time.Sleep(time.Duration(obs.IntervalSeconds) * time.Second)
err = obs.SaveObserverState()
if err != nil {
return errors.Wrap(err, "cannot save observer state")
}
if !obs.Config.Follow {
break
}
time.Sleep(time.Duration(obs.Config.IntervalSeconds) * time.Second)
}
return nil
}
func initConnection(clone *models.Clone) (*sql.DB, error) {
db, err := sql.Open("postgres", buildConnectionString(clone))
// SaveObserverState saves observer state to the disk.
func (obs *Observer) SaveObserverState() error {
bytes, err := json.MarshalIndent(obs, "", " ")
if err != nil {
return nil, errors.Wrap(err, "cannot init connection")
return err
}
if err := db.PingContext(context.TODO()); err != nil {
return nil, errors.Wrap(err, "cannot init connection")
err = ioutil.WriteFile(stateFilePath, bytes, 0644)
if err != nil {
return err
}
return db, nil
return nil
}
func runQuery(db *sql.DB, query string, args ...interface{}) (string, error) {
var result = ""
rows, err := db.Query(query, args...)
// LoadObserverState loads observer state from the disk.
func (obs *Observer) LoadObserverState() error {
bytes, err := ioutil.ReadFile(stateFilePath)
if err != nil {
log.Err("DB query:", err)
return "", err
return err
}
defer func() {
if err := rows.Close(); err != nil {
log.Err("Error when closing:", err)
}
}()
err = json.Unmarshal(bytes, &obs)
if err != nil {
return err
}
for rows.Next() {
var s string
return nil
}
if err := rows.Scan(&s); err != nil {
log.Err("DB query traversal:", err)
return s, err
}
// PrintSummary prints monitoring summary.
func (obs *Observer) PrintSummary() error {
summary := "Summary:\n"
summary += formatSummaryItem(fmt.Sprintf("Duration: %s", util.DurationToString(obs.Elapsed)))
summary += formatSummaryItem(fmt.Sprintf("Intervals with long-lasting exclusive locks: %d", obs.CounterWarning))
summary += "\nPerformance checklist:\n"
summary += formatChecklistItem(fmt.Sprintf("Duration < %ds", obs.Config.MaxDurationSeconds), obs.CheckDuration())
summary += formatChecklistItem("No long-lasting dangerous locks", obs.CheckLocks())
result += s + "\n"
_, err := fmt.Fprint(obs.writer, summary)
if err != nil {
return errors.Wrap(err, "cannot print")
}
if err := rows.Err(); err != nil {
log.Err("DB query traversal:", err)
return result, err
return nil
}
// CheckPerformanceRequirements checks monitoring data and returns an error if any of performance requires was not satisfied.
func (obs *Observer) CheckPerformanceRequirements() error {
if obs.CheckDuration() || obs.CheckLocks() {
return errors.New("performance requirements not satisfied")
}
return result, nil
return nil
}
func buildConnectionString(clone *models.Clone) string {
db := clone.DB
return fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=postgres sslmode=disable",
db.Host, db.Port, db.Username, db.Password)
// CheckDuration checks duration of the operation.
func (obs *Observer) CheckDuration() bool {
return obs.Elapsed < time.Duration(obs.Config.MaxDurationSeconds)*time.Second
}
// CheckLocks checks long-lasting locks during the operation.
func (obs *Observer) CheckLocks() bool {
return obs.CounterWarning == 0
}
func formatSummaryItem(str string) string {
return " " + str + "\n"
}
func formatChecklistItem(str string, state bool) string {
stateStr := "FAILED"
if state {
stateStr = "PASSED"
}
return " " + str + ": " + stateStr + "\n"
}
......@@ -4,6 +4,10 @@
package observer
import (
"fmt"
)
const queryLocks = `with lock_data as (
select
a.datname,
......@@ -33,3 +37,7 @@ select row_to_json(lock_data)
from lock_data
where query_duration > interval '%d second' -- for N=10, see the comments below
order by query_duration desc;`
func buildLocksMetricQuery(maxLockDurationSeconds uint64) string {
return fmt.Sprintf(queryLocks, maxLockDurationSeconds)
}
/*
2020 © Postgres.ai
*/
package observer
import (
"context"
"database/sql"
"fmt"
_ "github.com/lib/pq" //nolint
"github.com/pkg/errors"
"gitlab.com/postgres-ai/database-lab/pkg/log"
"gitlab.com/postgres-ai/database-lab/pkg/models"
)
func initConnection(clone *models.Clone) (*sql.DB, error) {
db, err := sql.Open("postgres", buildConnectionString(clone))
if err != nil {
return nil, errors.Wrap(err, "cannot init connection")
}
if err := db.PingContext(context.TODO()); err != nil {
return nil, errors.Wrap(err, "cannot init connection")
}
return db, nil
}
func runQuery(db *sql.DB, query string, args ...interface{}) (string, error) {
var result = ""
rows, err := db.Query(query, args...)
if err != nil {
log.Err("DB query:", err)
return "", err
}
defer func() {
if err := rows.Close(); err != nil {
log.Err("Error when closing:", err)
}
}()
for rows.Next() {
var s string
if err := rows.Scan(&s); err != nil {
log.Err("DB query traversal:", err)
return s, err
}
result += s + "\n"
}
if err := rows.Err(); err != nil {
log.Err("DB query traversal:", err)
return result, err
}
return result, nil
}
func buildConnectionString(clone *models.Clone) string {
db := clone.DB
return fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=postgres sslmode=disable",
db.Host, db.Port, db.Username, db.Password)
}