-
Notifications
You must be signed in to change notification settings - Fork 61
Description
Description
Summary
A potential resource leak has been identified in the metrics reporting pipeline where periodic reader goroutines continue running after their associated reporters are evicted from the LRU cache, preventing garbage collection and causing the OTEL endpoint to receive empty metric exports.
Root Cause
- Per-Service Metric Provider Creation
When theMetricsReporterprocesses spans in theonSpanfunction, it creates a per-serviceMetricsinstance through theReporterPool. EachMetricsinstance contains aMeterProviderwith aPeriodicReaderthat exports metrics at configured intervals.
opentelemetry-ebpf-instrumentation/pkg/export/otel/metrics.go
Lines 1125 to 1151 in 1807454
func (mr *MetricsReporter) onSpan(spans []request.Span) { for i := range spans { s := &spans[i] if s.InternalSignal() { continue } if !s.Service.ExportModes.CanExportMetrics() { continue } // If we are ignoring this span because of route patterns, don't do anything if request.IgnoreMetrics(s) { continue } reporter, err := mr.reporters.For(&s.Service) if err != nil { mlog().Error("unexpected error creating OTEL resource. Ignoring metric", "error", err, "service", s.Service) continue } reporter.record(s, mr) if mr.commonCfg.Features.AppHost() { hostInfo, attrs := mr.hostInfo.ForRecord(s) hostInfo.Record(mr.ctx, 1, instrument.WithAttributeSet(attrs)) } } }
Each service gets its ownMeterProviderwith aPeriodicReader:
opentelemetry-ebpf-instrumentation/pkg/export/otel/metrics.go
Lines 651 to 666 in 1807454
opts := []metric.Option{ metric.WithResource(resources), metric.WithReader(metric.NewPeriodicReader(mr.exporter, metric.WithInterval(mr.cfg.Interval))), } opts = append(opts, mr.otelMetricOptions(mlog)...) opts = append(opts, mr.spanMetricOptions(mlog)...) return Metrics{ ctx: mr.ctx, service: service, provider: metric.NewMeterProvider( opts..., ), } - Background Goroutine in
PeriodicReader
ThePeriodicReaderstarts a background goroutine that continuously collects and exports metrics on a ticker interval
opentelemetry-ebpf-instrumentation/pkg/export/otel/metric/periodic_reader.go
Lines 126 to 129 in 1807454
go func() { defer func() { close(r.done) }() r.run(ctx, conf.interval) }()
opentelemetry-ebpf-instrumentation/pkg/export/otel/metric/periodic_reader.go
Lines 163 to 181 in 1807454
func (r *PeriodicReader) run(ctx context.Context, interval time.Duration) { ticker := newTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: err := r.collectAndExport(ctx) if err != nil { otel.Handle(err) } case errCh := <-r.flushCh: errCh <- r.collectAndExport(ctx) ticker.Reset(interval) case <-ctx.Done(): return } } } - Reporter Eviction Without Goroutine Cleanup
After the TTL expires andexpireOldReportersremoves a reporter from the cache, the eviction callback only callsForceFlushasynchronously, but does not callShutdown
opentelemetry-ebpf-instrumentation/pkg/export/otel/otelcfg/common.go
Lines 293 to 306 in 1807454
func (rp *ReporterPool[K, T]) expireOldReporters() { now := rp.clock() if now.Sub(rp.lastExpiration) < rp.ttl { return } rp.lastExpiration = now for { _, v, ok := rp.pool.GetOldest() if !ok || now.Sub(v.lastAccess) < rp.ttl { return } rp.pool.RemoveOldest() } }
opentelemetry-ebpf-instrumentation/pkg/export/otel/metrics.go
Lines 284 to 299 in 1807454
mr.reporters = otelcfg.NewReporterPool[*svc.Attrs, *Metrics](cfg.ReportersCacheLen, cfg.TTL, timeNow, func(id svc.UID, v *Metrics) { llog := log.With("service", id) llog.Debug("evicting metrics reporter from cache") v.cleanupAllMetricsInstances() if !mr.pidTracker.ServiceLive(id) { mr.deleteTargetMetrics(&id) } go func() { if err := v.provider.ForceFlush(ctx); err != nil { llog.Warn("error flushing evicted metrics provider", "error", err) } }() }, mr.newMetricSet)
Impact
The periodic reader goroutine continues running indefinitely because:
- The goroutine only stops when
ctx.Done()is signaled (line 177 in periodic_reader.go) - The context is only canceled in the
Shutdownmethod (line 320 in periodic_reader.go) - The eviction callback only calls
ForceFlush, notShutdown
This causes:
- Memory leak: The evicted Metrics instance and its MeterProvider cannot be garbage collected
- Unnecessary network traffic: The OTEL endpoint continues receiving metric exports with no actual data
- Resource waste: Background goroutines accumulate over time for services that are no longer active
The Dilemma
Calling MeterProvider.Shutdown() in the eviction callback would solve the goroutine leak, but creates another problem
opentelemetry-ebpf-instrumentation/pkg/export/otel/metric/provider.go
Lines 129 to 143 in 1807454
| func (mp *MeterProvider) Shutdown(ctx context.Context) error { | |
| // Even though it may seem like there is a synchronization issue between the | |
| // call to `Store` and checking `shutdown`, the Go concurrency model ensures | |
| // that is not the case, as all the atomic operations executed in a program | |
| // behave as though executed in some sequentially consistent order. This | |
| // definition provides the same semantics as C++'s sequentially consistent | |
| // atomics and Java's volatile variables. | |
| // See https://go.dev/ref/mem#atomic and https://pkg.go.dev/sync/atomic. | |
| mp.stopped.Store(true) | |
| if mp.shutdown != nil { | |
| return mp.shutdown(ctx) | |
| } | |
| return nil | |
| } |
The
MeterProvider.Shutdown() calls the unified shutdown function for all readersopentelemetry-ebpf-instrumentation/pkg/export/otel/metric/config.go
Lines 25 to 35 in 1807454
| func (c config) readerSignals() (forceFlush, shutdown func(context.Context) error) { | |
| var fFuncs, sFuncs []func(context.Context) error | |
| for _, r := range c.readers { | |
| sFuncs = append(sFuncs, r.Shutdown) | |
| if f, ok := r.(interface{ ForceFlush(context.Context) error }); ok { | |
| fFuncs = append(fFuncs, f.ForceFlush) | |
| } | |
| } | |
| return unify(fFuncs), unifyShutdown(sFuncs) | |
| } |
Which in turn calls the
PeriodicReader.Shutdown()The critical issue: Line 338 in
periodic_reader.go calls r.exporter.Shutdown(ctx), which would shut down the shared exporter instance used by all services' metric providers, breaking metrics export for all other active services.opentelemetry-ebpf-instrumentation/pkg/export/otel/metric/periodic_reader.go
Lines 309 to 350 in 1807454
| func (r *PeriodicReader) Shutdown(ctx context.Context) error { | |
| err := ErrReaderShutdown | |
| r.shutdownOnce.Do(func() { | |
| // Prioritize the ctx timeout if it is set. | |
| if _, ok := ctx.Deadline(); !ok { | |
| var cancel context.CancelFunc | |
| ctx, cancel = context.WithTimeout(ctx, r.timeout) | |
| defer cancel() | |
| } | |
| // Stop the run loop. | |
| r.cancel() | |
| <-r.done | |
| // Any future call to Collect will now return ErrReaderShutdown. | |
| ph := r.sdkProducer.Swap(produceHolder{ | |
| produce: shutdownProducer{}.produce, | |
| }) | |
| if ph != nil { // Reader was registered. | |
| // Flush pending telemetry. | |
| m := r.rmPool.Get().(*sdkmetricdata.ResourceMetrics) | |
| err = r.collect(ctx, ph, m) | |
| if err == nil { | |
| err = r.export(ctx, m) | |
| } | |
| r.rmPool.Put(m) | |
| } | |
| sErr := r.exporter.Shutdown(ctx) | |
| if err == nil || errors.Is(err, ErrReaderShutdown) { | |
| err = sErr | |
| } | |
| r.mu.Lock() | |
| defer r.mu.Unlock() | |
| r.isShutdown = true | |
| // release references to Producer(s) | |
| r.externalProducers.Store([]Producer{}) | |
| }) | |
| return err | |
| } |
Steps to Reproduce
Environment:
Run opentelemetry-ebpf-instrumentation like the docker compose example on opentelemetry docs
- opentelemetry-ebpf-instrumentation version: v0.3.0
- Setup: Docker Compose with PostgreSQL database
version: "3.7"
services:
postgres:
image: postgres:16.10
ports:
- 5432:5432
environment:
- POSTGRES_HOST_AUTH_METHOD=trust
obi:
image: otel/ebpf-instrument:v0.3.0
environment:
- OTEL_EBPF_OPEN_PORT=5432
- OTEL_EBPF_CONFIG_PATH=/etc/obi/config.yaml
volumes:
- ./obi-config.yaml:/etc/obi/config.yaml
privileged: true
pid: "service:postgres"
depends_on:
- postgres
# obi-config.yaml
log_level: DEBUG
otel_metrics_export:
ttl: 5m
endpoint: http://otel-collector:4317
features:
- network
Steps:
- Deploy the above configuration in the Docker Compose environment
- Connect to PostgreSQL database and issue a create table command
- Observe that the metrics reporter is instantiated
- After the TTL has passed, connect to PostgreSQL database and issue a create table command to trigger expiration
- Observe that the eviction callback is being called
- Note that there are now two metrics reporter sending messages, the old one did not dissapear
Expected Behavior
When a metrics reporter is evicted from the cache, the following shutdown sequence should occur:
- Complete
MeterProviderShutdown: TheMeterProviderassociated with the evicted reporter must be fully shut down by calling itsShutdown()method, not justForceFlush(). This ensures that thePeriodicReaderproperly terminates its background goroutine, preventing resource leaks. - Exporter Independence: The
PeriodicReader.Shutdown()implementation callsr.exporter.Shutdown(ctx), but since multipleMeterProviderinstances share the same exporter instance, the shutdown of one reader should not shut down the shared exporter and must not affect other active readers.
Questions for Maintainers
- Is the current behavior intentional? Is it expected that periodic reader goroutines continue running after reporter eviction, or is this an oversight?
- What is the intended lifecycle management strategy for per-service
MeterProviderinstances in the context of TTL-based cache eviction? - Why does
PeriodicReader.Shutdown()callexporter.Shutdown()? Given that multiple readers may share the same exporter instance, shouldn't the exporter lifecycle be managed separately from individual readers? - What is the recommended approach to properly clean up evicted reporters without affecting other active services?