Add opencensus to bb_scheduler and bb_worker

parent cfd42310
......@@ -13,6 +13,7 @@ go_library(
deps = [
"//pkg/blobstore:go_default_library",
"//pkg/builder:go_default_library",
"//pkg/opencensus:go_default_library",
"//pkg/proto/configuration/bb_scheduler:go_default_library",
"//pkg/proto/remoteworker:go_default_library",
"//pkg/util:go_default_library",
......
......@@ -10,6 +10,7 @@ import (
remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
re_blobstore "github.com/buildbarn/bb-remote-execution/pkg/blobstore"
"github.com/buildbarn/bb-remote-execution/pkg/builder"
"github.com/buildbarn/bb-remote-execution/pkg/opencensus"
"github.com/buildbarn/bb-remote-execution/pkg/proto/configuration/bb_scheduler"
"github.com/buildbarn/bb-remote-execution/pkg/proto/remoteworker"
blobstore_configuration "github.com/buildbarn/bb-storage/pkg/blobstore/configuration"
......@@ -32,6 +33,10 @@ func main() {
log.Fatalf("Failed to read configuration from %s: %s", os.Args[1], err)
}
if configuration.Jaeger != nil {
opencensus.Initialize(configuration.Jaeger, "bb_scheduler")
}
browserURL, err := url.Parse(configuration.BrowserUrl)
if err != nil {
log.Fatal("Failed to parse browser URL: ", err)
......
......@@ -11,6 +11,7 @@ go_library(
"//pkg/blobstore:go_default_library",
"//pkg/builder:go_default_library",
"//pkg/cas:go_default_library",
"//pkg/opencensus:go_default_library",
"//pkg/environment:go_default_library",
"//pkg/filesystem:go_default_library",
"//pkg/proto/configuration/bb_worker:go_default_library",
......
......@@ -16,6 +16,7 @@ import (
re_cas "github.com/buildbarn/bb-remote-execution/pkg/cas"
"github.com/buildbarn/bb-remote-execution/pkg/environment"
re_filesystem "github.com/buildbarn/bb-remote-execution/pkg/filesystem"
"github.com/buildbarn/bb-remote-execution/pkg/opencensus"
"github.com/buildbarn/bb-remote-execution/pkg/proto/configuration/bb_worker"
"github.com/buildbarn/bb-remote-execution/pkg/proto/remoteworker"
"github.com/buildbarn/bb-storage/pkg/blobstore"
......@@ -39,6 +40,10 @@ func main() {
log.Fatalf("Failed to read configuration from %s: %s", os.Args[1], err)
}
if configuration.Jaeger != nil {
opencensus.Initialize(configuration.Jaeger, "bb_worker")
}
browserURL, err := url.Parse(configuration.BrowserUrl)
if err != nil {
log.Fatal("Failed to parse browser URL: ", err)
......
......@@ -15,6 +15,7 @@ go_library(
"@go_googleapis//google/rpc:errdetails_go_proto",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)
......
......@@ -8,6 +8,7 @@ import (
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/util"
"go.opencensus.io/trace"
)
type pendingPutOperation struct {
......@@ -78,6 +79,12 @@ func (ba *batchedStoreBlobAccess) flushLocked(ctx context.Context) {
return
}
ctx, span := trace.StartSpan(ctx, "blobstore.BatchedStoreBlobAccess.upload")
span.AddAttributes(
trace.Int64Attribute("file-count", int64(len(missing))),
)
defer span.End()
// Upload the missing ones.
for _, digest := range missing {
key := digest.GetKey(ba.blobKeyFormat)
......@@ -92,6 +99,9 @@ func (ba *batchedStoreBlobAccess) flushLocked(ctx context.Context) {
}
func (ba *batchedStoreBlobAccess) Put(ctx context.Context, digest *util.Digest, b buffer.Buffer) error {
ctx, span := trace.StartSpan(ctx, "blobstore.BatchedStoreBlobAccess.Put")
defer span.End()
ba.lock.Lock()
defer ba.lock.Unlock()
......
......@@ -7,6 +7,7 @@ import (
"github.com/buildbarn/bb-storage/pkg/blobstore"
"github.com/buildbarn/bb-storage/pkg/blobstore/buffer"
"github.com/buildbarn/bb-storage/pkg/util"
"go.opencensus.io/trace"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
......@@ -28,6 +29,9 @@ func NewExistencePreconditionBlobAccess(blobAccess blobstore.BlobAccess) blobsto
}
func (ba *existencePreconditionBlobAccess) Get(ctx context.Context, digest *util.Digest) buffer.Buffer {
ctx, span := trace.StartSpan(ctx, "blobstore.ExistencePreconditionBlobAccess.Get")
defer span.End()
return buffer.WithErrorHandler(
ba.BlobAccess.Get(ctx, digest),
existencePreconditionErrorHandler{digest: digest})
......
......@@ -47,6 +47,10 @@ go_library(
"@io_bazel_rules_go//proto/wkt:timestamp_go_proto",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@io_opencensus_go//trace/propagation:go_default_library",
],
)
......
......@@ -13,8 +13,13 @@ import (
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"go.opencensus.io/trace"
"go.opencensus.io/trace/propagation"
)
// BuildClient is a client for the Remote Worker protocol. It can send
......@@ -60,7 +65,7 @@ func NewBuildClient(scheduler remoteworker.OperationQueueClient, buildExecutor B
}
}
func (bc *BuildClient) startExecution(executionRequest *remoteworker.DesiredState_Executing) {
func (bc *BuildClient) startExecution(executionRequest *remoteworker.DesiredState_Executing, parent trace.SpanContext) {
bc.stopExecution()
// Spawn the execution of the build action.
......@@ -69,6 +74,14 @@ func (bc *BuildClient) startExecution(executionRequest *remoteworker.DesiredStat
updates := make(chan *remoteworker.CurrentState_Executing, 10)
bc.executionUpdates = updates
go func() {
ctx, span := trace.StartSpanWithRemoteParent(ctx, "builder.BuildClient.Execute", parent)
attrs := []trace.Attribute{trace.StringAttribute("instance-name", bc.request.InstanceName)}
for k, v := range bc.request.WorkerId {
attrs = append(attrs, trace.StringAttribute(k, v))
}
span.AddAttributes(attrs...)
defer span.End()
updates <- &remoteworker.CurrentState_Executing{
ActionDigest: executionRequest.ActionDigest,
ExecutionState: &remoteworker.CurrentState_Executing_Completed{
......@@ -162,11 +175,20 @@ func (bc *BuildClient) Run() error {
// Inform scheduler of current worker state, potentially
// requesting new work.
response, err := bc.scheduler.Synchronize(context.Background(), &bc.request)
var header metadata.MD // variable to store header and trailer
response, err := bc.scheduler.Synchronize(context.Background(), &bc.request, grpc.Header(&header))
if err != nil {
return util.StatusWrap(err, "Failed to synchronize with scheduler")
}
traceContext := header["grpc-trace-bin"]
var parent trace.SpanContext
if len(traceContext) > 0 {
traceContextBinary := []byte(traceContext[0])
parent, _ = propagation.FromBinary(traceContextBinary)
}
// Determine when we should contact the scheduler again in case
// of no activity.
nextSynchronizationAt, err := ptypes.Timestamp(response.NextSynchronizationAt)
......@@ -182,7 +204,7 @@ func (bc *BuildClient) Run() error {
// Scheduler is requesting us to execute the
// next action, maybe forcing us to to stop
// execution of the current build action.
bc.startExecution(workerState.Executing)
bc.startExecution(workerState.Executing, parent)
case *remoteworker.DesiredState_Idle:
// Scheduler is forcing us to go back to idle.
bc.stopExecution()
......
......@@ -16,6 +16,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.opencensus.io/trace"
)
type cachingBuildExecutor struct {
......@@ -42,6 +44,9 @@ func NewCachingBuildExecutor(base BuildExecutor, contentAddressableStorage cas.C
}
func (be *cachingBuildExecutor) Execute(ctx context.Context, filePool filesystem.FilePool, instanceName string, request *remoteworker.DesiredState_Executing, executionStateUpdates chan<- *remoteworker.CurrentState_Executing) *remoteexecution.ExecuteResponse {
ctx, span := trace.StartSpan(ctx, "builder.CachingExecutor.Execute")
defer span.End()
response := be.base.Execute(ctx, filePool, instanceName, request, executionStateUpdates)
if actionDigest, err := util.NewDigest(instanceName, request.ActionDigest); err != nil {
attachErrorToExecuteResponse(response, util.StatusWrap(err, "Failed to extract digest for action"))
......
......@@ -24,9 +24,13 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/trace"
"go.opencensus.io/trace/propagation"
"google.golang.org/genproto/googleapis/longrunning"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
......@@ -223,9 +227,19 @@ func (bq *InMemoryBuildQueue) GetCapabilities(ctx context.Context, in *remoteexe
}, nil
}
type executeServerWrapper struct {
remoteexecution.Execution_ExecuteServer
ctx context.Context
}
func (w executeServerWrapper) Context() context.Context { return w.ctx }
// Execute an action by scheduling it in the build queue. This call
// blocks until the action is completed.
func (bq *InMemoryBuildQueue) Execute(in *remoteexecution.ExecuteRequest, out remoteexecution.Execution_ExecuteServer) error {
ctx, span := trace.StartSpan(out.Context(), "buildqueue.InMemory.Execute")
defer span.End()
// Fetch the action and command corresponding to the execute
// request. Ideally, a scheduler is be oblivious of what these
// look like, if it weren't for the fact that Action.DoNotCache
......@@ -234,7 +248,6 @@ func (bq *InMemoryBuildQueue) Execute(in *remoteexecution.ExecuteRequest, out re
// To prevent loading these messages from the Content
// Addressable Storage (CAS) multiple times, the scheduler holds
// on to them and passes them on to the workers.
ctx := out.Context()
actionDigest, err := util.NewDigest(in.InstanceName, in.ActionDigest)
if err != nil {
return util.StatusWrap(err, "Failed to extract digest for action")
......@@ -273,7 +286,7 @@ func (bq *InMemoryBuildQueue) Execute(in *remoteexecution.ExecuteRequest, out re
if argv := command.Arguments; len(argv) > 0 {
argv0 = argv[0]
}
o = &operation{
o = newOperationWithSpan(ctx, &operation{
platformQueue: pq,
desiredState: remoteworker.DesiredState_Executing{
ActionDigest: in.ActionDigest,
......@@ -289,7 +302,7 @@ func (bq *InMemoryBuildQueue) Execute(in *remoteexecution.ExecuteRequest, out re
currentStageStartTime: bq.now,
completionWakeup: make(chan struct{}),
}
})
bq.operationsNameMap[o.name] = o
if !action.DoNotCache {
pq.inFlightDeduplicationMap[inFlightDeduplicationKey] = o
......@@ -305,13 +318,23 @@ func (bq *InMemoryBuildQueue) Execute(in *remoteexecution.ExecuteRequest, out re
pq.wakeupNextWorker()
inMemoryBuildQueueOperationsQueuedTotal.WithLabelValues(platformKey.instance, platformKey.platform).Inc()
}
return o.waitExecution(bq, out)
return o.waitExecution(bq, executeServerWrapper{Execution_ExecuteServer: out, ctx: ctx})
}
type waitExecuteServerWrapper struct {
remoteexecution.Execution_WaitExecutionServer
ctx context.Context
}
func (w waitExecuteServerWrapper) Context() context.Context { return w.ctx }
// WaitExecution attaches to an existing operation that was created by
// Execute(). This call can be used by the client to reattach to an
// operation in case of network failure.
func (bq *InMemoryBuildQueue) WaitExecution(in *remoteexecution.WaitExecutionRequest, out remoteexecution.Execution_WaitExecutionServer) error {
ctx, span := trace.StartSpan(out.Context(), "buildqueue.InMemory.WaitExecution")
defer span.End()
bq.enter(bq.clock.Now())
defer bq.leave()
......@@ -319,7 +342,10 @@ func (bq *InMemoryBuildQueue) WaitExecution(in *remoteexecution.WaitExecutionReq
if !ok {
return status.Errorf(codes.NotFound, "Operation with name %#v not found", in.Name)
}
return o.waitExecution(bq, out)
o.addParentSpan(span)
return o.waitExecution(bq, waitExecuteServerWrapper{Execution_WaitExecutionServer: out, ctx: ctx})
}
// Synchronize the state of a worker with the scheduler. This call is
......@@ -1057,6 +1083,8 @@ type operation struct {
// obtain Prometheus metrics.
currentStageStartTime time.Time
span *trace.Span
// retryCount specifies how many additional times the operation
// was provided to the worker to which it was allocated. This
// counter may be non-zero in case of network flakiness or
......@@ -1068,6 +1096,29 @@ type operation struct {
cleanupKey cleanupKey
}
/// newOperationWithSpan creates a new operation with an included span
func newOperationWithSpan(ctx context.Context, o *operation) *operation {
ctx, span := trace.StartSpan(ctx, "builder.Operation")
span.AddAttributes(
trace.StringAttribute("instance", o.instanceName),
trace.StringAttribute("operation-id", o.name),
)
o.span = span
return o
}
/// linkSpan links a span to this operation (in the case of waitExecute)
func (o *operation) addParentSpan(span *trace.Span) {
spanContext := span.SpanContext()
if o.span != nil {
o.span.AddLink(trace.Link{
TraceID: spanContext.TraceID,
SpanID: spanContext.SpanID,
Type: trace.LinkTypeParent,
})
}
}
func (o *operation) getStage() remoteexecution.ExecutionStage_Value {
if o.executeResponse != nil {
return remoteexecution.ExecutionStage_COMPLETED
......@@ -1196,6 +1247,10 @@ func (o *operation) complete(bq *InMemoryBuildQueue, executeResponse *remoteexec
}
o.registerExecutingStageFinished(bq, result, grpcCode)
}
if o.span != nil {
o.span.End()
}
}
func (o *operation) remove(bq *InMemoryBuildQueue) {
......@@ -1273,9 +1328,15 @@ func (w *worker) getCurrentOperation() *operation {
// startOperation assigns an operation to the worker, returning a
// synchronization response that instructs the worker to start executing
// it.
func (w *worker) startOperation(bq *InMemoryBuildQueue, pq *platformQueue, o *operation) *remoteworker.SynchronizeResponse {
func (w *worker) startOperation(ctx context.Context, bq *InMemoryBuildQueue, pq *platformQueue, o *operation) *remoteworker.SynchronizeResponse {
w.currentOperation = o
o.registerQueuedStageFinished(bq)
// attach the span and trace IDs to the header so the worker knows
// which span to assign this execute response to
// todo(arlyon) should worker queue know about the span?...
grpc.SetHeader(ctx, metadata.Pairs("grpc-trace-bin", string(propagation.Binary(o.span.SpanContext()))))
return &remoteworker.SynchronizeResponse{
NextSynchronizationAt: bq.getNextSynchronizationAtDelay(),
DesiredState: &remoteworker.DesiredState{
......@@ -1293,7 +1354,7 @@ func (w *worker) startOperation(bq *InMemoryBuildQueue, pq *platformQueue, o *op
// instructs the worker to go idle.
func (w *worker) getNextOperation(ctx context.Context, bq *InMemoryBuildQueue, pq *platformQueue, workerID map[string]string) (*remoteworker.SynchronizeResponse, error) {
if o, ok := pq.getNextOperationNonBlocking(bq, workerID); ok {
return w.startOperation(bq, pq, o), nil
return w.startOperation(ctx, bq, pq, o), nil
}
if ctx == nil {
......@@ -1318,7 +1379,7 @@ func (w *worker) getNextOperation(ctx context.Context, bq *InMemoryBuildQueue, p
NextSynchronizationAt: bq.getCurrentTime(),
}, err
}
return w.startOperation(bq, pq, o), nil
return w.startOperation(ctx, bq, pq, o), nil
}
// getCurrentOrNextOperation either returns a synchronization response
......
This diff is collapsed.
......@@ -8,6 +8,8 @@ import (
"github.com/buildbarn/bb-storage/pkg/cas"
"github.com/buildbarn/bb-storage/pkg/filesystem"
"github.com/buildbarn/bb-storage/pkg/util"
"go.opencensus.io/trace"
)
type naiveInputRootPopulator struct {
......@@ -31,6 +33,12 @@ func NewNaiveInputRootPopulator(contentAddressableStorage cas.ContentAddressable
}
func (ex *naiveInputRootPopulator) populateInputDirectory(ctx context.Context, digest *util.Digest, inputDirectory filesystem.Directory, components []string) error {
ctx, span := trace.StartSpan(ctx, "builder.NativeInputRootPopulator.populateInputDirectory")
span.AddAttributes(
trace.StringAttribute("directory", path.Join(components...)),
)
defer span.End()
// Obtain directory.
directory, err := ex.contentAddressableStorage.GetDirectory(ctx, digest)
if err != nil {
......
......@@ -16,5 +16,6 @@ go_library(
"@com_github_buildbarn_bb_storage//pkg/filesystem:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/proto/cas:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/util:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)
......@@ -8,6 +8,7 @@ import (
"github.com/buildbarn/bb-storage/pkg/cas"
"github.com/buildbarn/bb-storage/pkg/eviction"
"github.com/buildbarn/bb-storage/pkg/util"
"go.opencensus.io/trace"
)
type directoryCachingContentAddressableStorage struct {
......@@ -47,6 +48,9 @@ func (cas *directoryCachingContentAddressableStorage) makeSpace() {
}
func (cas *directoryCachingContentAddressableStorage) GetDirectory(ctx context.Context, digest *util.Digest) (*remoteexecution.Directory, error) {
ctx, span := trace.StartSpan(ctx, "cas.DirectoryCaching.GetDirectory")
defer span.End()
key := digest.GetKey(cas.digestKeyFormat)
// Check the cache.
......
......@@ -8,6 +8,7 @@ import (
"github.com/buildbarn/bb-storage/pkg/eviction"
"github.com/buildbarn/bb-storage/pkg/filesystem"
"github.com/buildbarn/bb-storage/pkg/util"
"go.opencensus.io/trace"
)
type hardlinkingContentAddressableStorage struct {
......@@ -64,6 +65,9 @@ func (cas *hardlinkingContentAddressableStorage) makeSpace(size int64) error {
}
func (cas *hardlinkingContentAddressableStorage) GetFile(ctx context.Context, digest *util.Digest, directory filesystem.Directory, name string, isExecutable bool) error {
ctx, span := trace.StartSpan(ctx, "cas.HardLinking.GetFile")
defer span.End()
key := digest.GetKey(cas.digestKeyFormat)
if isExecutable {
key += "+x"
......
......@@ -8,6 +8,7 @@ import (
"github.com/buildbarn/bb-storage/pkg/filesystem"
cas_proto "github.com/buildbarn/bb-storage/pkg/proto/cas"
"github.com/buildbarn/bb-storage/pkg/util"
"go.opencensus.io/trace"
)
type readWriteDecouplingContentAddressableStorage struct {
......@@ -28,41 +29,71 @@ func NewReadWriteDecouplingContentAddressableStorage(reader cas.ContentAddressab
}
func (cas *readWriteDecouplingContentAddressableStorage) GetAction(ctx context.Context, digest *util.Digest) (*remoteexecution.Action, error) {
ctx, span := trace.StartSpan(ctx, "cas.RWDecoupling.GetAction")
defer span.End()
return cas.reader.GetAction(ctx, digest)
}
func (cas *readWriteDecouplingContentAddressableStorage) GetCommand(ctx context.Context, digest *util.Digest) (*remoteexecution.Command, error) {
ctx, span := trace.StartSpan(ctx, "cas.RWDecoupling.GetCommand")
defer span.End()
return cas.reader.GetCommand(ctx, digest)
}
func (cas *readWriteDecouplingContentAddressableStorage) GetDirectory(ctx context.Context, digest *util.Digest) (*remoteexecution.Directory, error) {
ctx, span := trace.StartSpan(ctx, "cas.RWDecoupling.GetDirectory")
defer span.End()
return cas.reader.GetDirectory(ctx, digest)
}
func (cas *readWriteDecouplingContentAddressableStorage) GetFile(ctx context.Context, digest *util.Digest, directory filesystem.Directory, name string, isExecutable bool) error {
ctx, span := trace.StartSpan(ctx, "cas.RWDecoupling.GetFile")
defer span.End()
return cas.reader.GetFile(ctx, digest, directory, name, isExecutable)
}
func (cas *readWriteDecouplingContentAddressableStorage) GetTree(ctx context.Context, digest *util.Digest) (*remoteexecution.Tree, error) {
ctx, span := trace.StartSpan(ctx, "cas.RWDecoupling.GetTree")
defer span.End()
return cas.reader.GetTree(ctx, digest)
}
func (cas *readWriteDecouplingContentAddressableStorage) GetUncachedActionResult(ctx context.Context, digest *util.Digest) (*cas_proto.UncachedActionResult, error) {
ctx, span := trace.StartSpan(ctx, "cas.RWDecoupling.GetUncachedActionResult")
defer span.End()
return cas.reader.GetUncachedActionResult(ctx, digest)
}
func (cas *readWriteDecouplingContentAddressableStorage) PutFile(ctx context.Context, directory filesystem.Directory, name string, parentDigest *util.Digest) (*util.Digest, error) {
ctx, span := trace.StartSpan(ctx, "cas.RWDecoupling.PutFile")
defer span.End()
return cas.writer.PutFile(ctx, directory, name, parentDigest)
}
func (cas *readWriteDecouplingContentAddressableStorage) PutLog(ctx context.Context, log []byte, parentDigest *util.Digest) (*util.Digest, error) {
ctx, span := trace.StartSpan(ctx, "cas.RWDecoupling.PutLog")
defer span.End()
return cas.writer.PutLog(ctx, log, parentDigest)
}
func (cas *readWriteDecouplingContentAddressableStorage) PutTree(ctx context.Context, tree *remoteexecution.Tree, parentDigest *util.Digest) (*util.Digest, error) {
ctx, span := trace.StartSpan(ctx, "cas.RWDecoupling.PutTree")
defer span.End()
return cas.writer.PutTree(ctx, tree, parentDigest)
}
func (cas *readWriteDecouplingContentAddressableStorage) PutUncachedActionResult(ctx context.Context, uncachedActionResult *cas_proto.UncachedActionResult, parentDigest *util.Digest) (*util.Digest, error) {
ctx, span := trace.StartSpan(ctx, "cas.RWDecoupling.PutUncachedActionResult")
defer span.End()
return cas.writer.PutUncachedActionResult(ctx, uncachedActionResult, parentDigest)
}
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["init.go"],
importpath = "github.com/buildbarn/bb-remote-execution/pkg/opencensus",
visibility = ["//visibility:public"],
deps = [
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/bb_storage:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@io_opencensus_go//plugin/ocgrpc:go_default_library",
"@io_opencensus_go//stats/view:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@io_opencensus_go//zpages:go_default_library",
"@io_opencensus_go_contrib_exporter_jaeger//:go_default_library",
"@io_opencensus_go_contrib_exporter_prometheus//:go_default_library",
],
)
package opencensus
import (
"log"
"contrib.go.opencensus.io/exporter/jaeger"
prometheus_exporter "contrib.go.opencensus.io/exporter/prometheus"
pb "github.com/buildbarn/bb-storage/pkg/proto/configuration/bb_storage"
"github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/plugin/ocgrpc"
"go.opencensus.io/stats/view"
"go.opencensus.io/trace"
"go.opencensus.io/zpages"
)
// Initialize sets up Opentracing with Jaeger and a Prometheus exporter.
func Initialize(configuration *pb.JaegerConfiguration, namespace string) {
if configuration != nil {
pe, err := prometheus_exporter.NewExporter(prometheus_exporter.Options{
Registry: prometheus.DefaultRegisterer.(*prometheus.Registry),
Namespace: namespace,
})
if err != nil {
log.Fatalf("Failed to create the Prometheus stats exporter: %v", err)
}
view.RegisterExporter(pe)
if err := view.Register(ocgrpc.DefaultServerViews...); err != nil {
log.Fatalf("Failed to register ocgrpc server views: %v", err)
}
zpages.Handle(nil, "/debug")
je, err := jaeger.NewExporter(jaeger.Options{
AgentEndpoint: configuration.AgentEndpoint,
CollectorEndpoint: configuration.CollectorEndpoint,
Process: jaeger.Process{
ServiceName: configuration.ServiceName,
},
})
if err != nil {
log.Fatal("Failed to create the Jaeger exporter:", err)
}
trace.RegisterExporter(je)
if configuration.AlwaysSample {
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
}
}
}
......@@ -10,6 +10,7 @@ proto_library(
deps = [
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/blobstore:blobstore_proto",
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/grpc:grpc_proto",
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/bb_storage:bb_storage_proto",
],
)
......@@ -21,6 +22,7 @@ go_proto_library(
deps = [
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/blobstore:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/grpc:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/bb_storage:go_default_library",
],
)
......
......@@ -4,6 +4,7 @@ package buildbarn.configuration.bb_scheduler;
import "pkg/proto/configuration/blobstore/blobstore.proto";
import "pkg/proto/configuration/grpc/grpc.proto";
import "pkg/proto/configuration/bb_storage/bb_storage.proto";
option go_package = "github.com/buildbarn/bb-remote-execution/pkg/proto/configuration/bb_scheduler";
......@@ -26,4 +27,7 @@ message ApplicationConfiguration {
// Configuration for blob storage.
buildbarn.configuration.blobstore.BlobstoreConfiguration blobstore = 6;
// Jaeger configuration for tracing.
buildbarn.configuration.bb_storage.JaegerConfiguration jaeger = 7;
}
......@@ -12,6 +12,7 @@ proto_library(
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/blobstore:blobstore_proto",
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/eviction:eviction_proto",
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/grpc:grpc_proto",
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/bb_storage:bb_storage_proto",
"@com_google_protobuf//:duration_proto",
],
)
......@@ -26,6 +27,7 @@ go_proto_library(
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/blobstore:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/eviction:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/grpc:go_default_library",
"@com_github_buildbarn_bb_storage//pkg/proto/configuration/bb_storage:go_default_library",
],
)
......
......@@ -7,6 +7,7 @@ import "google/protobuf/duration.proto";
import "pkg/proto/configuration/blobstore/blobstore.proto";
import "pkg/proto/configuration/eviction/eviction.proto";
import "pkg/proto/configuration/grpc/grpc.proto";
import "pkg/proto/configuration/bb_storage/bb_storage.proto";
option go_package = "github.com/buildbarn/bb-remote-execution/pkg/proto/configuration/bb_worker";
......@@ -46,6 +47,9 @@ message ApplicationConfiguration {
// Directory where temporary files generated by build actions may be
// stored. If left empty, temporary files are stored in memory.
string file_pool_directory_path = 18;
// Jaeger configuration for tracing.
buildbarn.configuration.bb_storage.JaegerConfiguration jaeger = 19;
}
message LocalBuildDirectoryConfiguration {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment