Commit 89cbd7a4 authored by LensPlatform's avatar LensPlatform
Browse files

added common folder to codebase

parent e9ee68df
Pipeline #150357176 failed with stages
in 1 minute and 1 second
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, build with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
# Vendor folder
vendor
\ No newline at end of file
# This file is a template, and might need editing before it works on your project.
image: golang:latest
variables:
# Please edit to your GitLab project
REPO_NAME: gitlab.com/yoanyombapro/CubeMicroservices/common
# The problem is that to be able to use go get, one needs to put
# the repository in the $GOPATH. So for example if your gitlab domain
# is gitlab.com, and that your repository is namespace/project, and
# the default GOPATH being /go, then you'd need to have your
# repository in /go/src/gitlab.com/namespace/project
# Thus, making a symbolic link corrects this.
before_script:
- mkdir -p $GOPATH/src/$(dirname $REPO_NAME)
- ln -svf $CI_PROJECT_DIR $GOPATH/src/$REPO_NAME
- cd $GOPATH/src/$REPO_NAME
stages:
- test
- build
- deploy
format:
stage: test
script:
- go fmt $(go list ./... | grep -v /vendor/)
- go vet $(go list ./... | grep -v /vendor/)
- go test -race $(go list ./... | grep -v /vendor/)
#compile:
# stage: build
# script:
# - go build -race -ldflags "-extldflags '-static'" -o $CI_PROJECT_DIR/mybinary
# artifacts:
# paths:
# - mybinary
MIT License
Copyright (c) 2020 D Yoan L Mekontchou Yomba
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
package circuitbreaker
import (
"context"
"encoding/json"
"io/ioutil"
"log"
"net"
"net/http"
"time"
"fmt"
"github.com/afex/hystrix-go/hystrix"
"github.com/spf13/viper"
"github.com/eapache/go-resiliency/retrier"
"github.com/sirupsen/logrus"
"gitlab.com/yoanyombapro/CubeMicroservices/common/messaging"
"gitlab.com/yoanyombapro/CubeMicroservices/common/tracing"
"gitlab.com/yoanyombapro/CubeMicroservices/common/util"
)
func init() {
log.SetOutput(ioutil.Discard)
}
// Client to do http requests with
var Client *http.Client
// RETRIES is the number of retries to do in the retrier.
var retries = 3
// CallUsingCircuitBreaker performs a HTTP call inside a circuit breaker.
func CallUsingCircuitBreaker(ctx context.Context, breakerName string, url string, method string) ([]byte, error) {
output := make(chan []byte, 1)
errors := hystrix.Go(breakerName, func() error {
req, _ := http.NewRequest(method, url, nil)
tracing.AddTracingToReqFromContext(ctx, req)
err := callWithRetries(req, output)
return err // For hystrix, forward the err from the retrier. It's nil if OK.
}, func(err error) error {
logrus.Errorf("In fallback function for breaker %v, error: %v", breakerName, err.Error())
circuit, _, _ := hystrix.GetCircuit(breakerName)
logrus.Errorf("Circuit state is: %v", circuit.IsOpen())
return err
})
select {
case out := <-output:
logrus.Debugf("Call in breaker %v successful", breakerName)
return out, nil
case err := <-errors:
logrus.Debugf("Got error on channel in breaker %v. Msg: %v", breakerName, err.Error())
return nil, err
}
}
// PerformHTTPRequestCircuitBreaker performs the supplied http.Request within a circuit breaker.
func PerformHTTPRequestCircuitBreaker(ctx context.Context, breakerName string, req *http.Request) ([]byte, error) {
output := make(chan []byte, 1)
errors := hystrix.Go(breakerName, func() error {
tracing.AddTracingToReqFromContext(ctx, req)
err := callWithRetries(req, output)
return err // For hystrix, forward the err from the retrier. It's nil if OK.
}, func(err error) error {
logrus.Errorf("In fallback function for breaker %v, error: %v", breakerName, err.Error())
return err
})
select {
case out := <-output:
logrus.Debugf("Call in breaker %v successful", breakerName)
return out, nil
case err := <-errors:
logrus.Errorf("Got error on channel in breaker %v. Msg: %v", breakerName, err.Error())
return nil, err
}
}
func callWithRetries(req *http.Request, output chan []byte) error {
r := retrier.New(retrier.ConstantBackoff(retries, 100*time.Millisecond), nil)
attempt := 0
err := r.Run(func() error {
attempt++
resp, err := Client.Do(req)
if err == nil && resp.StatusCode < 299 {
responseBody, err := ioutil.ReadAll(resp.Body)
if err == nil {
output <- responseBody
return nil
}
return err
} else if err == nil {
err = fmt.Errorf("Status was %v", resp.StatusCode)
}
logrus.Errorf("Retrier failed, attempt %v", attempt)
return err
})
return err
}
// ConfigureHystrix sets up hystrix circuit breakers.
func ConfigureHystrix(commands []string, amqpClient messaging.IMessagingClient) {
for _, command := range commands {
hystrix.ConfigureCommand(command, hystrix.CommandConfig{
Timeout: resolveProperty(command, "Timeout"),
MaxConcurrentRequests: resolveProperty(command, "MaxConcurrentRequests"),
ErrorPercentThreshold: resolveProperty(command, "ErrorPercentThreshold"),
RequestVolumeThreshold: resolveProperty(command, "RequestVolumeThreshold"),
SleepWindow: resolveProperty(command, "SleepWindow"),
})
logrus.Printf("Circuit %v settings: %v", command, hystrix.GetCircuitSettings()[command])
}
hystrixStreamHandler := hystrix.NewStreamHandler()
hystrixStreamHandler.Start()
go http.ListenAndServe(net.JoinHostPort("", "8181"), hystrixStreamHandler)
logrus.Infoln("Launched hystrixStreamHandler at 8181")
// Publish presence on RabbitMQ
publishDiscoveryToken(amqpClient)
}
// Deregister publishes a Deregister token to Hystrix/Turbine
func Deregister(amqpClient messaging.IMessagingClient) {
ip, err := util.ResolveIPFromHostsFile()
if err != nil {
ip = util.GetIPWithPrefix("10.0.")
}
token := DiscoveryToken{
State: "DOWN",
Address: ip,
}
bytes, _ := json.Marshal(token)
amqpClient.PublishOnQueue(bytes, "discovery")
logrus.Infoln("Sent deregistration token over SpringCloudBus")
}
func publishDiscoveryToken(amqpClient messaging.IMessagingClient) {
ip, err := util.ResolveIPFromHostsFile()
if err != nil {
ip = util.GetIPWithPrefix("10.0.")
}
token := DiscoveryToken{
State: "UP",
Address: ip,
}
bytes, _ := json.Marshal(token)
go func() {
for {
amqpClient.PublishOnQueue(bytes, "discovery")
amqpClient.PublishOnQueue(bytes, "discovery")
time.Sleep(time.Second * 30)
}
}()
}
func resolveProperty(command string, prop string) int {
if viper.IsSet("hystrix.command." + command + "." + prop) {
return viper.GetInt("hystrix.command." + command + "." + prop)
}
return getDefaultHystrixConfigPropertyValue(prop)
}
func getDefaultHystrixConfigPropertyValue(prop string) int {
switch prop {
case "Timeout":
return 1000 //hystrix.DefaultTimeout
case "MaxConcurrentRequests":
return 200 //hystrix.DefaultMaxConcurrent
case "RequestVolumeThreshold":
return hystrix.DefaultVolumeThreshold
case "SleepWindow":
return hystrix.DefaultSleepWindow
case "ErrorPercentThreshold":
return hystrix.DefaultErrorPercentThreshold
}
panic("Got unknown hystrix property: " + prop + ". Panicing!")
}
// DiscoveryToken defines a struct for transmitting the state of a hystrix stream producer.
type DiscoveryToken struct {
State string `json:"state"` // UP, RUNNING, DOWN ??
Address string `json:"address"`
}
package circuitbreaker
/*
import (
"net/http"
"testing"
"github.com/afex/hystrix-go/hystrix"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"gopkg.in/h2non/gock.v1"
"gitlab.com/yoanyombapro/CubeMicroservices/common/testutil"
)
func init() {
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: "2006-01-02T15:04:05.000",
})
logrus.SetLevel(logrus.DebugLevel)
Client = &http.Client{}
}
func TestCallUsingResilienceAllFails(t *testing.T) {
defer gock.Off()
buildGockMatcherTimes(500, 4)
hystrix.Flush()
bytes, err := CallUsingCircuitBreaker(context.TODO(), "TEST", "http://quotes-service", "GET")
if err == nil {
return
}
if bytes == nil {
return
}
}
func TestCallUsingResilienceLastSucceeds(t *testing.T) {
defer gock.Off()
retries = 3
buildGockMatcherTimes(500, 2)
body := []byte("Some response")
buildGockMatcherWithBody(200, string(body))
hystrix.Flush()
bytes, err := CallUsingCircuitBreaker(context.TODO(), "TEST", "http://quotes-service", "GET")
testutil.AssertNil(t, err)
testutil.AssertNotNil(t, bytes)
testutil.AssertEqual(t, string(body), string(bytes))
}
func TestCallHystrixOpensAfterThresholdPassed(t *testing.T) {
defer gock.Off()
for a := 0; a < 6; a++ {
buildGockMatcher(500)
}
hystrix.Flush()
retries = 0
hystrix.ConfigureCommand("TEST", hystrix.CommandConfig{
RequestVolumeThreshold: 5,
})
for a := 0; a < 6; a++ {
CallUsingCircuitBreaker(context.TODO(), "TEST", "http://quotes-service", "GET")
}
cb, _, _ := hystrix.GetCircuit("TEST")
testutil.AssertTrue(t, cb.IsOpen())
}
func buildGockMatcherTimes(status int, times int) {
for a := 0; a < times; a++ {
buildGockMatcher(status)
}
}
func buildGockMatcherWithBody(status int, body string) {
gock.New("http://quotes-service").
Reply(status).BodyString(body)
}
func buildGockMatcher(status int) {
buildGockMatcherWithBody(status, "")
}
*/
package infrastructure
import (
"github.com/jinzhu/gorm"
"github.com/prometheus/client_golang/prometheus"
)
func InitDatabaseCounters(servicename string, db *gorm.DB) *DatabaseCounters {
// https://www.alexedwards.net/blog/configuring-sqldb
numOpenDatabaseConnections := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: servicename,
Subsystem: "database",
Name: "open_connections",
Help: "Open database connections",
},
func() float64 { return float64(db.DB().Stats().MaxOpenConnections) },
)
numOpenIdleDatabaseConnections := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: servicename,
Subsystem: "database",
Name: "idle_connections_open",
Help: "Number of idle mysql connections open.",
},
func() float64 { return float64(db.DB().Stats().Idle) },
)
numDatabaseConnectionsInUse := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: servicename,
Subsystem: "database",
Name: "connections_in_use",
Help: "Number of mysql connections in use.",
},
func() float64 { return float64(db.DB().Stats().InUse) },
)
timeBlockedWaitingForDatabaseConnection := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: servicename,
Subsystem: "database",
Name: "database_connection_latency",
Help: "Time blocked waiting for a new connection to the database",
},
func() float64 { return float64(db.DB().Stats().WaitDuration) },
)
databaseOperationLatency := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: servicename,
Subsystem: "database",
Name: "database_operation_latency",
Help: "Latency per database operation",
},
[]string{"operation"},
)
return &DatabaseCounters{
NumDatabaseConnections: &numOpenDatabaseConnections,
BlockedWaitingTimeForDatabaseConnections: &timeBlockedWaitingForDatabaseConnection,
DatabaseConnectionsInUse: &numDatabaseConnectionsInUse,
IdleDatabaseConnections: &numOpenIdleDatabaseConnections,
LatencyByOperation: databaseOperationLatency,
}
}
func (rc *DatabaseCounters) RegisterMetrics(){
_ = RegisterGaugeMetric(*rc.NumDatabaseConnections)
_ = RegisterGaugeMetric(*rc.BlockedWaitingTimeForDatabaseConnections)
_ = RegisterGaugeMetric(*rc.DatabaseConnectionsInUse)
_ = RegisterGaugeMetric(*rc.IdleDatabaseConnections)
_ = RegisterSummaryVecMetric(*rc.LatencyByOperation)
}
package infrastructure
import (
"github.com/jinzhu/gorm"
"github.com/prometheus/client_golang/prometheus"
)
// https://github.com/prometheus/client_golang/blob/master/prometheus/examples_test.go
type RuntimeCounters struct {
NumActiveGoRoutines *prometheus.GaugeFunc
}
type DatabaseCounters struct {
NumDatabaseConnections *prometheus.GaugeFunc
IdleDatabaseConnections *prometheus.GaugeFunc
DatabaseConnectionsInUse *prometheus.GaugeFunc
BlockedWaitingTimeForDatabaseConnections *prometheus.GaugeFunc
LatencyByOperation *prometheus.SummaryVec
}
type RequestCounters struct {
RequestLatencySummaryCounters *prometheus.SummaryVec
TotalRequestsCounters *prometheus.CounterVec
}
type WorkerCounters struct {
TotalCompletedTasksCounter *prometheus.Counter
TotalCompletedTasksByWorkerCounter *prometheus.CounterVec
}
type InfrastructureCounters struct {
RuntimeCounters *RuntimeCounters
DatabaseCounters *DatabaseCounters
RequestCounters *RequestCounters
WorkerCounters *WorkerCounters
}
func New(serviceName string, db *gorm.DB) *InfrastructureCounters {
return &InfrastructureCounters{
RuntimeCounters: InitRuntimeCounters(serviceName),
DatabaseCounters: InitDatabaseCounters(serviceName, db),
RequestCounters: InitRequestCounters(serviceName),
WorkerCounters: InitWorkerCounters(serviceName),
}
}
package infrastructure
import (
"github.com/prometheus/client_golang/prometheus"
)
// initialize run time counters
func InitRequestCounters(servicename string) *RequestCounters {
totalHttpRequest := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: servicename,
Subsystem: "requests",
Name: "http_requests_total",
Help: "How many HTTP requests processed, partitioned by status code and HTTP method.",
},
[]string{"code", "method", "path"},
)
httpRequestLatencySummaryCounter := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: servicename,
Subsystem: "requests",
Name: "http_request_latency_summary",
Help: "quantile summary of http request latency",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"method", "path"},
)
// httpRequestLatencyCounter.WithLabelValues("POST","/user").Observe(30 + math.Floor(120*math.Sin(float64(i)*0.1))/10)
return &RequestCounters{
RequestLatencySummaryCounters: RegisterSummaryVecMetric(*httpRequestLatencySummaryCounter),
TotalRequestsCounters: RegisterCounterVecMetric(*totalHttpRequest),
}
}
func (rc *RequestCounters) RegisterMetrics(){
_ = RegisterSummaryVecMetric(*rc.RequestLatencySummaryCounters)
_ = RegisterCounterVecMetric(*rc.TotalRequestsCounters)
}
package infrastructure
import (
"runtime"
"github.com/prometheus/client_golang/prometheus"
)
// initialize run time counters
func InitRuntimeCounters(servicename string) *RuntimeCounters {
numActiveGoRoutinesCounter := prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: servicename,
Subsystem: "runtime",
Name: "goroutines_count",
Help: "Number of goroutines that currently exist.",
},
func() float64 { return float64(runtime.NumGoroutine()) },
)
return &RuntimeCounters{
NumActiveGoRoutines: &numActiveGoRoutinesCounter,
}
}
func (rc *RuntimeCounters) RegisterMetrics(){
_ = RegisterGaugeMetric(*rc.NumActiveGoRoutines)
}
package infrastructure
import (
"github.com/prometheus/client_golang/prometheus"
)
// RegisterGaugeMetric registers a gauge metrics counter
func RegisterGaugeMetric(gaugeMetric prometheus.GaugeFunc) *prometheus.GaugeFunc {
err := prometheus.Register(gaugeMetric)
if err != nil {
if prometheus.Unregister(gaugeMetric) {
prometheus.MustRegister(gaugeMetric)
} else {
panic(err)
}
}
return &gaugeMetric
}
// RegisterCounterVecMetric registers a countervec metrics counter
func RegisterCounterVecMetric(counterMetric prometheus.CounterVec) *prometheus.CounterVec {
err := prometheus.Register(counterMetric)
if err != nil {
if prometheus.Unregister(counterMetric) {
prometheus.MustRegister(counterMetric)
} else {
panic(err)
}
}
return &counterMetric
}
// RegisterCounterMetric registers a counter
func RegisterCounterMetric(counterMetric prometheus.Counter) *prometheus.Counter {
err := prometheus.Register(counterMetric)
if err != nil {
if prometheus.Unregister(counterMetric) {
prometheus.MustRegister(counterMetric)
} else {
panic(err)
}
}
return &counterMetric
}
// RegisterSummaryVecMetric registers a summary vec counter
func RegisterSummaryVecMetric(summaryMetric prometheus.SummaryVec) *prometheus.SummaryVec {
err := prometheus.Register(summaryMetric)