66 "fmt"
77 "net/url"
88 "os"
9- "sync"
109 "time"
1110
1211 "cdr.dev/slog"
@@ -28,14 +27,13 @@ var (
2827 minAgentAPIV2 = "v2.9"
2928)
3029
31- // Coder establishes a connection to the Coder instance located at coderURL and
32- // authenticates using token. It then establishes a dRPC connection to the Agent
33- // API and begins sending logs. If the version of Coder does not support the
34- // Agent API, it will fall back to using the PatchLogs endpoint. The closer is
35- // used to close the logger and to wait at most logSendGracePeriod for logs to
36- // be sent. Cancelling the context will close the logs immediately without
37- // waiting for logs to be sent.
38- func Coder (ctx context.Context , coderURL * url.URL , token string ) (logger Func , closer func (), err error ) {
30+ // Coder establishes a connection to the Coder instance located at
31+ // coderURL and authenticates using token. It then establishes a
32+ // dRPC connection to the Agent API and begins sending logs.
33+ // If the version of Coder does not support the Agent API, it will
34+ // fall back to using the PatchLogs endpoint.
35+ // The returned function is used to block until all logs are sent.
36+ func Coder (ctx context.Context , coderURL * url.URL , token string ) (Func , func (), error ) {
3937 // To troubleshoot issues, we need some way of logging.
4038 metaLogger := slog .Make (sloghuman .Sink (os .Stderr ))
4139 defer metaLogger .Sync ()
@@ -46,26 +44,18 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, c
4644 }
4745 if semver .Compare (semver .MajorMinor (bi .Version ), minAgentAPIV2 ) < 0 {
4846 metaLogger .Warn (ctx , "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API" , slog .F ("coder_version" , bi .Version ))
49- logger , closer = sendLogsV1 (ctx , client , metaLogger .Named ("send_logs_v1" ))
50- return logger , closer , nil
47+ sendLogs , flushLogs : = sendLogsV1 (ctx , client , metaLogger .Named ("send_logs_v1" ))
48+ return sendLogs , flushLogs , nil
5149 }
52- // Note that ctx passed to initRPC will be inherited by the
53- // underlying connection, nothing we can do about that here.
5450 dac , err := initRPC (ctx , client , metaLogger .Named ("init_rpc" ))
5551 if err != nil {
5652 // Logged externally
5753 return nil , nil , fmt .Errorf ("init coder rpc client: %w" , err )
5854 }
5955 ls := agentsdk .NewLogSender (metaLogger .Named ("coder_log_sender" ))
6056 metaLogger .Warn (ctx , "Sending logs via AgentAPI v2" , slog .F ("coder_version" , bi .Version ))
61- logger , closer = sendLogsV2 (ctx , dac , ls , metaLogger .Named ("send_logs_v2" ))
62- var closeOnce sync.Once
63- return logger , func () {
64- closer ()
65- closeOnce .Do (func () {
66- _ = dac .DRPCConn ().Close ()
67- })
68- }, nil
57+ sendLogs , doneFunc := sendLogsV2 (ctx , dac , ls , metaLogger .Named ("send_logs_v2" ))
58+ return sendLogs , doneFunc , nil
6959}
7060
7161type coderLogSender interface {
@@ -84,7 +74,7 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client {
8474func initRPC (ctx context.Context , client * agentsdk.Client , l slog.Logger ) (proto.DRPCAgentClient20 , error ) {
8575 var c proto.DRPCAgentClient20
8676 var err error
87- retryCtx , retryCancel := context .WithTimeout (ctx , rpcConnectTimeout )
77+ retryCtx , retryCancel := context .WithTimeout (context . Background () , rpcConnectTimeout )
8878 defer retryCancel ()
8979 attempts := 0
9080 for r := retry .New (100 * time .Millisecond , time .Second ); r .Wait (retryCtx ); {
@@ -105,67 +95,65 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto
10595
10696// sendLogsV1 uses the PatchLogs endpoint to send logs.
10797// This is deprecated, but required for backward compatibility with older versions of Coder.
108- func sendLogsV1 (ctx context.Context , client * agentsdk.Client , l slog.Logger ) (logger Func , closer func ()) {
98+ func sendLogsV1 (ctx context.Context , client * agentsdk.Client , l slog.Logger ) (Func , func ()) {
10999 // nolint: staticcheck // required for backwards compatibility
110- sendLog , flushAndClose := agentsdk .LogsSender (agentsdk .ExternalLogSourceID , client .PatchLogs , slog.Logger {})
111- var mu sync.Mutex
100+ sendLogs , flushLogs := agentsdk .LogsSender (agentsdk .ExternalLogSourceID , client .PatchLogs , slog.Logger {})
112101 return func (lvl Level , msg string , args ... any ) {
113102 log := agentsdk.Log {
114103 CreatedAt : time .Now (),
115104 Output : fmt .Sprintf (msg , args ... ),
116105 Level : codersdk .LogLevel (lvl ),
117106 }
118- mu .Lock ()
119- defer mu .Unlock ()
120- if err := sendLog (ctx , log ); err != nil {
107+ if err := sendLogs (ctx , log ); err != nil {
121108 l .Warn (ctx , "failed to send logs to Coder" , slog .Error (err ))
122109 }
123110 }, func () {
124- ctx , cancel := context .WithTimeout (ctx , logSendGracePeriod )
125- defer cancel ()
126- if err := flushAndClose (ctx ); err != nil {
111+ if err := flushLogs (ctx ); err != nil {
127112 l .Warn (ctx , "failed to flush logs" , slog .Error (err ))
128113 }
129114 }
130115}
131116
132117// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9.
133- func sendLogsV2 (ctx context.Context , dest agentsdk.LogDest , ls coderLogSender , l slog.Logger ) (logger Func , closer func ()) {
134- sendCtx , sendCancel := context .WithCancel (ctx )
118+ func sendLogsV2 (ctx context.Context , dest agentsdk.LogDest , ls coderLogSender , l slog.Logger ) (Func , func ()) {
135119 done := make (chan struct {})
136120 uid := uuid .New ()
137121 go func () {
138122 defer close (done )
139- if err := ls .SendLoop (sendCtx , dest ); err != nil {
123+ if err := ls .SendLoop (ctx , dest ); err != nil {
140124 if ! errors .Is (err , context .Canceled ) {
141125 l .Warn (ctx , "failed to send logs to Coder" , slog .Error (err ))
142126 }
143127 }
128+
129+ // Wait for up to 10 seconds for logs to finish sending.
130+ sendCtx , sendCancel := context .WithTimeout (context .Background (), logSendGracePeriod )
131+ defer sendCancel ()
132+ // Try once more to send any pending logs
133+ if err := ls .SendLoop (sendCtx , dest ); err != nil {
134+ if ! errors .Is (err , context .DeadlineExceeded ) {
135+ l .Warn (ctx , "failed to send remaining logs to Coder" , slog .Error (err ))
136+ }
137+ }
138+ ls .Flush (uid )
139+ if err := ls .WaitUntilEmpty (sendCtx ); err != nil {
140+ if ! errors .Is (err , context .DeadlineExceeded ) {
141+ l .Warn (ctx , "log sender did not empty" , slog .Error (err ))
142+ }
143+ }
144144 }()
145145
146- var closeOnce sync.Once
147- return func (l Level , msg string , args ... any ) {
148- ls .Enqueue (uid , agentsdk.Log {
149- CreatedAt : time .Now (),
150- Output : fmt .Sprintf (msg , args ... ),
151- Level : codersdk .LogLevel (l ),
152- })
153- }, func () {
154- closeOnce .Do (func () {
155- // Trigger a flush and wait for logs to be sent.
156- ls .Flush (uid )
157- ctx , cancel := context .WithTimeout (ctx , logSendGracePeriod )
158- defer cancel ()
159- err := ls .WaitUntilEmpty (ctx )
160- if err != nil {
161- l .Warn (ctx , "log sender did not empty" , slog .Error (err ))
162- }
146+ logFunc := func (l Level , msg string , args ... any ) {
147+ ls .Enqueue (uid , agentsdk.Log {
148+ CreatedAt : time .Now (),
149+ Output : fmt .Sprintf (msg , args ... ),
150+ Level : codersdk .LogLevel (l ),
151+ })
152+ }
163153
164- // Stop the send loop.
165- sendCancel ()
166- })
154+ doneFunc := func () {
155+ <- done
156+ }
167157
168- // Wait for the send loop to finish.
169- <- done
170- }
158+ return logFunc , doneFunc
171159}
0 commit comments