diff --git a/host/host.go b/host/host.go index 4b6a9d44..7b897bd3 100644 --- a/host/host.go +++ b/host/host.go @@ -53,6 +53,8 @@ type Trace struct { KTime times.KTime PID libpf.PID TID libpf.PID + Origin int + OffTime uint64 // Time a task was off-cpu. APMTraceID libpf.APMTraceID APMTransactionID libpf.APMTransactionID CPU int diff --git a/reporter/iface.go b/reporter/iface.go index 7cf19cf3..40ce4f26 100644 --- a/reporter/iface.go +++ b/reporter/iface.go @@ -37,6 +37,8 @@ type TraceEventMeta struct { APMServiceName string PID, TID libpf.PID CPU int + Origin int + OffTime uint64 } type TraceReporter interface { diff --git a/reporter/otlp_reporter.go b/reporter/otlp_reporter.go index c2fbdf29..51769cd8 100644 --- a/reporter/otlp_reporter.go +++ b/reporter/otlp_reporter.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/ebpf-profiler/libpf" "go.opentelemetry.io/ebpf-profiler/libpf/xsync" + "go.opentelemetry.io/ebpf-profiler/support" ) const ( @@ -78,6 +79,7 @@ type traceEvents struct { mappingEnds []libpf.Address mappingFileOffsets []uint64 timestamps []uint64 // in nanoseconds + offTimes []uint64 // in nanoseconds } // attrKeyValue is a helper to populate Profile.attribute_table. @@ -88,6 +90,8 @@ type attrKeyValue[T string | int64] struct { value T } +type samplesMap map[traceAndMetaKey]*traceEvents + // OTLPReporter receives and transforms information to be OTLP/profiles compliant. type OTLPReporter struct { config *Config @@ -122,8 +126,9 @@ type OTLPReporter struct { // frames maps frame information to its source location. frames *lru.SyncedLRU[libpf.FileID, *xsync.RWMutex[map[libpf.AddressOrLineno]sourceInfo]] - // traceEvents stores reported trace events (trace metadata with frames and counts) - traceEvents xsync.RWMutex[map[traceAndMetaKey]*traceEvents] + // traceEvents stores reported trace events (trace metadata with frames and counts) from + // various origins. + traceEvents xsync.RWMutex[map[int]samplesMap] // pkgGRPCOperationTimeout sets the time limit for GRPC requests. pkgGRPCOperationTimeout time.Duration @@ -177,6 +182,12 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { return nil, err } + originsMap := make(map[int]samplesMap, 2) + for _, origin := range []int{support.TraceOriginSampling, + support.TraceOriginOffCPU} { + originsMap[origin] = make(samplesMap) + } + return &OTLPReporter{ config: cfg, name: cfg.Name, @@ -195,7 +206,7 @@ func NewOTLP(cfg *Config) (*OTLPReporter, error) { executables: executables, frames: frames, hostmetadata: hostmetadata, - traceEvents: xsync.NewRWMutex(map[traceAndMetaKey]*traceEvents{}), + traceEvents: xsync.NewRWMutex(originsMap), cgroupv2ID: cgroupv2ID, }, nil } @@ -211,8 +222,11 @@ func (r *OTLPReporter) SupportsReportTraceEvent() bool { return true } // ReportTraceEvent enqueues reported trace events for the OTLP reporter. func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta) { - traceEventsMap := r.traceEvents.WLock() - defer r.traceEvents.WUnlock(&traceEventsMap) + if meta.Origin != support.TraceOriginSampling && meta.Origin != support.TraceOriginOffCPU { + // At the moment only on-CPU and off-CPU traces are reported. + log.Errorf("Skip reporting trace for unexpected %d origin", meta.Origin) + return + } containerID, err := libpf.LookupCgroupv2(r.cgroupv2ID, meta.PID) if err != nil { @@ -228,13 +242,17 @@ func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta pid: int64(meta.PID), } - if events, exists := (*traceEventsMap)[key]; exists { + traceEventsMap := r.traceEvents.WLock() + defer r.traceEvents.WUnlock(&traceEventsMap) + + if events, exists := (*traceEventsMap)[meta.Origin][key]; exists { events.timestamps = append(events.timestamps, uint64(meta.Timestamp)) - (*traceEventsMap)[key] = events + events.offTimes = append(events.offTimes, uint64(meta.Timestamp)) + (*traceEventsMap)[meta.Origin][key] = events return } - (*traceEventsMap)[key] = &traceEvents{ + (*traceEventsMap)[meta.Origin][key] = &traceEvents{ files: trace.Files, linenos: trace.Linenos, frameTypes: trace.FrameTypes, @@ -242,6 +260,7 @@ func (r *OTLPReporter) ReportTraceEvent(trace *libpf.Trace, meta *TraceEventMeta mappingEnds: trace.MappingEnd, mappingFileOffsets: trace.MappingFileOffsets, timestamps: []uint64{uint64(meta.Timestamp)}, + offTimes: []uint64{meta.OffTime}, } } @@ -402,24 +421,33 @@ func (r *OTLPReporter) Start(ctx context.Context) error { // reportOTLPProfile creates and sends out an OTLP profile. func (r *OTLPReporter) reportOTLPProfile(ctx context.Context) error { - profile, startTS, endTS := r.getProfile() + var pc = []*profiles.ProfileContainer{} + + for _, kind := range []int{support.TraceOriginSampling, support.TraceOriginOffCPU} { + profile, startTS, endTS := r.getProfile(kind) + + if len(profile.Sample) == 0 { + log.Debugf("No samples for %d", kind) + continue + } - if len(profile.Sample) == 0 { + pc = append(pc, &profiles.ProfileContainer{ + ProfileId: mkProfileID(), + StartTimeUnixNano: startTS, + EndTimeUnixNano: endTS, + // Attributes - Optional element we do not use. + // DroppedAttributesCount - Optional element we do not use. + // OriginalPayloadFormat - Optional element we do not use. + // OriginalPayload - Optional element we do not use. + Profile: profile, + }) + } + + if len(pc) == 0 { log.Debugf("Skip sending of OTLP profile with no samples") return nil } - pc := []*profiles.ProfileContainer{{ - ProfileId: mkProfileID(), - StartTimeUnixNano: startTS, - EndTimeUnixNano: endTS, - // Attributes - Optional element we do not use. - // DroppedAttributesCount - Optional element we do not use. - // OriginalPayloadFormat - Optional element we do not use. - // OriginalPayload - Optional element we do not use. - Profile: profile, - }} - scopeProfiles := []*profiles.ScopeProfiles{{ Profiles: pc, Scope: &common.InstrumentationScope{ @@ -490,54 +518,40 @@ func (r *OTLPReporter) getResource() *resource.Resource { } } -// getProfile returns an OTLP profile containing all collected samples up to this moment. -func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS uint64) { - traceEvents := r.traceEvents.WLock() - samples := maps.Clone(*traceEvents) - clear(*traceEvents) - r.traceEvents.WUnlock(&traceEvents) - - // stringMap is a temporary helper that will build the StringTable. - // By specification, the first element should be empty. - stringMap := make(map[string]uint32) - stringMap[""] = 0 - - // funcMap is a temporary helper that will build the Function array - // in profile and make sure information is deduplicated. - funcMap := make(map[funcInfo]uint64) - funcMap[funcInfo{name: "", fileName: ""}] = 0 - - // attributeMap is a temporary helper that maps attribute values to - // their respective indices. - // This is to ensure that AttributeTable does not contain duplicates. - attributeMap := make(map[string]uint64) +// getProfile returns an OTLP profile containing all collected traces up to this moment. +func (r *OTLPReporter) getProfile(origin int) (profile *profiles.Profile, startTS, endTS uint64) { + samples := r.getEventsFromOrigin(origin) + stringMap, funcMap, attributeMap := getHelperMaps() numSamples := len(samples) - profile = &profiles.Profile{ - // SampleType - Next step: Figure out the correct SampleType. - Sample: make([]*profiles.Sample, 0, numSamples), - SampleType: []*profiles.ValueType{{ - Type: int64(getStringMapIndex(stringMap, "samples")), - Unit: int64(getStringMapIndex(stringMap, "count")), - }}, - PeriodType: &profiles.ValueType{ - Type: int64(getStringMapIndex(stringMap, "cpu")), - Unit: int64(getStringMapIndex(stringMap, "nanoseconds")), - }, - Period: 1e9 / int64(r.samplesPerSecond), - // AttributeUnits - Optional element we do not use. - // LinkTable - Optional element we do not use. - // DropFrames - Optional element we do not use. - // KeepFrames - Optional element we do not use. - // Comment - Optional element we do not use. - // DefaultSampleType - Optional element we do not use. + switch origin { + case support.TraceOriginSampling: + profile = &profiles.Profile{ + Sample: make([]*profiles.Sample, 0, numSamples), + SampleType: []*profiles.ValueType{{ + Type: int64(getStringMapIndex(stringMap, "samples")), + Unit: int64(getStringMapIndex(stringMap, "count")), + }}, + PeriodType: &profiles.ValueType{ + Type: int64(getStringMapIndex(stringMap, "cpu")), + Unit: int64(getStringMapIndex(stringMap, "nanoseconds")), + }, + Period: 1e9 / int64(r.samplesPerSecond), + } + case support.TraceOriginOffCPU: + profile = &profiles.Profile{ + Sample: make([]*profiles.Sample, 0, numSamples), + SampleType: []*profiles.ValueType{{ + Type: int64(getStringMapIndex(stringMap, "events")), + Unit: int64(getStringMapIndex(stringMap, "nanoseconds")), + }}, + } + default: + return &profiles.Profile{}, 0, 0 } locationIndex := uint64(0) - // Temporary lookup to reference existing Mappings. - fileIDtoMapping := make(map[libpf.FileID]uint64) - for traceKey, traceInfo := range samples { sample := &profiles.Sample{} sample.LocationsStartIndex = locationIndex @@ -550,110 +564,18 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u endTS = traceInfo.timestamps[len(traceInfo.timestamps)-1] sample.TimestampsUnixNano = traceInfo.timestamps - sample.Value = []int64{1} - - // Walk every frame of the trace. - for i := range traceInfo.frameTypes { - frameAttributes := addProfileAttributes(profile, []attrKeyValue[string]{ - {key: "profile.frame.type", value: traceInfo.frameTypes[i].String()}, - }, attributeMap) - - loc := &profiles.Location{ - // Id - Optional element we do not use. - Address: uint64(traceInfo.linenos[i]), - // IsFolded - Optional element we do not use. - Attributes: frameAttributes, + switch origin { + case support.TraceOriginSampling: + sample.Value = []int64{1} + case support.TraceOriginOffCPU: + sample.Value = make([]int64, len(traceInfo.offTimes)) + for idx, offTime := range traceInfo.offTimes { + sample.Value[idx] = int64(offTime) } - - switch frameKind := traceInfo.frameTypes[i]; frameKind { - case libpf.NativeFrame: - // As native frames are resolved in the backend, we use Mapping to - // report these frames. - - var locationMappingIndex uint64 - if tmpMappingIndex, exists := fileIDtoMapping[traceInfo.files[i]]; exists { - locationMappingIndex = tmpMappingIndex - } else { - idx := uint64(len(fileIDtoMapping)) - fileIDtoMapping[traceInfo.files[i]] = idx - locationMappingIndex = idx - - // Ensure that actively used executables do not expire. - execInfo, exists := r.executables.GetAndRefresh(traceInfo.files[i], - executableCacheLifetime) - - // Next step: Select a proper default value, - // if the name of the executable is not known yet. - var fileName = "UNKNOWN" - if exists { - fileName = execInfo.fileName - } - - mappingAttributes := addProfileAttributes(profile, []attrKeyValue[string]{ - // Once SemConv and its Go package is released with the new - // semantic convention for build_id, replace these hard coded - // strings. - {key: "process.executable.build_id.gnu", value: execInfo.gnuBuildID}, - {key: "process.executable.build_id.htlhash", - value: traceInfo.files[i].StringNoQuotes()}, - }, attributeMap) - - profile.Mapping = append(profile.Mapping, &profiles.Mapping{ - // Id - Optional element we do not use. - MemoryStart: uint64(traceInfo.mappingStarts[i]), - MemoryLimit: uint64(traceInfo.mappingEnds[i]), - FileOffset: traceInfo.mappingFileOffsets[i], - Filename: int64(getStringMapIndex(stringMap, fileName)), - Attributes: mappingAttributes, - // HasFunctions - Optional element we do not use. - // HasFilenames - Optional element we do not use. - // HasLineNumbers - Optional element we do not use. - // HasInlinedFrames - Optional element we do not use. - }) - } - loc.MappingIndex = locationMappingIndex - case libpf.AbortFrame: - // Next step: Figure out how the OTLP protocol - // could handle artificial frames, like AbortFrame, - // that are not originated from a native or interpreted - // program. - default: - // Store interpreted frame information as a Line message: - line := &profiles.Line{} - - fileIDInfoLock, exists := r.frames.Get(traceInfo.files[i]) - if !exists { - // At this point, we do not have enough information for the frame. - // Therefore, we report a dummy entry and use the interpreter as filename. - line.FunctionIndex = createFunctionEntry(funcMap, - "UNREPORTED", frameKind.String()) - } else { - fileIDInfo := fileIDInfoLock.RLock() - if si, exists := (*fileIDInfo)[traceInfo.linenos[i]]; exists { - line.Line = int64(si.lineNumber) - - line.FunctionIndex = createFunctionEntry(funcMap, - si.functionName, si.filePath) - } else { - // At this point, we do not have enough information for the frame. - // Therefore, we report a dummy entry and use the interpreter as filename. - // To differentiate this case from the case where no information about - // the file ID is available at all, we use a different name for reported - // function. - line.FunctionIndex = createFunctionEntry(funcMap, - "UNRESOLVED", frameKind.String()) - } - fileIDInfoLock.RUnlock(&fileIDInfo) - } - loc.Line = append(loc.Line, line) - - // To be compliant with the protocol, generate a dummy mapping entry. - loc.MappingIndex = getDummyMappingIndex(fileIDtoMapping, - stringMap, attributeMap, profile, traceInfo.files[i]) - } - profile.Location = append(profile.Location, loc) } + r.populateTrace(profile, traceInfo, stringMap, funcMap, attributeMap) + sample.Attributes = append(addProfileAttributes(profile, []attrKeyValue[string]{ {key: string(semconv.ContainerIDKey), value: traceKey.containerID}, {key: string(semconv.ThreadNameKey), value: traceKey.comm}, @@ -668,6 +590,154 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u } log.Debugf("Reporting OTLP profile with %d samples", len(profile.Sample)) + populateMetadata(profile, stringMap, funcMap) + + profile.DurationNanos = int64(endTS - startTS) + profile.TimeNanos = int64(startTS) + + return profile, startTS, endTS +} + +// getEventsFromOrigin returns all reported traces from a specified kind of profiling. +func (r *OTLPReporter) getEventsFromOrigin(kind int) samplesMap { + traceEvents := r.traceEvents.WLock() + samples := maps.Clone((*traceEvents)[kind]) + clear((*traceEvents)[kind]) + r.traceEvents.WUnlock(&traceEvents) + return samples +} + +// getHelperMaps returns temporary maps that support constructing a profile. +func getHelperMaps() (stringMap map[string]uint32, funcMap map[funcInfo]uint64, + attributeMap map[string]uint64) { + // stringMap is a temporary helper that will build the StringTable. + // By specification, the first element should be empty. + stringMap = make(map[string]uint32) + stringMap[""] = 0 + + // funcMap is a temporary helper that will build the Function array + // in profile and make sure information is deduplicated. + funcMap = make(map[funcInfo]uint64) + funcMap[funcInfo{name: "", fileName: ""}] = 0 + + // attributeMap is a temporary helper that maps attribute values to + // their respective indices. + // This is to ensure that AttributeTable does not contain duplicates. + attributeMap = make(map[string]uint64) + + return stringMap, funcMap, attributeMap +} + +func (r *OTLPReporter) populateTrace(profile *profiles.Profile, traceInfo *traceEvents, + stringMap map[string]uint32, funcMap map[funcInfo]uint64, attributeMap map[string]uint64) { + // Temporary lookup to reference existing Mappings. + fileIDtoMapping := make(map[libpf.FileID]uint64) + + // Walk every frame of the trace. + for i := range traceInfo.frameTypes { + frameAttributes := addProfileAttributes(profile, []attrKeyValue[string]{ + {key: "profile.frame.type", value: traceInfo.frameTypes[i].String()}, + }, attributeMap) + + loc := &profiles.Location{ + // Id - Optional element we do not use. + Address: uint64(traceInfo.linenos[i]), + // IsFolded - Optional element we do not use. + Attributes: frameAttributes, + } + + switch frameKind := traceInfo.frameTypes[i]; frameKind { + case libpf.NativeFrame: + // As native frames are resolved in the backend, we use Mapping to + // report these frames. + + var locationMappingIndex uint64 + if tmpMappingIndex, exists := fileIDtoMapping[traceInfo.files[i]]; exists { + locationMappingIndex = tmpMappingIndex + } else { + idx := uint64(len(fileIDtoMapping)) + fileIDtoMapping[traceInfo.files[i]] = idx + locationMappingIndex = idx + + execInfo, exists := r.executables.GetAndRefresh(traceInfo.files[i], + executableCacheLifetime) + + // Next step: Select a proper default value, + // if the name of the executable is not known yet. + var fileName = "UNKNOWN" + if exists { + fileName = execInfo.fileName + } + + mappingAttributes := addProfileAttributes(profile, []attrKeyValue[string]{ + // Once SemConv and its Go package is released with the new + // semantic convention for build_id, replace these hard coded + // strings. + {key: "process.executable.build_id.gnu", value: execInfo.gnuBuildID}, + {key: "process.executable.build_id.htlhash", + value: traceInfo.files[i].StringNoQuotes()}, + }, attributeMap) + + profile.Mapping = append(profile.Mapping, &profiles.Mapping{ + // Id - Optional element we do not use. + MemoryStart: uint64(traceInfo.mappingStarts[i]), + MemoryLimit: uint64(traceInfo.mappingEnds[i]), + FileOffset: traceInfo.mappingFileOffsets[i], + Filename: int64(getStringMapIndex(stringMap, fileName)), + Attributes: mappingAttributes, + // HasFunctions - Optional element we do not use. + // HasFilenames - Optional element we do not use. + // HasLineNumbers - Optional element we do not use. + // HasInlinedFrames - Optional element we do not use. + }) + } + loc.MappingIndex = locationMappingIndex + case libpf.AbortFrame: + // Next step: Figure out how the OTLP protocol + // could handle artificial frames, like AbortFrame, + // that are not originated from a native or interpreted + // program. + default: + // Store interpreted frame information as a Line message: + line := &profiles.Line{} + + fileIDInfoLock, exists := r.frames.Get(traceInfo.files[i]) + if !exists { + // At this point, we do not have enough information for the frame. + // Therefore, we report a dummy entry and use the interpreter as filename. + line.FunctionIndex = createFunctionEntry(funcMap, + "UNREPORTED", frameKind.String()) + } else { + fileIDInfo := fileIDInfoLock.RLock() + if si, exists := (*fileIDInfo)[traceInfo.linenos[i]]; exists { + line.Line = int64(si.lineNumber) + + line.FunctionIndex = createFunctionEntry(funcMap, + si.functionName, si.filePath) + } else { + // At this point, we do not have enough information for the frame. + // Therefore, we report a dummy entry and use the interpreter as filename. + // To differentiate this case from the case where no information about + // the file ID is available at all, we use a different name for reported + // function. + line.FunctionIndex = createFunctionEntry(funcMap, + "UNRESOLVED", frameKind.String()) + } + fileIDInfoLock.RUnlock(&fileIDInfo) + } + loc.Line = append(loc.Line, line) + + // To be compliant with the protocol, generate a dummy mapping entry. + loc.MappingIndex = getDummyMappingIndex(fileIDtoMapping, + stringMap, attributeMap, profile, traceInfo.files[i]) + } + profile.Location = append(profile.Location, loc) + } +} + +// populateMetadata adds the information from the temporary helper maps to the profile. +func populateMetadata(profile *profiles.Profile, stringMap map[string]uint32, + funcMap map[funcInfo]uint64) { // Populate the deduplicated functions into profile. funcTable := make([]*profiles.Function, len(funcMap)) for v, idx := range funcMap { @@ -693,11 +763,6 @@ func (r *OTLPReporter) getProfile() (profile *profiles.Profile, startTS, endTS u for i := int64(0); i < int64(len(profile.Location)); i++ { profile.LocationIndices[i] = i } - - profile.DurationNanos = int64(endTS - startTS) - profile.TimeNanos = int64(startTS) - - return profile, startTS, endTS } // getStringMapIndex inserts or looks up the index for value in stringMap. diff --git a/tracehandler/tracehandler.go b/tracehandler/tracehandler.go index ebc39ac0..0c9fb3bc 100644 --- a/tracehandler/tracehandler.go +++ b/tracehandler/tracehandler.go @@ -127,6 +127,8 @@ func (m *traceHandler) HandleTrace(bpfTrace *host.Trace) { TID: bpfTrace.TID, APMServiceName: "", // filled in below CPU: bpfTrace.CPU, + Origin: bpfTrace.Origin, + OffTime: bpfTrace.OffTime, } if !m.reporter.SupportsReportTraceEvent() { diff --git a/tracer/tracer.go b/tracer/tracer.go index 09e107de..4654d0fb 100644 --- a/tracer/tracer.go +++ b/tracer/tracer.go @@ -977,6 +977,8 @@ func (t *Tracer) loadBpfTrace(raw []byte, cpu int) *host.Trace { APMTransactionID: *(*libpf.APMTransactionID)(unsafe.Pointer(&ptr.apm_transaction_id)), PID: libpf.PID(ptr.pid), TID: libpf.PID(ptr.tid), + Origin: int(ptr.origin), + OffTime: uint64(ptr.offtime), KTime: times.KTime(ptr.ktime), CPU: cpu, } @@ -984,11 +986,13 @@ func (t *Tracer) loadBpfTrace(raw []byte, cpu int) *host.Trace { // Trace fields included in the hash: // - PID, kernel stack ID, length & frame array // Intentionally excluded: - // - ktime, COMM, APM trace, APM transaction ID + // - ktime, COMM, APM trace, APM transaction ID, Origin and Off Time ptr.comm = [16]C.char{} ptr.apm_trace_id = C.ApmTraceID{} ptr.apm_transaction_id = C.ApmSpanID{} ptr.ktime = 0 + ptr.origin = 0 + ptr.offtime = 0 trace.Hash = host.TraceHash(xxh3.Hash128(raw).Lo) userFrameOffs := 0