diff --git a/collector.go b/collector.go index 6e8935a..d788c64 100644 --- a/collector.go +++ b/collector.go @@ -240,7 +240,6 @@ func (c *cpuProfileCollector) upload() { SampleRate: DefaultSampleRate, Units: "samples", AggregationType: "sum", - Format: upstream.FormatPprof, Profile: copyBuf(buf), }) c.buf.Reset() diff --git a/sample_types.go b/sample_types.go deleted file mode 100644 index 53c7438..0000000 --- a/sample_types.go +++ /dev/null @@ -1,52 +0,0 @@ -package pyroscope - -import ( - "github.com/grafana/pyroscope-go/upstream" -) - -var ( - sampleTypeConfigHeap = map[string]*upstream.SampleType{ //nolint:gochecknoglobals - "alloc_objects": { - Units: "objects", - Cumulative: false, - }, - "alloc_space": { - Units: "bytes", - Cumulative: false, - }, - "inuse_space": { - Units: "bytes", - Aggregation: "average", - Cumulative: false, - }, - "inuse_objects": { - Units: "objects", - Aggregation: "average", - Cumulative: false, - }, - } - sampleTypeConfigMutex = map[string]*upstream.SampleType{ //nolint:gochecknoglobals - "contentions": { - DisplayName: "mutex_count", - Units: "lock_samples", - Cumulative: false, - }, - "delay": { - DisplayName: "mutex_duration", - Units: "lock_nanoseconds", - Cumulative: false, - }, - } - sampleTypeConfigBlock = map[string]*upstream.SampleType{ //nolint:gochecknoglobals - "contentions": { - DisplayName: "block_count", - Units: "lock_samples", - Cumulative: false, - }, - "delay": { - DisplayName: "block_duration", - Units: "lock_nanoseconds", - Cumulative: false, - }, - } -) diff --git a/session.go b/session.go index 2bd2861..6d52cd4 100644 --- a/session.go +++ b/session.go @@ -275,15 +275,7 @@ func (ps *Session) uploadData(startTime, endTime time.Time) { SpyName: "gospy", Units: "goroutines", AggregationType: "average", - Format: upstream.FormatPprof, Profile: copyBuf(ps.goroutinesBuf.Bytes()), - SampleTypeConfig: map[string]*upstream.SampleType{ - "goroutine": { - DisplayName: "goroutines", - Units: "goroutines", - Aggregation: "average", - }, - }, }) ps.goroutinesBuf.Reset() } @@ -324,14 +316,12 @@ func (ps *Session) dumpHeapProfile(startTime time.Time, endTime time.Time) { } curMemBytes := copyBuf(ps.memBuf.Bytes()) job := &upstream.UploadJob{ - Name: ps.appName, - StartTime: startTime, - EndTime: endTime, - SpyName: "gospy", - SampleRate: 100, - Format: upstream.FormatPprof, - Profile: curMemBytes, - SampleTypeConfig: sampleTypeConfigHeap, + Name: ps.appName, + StartTime: startTime, + EndTime: endTime, + SpyName: "gospy", + SampleRate: 100, + Profile: curMemBytes, } ps.upstream.Upload(job) ps.lastGCGeneration = currentGCGeneration @@ -353,13 +343,11 @@ func (ps *Session) dumpMutexProfile(startTime time.Time, endTime time.Time) { } curMutexBuf := copyBuf(ps.mutexBuf.Bytes()) job := &upstream.UploadJob{ - Name: ps.appName, - StartTime: startTime, - EndTime: endTime, - SpyName: "gospy", - Format: upstream.FormatPprof, - Profile: curMutexBuf, - SampleTypeConfig: sampleTypeConfigMutex, + Name: ps.appName, + StartTime: startTime, + EndTime: endTime, + SpyName: "gospy", + Profile: curMutexBuf, } ps.upstream.Upload(job) } @@ -379,13 +367,11 @@ func (ps *Session) dumpBlockProfile(startTime time.Time, endTime time.Time) { } curBlockBuf := copyBuf(ps.blockBuf.Bytes()) job := &upstream.UploadJob{ - Name: ps.appName, - StartTime: startTime, - EndTime: endTime, - SpyName: "gospy", - Format: upstream.FormatPprof, - Profile: curBlockBuf, - SampleTypeConfig: sampleTypeConfigBlock, + Name: ps.appName, + StartTime: startTime, + EndTime: endTime, + SpyName: "gospy", + Profile: curBlockBuf, } ps.upstream.Upload(job) } diff --git a/upstream/remote/remote.go b/upstream/remote/remote.go index 8a21f4f..fd870d5 100644 --- a/upstream/remote/remote.go +++ b/upstream/remote/remote.go @@ -3,11 +3,9 @@ package remote import ( "bytes" "context" - "encoding/json" "errors" "fmt" "io" - "mime/multipart" "net/http" "net/url" "path" @@ -42,6 +40,7 @@ type Remote struct { wg sync.WaitGroup flushWG *sync.WaitGroup + url *url.URL } type HTTPClient interface { @@ -100,6 +99,7 @@ func NewRemote(cfg Config) (*Remote, error) { if err != nil { return nil, err } + r.url = u // authorize the token first if cfg.AuthToken == "" && isOGPyroscopeCloud(u) { @@ -150,35 +150,10 @@ func (r *Remote) Flush() { } func (r *Remote) uploadProfile(j *upstream.UploadJob) error { - u, err := url.Parse(r.cfg.Address) - if err != nil { - return fmt.Errorf("url parse: %w", err) - } - - body := &bytes.Buffer{} - - writer := multipart.NewWriter(body) - fw, err := writer.CreateFormFile("profile", "profile.pprof") - if err != nil { - return err - } - _, _ = fw.Write(j.Profile) - if j.SampleTypeConfig != nil { - fw, err = writer.CreateFormFile("sample_type_config", "sample_type_config.json") - if err != nil { - return err - } - b, err := json.Marshal(j.SampleTypeConfig) - if err != nil { - return err - } - _, _ = fw.Write(b) - } - if err = writer.Close(); err != nil { - return err - } + u := *r.url q := u.Query() + q.Set("format", "pprof") q.Set("name", j.Name) q.Set("from", strconv.FormatInt(j.StartTime.UnixNano(), 10)) q.Set("until", strconv.FormatInt(j.EndTime.UnixNano(), 10)) @@ -191,18 +166,20 @@ func (r *Remote) uploadProfile(j *upstream.UploadJob) error { u.RawQuery = q.Encode() r.logger.Debugf("uploading at %s", u.String()) - // new a request for the job - request, err := http.NewRequestWithContext(context.Background(), http.MethodPost, u.String(), body) + + request, err := http.NewRequestWithContext( + context.Background(), // todo: timeout + http.MethodPost, + u.String(), + bytes.NewReader(j.Profile), + ) if err != nil { return fmt.Errorf("new http request: %w", err) } - contentType := writer.FormDataContentType() - r.logger.Debugf("content type: %s", contentType) - request.Header.Set("Content-Type", contentType) - // request.Header.Set("Content-Type", "binary/octet-stream+"+string(j.Format)) + request.Header.Set("Content-Type", "binary/octet-stream+pprof") switch { - case r.cfg.AuthToken != "" && isOGPyroscopeCloud(u): + case r.cfg.AuthToken != "" && isOGPyroscopeCloud(&u): request.Header.Set("Authorization", "Bearer "+r.cfg.AuthToken) case r.cfg.BasicAuthUser != "" && r.cfg.BasicAuthPassword != "": request.SetBasicAuth(r.cfg.BasicAuthUser, r.cfg.BasicAuthPassword) diff --git a/upstream/remote/remote_test.go b/upstream/remote/remote_test.go index 839d546..1b9a46f 100644 --- a/upstream/remote/remote_test.go +++ b/upstream/remote/remote_test.go @@ -66,13 +66,11 @@ func TestUploadProfile(t *testing.T) { Body: io.NopCloser(bytes.NewBufferString("OK")), }, nil) - r := &Remote{ - cfg: tt.cfg, - client: mockClient, - logger: logger, - } - - err := r.uploadProfile(&upstream.UploadJob{ + r, err := NewRemote(tt.cfg) + r.client = mockClient + r.logger = logger + require.NoError(t, err) + err = r.uploadProfile(&upstream.UploadJob{ Name: "test-profile", StartTime: time.Now(), EndTime: time.Now().Add(time.Minute), diff --git a/upstream/upstream.go b/upstream/upstream.go index 6e4b2ab..7ec33e1 100644 --- a/upstream/upstream.go +++ b/upstream/upstream.go @@ -4,23 +4,11 @@ import ( "time" ) -type Format string - -const FormatPprof Format = "pprof" - type Upstream interface { Upload(job *UploadJob) Flush() } -type SampleType struct { - Units string `json:"units,omitempty"` - Aggregation string `json:"aggregation,omitempty"` - DisplayName string `json:"display-name,omitempty"` - Sampled bool `json:"sampled,omitempty"` - Cumulative bool `json:"cumulative,omitempty"` -} - type UploadJob struct { Name string StartTime time.Time @@ -29,9 +17,5 @@ type UploadJob struct { SampleRate uint32 Units string AggregationType string - Format Format Profile []byte - // Deprecated - PrevProfile []byte - SampleTypeConfig map[string]*SampleType }