Skip to content

Commit 92e097a

Browse files
authored
✨ feat: add storage (#225)
1 parent 2839a98 commit 92e097a

29 files changed

+3450
-602
lines changed

core/logger/writer/loki.go

Lines changed: 126 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -1,128 +1,128 @@
11
package writer
22

3-
import (
4-
"bytes"
5-
"errors"
6-
"io"
7-
"log/slog"
8-
"net/http"
9-
"os"
10-
"os/signal"
11-
"syscall"
12-
"time"
13-
14-
"github.com/gogo/protobuf/proto"
15-
"github.com/golang/snappy"
16-
"github.com/grafana/loki/v3/pkg/logproto"
17-
"github.com/prometheus/common/model"
18-
)
19-
20-
/*
21-
* @Author: lwnmengjing<lwnmengjing@qq.com>
22-
* @Date: 2024/5/7 18:37:00
23-
* @Last Modified by: lwnmengjing<lwnmengjing@qq.com>
24-
* @Last Modified time: 2024/5/7 18:37:00
25-
*/
26-
27-
type LokiWriter struct {
28-
opts Options
29-
entries chan logproto.Entry
30-
}
31-
32-
func NewLokiWriter(opts ...Option) (*LokiWriter, error) {
33-
options := setDefault()
34-
for _, o := range opts {
35-
o(&options)
36-
}
37-
p := &LokiWriter{
38-
opts: options,
39-
entries: make(chan logproto.Entry, options.bufferSize),
40-
}
41-
go p.write()
42-
return p, nil
43-
}
44-
45-
func (p *LokiWriter) Write(data []byte) (n int, err error) {
46-
if p.entries == nil {
47-
p.entries = make(chan logproto.Entry, p.opts.bufferSize)
48-
}
49-
n = len(data)
50-
go func() {
51-
p.entries <- logproto.Entry{
52-
Line: string(data),
53-
Timestamp: time.Now(),
54-
}
55-
}()
56-
return n, nil
57-
}
58-
59-
func (p *LokiWriter) write() {
60-
entries := make([]logproto.Entry, 0)
61-
62-
done := make(chan os.Signal, 1)
63-
signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
64-
for {
65-
select {
66-
case <-done:
67-
err := p.send(entries)
68-
if err != nil {
69-
slog.Error("application exit, send to loki failed", slog.String("error", err.Error()))
70-
err = nil
71-
}
72-
return
73-
case <-time.After(p.opts.lokiInterval):
74-
// send to loki
75-
if len(entries) > 0 {
76-
err := p.send(entries)
77-
if err != nil {
78-
slog.Error("send to loki failed", slog.String("error", err.Error()))
79-
err = nil
80-
}
81-
entries = make([]logproto.Entry, 0)
82-
}
83-
case d := <-p.entries:
84-
entries = append(entries, d)
85-
}
86-
}
87-
}
88-
89-
func (p *LokiWriter) send(entries []logproto.Entry) error {
90-
if len(entries) == 0 {
91-
return nil
92-
}
93-
// send to loki
94-
labels := make(model.LabelSet)
95-
for k, v := range p.opts.lokiLabels {
96-
labels[model.LabelName(k)] = model.LabelValue(v)
97-
}
98-
req := &logproto.PushRequest{
99-
Streams: []logproto.Stream{
100-
{
101-
Labels: labels.String(),
102-
Entries: entries,
103-
},
104-
},
105-
}
106-
payload, err := proto.Marshal(req)
107-
if err != nil {
108-
return err
109-
}
110-
payload = snappy.Encode(nil, payload)
111-
// 发送POST请求到Loki
112-
resp, err := http.Post(p.opts.lokiURL,
113-
"application/x-protobuf",
114-
bytes.NewBuffer(payload))
115-
if err != nil {
116-
return err
117-
}
118-
defer resp.Body.Close()
119-
if resp.StatusCode > 399 {
120-
var body []byte
121-
body, err = io.ReadAll(resp.Body)
122-
if err != nil {
123-
return err
124-
}
125-
return errors.New(string(body))
126-
}
127-
return nil
128-
}
3+
//import (
4+
// "bytes"
5+
// "errors"
6+
// "io"
7+
// "log/slog"
8+
// "net/http"
9+
// "os"
10+
// "os/signal"
11+
// "syscall"
12+
// "time"
13+
//
14+
// "github.com/gogo/protobuf/proto"
15+
// "github.com/golang/snappy"
16+
// "github.com/grafana/loki/v3/pkg/logproto"
17+
// "github.com/prometheus/common/model"
18+
//)
19+
//
20+
///*
21+
// * @Author: lwnmengjing<lwnmengjing@qq.com>
22+
// * @Date: 2024/5/7 18:37:00
23+
// * @Last Modified by: lwnmengjing<lwnmengjing@qq.com>
24+
// * @Last Modified time: 2024/5/7 18:37:00
25+
// */
26+
//
27+
//type LokiWriter struct {
28+
// opts Options
29+
// entries chan logproto.Entry
30+
//}
31+
//
32+
//func NewLokiWriter(opts ...Option) (*LokiWriter, error) {
33+
// options := setDefault()
34+
// for _, o := range opts {
35+
// o(&options)
36+
// }
37+
// p := &LokiWriter{
38+
// opts: options,
39+
// entries: make(chan logproto.Entry, options.bufferSize),
40+
// }
41+
// go p.write()
42+
// return p, nil
43+
//}
44+
//
45+
//func (p *LokiWriter) Write(data []byte) (n int, err error) {
46+
// if p.entries == nil {
47+
// p.entries = make(chan logproto.Entry, p.opts.bufferSize)
48+
// }
49+
// n = len(data)
50+
// go func() {
51+
// p.entries <- logproto.Entry{
52+
// Line: string(data),
53+
// Timestamp: time.Now(),
54+
// }
55+
// }()
56+
// return n, nil
57+
//}
58+
//
59+
//func (p *LokiWriter) write() {
60+
// entries := make([]logproto.Entry, 0)
61+
//
62+
// done := make(chan os.Signal, 1)
63+
// signal.Notify(done, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
64+
// for {
65+
// select {
66+
// case <-done:
67+
// err := p.send(entries)
68+
// if err != nil {
69+
// slog.Error("application exit, send to loki failed", slog.String("error", err.Error()))
70+
// err = nil
71+
// }
72+
// return
73+
// case <-time.After(p.opts.lokiInterval):
74+
// // send to loki
75+
// if len(entries) > 0 {
76+
// err := p.send(entries)
77+
// if err != nil {
78+
// slog.Error("send to loki failed", slog.String("error", err.Error()))
79+
// err = nil
80+
// }
81+
// entries = make([]logproto.Entry, 0)
82+
// }
83+
// case d := <-p.entries:
84+
// entries = append(entries, d)
85+
// }
86+
// }
87+
//}
88+
//
89+
//func (p *LokiWriter) send(entries []logproto.Entry) error {
90+
// if len(entries) == 0 {
91+
// return nil
92+
// }
93+
// // send to loki
94+
// labels := make(model.LabelSet)
95+
// for k, v := range p.opts.lokiLabels {
96+
// labels[model.LabelName(k)] = model.LabelValue(v)
97+
// }
98+
// req := &logproto.PushRequest{
99+
// Streams: []logproto.Stream{
100+
// {
101+
// Labels: labels.String(),
102+
// Entries: entries,
103+
// },
104+
// },
105+
// }
106+
// payload, err := proto.Marshal(req)
107+
// if err != nil {
108+
// return err
109+
// }
110+
// payload = snappy.Encode(nil, payload)
111+
// // 发送POST请求到Loki
112+
// resp, err := http.Post(p.opts.lokiURL,
113+
// "application/x-protobuf",
114+
// bytes.NewBuffer(payload))
115+
// if err != nil {
116+
// return err
117+
// }
118+
// defer resp.Body.Close()
119+
// if resp.StatusCode > 399 {
120+
// var body []byte
121+
// body, err = io.ReadAll(resp.Body)
122+
// if err != nil {
123+
// return err
124+
// }
125+
// return errors.New(string(body))
126+
// }
127+
// return nil
128+
//}

0 commit comments

Comments
 (0)