Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ func (c *cpuProfileCollector) upload() {
SampleRate: DefaultSampleRate,
Units: "samples",
AggregationType: "sum",
Format: upstream.FormatPprof,
Profile: copyBuf(buf),
})
c.buf.Reset()
Expand Down
52 changes: 0 additions & 52 deletions sample_types.go

This file was deleted.

46 changes: 16 additions & 30 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,15 +275,7 @@ func (ps *Session) uploadData(startTime, endTime time.Time) {
SpyName: "gospy",
Units: "goroutines",
AggregationType: "average",
Format: upstream.FormatPprof,
Profile: copyBuf(ps.goroutinesBuf.Bytes()),
SampleTypeConfig: map[string]*upstream.SampleType{
"goroutine": {
DisplayName: "goroutines",
Units: "goroutines",
Aggregation: "average",
},
},
})
ps.goroutinesBuf.Reset()
}
Expand Down Expand Up @@ -324,14 +316,12 @@ func (ps *Session) dumpHeapProfile(startTime time.Time, endTime time.Time) {
}
curMemBytes := copyBuf(ps.memBuf.Bytes())
job := &upstream.UploadJob{
Name: ps.appName,
StartTime: startTime,
EndTime: endTime,
SpyName: "gospy",
SampleRate: 100,
Format: upstream.FormatPprof,
Profile: curMemBytes,
SampleTypeConfig: sampleTypeConfigHeap,
Name: ps.appName,
StartTime: startTime,
EndTime: endTime,
SpyName: "gospy",
SampleRate: 100,
Profile: curMemBytes,
}
ps.upstream.Upload(job)
ps.lastGCGeneration = currentGCGeneration
Expand All @@ -353,13 +343,11 @@ func (ps *Session) dumpMutexProfile(startTime time.Time, endTime time.Time) {
}
curMutexBuf := copyBuf(ps.mutexBuf.Bytes())
job := &upstream.UploadJob{
Name: ps.appName,
StartTime: startTime,
EndTime: endTime,
SpyName: "gospy",
Format: upstream.FormatPprof,
Profile: curMutexBuf,
SampleTypeConfig: sampleTypeConfigMutex,
Name: ps.appName,
StartTime: startTime,
EndTime: endTime,
SpyName: "gospy",
Profile: curMutexBuf,
}
ps.upstream.Upload(job)
}
Expand All @@ -379,13 +367,11 @@ func (ps *Session) dumpBlockProfile(startTime time.Time, endTime time.Time) {
}
curBlockBuf := copyBuf(ps.blockBuf.Bytes())
job := &upstream.UploadJob{
Name: ps.appName,
StartTime: startTime,
EndTime: endTime,
SpyName: "gospy",
Format: upstream.FormatPprof,
Profile: curBlockBuf,
SampleTypeConfig: sampleTypeConfigBlock,
Name: ps.appName,
StartTime: startTime,
EndTime: endTime,
SpyName: "gospy",
Profile: curBlockBuf,
}
ps.upstream.Upload(job)
}
Expand Down
49 changes: 13 additions & 36 deletions upstream/remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package remote
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/url"
"path"
Expand Down Expand Up @@ -42,6 +40,7 @@ type Remote struct {
wg sync.WaitGroup

flushWG *sync.WaitGroup
url *url.URL
}

type HTTPClient interface {
Expand Down Expand Up @@ -100,6 +99,7 @@ func NewRemote(cfg Config) (*Remote, error) {
if err != nil {
return nil, err
}
r.url = u

// authorize the token first
if cfg.AuthToken == "" && isOGPyroscopeCloud(u) {
Expand Down Expand Up @@ -150,35 +150,10 @@ func (r *Remote) Flush() {
}

func (r *Remote) uploadProfile(j *upstream.UploadJob) error {
u, err := url.Parse(r.cfg.Address)
if err != nil {
return fmt.Errorf("url parse: %w", err)
}

body := &bytes.Buffer{}

writer := multipart.NewWriter(body)
fw, err := writer.CreateFormFile("profile", "profile.pprof")
if err != nil {
return err
}
_, _ = fw.Write(j.Profile)
if j.SampleTypeConfig != nil {
fw, err = writer.CreateFormFile("sample_type_config", "sample_type_config.json")
if err != nil {
return err
}
b, err := json.Marshal(j.SampleTypeConfig)
if err != nil {
return err
}
_, _ = fw.Write(b)
}
if err = writer.Close(); err != nil {
return err
}
u := *r.url

q := u.Query()
q.Set("format", "pprof")
q.Set("name", j.Name)
q.Set("from", strconv.FormatInt(j.StartTime.UnixNano(), 10))
q.Set("until", strconv.FormatInt(j.EndTime.UnixNano(), 10))
Expand All @@ -191,18 +166,20 @@ func (r *Remote) uploadProfile(j *upstream.UploadJob) error {
u.RawQuery = q.Encode()

r.logger.Debugf("uploading at %s", u.String())
// new a request for the job
request, err := http.NewRequestWithContext(context.Background(), http.MethodPost, u.String(), body)

request, err := http.NewRequestWithContext(
context.Background(), // todo: timeout
http.MethodPost,
u.String(),
bytes.NewReader(j.Profile),
)
if err != nil {
return fmt.Errorf("new http request: %w", err)
}
contentType := writer.FormDataContentType()
r.logger.Debugf("content type: %s", contentType)
request.Header.Set("Content-Type", contentType)
// request.Header.Set("Content-Type", "binary/octet-stream+"+string(j.Format))
request.Header.Set("Content-Type", "binary/octet-stream+pprof")

switch {
case r.cfg.AuthToken != "" && isOGPyroscopeCloud(u):
case r.cfg.AuthToken != "" && isOGPyroscopeCloud(&u):
request.Header.Set("Authorization", "Bearer "+r.cfg.AuthToken)
case r.cfg.BasicAuthUser != "" && r.cfg.BasicAuthPassword != "":
request.SetBasicAuth(r.cfg.BasicAuthUser, r.cfg.BasicAuthPassword)
Expand Down
12 changes: 5 additions & 7 deletions upstream/remote/remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,11 @@ func TestUploadProfile(t *testing.T) {
Body: io.NopCloser(bytes.NewBufferString("OK")),
}, nil)

r := &Remote{
cfg: tt.cfg,
client: mockClient,
logger: logger,
}

err := r.uploadProfile(&upstream.UploadJob{
r, err := NewRemote(tt.cfg)
r.client = mockClient
r.logger = logger
require.NoError(t, err)
err = r.uploadProfile(&upstream.UploadJob{
Name: "test-profile",
StartTime: time.Now(),
EndTime: time.Now().Add(time.Minute),
Expand Down
16 changes: 0 additions & 16 deletions upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,11 @@ import (
"time"
)

type Format string

const FormatPprof Format = "pprof"

type Upstream interface {
Upload(job *UploadJob)
Flush()
}

type SampleType struct {
Units string `json:"units,omitempty"`
Aggregation string `json:"aggregation,omitempty"`
DisplayName string `json:"display-name,omitempty"`
Sampled bool `json:"sampled,omitempty"`
Cumulative bool `json:"cumulative,omitempty"`
}

type UploadJob struct {
Name string
StartTime time.Time
Expand All @@ -29,9 +17,5 @@ type UploadJob struct {
SampleRate uint32
Units string
AggregationType string
Format Format
Profile []byte
// Deprecated
PrevProfile []byte
SampleTypeConfig map[string]*SampleType
}
Loading