From f61acf90dedc3893a2fb32ab89d522385aa6a36a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Fri, 24 Apr 2026 20:39:53 +0200 Subject: [PATCH 1/2] poc for opentelemetry tracing --- common/tracing/tracing.go | 106 +++++++++++++++++++ core/config.go | 2 + core/core.go | 30 ++++++ core/environment/environment.go | 33 +++++- core/environment/transition.go | 3 +- core/environment/transition_configure.go | 21 +++- core/environment/transition_deploy.go | 20 +++- core/environment/transition_goerror.go | 22 +++- core/environment/transition_reset.go | 21 +++- core/environment/transition_startactivity.go | 21 +++- core/environment/transition_stopactivity.go | 21 +++- go.mod | 43 +++++--- go.sum | 54 ++++++++++ 13 files changed, 372 insertions(+), 25 deletions(-) create mode 100644 common/tracing/tracing.go diff --git a/common/tracing/tracing.go b/common/tracing/tracing.go new file mode 100644 index 000000000..f44cb3dba --- /dev/null +++ b/common/tracing/tracing.go @@ -0,0 +1,106 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2025 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +// Package tracing provides OpenTelemetry tracing initialisation for O² Control components. +package tracing + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.40.0" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" +) + +var Tracer trace.Tracer = noop.NewTracerProvider().Tracer("") + +type Span struct { + Ctx context.Context + span trace.Span +} + +func NewSpan(parent context.Context, name string) Span { + ctx, span := Tracer.Start(parent, name) + return Span{Ctx: ctx, span: span} +} + +func (s *Span) End() { + if s.span != nil { + s.span.End() + } +} + +func (s *Span) Span() trace.Span { + return s.span +} + +// Run initialises the global TracerProvider and sets the package-level Tracer. +// It returns a shutdown function that must be called on process exit. +// +// \param ctx parent context +// \param endpoint OTel collector gRPC endpoint, e.g. "localhost:4317" +// \param serviceName OTLP service.name attribute, e.g. "aliecs" +func Run(ctx context.Context, endpoint string, serviceName string) (func(context.Context) error, error) { + exp, err := otlptracegrpc.New(ctx, + otlptracegrpc.WithEndpoint(endpoint), + otlptracegrpc.WithInsecure(), + ) + if err != nil { + return nil, fmt.Errorf("tracing: failed to create OTLP exporter: %w", err) + } + + r, err := resource.Merge( + resource.Default(), + resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(serviceName), + ), + ) + if err != nil { + return nil, fmt.Errorf("tracing: failed to create resource: %w", err) + } + + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exp), + sdktrace.WithResource(r), + ) + + otel.SetTracerProvider(tp) + Tracer = otel.Tracer(serviceName) + + return tp.Shutdown, nil +} + +// Stop is a convenience wrapper — call it when you already hold the shutdown func. +func Stop(ctx context.Context, shutdown func(context.Context) error) error { + if shutdown == nil { + return nil + } + return shutdown(ctx) +} diff --git a/core/config.go b/core/config.go index 0f64cbac9..d948066d0 100644 --- a/core/config.go +++ b/core/config.go @@ -92,6 +92,7 @@ func setDefaults() error { viper.SetDefault("metrics.address", getenv("LIBPROCESS_IP", "127.0.0.1")) viper.SetDefault("metrics.port", getenvInt("PORT0", "64009")) viper.SetDefault("metrics.path", getenv("METRICS_API_PATH", "/metrics")) + viper.SetDefault("tracingEndpoint", "") viper.SetDefault("reposSshKey", "") viper.SetDefault("summaryMetrics", false) viper.SetDefault("verbose", false) @@ -165,6 +166,7 @@ func setFlags() error { pflag.String("metrics.address", viper.GetString("metrics.address"), "IP of metrics server") pflag.Int("metrics.port", viper.GetInt("metrics.port"), "Port of metrics server (listens on server.address)") pflag.String("metrics.path", viper.GetString("metrics.path"), "URI path to metrics endpoint") + pflag.String("tracingEndpoint", viper.GetString("tracingEndpoint"), "Endpoint of the OpenTelemetry collector gRPC service (`host:port`, tracing disabled if empty)") pflag.String("reposSshKey", viper.GetString("reposSshKey"), "Path to a readable private ssh key for repo operations") pflag.Bool("summaryMetrics", viper.GetBool("summaryMetrics"), "Collect summary metrics for tasks launched per-offer-cycle, offer processing time, etc.") pflag.Bool("verbose", viper.GetBool("verbose"), "Verbose logging") diff --git a/core/core.go b/core/core.go index f2370f11b..272b92caf 100644 --- a/core/core.go +++ b/core/core.go @@ -42,6 +42,7 @@ import ( "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/AliceO2Group/Control/common/monitoring" pb "github.com/AliceO2Group/Control/common/protos" + "github.com/AliceO2Group/Control/common/tracing" "github.com/AliceO2Group/Control/core/the" "github.com/AliceO2Group/Control/common/logger" @@ -74,6 +75,33 @@ func parseMetricsEndpoint(metricsEndpoint string) (error, uint16, string) { } } +func runTracing(ctx context.Context) func() { + endpoint := viper.GetString("tracingEndpoint") + if endpoint == "" { + log.WithField(infologger.Level, infologger.IL_Support).Info("Tracing disabled: tracingEndpoint not set") + return func() {} + } + + host, port, err := net.SplitHostPort(endpoint) + if err != nil || host == "" || port == "" { + log.WithField("error", err).Errorf("Invalid tracingEndpoint %q: expected host:port format", endpoint) + return func() {} + } + + shutdown, err := tracing.Run(ctx, endpoint, "aliecs") + if err != nil { + log.WithField("error", err).Error("Failed to initialize tracing") + return func() {} + } + + log.WithField(infologger.Level, infologger.IL_Support).Infof("Tracing started, exporting to %s", endpoint) + return func() { + if err := tracing.Stop(ctx, shutdown); err != nil { + log.WithField("error", err).Error("Failed to shut down tracing") + } + } +} + func runMetrics() { metricsEndpoint := viper.GetString("metricsEndpoint") err, port, endpoint := parseMetricsEndpoint(metricsEndpoint) @@ -150,6 +178,8 @@ func Run() error { // Plugins need to start after taskman is running, because taskman provides the FID integration.PluginsInstance().InitAll(state.taskman.GetFrameworkID()) + stopTracing := runTracing(ctx) + defer stopTracing() runMetrics() defer golangmetrics.Stop() defer monitoring.Stop() diff --git a/core/environment/environment.go b/core/environment/environment.go index fcdac227e..e69e91bd8 100644 --- a/core/environment/environment.go +++ b/core/environment/environment.go @@ -44,6 +44,9 @@ import ( "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/AliceO2Group/Control/common/monitoring" pb "github.com/AliceO2Group/Control/common/protos" + "github.com/AliceO2Group/Control/common/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/common/runtype" "github.com/AliceO2Group/Control/common/system" "github.com/AliceO2Group/Control/common/utils" @@ -92,6 +95,8 @@ type Environment struct { autoStopTimer *time.Timer autoStopCancelFcn context.CancelFunc + + tracing tracing.Span } func (env *Environment) NotifyEvent(e event.DeviceEvent) { @@ -120,6 +125,11 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, callsPendingAwait: make(map[string]callable.CallsMap), } + env.tracing = tracing.NewSpan(context.Background(), "environment") + env.tracing.Span().SetAttributes( + attribute.String("environment.id", envId.String()), + ) + // Make the KVs accessible to the workflow via ParentAdapter env.wfAdapter = workflow.NewParentAdapter( func() uid.ID { return env.Id() }, @@ -986,6 +996,20 @@ func (env *Environment) runTasksAsHooks(hooksToTrigger task.Tasks) (errorMap map } func (env *Environment) TryTransition(t Transition) (err error) { + span := tracing.NewSpan(env.tracing.Ctx, "TryTransition") + defer func() { + span.Span().SetAttributes( + attribute.String("transition", t.eventName()), + attribute.String("state.src", env.Sm.Current()), + ) + if err != nil { + span.Span().RecordError(err) + span.Span().SetStatus(codes.Error, err.Error()) + } else { + span.Span().SetStatus(codes.Ok, "") + } + span.End() + }() if !env.transitionMutex.TryLock() { log.WithField("partition", env.id.String()). Warnf("environment transition '%s' attempt delayed: transition '%s' in progress. waiting for completion or failure", t.eventName(), env.currentTransition) @@ -1022,7 +1046,7 @@ func (env *Environment) TryTransition(t Transition) (err error) { }) return } - err = env.Sm.Event(context.Background(), t.eventName(), t) + err = env.Sm.Event(context.Background(), t.eventName(), t, span.Ctx) if err != nil { the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{ @@ -1075,9 +1099,13 @@ func (env *Environment) handlerFunc() func(e *fsm.Event) { e.Cancel(errors.New("transition wrapping error")) return } + ctx, ok := e.Args[1].(context.Context) + if !ok { + ctx = context.Background() + } if transition.eventName() == e.Event { - transErr := transition.do(env) + transErr := transition.do(ctx, env) if transErr != nil { e.Cancel(transErr) } @@ -1251,6 +1279,7 @@ func (env *Environment) subscribeToWfState(taskman *task.Manager) { } func (env *Environment) unsubscribeFromWfState() { + env.tracing.End() // Use select to unblock in case the above goroutine // exits due to an ERROR state. If that's the case // we close the channel. diff --git a/core/environment/transition.go b/core/environment/transition.go index ecb563bf8..ef60202bb 100644 --- a/core/environment/transition.go +++ b/core/environment/transition.go @@ -25,6 +25,7 @@ package environment import ( + "context" "errors" "github.com/AliceO2Group/Control/common/monitoring" @@ -35,7 +36,7 @@ import ( type Transition interface { eventName() string check() error - do(*Environment) error + do(context.Context, *Environment) error } func MakeTransition(taskman *task.Manager, optype pb.ControlEnvironmentRequest_Optype) Transition { diff --git a/core/environment/transition_configure.go b/core/environment/transition_configure.go index 09d93eca9..a4f7215c9 100644 --- a/core/environment/transition_configure.go +++ b/core/environment/transition_configure.go @@ -25,12 +25,16 @@ package environment import ( + "context" "errors" "github.com/AliceO2Group/Control/core/workflow" "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/monitoring" + "github.com/AliceO2Group/Control/common/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/taskop" ) @@ -48,7 +52,7 @@ type ConfigureTransition struct { baseTransition } -func (t ConfigureTransition) do(env *Environment) (err error) { +func (t ConfigureTransition) do(ctx context.Context, env *Environment) (err error) { if env == nil { return errors.New("cannot transition in NIL environment") } @@ -56,6 +60,21 @@ func (t ConfigureTransition) do(env *Environment) (err error) { metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + span := tracing.NewSpan(ctx, "ConfigureTransition.do") + defer func() { + span.Span().SetAttributes( + attribute.String("transition", t.name), + attribute.String("envId", env.Id().String()), + ) + if err != nil { + span.Span().RecordError(err) + span.Span().SetStatus(codes.Error, err.Error()) + } else { + span.Span().SetStatus(codes.Ok, "") + } + span.End() + }() + wf := env.Workflow() activeTasks := workflow.GetActiveTasks(wf) diff --git a/core/environment/transition_deploy.go b/core/environment/transition_deploy.go index 9c0e49b5b..de32b9b4a 100644 --- a/core/environment/transition_deploy.go +++ b/core/environment/transition_deploy.go @@ -37,6 +37,9 @@ import ( "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/AliceO2Group/Control/common/monitoring" + "github.com/AliceO2Group/Control/common/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" "github.com/AliceO2Group/Control/core/task/taskop" @@ -62,7 +65,7 @@ type DeployTransition struct { removeRoles []string } -func (t DeployTransition) do(env *Environment) (err error) { +func (t DeployTransition) do(ctx context.Context, env *Environment) (err error) { if env == nil { return errors.New("cannot transition in NIL environment") } @@ -70,6 +73,21 @@ func (t DeployTransition) do(env *Environment) (err error) { metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + span := tracing.NewSpan(ctx, "DeployTransition.do") + defer func() { + span.Span().SetAttributes( + attribute.String("transition", t.name), + attribute.String("envId", env.Id().String()), + ) + if err != nil { + span.Span().RecordError(err) + span.Span().SetStatus(codes.Error, err.Error()) + } else { + span.Span().SetStatus(codes.Ok, "") + } + span.End() + }() + wf := env.Workflow() // Skip cleanup for anything other than readout-dataflow diff --git a/core/environment/transition_goerror.go b/core/environment/transition_goerror.go index e0844f6d5..dbe66efd7 100644 --- a/core/environment/transition_goerror.go +++ b/core/environment/transition_goerror.go @@ -25,7 +25,12 @@ package environment import ( + "context" + "github.com/AliceO2Group/Control/common/monitoring" + "github.com/AliceO2Group/Control/common/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/core/controlcommands" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" @@ -44,10 +49,25 @@ type GoErrorTransition struct { baseTransition } -func (t GoErrorTransition) do(env *Environment) (err error) { +func (t GoErrorTransition) do(ctx context.Context, env *Environment) (err error) { metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + span := tracing.NewSpan(ctx, "GoErrorTransition.do") + defer func() { + span.Span().SetAttributes( + attribute.String("transition", t.name), + attribute.String("envId", env.Id().String()), + ) + if err != nil { + span.Span().RecordError(err) + span.Span().SetStatus(codes.Error, err.Error()) + } else { + span.Span().SetStatus(codes.Ok, "") + } + span.End() + }() + // we stop all tasks which are in RUNNING toStop := env.Workflow().GetTasks().Filtered(func(t *task.Task) bool { t.SetSafeToStop(true) diff --git a/core/environment/transition_reset.go b/core/environment/transition_reset.go index 84bfd44da..72ccc5ea5 100644 --- a/core/environment/transition_reset.go +++ b/core/environment/transition_reset.go @@ -25,10 +25,14 @@ package environment import ( + "context" "errors" "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/monitoring" + "github.com/AliceO2Group/Control/common/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" "github.com/AliceO2Group/Control/core/workflow" @@ -47,7 +51,7 @@ type ResetTransition struct { baseTransition } -func (t ResetTransition) do(env *Environment) (err error) { +func (t ResetTransition) do(ctx context.Context, env *Environment) (err error) { if env == nil { return errors.New("cannot transition in NIL environment") } @@ -55,6 +59,21 @@ func (t ResetTransition) do(env *Environment) (err error) { metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + span := tracing.NewSpan(ctx, "ResetTransition.do") + defer func() { + span.Span().SetAttributes( + attribute.String("transition", t.name), + attribute.String("envId", env.Id().String()), + ) + if err != nil { + span.Span().RecordError(err) + span.Span().SetStatus(codes.Error, err.Error()) + } else { + span.Span().SetStatus(codes.Ok, "") + } + span.End() + }() + taskmanMessage := task.NewTransitionTaskMessage( workflow.GetActiveTasks(env.Workflow()), sm.CONFIGURED.String(), diff --git a/core/environment/transition_startactivity.go b/core/environment/transition_startactivity.go index 47a547c1b..f16e2ab22 100644 --- a/core/environment/transition_startactivity.go +++ b/core/environment/transition_startactivity.go @@ -25,6 +25,7 @@ package environment import ( + "context" "errors" "strconv" @@ -34,6 +35,9 @@ import ( "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/AliceO2Group/Control/common/monitoring" + "github.com/AliceO2Group/Control/common/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/core/controlcommands" "github.com/AliceO2Group/Control/core/task" "github.com/iancoleman/strcase" @@ -68,7 +72,7 @@ type StartActivityTransition struct { baseTransition } -func (t StartActivityTransition) do(env *Environment) (err error) { +func (t StartActivityTransition) do(ctx context.Context, env *Environment) (err error) { if env == nil { return errors.New("cannot transition in NIL environment") } @@ -76,6 +80,21 @@ func (t StartActivityTransition) do(env *Environment) (err error) { metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + span := tracing.NewSpan(ctx, "StartActivityTransition.do") + defer func() { + span.Span().SetAttributes( + attribute.String("transition", t.name), + attribute.String("envId", env.Id().String()), + ) + if err != nil { + span.Span().RecordError(err) + span.Span().SetStatus(codes.Error, err.Error()) + } else { + span.Span().SetStatus(codes.Ok, "") + } + span.End() + }() + runNumber := env.currentRunNumber log.WithField(infologger.Run, runNumber). diff --git a/core/environment/transition_stopactivity.go b/core/environment/transition_stopactivity.go index 7e55fbe62..c67034fe7 100644 --- a/core/environment/transition_stopactivity.go +++ b/core/environment/transition_stopactivity.go @@ -25,11 +25,15 @@ package environment import ( + "context" "errors" "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/AliceO2Group/Control/common/monitoring" + "github.com/AliceO2Group/Control/common/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/core/controlcommands" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" @@ -59,7 +63,7 @@ type StopActivityTransition struct { baseTransition } -func (t StopActivityTransition) do(env *Environment) (err error) { +func (t StopActivityTransition) do(ctx context.Context, env *Environment) (err error) { if env == nil { return errors.New("cannot transition in NIL environment") } @@ -67,6 +71,21 @@ func (t StopActivityTransition) do(env *Environment) (err error) { metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + span := tracing.NewSpan(ctx, "StopActivityTransition.do") + defer func() { + span.Span().SetAttributes( + attribute.String("transition", t.name), + attribute.String("envId", env.Id().String()), + ) + if err != nil { + span.Span().RecordError(err) + span.Span().SetStatus(codes.Error, err.Error()) + } else { + span.Span().SetStatus(codes.Ok, "") + } + span.End() + }() + log.WithField(infologger.Run, env.currentRunNumber). WithField("partition", env.Id().String()). WithField(infologger.Level, infologger.IL_Support). diff --git a/go.mod b/go.mod index c47808e51..8d53340f5 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/AliceO2Group/Control -go 1.24.2 +go 1.25.0 // github.com/coreos/bbolt@v1.3.4: parsing go.mod: // module declares its path as: go.etcd.io/bbolt @@ -58,12 +58,12 @@ require ( github.com/teo/logrus-prefixed-formatter v0.5.3-0.20230717095749-669d57324f0a github.com/valyala/fasttemplate v1.2.2 github.com/xlab/treeprint v1.2.0 - golang.org/x/crypto v0.45.0 - golang.org/x/net v0.47.0 - golang.org/x/sys v0.38.0 - google.golang.org/grpc v1.62.1 + golang.org/x/crypto v0.49.0 + golang.org/x/net v0.52.0 + golang.org/x/sys v0.42.0 + google.golang.org/grpc v1.80.0 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 - google.golang.org/protobuf v1.34.1 + google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 ) @@ -87,17 +87,19 @@ require ( github.com/ProtonMail/go-crypto v1.1.6 // indirect github.com/armon/go-metrics v0.5.3 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cloudflare/circl v1.6.3 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/cyphar/filepath-securejoin v0.4.1 // indirect github.com/emirpasic/gods v1.18.1 // indirect - github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect + github.com/envoyproxy/protoc-gen-validate v1.3.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/gdamore/encoding v1.0.1 // indirect github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect github.com/go-git/go-billy/v5 v5.6.2 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/spec v0.21.0 // indirect @@ -106,6 +108,7 @@ require ( github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-hclog v1.6.2 // indirect @@ -159,15 +162,23 @@ require ( github.com/urfave/cli/v2 v2.3.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel v1.43.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0 // indirect + go.opentelemetry.io/otel/metric v1.43.0 // indirect + go.opentelemetry.io/otel/sdk v1.43.0 // indirect + go.opentelemetry.io/otel/trace v1.43.0 // indirect + go.opentelemetry.io/proto/otlp v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/mod v0.29.0 // indirect - golang.org/x/sync v0.18.0 // indirect - golang.org/x/term v0.37.0 // indirect - golang.org/x/text v0.31.0 // indirect - golang.org/x/tools v0.38.0 // indirect + golang.org/x/mod v0.33.0 // indirect + golang.org/x/sync v0.20.0 // indirect + golang.org/x/term v0.41.0 // indirect + golang.org/x/text v0.35.0 // indirect + golang.org/x/tools v0.42.0 // indirect google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 70b2612e7..1ef63111e 100644 --- a/go.sum +++ b/go.sum @@ -33,9 +33,13 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/briandowns/spinner v1.23.0 h1:alDF2guRWqa/FOZZYWjlMIx2L6H0wyewPxo/CH4Pt2A= github.com/briandowns/spinner v1.23.0/go.mod h1:rPG4gmXeN3wQV/TsAY4w8lPdIM6RX3yqeBQJSrbXjuE= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/cloudflare/circl v1.6.3 h1:9GPOhQGF9MCYUeXyMYlqTR6a5gTrgR/fBLXvUgtVcg8= @@ -61,6 +65,8 @@ github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/protoc-gen-validate v1.0.4 h1:gVPz/FMfvh57HdSJQyvBtF00j8JU4zdyUgIUNhlgg0A= github.com/envoyproxy/protoc-gen-validate v1.0.4/go.mod h1:qys6tmnRsYrQqIhm2bvKZH4Blx/1gTIZ2UKVY1M+Yew= +github.com/envoyproxy/protoc-gen-validate v1.3.0 h1:TvGH1wof4H33rezVKWSpqKz5NXWg5VPuZ0uONDT6eb4= +github.com/envoyproxy/protoc-gen-validate v1.3.0/go.mod h1:HvYl7zwPa5mffgyeTUHA9zHIH36nmrm7oCbo4YKoSWA= github.com/expr-lang/expr v1.17.7 h1:Q0xY/e/2aCIp8g9s/LGvMDCC5PxYlvHgDZRQ4y16JX8= github.com/expr-lang/expr v1.17.7/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= @@ -96,8 +102,13 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= @@ -139,6 +150,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= github.com/hashicorp/consul/api v1.28.2 h1:mXfkRHrpHN4YY3RqL09nXU1eHKLNiuAN4kHvDQ16k/8= github.com/hashicorp/consul/api v1.28.2/go.mod h1:KyzqzgMEya+IZPcD65YFoOVAgPpbfERu4I/tzG6/ueE= github.com/hashicorp/consul/sdk v0.16.0 h1:SE9m0W6DEfgIVCJX7xU+iv/hUl4m/nxqMTnCdMxDpJ8= @@ -410,6 +423,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/swaggo/files/v2 v2.0.0 h1:hmAt8Dkynw7Ssz46F6pn8ok6YmGZqHSVLZ+HQM7i0kw= @@ -440,6 +454,22 @@ github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= +go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 h1:88Y4s2C8oTui1LGM6bTWkw0ICGcOLCAI5l6zsD1j20k= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0/go.mod h1:Vl1/iaggsuRlrHf/hfPJPvVag77kKyvrLeD10kpMl+A= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0 h1:RAE+JPfvEmvy+0LzyUA25/SGawPwIUbZ6u0Wug54sLc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.43.0/go.mod h1:AGmbycVGEsRx9mXMZ75CsOyhSP6MFIcj/6dnG+vhVjk= +go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM= +go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY= +go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg= +go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg= +go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A= +go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0= +go.opentelemetry.io/proto/otlp v1.10.0 h1:IQRWgT5srOCYfiWnpqUYz9CVmbO8bFmKcwYxpuCSL2g= +go.opentelemetry.io/proto/otlp v1.10.0/go.mod h1:/CV4QoCR/S9yaPj8utp3lvQPoqMtxXdzn7ozvvozVqk= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -452,6 +482,8 @@ golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= +golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4= +golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -460,6 +492,8 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91 golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= +golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= +golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -476,6 +510,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -486,6 +522,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -521,6 +559,8 @@ golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -529,6 +569,8 @@ golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= +golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= +golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -541,6 +583,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= +golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -551,6 +595,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= +golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= +golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -559,14 +605,22 @@ google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 h1:KAeGQVN3M9nD0/b google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80/go.mod h1:cc8bqMqtv9gMOr0zHg2Vzff5ULhhL2IXP4sbcn32Dro= google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 h1:Lj5rbfG876hIAYFjqiJnPHfhXbv+nzTWfm04Fg/XSVU= google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA= +google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:VPWxll4HlMw1Vs/qXtN7BvhZqsS9cdAittCNvVENElA= +google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M= google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= +google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 260e57f6e75a729071f0e75282d6263b873e4dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Mon, 27 Apr 2026 15:59:43 +0200 Subject: [PATCH 2/2] more traces; currently not displaying properly in opensearch --- common/tracing/tracing.go | 18 +- core/environment/environment.go | 170 ++++++++++++++----- core/environment/hooks_test.go | 54 ++++-- core/environment/manager.go | 15 +- core/environment/transition_configure.go | 20 +-- core/environment/transition_deploy.go | 20 +-- core/environment/transition_goerror.go | 20 +-- core/environment/transition_reset.go | 20 +-- core/environment/transition_startactivity.go | 20 +-- core/environment/transition_stopactivity.go | 18 +- core/workflow/callable/call.go | 40 ++++- 11 files changed, 268 insertions(+), 147 deletions(-) diff --git a/common/tracing/tracing.go b/common/tracing/tracing.go index f44cb3dba..f7b37762e 100644 --- a/common/tracing/tracing.go +++ b/common/tracing/tracing.go @@ -30,6 +30,7 @@ import ( "fmt" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -45,8 +46,8 @@ type Span struct { span trace.Span } -func NewSpan(parent context.Context, name string) Span { - ctx, span := Tracer.Start(parent, name) +func NewSpan(parent context.Context, name string, opts ...trace.SpanStartOption) Span { + ctx, span := Tracer.Start(parent, name, opts...) return Span{Ctx: ctx, span: span} } @@ -56,6 +57,19 @@ func (s *Span) End() { } } +// SetError records err on the span and sets its status. Pass nil to mark the span OK. +func (s *Span) SetError(err error) { + if s.span == nil { + return + } + if err != nil { + s.span.RecordError(err) + s.span.SetStatus(codes.Error, err.Error()) + } else { + s.span.SetStatus(codes.Ok, "") + } +} + func (s *Span) Span() trace.Span { return s.span } diff --git a/core/environment/environment.go b/core/environment/environment.go index e69e91bd8..780221a9e 100644 --- a/core/environment/environment.go +++ b/core/environment/environment.go @@ -44,11 +44,9 @@ import ( "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/AliceO2Group/Control/common/monitoring" pb "github.com/AliceO2Group/Control/common/protos" - "github.com/AliceO2Group/Control/common/tracing" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/common/runtype" "github.com/AliceO2Group/Control/common/system" + "github.com/AliceO2Group/Control/common/tracing" "github.com/AliceO2Group/Control/common/utils" "github.com/AliceO2Group/Control/common/utils/uid" "github.com/AliceO2Group/Control/core/task" @@ -60,6 +58,8 @@ import ( "github.com/looplab/fsm" "github.com/pborman/uuid" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var log = logger.New(logrus.StandardLogger(), "env") @@ -125,9 +125,10 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, callsPendingAwait: make(map[string]callable.CallsMap), } - env.tracing = tracing.NewSpan(context.Background(), "environment") - env.tracing.Span().SetAttributes( - attribute.String("environment.id", envId.String()), + env.tracing = tracing.NewSpan(context.Background(), "environment", + trace.WithAttributes( + attribute.String("environment.id", envId.String()), + ), ) // Make the KVs accessible to the workflow via ParentAdapter @@ -170,12 +171,23 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, {Name: "RECOVER", Src: []string{"ERROR"}, Dst: "DEPLOYED"}, }, fsm.Callbacks{ - "before_event": func(_ context.Context, e *fsm.Event) { + "before_event": func(ctx context.Context, e *fsm.Event) { env.Mu.Lock() env.currentTransition = e.Event env.Mu.Unlock() trigger := fmt.Sprintf("before_%s", e.Event) + span := tracing.NewSpan(ctx, "before_event", + trace.WithAttributes( + attribute.String("envId", env.Id().String()), + attribute.String("event", e.Event), + attribute.String("dst", e.Dst), + attribute.String("src", e.Src), + attribute.String("trigger", trigger), + ), + ) + ctx = span.Ctx + defer span.End() the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{ EnvironmentId: env.id.String(), @@ -190,7 +202,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, }) // first, we execute hooks which should be executed before an event officially starts - errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger) + errHooks := env.handleHooksWithNegativeWeights(ctx, env.Workflow(), trigger) if errHooks != nil { e.Cancel(errHooks) the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{ @@ -315,7 +327,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, ) } - errHooks = env.handleHooksWithPositiveWeights(env.Workflow(), trigger) + errHooks = env.handleHooksWithPositiveWeights(ctx, env.Workflow(), trigger) if errHooks != nil { e.Cancel(errHooks) } @@ -338,8 +350,19 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, WorkflowTemplateInfo: env.GetWorkflowInfo(), }) }, - "leave_state": func(_ context.Context, e *fsm.Event) { + "leave_state": func(ctx context.Context, e *fsm.Event) { trigger := fmt.Sprintf("leave_%s", e.Src) + span := tracing.NewSpan(ctx, "leave_state", + trace.WithAttributes( + attribute.String("envId", env.Id().String()), + attribute.String("event", e.Event), + attribute.String("dst", e.Dst), + attribute.String("src", e.Src), + attribute.String("trigger", trigger), + ), + ) + ctx = span.Ctx + defer span.End() the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{ EnvironmentId: env.id.String(), @@ -353,7 +376,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, WorkflowTemplateInfo: env.GetWorkflowInfo(), }) - errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger) + errHooks := env.handleHooksWithNegativeWeights(ctx, env.Workflow(), trigger) // fixme: in principle we should not need it anymore, since both STOP_ACTIVITY and GO_ERROR set EOR // We might leave RUNNING not only through STOP_ACTIVITY. In such cases we also need a run stop time. if e.Src == "RUNNING" { @@ -382,7 +405,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, return } - errHooks = env.handleHooksWithPositiveWeights(env.Workflow(), trigger) + errHooks = env.handleHooksWithPositiveWeights(ctx, env.Workflow(), trigger) if errHooks != nil { e.Cancel(errHooks) } @@ -421,7 +444,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, WorkflowTemplateInfo: env.GetWorkflowInfo(), }) - env.handlerFunc()(e) + env.handlerFunc()(ctx, e) eventState := e.Dst // we set the destination state here instead of the current for the event write, if the tasks have transitioned transitionStatus := pb.OpStatus_ONGOING @@ -444,9 +467,21 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, WorkflowTemplateInfo: env.GetWorkflowInfo(), }) }, - "enter_state": func(_ context.Context, e *fsm.Event) { + "enter_state": func(ctx context.Context, e *fsm.Event) { trigger := fmt.Sprintf("enter_%s", e.Dst) + span := tracing.NewSpan(ctx, "enter_state", + trace.WithAttributes( + attribute.String("envId", env.Id().String()), + attribute.String("event", e.Event), + attribute.String("dst", e.Dst), + attribute.String("src", e.Src), + attribute.String("trigger", trigger), + ), + ) + ctx = span.Ctx + defer span.End() + the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{ EnvironmentId: env.id.String(), State: env.Sm.Current(), @@ -459,12 +494,12 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, WorkflowTemplateInfo: env.GetWorkflowInfo(), }) - errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger) + errHooks := env.handleHooksWithNegativeWeights(ctx, env.Workflow(), trigger) enterStateTimeMs = strconv.FormatInt(time.Now().UnixMilli(), 10) env.workflow.SetRuntimeVar("enter_state_time_ms", enterStateTimeMs) - errHooks = errors.Join(errHooks, env.handleHooksWithPositiveWeights(env.Workflow(), trigger)) + errHooks = errors.Join(errHooks, env.handleHooksWithPositiveWeights(ctx, env.Workflow(), trigger)) if errHooks != nil { // at enter_ it will not cancel the transition but only set the error e.Cancel(errHooks) @@ -501,7 +536,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, "partition": envId, }).Debug("environment.sm entering state") }, - "after_event": func(_ context.Context, e *fsm.Event) { + "after_event": func(ctx context.Context, e *fsm.Event) { defer func() { env.Mu.Lock() env.currentTransition = "" @@ -510,6 +545,18 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, trigger := fmt.Sprintf("after_%s", e.Event) + span := tracing.NewSpan(ctx, "after_event", + trace.WithAttributes( + attribute.String("envId", env.Id().String()), + attribute.String("event", e.Event), + attribute.String("dst", e.Dst), + attribute.String("src", e.Src), + attribute.String("trigger", trigger), + ), + ) + ctx = span.Ctx + defer span.End() + the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{ EnvironmentId: env.id.String(), State: env.Sm.Current(), @@ -522,7 +569,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, WorkflowTemplateInfo: env.GetWorkflowInfo(), }) - errHooks := env.handleHooksWithNegativeWeights(env.Workflow(), trigger) + errHooks := env.handleHooksWithNegativeWeights(ctx, env.Workflow(), trigger) if errHooks != nil { // at after_ it will not cancel the transition but only set the error e.Cancel(errHooks) @@ -626,7 +673,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, env.invalidateAutoStopTransition() } - errHooks = errors.Join(errHooks, env.handleHooksWithPositiveWeights(env.Workflow(), trigger)) + errHooks = errors.Join(errHooks, env.handleHooksWithPositiveWeights(ctx, env.Workflow(), trigger)) if errHooks != nil { e.Cancel(errHooks) } @@ -667,7 +714,7 @@ func newEnvironment(userVars map[string]string, newId uid.ID) (env *Environment, return } -func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weightPredicate func(callable.HookWeight) bool) (err error) { +func (env *Environment) handleHooks(ctx context.Context, workflow workflow.Role, trigger string, weightPredicate func(callable.HookWeight) bool) (err error) { // Starting point: get all hooks to be started for the current trigger hooksMapForTrigger := workflow.GetHooksMapForTrigger(trigger) callsMapForAwait := env.callsPendingAwait[trigger] @@ -678,6 +725,16 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weig metric.AddTag("runtype", env.GetRunType().String()) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + span := tracing.NewSpan(ctx, "handleHooks", + trace.WithAttributes( + attribute.String("trigger", trigger), + attribute.String("envId", env.id.String()), + attribute.String("runtype", env.GetRunType().String()), + ), + ) + ctx = span.Ctx + defer span.End() + allWeightsSet := make(callable.HooksMap) for k := range hooksMapForTrigger { allWeightsSet[k] = callable.Hooks{} @@ -728,7 +785,7 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weig env.callsPendingAwait[awaitName][awaitWeight] = append( env.callsPendingAwait[awaitName][awaitWeight], call) } - callsToStart.StartAll() // returns immediately (async) + callsToStart.StartAll(ctx) // returns immediately (async) } } @@ -820,23 +877,54 @@ func (env *Environment) handleHooks(workflow workflow.Role, trigger string, weig } } -func (env *Environment) handleAllHooks(workflow workflow.Role, trigger string) (err error) { +func (env *Environment) handleAllHooks(ctx context.Context, workflow workflow.Role, trigger string) (err error) { log.WithField("partition", env.id).Debugf("begin handling hooks for trigger %s", trigger) defer utils.TimeTrack(time.Now(), fmt.Sprintf("finished handling hooks for trigger %s", trigger), log.WithPrefix("env").WithField("partition", env.id)) - return env.handleHooks(workflow, trigger, func(w callable.HookWeight) bool { return true }) + span := tracing.NewSpan(ctx, "handleAllHooks", + trace.WithAttributes( + attribute.String("trigger", trigger), + attribute.String("envId", env.id.String()), + attribute.String("runtype", env.GetRunType().String()), + ), + ) + ctx = span.Ctx + defer span.End() + + return env.handleHooks(ctx, workflow, trigger, func(w callable.HookWeight) bool { return true }) } -func (env *Environment) handleHooksWithNegativeWeights(workflow workflow.Role, trigger string) (err error) { +func (env *Environment) handleHooksWithNegativeWeights(ctx context.Context, workflow workflow.Role, trigger string) (err error) { log.WithField("partition", env.id).Debugf("begin handling hooks with negative weights for trigger %s", trigger) defer utils.TimeTrack(time.Now(), fmt.Sprintf("finished handling hooks with negative weights for trigger %s", trigger), log.WithPrefix("env").WithField("partition", env.id)) - return env.handleHooks(workflow, trigger, func(w callable.HookWeight) bool { return w < 0 }) + span := tracing.NewSpan(ctx, "handleHooksWithNegativeWeights", + trace.WithAttributes( + attribute.String("trigger", trigger), + attribute.String("envId", env.id.String()), + attribute.String("runtype", env.GetRunType().String()), + ), + ) + ctx = span.Ctx + defer span.End() + + return env.handleHooks(ctx, workflow, trigger, func(w callable.HookWeight) bool { return w < 0 }) } // "positive" include 0 -func (env *Environment) handleHooksWithPositiveWeights(workflow workflow.Role, trigger string) (err error) { +func (env *Environment) handleHooksWithPositiveWeights(ctx context.Context, workflow workflow.Role, trigger string) (err error) { log.WithField("partition", env.id).Debugf("begin handling hooks with positive weights for trigger %s", trigger) defer utils.TimeTrack(time.Now(), fmt.Sprintf("finished handling hooks with positive weights for trigger %s", trigger), log.WithPrefix("env").WithField("partition", env.id)) - return env.handleHooks(workflow, trigger, func(w callable.HookWeight) bool { return w >= 0 }) + + span := tracing.NewSpan(ctx, "handleHooksWithPositiveWeights", + trace.WithAttributes( + attribute.String("trigger", trigger), + attribute.String("envId", env.id.String()), + attribute.String("runtype", env.GetRunType().String()), + ), + ) + ctx = span.Ctx + defer span.End() + + return env.handleHooks(ctx, workflow, trigger, func(w callable.HookWeight) bool { return w >= 0 }) } // runTasksAsHooks returns a map of failed hook tasks and their respective error values. @@ -996,18 +1084,14 @@ func (env *Environment) runTasksAsHooks(hooksToTrigger task.Tasks) (errorMap map } func (env *Environment) TryTransition(t Transition) (err error) { - span := tracing.NewSpan(env.tracing.Ctx, "TryTransition") - defer func() { - span.Span().SetAttributes( + span := tracing.NewSpan(env.tracing.Ctx, fmt.Sprintf("TryTransition-%s", t.eventName()), + trace.WithAttributes( attribute.String("transition", t.eventName()), - attribute.String("state.src", env.Sm.Current()), - ) - if err != nil { - span.Span().RecordError(err) - span.Span().SetStatus(codes.Error, err.Error()) - } else { - span.Span().SetStatus(codes.Ok, "") - } + attribute.String("state", env.Sm.Current()), + ), + ) + defer func() { + span.SetError(err) span.End() }() if !env.transitionMutex.TryLock() { @@ -1046,7 +1130,7 @@ func (env *Environment) TryTransition(t Transition) (err error) { }) return } - err = env.Sm.Event(context.Background(), t.eventName(), t, span.Ctx) + err = env.Sm.Event(span.Ctx, t.eventName(), t) if err != nil { the.EventWriterWithTopic(topic.Environment).WriteEvent(&pb.Ev_EnvironmentEvent{ @@ -1075,11 +1159,11 @@ func (env *Environment) TryTransition(t Transition) (err error) { return } -func (env *Environment) handlerFunc() func(e *fsm.Event) { +func (env *Environment) handlerFunc() func(ctx context.Context, e *fsm.Event) { if env == nil { return nil } - return func(e *fsm.Event) { + return func(ctx context.Context, e *fsm.Event) { if e.Err != nil { // If the event was already cancelled return } @@ -1099,10 +1183,6 @@ func (env *Environment) handlerFunc() func(e *fsm.Event) { e.Cancel(errors.New("transition wrapping error")) return } - ctx, ok := e.Args[1].(context.Context) - if !ok { - ctx = context.Background() - } if transition.eventName() == e.Event { transErr := transition.do(ctx, env) diff --git a/core/environment/hooks_test.go b/core/environment/hooks_test.go index b4d70131e..a49ea800c 100644 --- a/core/environment/hooks_test.go +++ b/core/environment/hooks_test.go @@ -3,6 +3,7 @@ package environment import ( "context" "fmt" + "github.com/AliceO2Group/Control/common/utils/uid" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/workflow" @@ -26,7 +27,7 @@ func NewDummyTransition(transition string, fail bool) Transition { } } -func (t DummyTransition) do(env *Environment) (err error) { +func (t DummyTransition) do(ctx context.Context, env *Environment) (err error) { if t.fail { return fmt.Errorf("transition successfully failed") } @@ -50,7 +51,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call", task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "before_CONFIGURE"}, "testplugin.Test()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("DEPLOYED") @@ -68,7 +70,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call", task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "before_CONFIGURE"}, "testplugin.Test()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("DEPLOYED") env.workflow.GetUserVars().Set("testplugin_fail", "true") @@ -88,7 +91,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call", task.Traits{Trigger: "leave_DEPLOYED", Timeout: "5s", Critical: true, Await: "leave_DEPLOYED"}, "testplugin.Test()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("DEPLOYED") env.workflow.GetUserVars().Set("testplugin_fail", "true") @@ -109,7 +113,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call", task.Traits{Trigger: "enter_CONFIGURED", Timeout: "5s", Critical: true, Await: "enter_CONFIGURED"}, "testplugin.Test()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("DEPLOYED") env.workflow.GetUserVars().Set("testplugin_fail", "true") @@ -130,7 +135,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call", task.Traits{Trigger: "after_CONFIGURE", Timeout: "5s", Critical: true, Await: "after_CONFIGURE"}, "testplugin.Test()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("DEPLOYED") env.workflow.GetUserVars().Set("testplugin_fail", "true") @@ -150,7 +156,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call", task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: false, Await: "before_CONFIGURE"}, "testplugin.Test()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("DEPLOYED") env.workflow.GetUserVars().Set("testplugin_fail", "true") @@ -169,7 +176,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call", task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "after_CONFIGURE"}, "testplugin.Test()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("DEPLOYED") @@ -187,7 +195,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call", task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "before_RESET"}, "testplugin.Test()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("DEPLOYED") @@ -207,7 +216,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call", task.Traits{Trigger: "after_CONFIGURE", Timeout: "5s", Critical: true, Await: "after_CONFIGURE"}, "testplugin.Test()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("DEPLOYED") @@ -230,7 +240,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call2", task.Traits{Trigger: "before_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_START_ACTIVITY"}, "testplugin.TimestampObserver()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("CONFIGURED") @@ -255,7 +266,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call2", task.Traits{Trigger: "after_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_START_ACTIVITY"}, "testplugin.TimestampObserver()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("CONFIGURED") @@ -278,7 +290,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call2", task.Traits{Trigger: "before_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_STOP_ACTIVITY"}, "testplugin.TimestampObserver()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("CONFIGURED") @@ -305,7 +318,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call2", task.Traits{Trigger: "after_STOP_ACTIVITY", Timeout: "5s", Critical: true, Await: "after_STOP_ACTIVITY"}, "testplugin.TimestampObserver()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("CONFIGURED") @@ -325,7 +339,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call", task.Traits{Trigger: "before_START_ACTIVITY", Timeout: "5s", Critical: true, Await: "before_START_ACTIVITY"}, "testplugin.TimestampObserver()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("CONFIGURED") @@ -412,7 +427,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call", task.Traits{Trigger: "leave_RUNNING", Timeout: "5s", Critical: true, Await: "leave_RUNNING"}, "testplugin.TimestampObserver()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("CONFIGURED") @@ -449,7 +465,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call1", task.Traits{Trigger: "before_CONFIGURE-50", Timeout: "5s", Critical: true, Await: "before_CONFIGURE-50"}, "testplugin.CallOrderObserver()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("DEPLOYED") @@ -472,7 +489,8 @@ var _ = Describe("calling hooks on FSM events", func() { "call2", // this call should not return, but should be cancelled later task.Traits{Trigger: "before_CONFIGURE", Timeout: "5s", Critical: true, Await: "after_CONFIGURE"}, "testplugin.Test()", - "")}) + ""), + }) workflow.LinkChildrenToParents(env.workflow) env.Sm.SetState("DEPLOYED") diff --git a/core/environment/manager.go b/core/environment/manager.go index 8546bd0c9..8a1ea568d 100644 --- a/core/environment/manager.go +++ b/core/environment/manager.go @@ -40,6 +40,7 @@ import ( "github.com/AliceO2Group/Control/common/logger/infologger" evpb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/common/system" + "github.com/AliceO2Group/Control/common/tracing" "github.com/AliceO2Group/Control/common/utils" "github.com/AliceO2Group/Control/common/utils/uid" lhcevent "github.com/AliceO2Group/Control/core/integration/lhc/event" @@ -51,6 +52,8 @@ import ( "github.com/AliceO2Group/Control/core/workflow" pb "github.com/AliceO2Group/Control/executor/protos" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type Manager struct { @@ -634,6 +637,13 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error env, err := envs.environment(environmentId) envs.mu.RUnlock() + span := tracing.NewSpan(env.tracing.Ctx, "TeardownEnvironment", + trace.WithAttributes( + attribute.String("envId", env.Id().String()), + ), + ) + defer span.End() + if err != nil { return err } @@ -681,7 +691,7 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error WorkflowTemplateInfo: env.GetWorkflowInfo(), }) - err = env.handleAllHooks(env.Workflow(), "leave_"+env.CurrentState()) + err = env.handleAllHooks(span.Ctx, env.Workflow(), "leave_"+env.CurrentState()) if err != nil { log.WithFields(logrus.Fields{ "partition": environmentId.String(), @@ -854,7 +864,7 @@ func (envs *Manager) TeardownEnvironment(environmentId uid.ID, force bool) error for _, weight := range allWeights { hooksForWeight, ok := hooksMapForDestroy[weight] if ok { - hooksForWeight.FilterCalls().CallAll() + hooksForWeight.FilterCalls().CallAll(span.Ctx) // calls done, we start the task hooks... cleanupTaskHooks := hooksForWeight.FilterTasks() @@ -1071,7 +1081,6 @@ func (envs *Manager) handleIntegratedServiceEvent(evt event.IntegratedServiceEve } func (envs *Manager) handleLhcEvents(evt event.IntegratedServiceEvent) { - lhcEvent, ok := evt.(*lhcevent.LhcStateChangeEvent) if !ok { return diff --git a/core/environment/transition_configure.go b/core/environment/transition_configure.go index a4f7215c9..7c0fb2dca 100644 --- a/core/environment/transition_configure.go +++ b/core/environment/transition_configure.go @@ -33,10 +33,10 @@ import ( "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/common/tracing" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/taskop" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func NewConfigureTransition(taskman *task.Manager) Transition { @@ -60,18 +60,14 @@ func (t ConfigureTransition) do(ctx context.Context, env *Environment) (err erro metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() - span := tracing.NewSpan(ctx, "ConfigureTransition.do") - defer func() { - span.Span().SetAttributes( + span := tracing.NewSpan(ctx, "ConfigureTransition.do", + trace.WithAttributes( attribute.String("transition", t.name), attribute.String("envId", env.Id().String()), - ) - if err != nil { - span.Span().RecordError(err) - span.Span().SetStatus(codes.Error, err.Error()) - } else { - span.Span().SetStatus(codes.Ok, "") - } + ), + ) + defer func() { + span.SetError(err) span.End() }() diff --git a/core/environment/transition_deploy.go b/core/environment/transition_deploy.go index de32b9b4a..43595d57c 100644 --- a/core/environment/transition_deploy.go +++ b/core/environment/transition_deploy.go @@ -38,14 +38,14 @@ import ( "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/common/tracing" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" "github.com/AliceO2Group/Control/core/task/taskop" "github.com/AliceO2Group/Control/core/workflow" "github.com/hashicorp/go-multierror" "github.com/pborman/uuid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func NewDeployTransition(taskman *task.Manager, addRoles []string, removeRoles []string) Transition { @@ -73,18 +73,14 @@ func (t DeployTransition) do(ctx context.Context, env *Environment) (err error) metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() - span := tracing.NewSpan(ctx, "DeployTransition.do") - defer func() { - span.Span().SetAttributes( + span := tracing.NewSpan(ctx, "DeployTransition.do", + trace.WithAttributes( attribute.String("transition", t.name), attribute.String("envId", env.Id().String()), - ) - if err != nil { - span.Span().RecordError(err) - span.Span().SetStatus(codes.Error, err.Error()) - } else { - span.Span().SetStatus(codes.Ok, "") - } + ), + ) + defer func() { + span.SetError(err) span.End() }() diff --git a/core/environment/transition_goerror.go b/core/environment/transition_goerror.go index dbe66efd7..e88f93b60 100644 --- a/core/environment/transition_goerror.go +++ b/core/environment/transition_goerror.go @@ -29,11 +29,11 @@ import ( "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/common/tracing" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/core/controlcommands" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func NewGoErrorTransition(taskman *task.Manager) Transition { @@ -53,18 +53,14 @@ func (t GoErrorTransition) do(ctx context.Context, env *Environment) (err error) metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() - span := tracing.NewSpan(ctx, "GoErrorTransition.do") - defer func() { - span.Span().SetAttributes( + span := tracing.NewSpan(ctx, "GoErrorTransition.do", + trace.WithAttributes( attribute.String("transition", t.name), attribute.String("envId", env.Id().String()), - ) - if err != nil { - span.Span().RecordError(err) - span.Span().SetStatus(codes.Error, err.Error()) - } else { - span.Span().SetStatus(codes.Ok, "") - } + ), + ) + defer func() { + span.SetError(err) span.End() }() diff --git a/core/environment/transition_reset.go b/core/environment/transition_reset.go index 72ccc5ea5..7c7468f0e 100644 --- a/core/environment/transition_reset.go +++ b/core/environment/transition_reset.go @@ -31,11 +31,11 @@ import ( "github.com/AliceO2Group/Control/common/event" "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/common/tracing" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" "github.com/AliceO2Group/Control/core/workflow" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func NewResetTransition(taskman *task.Manager) Transition { @@ -59,18 +59,14 @@ func (t ResetTransition) do(ctx context.Context, env *Environment) (err error) { metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() - span := tracing.NewSpan(ctx, "ResetTransition.do") - defer func() { - span.Span().SetAttributes( + span := tracing.NewSpan(ctx, "ResetTransition.do", + trace.WithAttributes( attribute.String("transition", t.name), attribute.String("envId", env.Id().String()), - ) - if err != nil { - span.Span().RecordError(err) - span.Span().SetStatus(codes.Error, err.Error()) - } else { - span.Span().SetStatus(codes.Ok, "") - } + ), + ) + defer func() { + span.SetError(err) span.End() }() diff --git a/core/environment/transition_startactivity.go b/core/environment/transition_startactivity.go index f16e2ab22..0be802407 100644 --- a/core/environment/transition_startactivity.go +++ b/core/environment/transition_startactivity.go @@ -36,11 +36,11 @@ import ( "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/common/tracing" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" "github.com/AliceO2Group/Control/core/controlcommands" "github.com/AliceO2Group/Control/core/task" "github.com/iancoleman/strcase" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var StartActivityParameterKeys = []string{ @@ -80,18 +80,14 @@ func (t StartActivityTransition) do(ctx context.Context, env *Environment) (err metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() - span := tracing.NewSpan(ctx, "StartActivityTransition.do") - defer func() { - span.Span().SetAttributes( + span := tracing.NewSpan(ctx, "StartActivityTransition.do", + trace.WithAttributes( attribute.String("transition", t.name), attribute.String("envId", env.Id().String()), - ) - if err != nil { - span.Span().RecordError(err) - span.Span().SetStatus(codes.Error, err.Error()) - } else { - span.Span().SetStatus(codes.Ok, "") - } + ), + ) + defer func() { + span.SetError(err) span.End() }() diff --git a/core/environment/transition_stopactivity.go b/core/environment/transition_stopactivity.go index c67034fe7..6f968d282 100644 --- a/core/environment/transition_stopactivity.go +++ b/core/environment/transition_stopactivity.go @@ -33,7 +33,7 @@ import ( "github.com/AliceO2Group/Control/common/monitoring" "github.com/AliceO2Group/Control/common/tracing" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/AliceO2Group/Control/core/controlcommands" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/task/sm" @@ -71,18 +71,14 @@ func (t StopActivityTransition) do(ctx context.Context, env *Environment) (err e metric := t.transitionDoMetric(env) defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() - span := tracing.NewSpan(ctx, "StopActivityTransition.do") - defer func() { - span.Span().SetAttributes( + span := tracing.NewSpan(ctx, "StopActivityTransition.do", + trace.WithAttributes( attribute.String("transition", t.name), attribute.String("envId", env.Id().String()), - ) - if err != nil { - span.Span().RecordError(err) - span.Span().SetStatus(codes.Error, err.Error()) - } else { - span.Span().SetStatus(codes.Ok, "") - } + ), + ) + defer func() { + span.SetError(err) span.End() }() diff --git a/core/workflow/callable/call.go b/core/workflow/callable/call.go index 1a83c100f..71afe11e2 100644 --- a/core/workflow/callable/call.go +++ b/core/workflow/callable/call.go @@ -39,12 +39,15 @@ import ( "github.com/AliceO2Group/Control/common/logger/infologger" "github.com/AliceO2Group/Control/common/monitoring" evpb "github.com/AliceO2Group/Control/common/protos" + "github.com/AliceO2Group/Control/common/tracing" "github.com/AliceO2Group/Control/common/utils" "github.com/AliceO2Group/Control/configuration/template" "github.com/AliceO2Group/Control/core/integration" "github.com/AliceO2Group/Control/core/task" "github.com/AliceO2Group/Control/core/the" "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) var ( @@ -75,10 +78,10 @@ func NewCall(funcCall string, returnVar string, parent ParentRole) (call *Call) } } -func (s Calls) CallAll() map[*Call]error { +func (s Calls) CallAll(ctx context.Context) map[*Call]error { errs := make(map[*Call]error) for _, v := range s { - err := v.Call() + err := v.Call(ctx) if err != nil { errs[v] = err } @@ -86,9 +89,9 @@ func (s Calls) CallAll() map[*Call]error { return errs } -func (s Calls) StartAll() { +func (s Calls) StartAll(ctx context.Context) { for _, v := range s { - v.Start() + v.Start(ctx) } } @@ -119,7 +122,7 @@ func (c *Call) callableMetric(name string) monitoring.Metric { return metric } -func (c *Call) Call() error { +func (c *Call) Call(ctx context.Context) error { log.WithField("trigger", c.Traits.Trigger). WithField("await", c.Traits.Await). WithField("partition", c.parentRole.GetEnvironmentId().String()). @@ -129,6 +132,16 @@ func (c *Call) Call() error { metric := c.callableMetric("callablecall") defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + span := tracing.NewSpan(ctx, fmt.Sprintf("Call-%s", c.GetName()), + trace.WithAttributes( + attribute.String("runtype", c.getRunTypeTag()), + attribute.String("name", c.GetName()), + attribute.String("trigger", c.GetTraits().Trigger), + attribute.String("envId", c.parentRole.GetEnvironmentId().String()), + ), + ) + defer span.End() + the.EventWriterWithTopic(topic.Call).WriteEvent(&evpb.Ev_CallEvent{ Path: c.GetParentRolePath(), Func: c.Func, @@ -239,18 +252,29 @@ func (c *Call) Call() error { return nil } -func (c *Call) Start() { +func (c *Call) Start(ctx context.Context) { c.await = make(chan error) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) c.awaitCancel = cancel go func() { metric := c.callableMetric("callablewrapped") defer monitoring.TimerSendSingle(&metric, monitoring.Millisecond)() + span := tracing.NewSpan(ctx, fmt.Sprintf("Start-%s", c.GetName()), + trace.WithAttributes( + attribute.String("runtype", c.getRunTypeTag()), + attribute.String("name", c.GetName()), + attribute.String("trigger", c.GetTraits().Trigger), + attribute.String("envId", c.parentRole.GetEnvironmentId().String()), + ), + ) + ctx = span.Ctx + defer span.End() + callId := fmt.Sprintf("hook:%s:%s", c.GetTraits().Trigger, c.GetName()) log.Debugf("%s started", callId) defer utils.TimeTrack(time.Now(), callId, log.WithPrefix("callable")) - err := c.Call() + err := c.Call(ctx) select { case c.await <- err: if err == nil {