chore: make deps, go mod vendor
Some checks failed
continuous-integration/drone/push Build is failing
Some checks failed
continuous-integration/drone/push Build is failing
This commit is contained in:
132
vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
generated
vendored
132
vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
generated
vendored
@ -8,14 +8,13 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/embedded"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/exemplar"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/x"
|
||||
@ -38,14 +37,17 @@ type instrumentSync struct {
|
||||
compAgg aggregate.ComputeAggregation
|
||||
}
|
||||
|
||||
func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline {
|
||||
func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline {
|
||||
if res == nil {
|
||||
res = resource.Empty()
|
||||
}
|
||||
return &pipeline{
|
||||
resource: res,
|
||||
reader: reader,
|
||||
views: views,
|
||||
resource: res,
|
||||
reader: reader,
|
||||
views: views,
|
||||
int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
|
||||
float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
|
||||
exemplarFilter: exemplarFilter,
|
||||
// aggregations is lazy allocated when needed.
|
||||
}
|
||||
}
|
||||
@ -63,9 +65,26 @@ type pipeline struct {
|
||||
views []View
|
||||
|
||||
sync.Mutex
|
||||
aggregations map[instrumentation.Scope][]instrumentSync
|
||||
callbacks []func(context.Context) error
|
||||
multiCallbacks list.List
|
||||
int64Measures map[observableID[int64]][]aggregate.Measure[int64]
|
||||
float64Measures map[observableID[float64]][]aggregate.Measure[float64]
|
||||
aggregations map[instrumentation.Scope][]instrumentSync
|
||||
callbacks []func(context.Context) error
|
||||
multiCallbacks list.List
|
||||
exemplarFilter exemplar.Filter
|
||||
}
|
||||
|
||||
// addInt64Measure adds a new int64 measure to the pipeline for each observer.
|
||||
func (p *pipeline) addInt64Measure(id observableID[int64], m []aggregate.Measure[int64]) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
p.int64Measures[id] = m
|
||||
}
|
||||
|
||||
// addFloat64Measure adds a new float64 measure to the pipeline for each observer.
|
||||
func (p *pipeline) addFloat64Measure(id observableID[float64], m []aggregate.Measure[float64]) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
p.float64Measures[id] = m
|
||||
}
|
||||
|
||||
// addSync adds the instrumentSync to pipeline p with scope. This method is not
|
||||
@ -105,14 +124,15 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
var errs multierror
|
||||
var err error
|
||||
for _, c := range p.callbacks {
|
||||
// TODO make the callbacks parallel. ( #3034 )
|
||||
if err := c(ctx); err != nil {
|
||||
errs.append(err)
|
||||
if e := c(ctx); e != nil {
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
rm.Resource = nil
|
||||
clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
|
||||
rm.ScopeMetrics = rm.ScopeMetrics[:0]
|
||||
return err
|
||||
}
|
||||
@ -120,12 +140,13 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
|
||||
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
|
||||
// TODO make the callbacks parallel. ( #3034 )
|
||||
f := e.Value.(multiCallback)
|
||||
if err := f(ctx); err != nil {
|
||||
errs.append(err)
|
||||
if e := f(ctx); e != nil {
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
// This means the context expired before we finished running callbacks.
|
||||
rm.Resource = nil
|
||||
clear(rm.ScopeMetrics) // Erase elements to let GC collect objects.
|
||||
rm.ScopeMetrics = rm.ScopeMetrics[:0]
|
||||
return err
|
||||
}
|
||||
@ -157,7 +178,7 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
|
||||
|
||||
rm.ScopeMetrics = rm.ScopeMetrics[:i]
|
||||
|
||||
return errs.errorOrNil()
|
||||
return err
|
||||
}
|
||||
|
||||
// inserter facilitates inserting of new instruments from a single scope into a
|
||||
@ -219,7 +240,7 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
|
||||
measures []aggregate.Measure[N]
|
||||
)
|
||||
|
||||
errs := &multierror{wrapped: errCreatingAggregators}
|
||||
var err error
|
||||
seen := make(map[uint64]struct{})
|
||||
for _, v := range i.pipeline.views {
|
||||
stream, match := v(inst)
|
||||
@ -227,9 +248,9 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
|
||||
continue
|
||||
}
|
||||
matched = true
|
||||
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
in, id, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
|
||||
if e != nil {
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
if in == nil { // Drop aggregation.
|
||||
continue
|
||||
@ -242,8 +263,12 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
|
||||
measures = append(measures, in)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
err = errors.Join(errCreatingAggregators, err)
|
||||
}
|
||||
|
||||
if matched {
|
||||
return measures, errs.errorOrNil()
|
||||
return measures, err
|
||||
}
|
||||
|
||||
// Apply implicit default view if no explicit matched.
|
||||
@ -252,15 +277,18 @@ func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation)
|
||||
Description: inst.Description,
|
||||
Unit: inst.Unit,
|
||||
}
|
||||
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
in, _, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
|
||||
if e != nil {
|
||||
if err == nil {
|
||||
err = errCreatingAggregators
|
||||
}
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
if in != nil {
|
||||
// Ensured to have not seen given matched was false.
|
||||
measures = append(measures, in)
|
||||
}
|
||||
return measures, errs.errorOrNil()
|
||||
return measures, err
|
||||
}
|
||||
|
||||
// addCallback registers a single instrument callback to be run when
|
||||
@ -329,6 +357,9 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
|
||||
// The view explicitly requested the default aggregation.
|
||||
stream.Aggregation = DefaultAggregationSelector(kind)
|
||||
}
|
||||
if stream.ExemplarReservoirProviderSelector == nil {
|
||||
stream.ExemplarReservoirProviderSelector = DefaultExemplarReservoirProviderSelector
|
||||
}
|
||||
|
||||
if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
|
||||
return nil, 0, fmt.Errorf(
|
||||
@ -349,7 +380,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
|
||||
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
|
||||
b := aggregate.Builder[N]{
|
||||
Temporality: i.pipeline.reader.temporality(kind),
|
||||
ReservoirFunc: reservoirFunc[N](stream.Aggregation),
|
||||
ReservoirFunc: reservoirFunc[N](stream.ExemplarReservoirProviderSelector(stream.Aggregation), i.pipeline.exemplarFilter),
|
||||
}
|
||||
b.Filter = stream.AttributeFilter
|
||||
// A value less than or equal to zero will disable the aggregation
|
||||
@ -552,24 +583,16 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
|
||||
// measurement.
|
||||
type pipelines []*pipeline
|
||||
|
||||
func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines {
|
||||
func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines {
|
||||
pipes := make([]*pipeline, 0, len(readers))
|
||||
for _, r := range readers {
|
||||
p := newPipeline(res, r, views)
|
||||
p := newPipeline(res, r, views, exemplarFilter)
|
||||
r.register(p)
|
||||
pipes = append(pipes, p)
|
||||
}
|
||||
return pipes
|
||||
}
|
||||
|
||||
func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
|
||||
unregs := make([]func(), len(p))
|
||||
for i, pipe := range p {
|
||||
unregs[i] = pipe.addMultiCallback(c)
|
||||
}
|
||||
return unregisterFuncs{f: unregs}
|
||||
}
|
||||
|
||||
type unregisterFuncs struct {
|
||||
embedded.Registration
|
||||
f []func()
|
||||
@ -602,15 +625,15 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) reso
|
||||
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) {
|
||||
var measures []aggregate.Measure[N]
|
||||
|
||||
errs := &multierror{}
|
||||
var err error
|
||||
for _, i := range r.inserters {
|
||||
in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
in, e := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
|
||||
if e != nil {
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
measures = append(measures, in...)
|
||||
}
|
||||
return measures, errs.errorOrNil()
|
||||
return measures, err
|
||||
}
|
||||
|
||||
// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
|
||||
@ -619,37 +642,18 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)
|
||||
func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
|
||||
var measures []aggregate.Measure[N]
|
||||
|
||||
errs := &multierror{}
|
||||
var err error
|
||||
for _, i := range r.inserters {
|
||||
agg := i.readerDefaultAggregation(id.Kind)
|
||||
if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 {
|
||||
histAgg.Boundaries = boundaries
|
||||
agg = histAgg
|
||||
}
|
||||
in, err := i.Instrument(id, agg)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
in, e := i.Instrument(id, agg)
|
||||
if e != nil {
|
||||
err = errors.Join(err, e)
|
||||
}
|
||||
measures = append(measures, in...)
|
||||
}
|
||||
return measures, errs.errorOrNil()
|
||||
}
|
||||
|
||||
type multierror struct {
|
||||
wrapped error
|
||||
errors []string
|
||||
}
|
||||
|
||||
func (m *multierror) errorOrNil() error {
|
||||
if len(m.errors) == 0 {
|
||||
return nil
|
||||
}
|
||||
if m.wrapped == nil {
|
||||
return errors.New(strings.Join(m.errors, "; "))
|
||||
}
|
||||
return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; "))
|
||||
}
|
||||
|
||||
func (m *multierror) append(err error) {
|
||||
m.errors = append(m.errors, err.Error())
|
||||
return measures, err
|
||||
}
|
||||
|
Reference in New Issue
Block a user