Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 122 additions & 24 deletions connector/profilingmetricsconnector/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package profilingmetricsconnector // import "github.com/elastic/opentelemetry-co

import (
"fmt"
"log/slog"
"regexp"

"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -38,22 +39,26 @@ type frameInfo struct {
filename string
}

const nativeLibraryAttrName = "shlib_name"
const (
nativeLibraryAttrName = "shlib_name"
syscallAttrName = "syscall_name"
)

var (
metricUser = metric{name: "samples.user.count", desc: "Number of samples executing userspace code (self)"}
metricKernel = metric{name: "samples.kernel.count", desc: "Number of samples executing kernel code (self)"}
metricNative = metric{name: "samples.native.count", desc: "Number of samples executing native code (self)"}
metricJVM = metric{name: "samples.jvm.count", desc: "Number of samples executing HotSpot code (self)"}
metricPython = metric{name: "samples.cpython.count", desc: "Number of samples executing Python code (self)"}
metricGo = metric{name: "samples.go.count", desc: "Number of samples executing Go code (self)"}
metricV8JS = metric{name: "samples.v8js.count", desc: "Number of samples executing V8 JS code (self)"}
metricPHP = metric{name: "samples.php.count", desc: "Number of samples executing PHP code (self)"}
metricPerl = metric{name: "samples.perl.count", desc: "Number of samples executing Perl code (self)"}
metricRuby = metric{name: "samples.ruby.count", desc: "Number of samples executing Ruby code (self)"}
metricDotnet = metric{name: "samples.dotnet.count", desc: "Number of samples executing Dotnet code (self)"}
metricRust = metric{name: "samples.rust.count", desc: "Number of samples executing Rust code (self)"}
metricBeam = metric{name: "samples.beam.count", desc: "Number of samples executing Beam code (self)"}
metricUser = metric{name: "samples.user.count", desc: "Number of samples executing userspace code (self)"}
metricKernel = metric{name: "samples.kernel.count", desc: "Number of samples executing kernel code (self)"}
metricSyscall = metric{name: "samples.syscall.count", desc: "Number of samples executing syscall code (self)"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding one more metric, we can use the same method as metricNative: We know that there can not be multiple syscalls per stacktrace, so we can enrich metricKernel with the syscall name attribute.

We can do that inside classifyFrame (which we can rename to classifyFrames) and have it take an extra argument kernelCounts map[string]int64

metricNative = metric{name: "samples.native.count", desc: "Number of samples executing native code (self)"}
metricJVM = metric{name: "samples.jvm.count", desc: "Number of samples executing HotSpot code (self)"}
metricPython = metric{name: "samples.cpython.count", desc: "Number of samples executing Python code (self)"}
metricGo = metric{name: "samples.go.count", desc: "Number of samples executing Go code (self)"}
metricV8JS = metric{name: "samples.v8js.count", desc: "Number of samples executing V8 JS code (self)"}
metricPHP = metric{name: "samples.php.count", desc: "Number of samples executing PHP code (self)"}
metricPerl = metric{name: "samples.perl.count", desc: "Number of samples executing Perl code (self)"}
metricRuby = metric{name: "samples.ruby.count", desc: "Number of samples executing Ruby code (self)"}
metricDotnet = metric{name: "samples.dotnet.count", desc: "Number of samples executing Dotnet code (self)"}
metricRust = metric{name: "samples.rust.count", desc: "Number of samples executing Rust code (self)"}
metricBeam = metric{name: "samples.beam.count", desc: "Number of samples executing Beam code (self)"}

allowedFrameTypes = map[string]metric{
frameTypeNative: metricNative,
Expand All @@ -70,10 +75,14 @@ var (
frameTypeBeam: metricBeam,
}

// match shared libraries
rx = regexp.MustCompile(`(?:.*/)?(.+)\.so`)

// match syscalls
syscallRx = regexp.MustCompile(`^(?:__x64_sys|__arm64_sys|ksys)_(\w+)`)
)

func fetchFrameInfo(dictionary pprofile.ProfilesDictionary,
func fetchLeafFrameInfo(dictionary pprofile.ProfilesDictionary,
locationIndices pcommon.Int32Slice,
sampleLocationIndex int,
) (frameInfo, error) {
Expand Down Expand Up @@ -148,17 +157,15 @@ func fetchFrameInfo(dictionary pprofile.ProfilesDictionary,
// classifyFrame classifies sample into one or more categories based on frame type.
// This takes place by incrementing the associated metric count.
func classifyFrame(dictionary pprofile.ProfilesDictionary,
locationIndices pcommon.Int32Slice,
sample pprofile.Sample,
counts map[metric]int64,
nativeCounts map[string]int64,
locationIndices pcommon.Int32Slice, sample pprofile.Sample,
counts map[metric]int64, nativeCounts map[string]int64,
) error {
fi, err := fetchFrameInfo(dictionary, locationIndices, 0)
leaf, err := fetchLeafFrameInfo(dictionary, locationIndices, 0)
if err != nil {
return err
}

leafFrameType := fi.typ
leafFrameType := leaf.typ
// We don't need a separate metric for total number of samples, as this can always be
// derived from summing the metricKernel and metricUser counts.
metric := allowedFrameTypes[leafFrameType]
Expand All @@ -177,7 +184,7 @@ func classifyFrame(dictionary pprofile.ProfilesDictionary,
}

// Extract native library name and increment associated count
if sm := rx.FindStringSubmatch(fi.filename); sm != nil {
if sm := rx.FindStringSubmatch(leaf.filename); sm != nil {
nativeCounts[sm[1]]++
} else {
counts[metric]++
Expand All @@ -186,22 +193,97 @@ func classifyFrame(dictionary pprofile.ProfilesDictionary,
return nil
}

// identifySyscall walks the frames and extracts the syscall information.
func identifySyscall(dictionary pprofile.ProfilesDictionary,
locationIndices pcommon.Int32Slice,
syscallCounts map[string]int64, multiplier int64,
) error {
attrTable := dictionary.AttributeTable()
locationTable := dictionary.LocationTable()
strTable := dictionary.StringTable()
funcTable := dictionary.FunctionTable()

attrTblLen := attrTable.Len()
locTblLen := locationTable.Len()
strTblLen := strTable.Len()
funcTblLen := funcTable.Len()

for _, li := range locationIndices.All() {
if li >= int32(locTblLen) {
slog.Error("identifySyscall", slog.Any("li", li),
slog.Any("locTblLen", locTblLen))
continue
}
loc := locationTable.At(int(li))
for _, attrIdx := range loc.AttributeIndices().All() {
if attrIdx >= int32(attrTblLen) {
slog.Error("identifySyscall", slog.Any("attrIdx", attrIdx),
slog.Any("attrTblLen", attrTblLen))
continue
}
attr := attrTable.At(int(attrIdx))
if int(attr.KeyStrindex()) >= strTblLen {
slog.Error("identifySyscall", slog.Any("attr.KeyStrindex()", attr.KeyStrindex()),
slog.Any("strTblLen", strTblLen))
continue
}

if strTable.At(int(attr.KeyStrindex())) == string(semconv.ProfileFrameTypeKey) {
frameType := attr.Value().Str()
if frameType == frameTypeKernel {
for _, ln := range loc.Line().All() {
if ln.FunctionIndex() >= int32(funcTblLen) {
slog.Error("identifySyscall", slog.Any("ln.FunctionIndex()", ln.FunctionIndex()),
slog.Any("funcTblLen", funcTblLen))
continue
}
fn := funcTable.At(int(ln.FunctionIndex()))
if fn.NameStrindex() >= int32(strTblLen) {
slog.Error("identifySyscall", slog.Any("fn.NameStrindex()", fn.NameStrindex()),
slog.Any("strTblLen", strTblLen))
continue
}
fnName := strTable.At(int(fn.NameStrindex()))

// Avoid string allocations by using indices to string location.
indices := syscallRx.FindStringSubmatchIndex(fnName)
if len(indices) == 4 {
syscall := fnName[indices[2]:indices[3]]
syscallCounts[syscall] += multiplier
return nil
}
}

}
}
}
}
return nil
}

func (c *profilesToMetricsConnector) addFrameMetrics(dictionary pprofile.ProfilesDictionary,
profile pprofile.Profile, scopeMetrics pmetric.ScopeMetrics,
) {
stackTable := dictionary.StackTable()

counts := make(map[metric]int64)
nativeCounts := make(map[string]int64)
syscallCounts := make(map[string]int64)

// Process all samples and extract metric counts
for _, sample := range profile.Sample().All() {
multiplier := int64(sample.TimestampsUnixNano().Len())
stack := stackTable.At(int(sample.StackIndex()))
if err := classifyFrame(dictionary, stack.LocationIndices(),
sample, counts, nativeCounts); err != nil {
// Should not happen with well-formed profile data
// TODO: Add error metric or log error
continue
slog.Error("classifyFrame", slog.Any("error", err))
}

if err := identifySyscall(dictionary, stack.LocationIndices(),
syscallCounts, multiplier); err != nil {
// Should not happen with well-formed profile data
slog.Error("identifySyscall", slog.Any("error", err))
}
}

Expand Down Expand Up @@ -236,4 +318,20 @@ func (c *profilesToMetricsConnector) addFrameMetrics(dictionary pprofile.Profile
dp.SetIntValue(count)
dp.Attributes().PutStr(nativeLibraryAttrName, libraryName)
}

for sysCall, count := range syscallCounts {
m := scopeMetrics.Metrics().AppendEmpty()
m.SetName(c.config.MetricsPrefix + metricSyscall.name)
m.SetDescription(metricSyscall.desc)
m.SetUnit("1")

sum := m.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)

dp := sum.DataPoints().AppendEmpty()
dp.SetTimestamp(profile.Time())
dp.SetIntValue(count)
dp.Attributes().PutStr(syscallAttrName, sysCall)
}
}
46 changes: 34 additions & 12 deletions connector/profilingmetricsconnector/frame_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,24 @@ func (m *metricsConsumerStub) ConsumeMetrics(ctx context.Context, md pmetric.Met
assert.Equal(m.t, pmetric.AggregationTemporalityDelta,
sum.AggregationTemporality())
assert.Equal(m.t, 1, sum.DataPoints().Len())
dp := sum.DataPoints().At(0)
// For native metrics, this is convenient way to test library name extraction
if strings.HasSuffix(name, metricNative.name) {
if shlibName, exists := dp.Attributes().Get(nativeLibraryAttrName); exists {
name = fmt.Sprintf("%v/%v", name, shlibName.AsString())
for _, dp := range sum.DataPoints().All() {
switch {
case strings.HasSuffix(name, metricNative.name):
// For native metrics, this is convenient way to test library name extraction
if shlibName, exists := dp.Attributes().Get(nativeLibraryAttrName); exists {
name = fmt.Sprintf("%v/%v", name, shlibName.AsString())
}
case strings.HasSuffix(name, metricSyscall.name):
// For syscall metrics, this is convenient way to test syscall name extraction
if syscallName, exists := dp.Attributes().Get(syscallAttrName); exists {
name = fmt.Sprintf("%v/%v", name, syscallName.AsString())
}
default:
// Non-native metrics should not have attributes attached
assert.Equal(m.t, 0, dp.Attributes().Len())
}
} else {
// Non-native metrics should not have attributes attached
assert.Equal(m.t, 0, dp.Attributes().Len())
m.counts[name] += dp.IntValue()
}
m.counts[name] += dp.IntValue()
}
}
}
Expand All @@ -98,6 +105,7 @@ func newProfiles() (pprofile.Profiles,
pprofile.KeyValueAndUnitSlice,
pprofile.LocationSlice,
pprofile.StackSlice,
pprofile.FunctionSlice,
) {
profiles := pprofile.NewProfiles()
dict := profiles.Dictionary()
Expand All @@ -107,6 +115,7 @@ func newProfiles() (pprofile.Profiles,
locTable := dict.LocationTable()
mappingTable := dict.MappingTable()
stackTable := dict.StackTable()
funcTable := dict.FunctionTable()

strTable.Append("")
strTable.Append("samples")
Expand All @@ -116,8 +125,9 @@ func newProfiles() (pprofile.Profiles,
mappingTable.AppendEmpty()
attrTable.AppendEmpty()
stackTable.AppendEmpty()
funcTable.AppendEmpty()

return profiles, dict, strTable, attrTable, locTable, stackTable
return profiles, dict, strTable, attrTable, locTable, stackTable, funcTable
}

// newProfile initializes and appends a Profile to a Profiles instance.
Expand Down Expand Up @@ -146,11 +156,12 @@ func TestConsumeProfiles_FrameMetrics(t *testing.T) {
}

// Create a Profile and higher-level envelopes
profiles, _, strTable, attrTable, locTable, stackTable := newProfiles()
profiles, _, strTable, attrTable, locTable, stackTable, _ := newProfiles()
prof := newProfile(profiles)

// Create a profiles object with a sample that has a location with a frame type attribute.
sample := prof.Sample().AppendEmpty()
sample.TimestampsUnixNano().Append(1, 2, 3, 4)

// Add an attribute for frame type
attr := attrTable.AppendEmpty()
Expand Down Expand Up @@ -191,7 +202,7 @@ func TestConsumeProfiles_FrameMetricsMultiple(t *testing.T) {
}

// Create a Profile and higher-level envelopes
profiles, dict, strTable, attrTable, locTable, stackTable := newProfiles()
profiles, dict, strTable, attrTable, locTable, stackTable, funcTable := newProfiles()
prof := newProfile(profiles)

mappingTable := dict.MappingTable()
Expand All @@ -218,6 +229,7 @@ func TestConsumeProfiles_FrameMetricsMultiple(t *testing.T) {
locPy.AttributeIndices().Append(2)
locKernel := locTable.AppendEmpty()
locKernel.AttributeIndices().Append(3)
locKernel.Line().AppendEmpty().SetFunctionIndex(1)

locNative := locTable.AppendEmpty()
locNative.AttributeIndices().Append(4)
Expand Down Expand Up @@ -249,20 +261,29 @@ func TestConsumeProfiles_FrameMetricsMultiple(t *testing.T) {
// Eight samples
sampleKernel := prof.Sample().AppendEmpty()
sampleKernel.SetStackIndex(3)
sampleKernel.TimestampsUnixNano().Append(1, 2, 3)
sampleNative := prof.Sample().AppendEmpty()
sampleNative.SetStackIndex(4)
sampleNative = prof.Sample().AppendEmpty()
sampleNative.SetStackIndex(5)
sampleNative.TimestampsUnixNano().Append(2)
sampleGo := prof.Sample().AppendEmpty()
sampleGo.SetStackIndex(1)
sampleGo.TimestampsUnixNano().Append(3)
samplePy := prof.Sample().AppendEmpty()
samplePy.SetStackIndex(2)
samplePy = prof.Sample().AppendEmpty()
samplePy.SetStackIndex(2)
samplePy = prof.Sample().AppendEmpty()
samplePy.SetStackIndex(2)
samplePy.TimestampsUnixNano().Append(4)
sampleGo = prof.Sample().AppendEmpty()
sampleGo.SetStackIndex(1)
sampleGo.TimestampsUnixNano().Append(5)

syscallFunc := funcTable.AppendEmpty()
syscallFunc.SetNameStrindex(int32(strTable.Len()))
strTable.Append("__x64_sys_bpf")

err := conn.ConsumeProfiles(context.Background(), profiles)
assert.NoError(t, err)
Expand All @@ -274,6 +295,7 @@ func TestConsumeProfiles_FrameMetricsMultiple(t *testing.T) {
"frametest.samples.kernel.count": 1,
"frametest.samples.native.count": 1,
"frametest.samples.native.count/libc": 1,
"frametest.samples.syscall.count/bpf": 3,
},
m.counts)
}