Skip to content

Commit 2d2007e

Browse files
committed
wip
1 parent dfee5bf commit 2d2007e

File tree

19 files changed

+2242
-2
lines changed

19 files changed

+2242
-2
lines changed

src/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ require (
3333
code.cloudfoundry.org/go-log-cache/v3 v3.0.3
3434
code.cloudfoundry.org/go-loggregator/v10 v10.0.1
3535
github.com/go-chi/chi/v5 v5.1.0
36+
github.com/google/go-cmp v0.6.0
3637
github.com/shirou/gopsutil/v4 v4.24.8
3738
)
3839

@@ -52,7 +53,6 @@ require (
5253
github.com/go-ole/go-ole v1.3.0 // indirect
5354
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
5455
github.com/golang/snappy v0.0.4 // indirect
55-
github.com/google/go-cmp v0.6.0 // indirect
5656
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect
5757
github.com/klauspost/compress v1.17.9 // indirect
5858
github.com/lufia/plan9stats v0.0.0-20240909124753-873cd0166683 // indirect

src/integration/doc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package integration contains integration tests for Log Cache components.
2+
package integration

src/integration/gateway_test.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package integration_test
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"os/exec"
9+
"time"
10+
11+
logcache "code.cloudfoundry.org/go-log-cache/v3/rpc/logcache_v1"
12+
"github.com/google/go-cmp/cmp/cmpopts"
13+
. "github.com/onsi/ginkgo/v2"
14+
. "github.com/onsi/gomega"
15+
"github.com/onsi/gomega/gexec"
16+
17+
"code.cloudfoundry.org/log-cache/integration/integrationfakes"
18+
)
19+
20+
var _ = Describe("Gateway", func() {
21+
var (
22+
fakeLogCache *integrationfakes.FakeLogCache
23+
24+
gatewayPort int
25+
gateway *gexec.Session
26+
)
27+
28+
BeforeEach(func() {
29+
port := 8000 + GinkgoParallelProcess()
30+
fakeLogCache = integrationfakes.NewFakeLogCache(port, nil)
31+
fakeLogCache.Start()
32+
33+
gatewayPort = 8080 + GinkgoParallelProcess()
34+
})
35+
36+
JustBeforeEach(func() {
37+
command := exec.Command(componentPaths.Gateway)
38+
envVars := map[string]string{
39+
"ADDR": fmt.Sprintf(":%d", gatewayPort),
40+
"LOG_CACHE_ADDR": fakeLogCache.Address(),
41+
"METRICS_PORT": "0",
42+
}
43+
for k, v := range envVars {
44+
command.Env = append(command.Env, fmt.Sprintf("%s=%s", k, v))
45+
}
46+
47+
var err error
48+
gateway, err = gexec.Start(command, GinkgoWriter, GinkgoWriter)
49+
Expect(err).ShouldNot(HaveOccurred())
50+
})
51+
52+
JustAfterEach(func() {
53+
gateway.Interrupt().Wait(2 * time.Second)
54+
})
55+
56+
AfterEach(func() {
57+
fakeLogCache.Stop()
58+
})
59+
60+
Context("/api/v1/info endpoint", func() {
61+
var resp *http.Response
62+
63+
JustBeforeEach(func() {
64+
u := fmt.Sprintf("http://localhost:%d/api/v1/info", gatewayPort)
65+
Eventually(func() error {
66+
var err error
67+
resp, err = http.Get(u)
68+
return err
69+
}, "5s").ShouldNot(HaveOccurred())
70+
})
71+
72+
AfterEach(func() {
73+
resp.Body.Close()
74+
})
75+
76+
It("returns 200", func() {
77+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
78+
})
79+
80+
It("sets Content-Type header", func() {
81+
Expect(resp.Header.Get("Content-Type")).To(Equal("application/json"))
82+
})
83+
84+
It("sets Content-Length header", func() {
85+
Expect(resp.Header.Get("Content-Length")).To(MatchRegexp("\\d+"))
86+
})
87+
88+
Context("response body", func() {
89+
var body []byte
90+
91+
JustBeforeEach(func() {
92+
var err error
93+
body, err = io.ReadAll(resp.Body)
94+
Expect(err).ToNot(HaveOccurred())
95+
})
96+
97+
It("is a JSON with version and uptime information", func() {
98+
result := struct {
99+
Version string `json:"version"`
100+
VMUptime string `json:"vm_uptime"`
101+
}{}
102+
err := json.Unmarshal(body, &result)
103+
Expect(err).ToNot(HaveOccurred())
104+
Expect(result.Version).To(Equal("1.2.3"))
105+
Expect(result.VMUptime).To(MatchRegexp("\\d+"))
106+
})
107+
108+
It("has a newline at the end", func() {
109+
Expect(string(body)).To(MatchRegexp(".*\\n$"))
110+
})
111+
})
112+
})
113+
114+
Context("api/v1/read/:sourceID endpoint", func() {
115+
DescribeTableSubtree("with valid source IDs",
116+
func(sourceID string) {
117+
var resp *http.Response
118+
119+
JustBeforeEach(func() {
120+
u := fmt.Sprintf("http://localhost:%d/api/v1/read/%s", gatewayPort, sourceID)
121+
Eventually(func() error {
122+
var err error
123+
resp, err = http.Get(u)
124+
return err
125+
}, "5s").ShouldNot(HaveOccurred())
126+
})
127+
128+
AfterEach(func() {
129+
resp.Body.Close()
130+
})
131+
132+
It("returns 200", func() {
133+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
134+
})
135+
136+
It("sets Content-Type header", func() {
137+
Expect(resp.Header.Get("Content-Type")).To(Equal("application/json"))
138+
})
139+
140+
It("sets Content-Length header", func() {
141+
Expect(resp.Header.Get("Content-Length")).To(MatchRegexp("\\d+"))
142+
})
143+
144+
It("forwards the request to Log Cache", func() {
145+
reqs := fakeLogCache.ReadRequests()
146+
Expect(len(reqs)).To(Equal(1))
147+
Expect(reqs[0]).To(BeComparableTo(&logcache.ReadRequest{
148+
SourceId: sourceID,
149+
}, cmpopts.IgnoreUnexported(logcache.ReadRequest{})))
150+
})
151+
152+
Context("response body", func() {
153+
var body []byte
154+
155+
JustBeforeEach(func() {
156+
var err error
157+
body, err = io.ReadAll(resp.Body)
158+
Expect(err).ToNot(HaveOccurred())
159+
})
160+
161+
PIt("is a JSON with envelopes", func() {
162+
var rr logcache.ReadResponse
163+
err := json.Unmarshal(body, &rr)
164+
Expect(err).ToNot(HaveOccurred())
165+
Expect(rr.Envelopes).To(HaveLen(0))
166+
})
167+
168+
It("has a newline at the end", func() {
169+
Expect(string(body)).To(MatchRegexp(".*\\n$"))
170+
})
171+
})
172+
},
173+
Entry("regular", "myid"),
174+
Entry("URL encoded", "my%2Fid"),
175+
Entry("with slash", "my/id"),
176+
Entry("with dash", "my-id"),
177+
)
178+
})
179+
})
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package integration_test
2+
3+
import (
4+
"encoding/json"
5+
"testing"
6+
7+
. "github.com/onsi/ginkgo/v2"
8+
. "github.com/onsi/gomega"
9+
"github.com/onsi/gomega/gexec"
10+
)
11+
12+
type ComponentPaths struct {
13+
Gateway string `json:"gateway_path"`
14+
CFAuthProxy string `json:"cf_auth_proxy_path"`
15+
LogCache string `json:"log_cache_path"`
16+
SyslogServer string `json:"syslog_server_path"`
17+
}
18+
19+
func NewComponentPaths() ComponentPaths {
20+
cps := ComponentPaths{}
21+
22+
path, err := gexec.Build("code.cloudfoundry.org/log-cache/cmd/gateway", "-ldflags", "-X main.buildVersion=1.2.3")
23+
Expect(err).NotTo(HaveOccurred())
24+
cps.Gateway = path
25+
26+
path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/cf-auth-proxy")
27+
Expect(err).NotTo(HaveOccurred())
28+
cps.CFAuthProxy = path
29+
30+
path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/log-cache")
31+
Expect(err).NotTo(HaveOccurred())
32+
cps.LogCache = path
33+
34+
path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/syslog-server")
35+
Expect(err).NotTo(HaveOccurred())
36+
cps.SyslogServer = path
37+
38+
return cps
39+
}
40+
41+
func (cps *ComponentPaths) Marshal() []byte {
42+
data, err := json.Marshal(cps)
43+
Expect(err).NotTo(HaveOccurred())
44+
return data
45+
}
46+
47+
func TestIntegration(t *testing.T) {
48+
RegisterFailHandler(Fail)
49+
RunSpecs(t, "Integration Suite")
50+
}
51+
52+
var componentPaths ComponentPaths
53+
54+
var _ = SynchronizedBeforeSuite(func() []byte {
55+
cps := NewComponentPaths()
56+
return cps.Marshal()
57+
}, func(data []byte) {
58+
Expect(json.Unmarshal(data, &componentPaths)).To(Succeed())
59+
})
60+
61+
var _ = SynchronizedAfterSuite(func() {}, func() {
62+
gexec.CleanupBuildArtifacts()
63+
})
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package integrationfakes
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"fmt"
7+
"net"
8+
"sync"
9+
10+
logcache "code.cloudfoundry.org/go-log-cache/v3/rpc/logcache_v1"
11+
"google.golang.org/grpc"
12+
"google.golang.org/grpc/credentials"
13+
)
14+
15+
// FakeLogCache is a fake implementation of log-cache.
16+
type FakeLogCache struct {
17+
port int // Port to listen on.
18+
addr string // Address of the net listener.
19+
c *tls.Config // TLS config to apply to gRPC; no TLS if nil.
20+
s *grpc.Server // gRPC server responding to Log Cache gRPC requests.
21+
serveCh chan error // Channel to catch errors when the serve errors from the gRPC server.
22+
23+
readMu sync.Mutex // Mutex to prevent race conditions with FakeLogCache.Read().
24+
readRequests []*logcache.ReadRequest // Slice of requests made to FakeLogCache.Read().
25+
readResponse *logcache.ReadResponse
26+
readErr error
27+
28+
logcache.UnimplementedEgressServer
29+
logcache.UnimplementedIngressServer
30+
logcache.UnimplementedPromQLQuerierServer
31+
}
32+
33+
// NewFakeLogCache creates a new instance of FakeLogCache with the specified
34+
// port and TLS configuration.
35+
func NewFakeLogCache(port int, c *tls.Config) *FakeLogCache {
36+
return &FakeLogCache{
37+
port: port,
38+
c: c,
39+
serveCh: make(chan error),
40+
}
41+
}
42+
43+
// Start attempts to claim a net listener on FakeLogCache's port and
44+
// start a log-cache gRPC server in a separate goroutine. The server uses
45+
// FakeLogCache's TLS config if it was provided. This is a non-blocking
46+
// operation and returns an error if it fails.
47+
//
48+
// If FakeLogCache is started, it must be stopped with Stop().
49+
func (f *FakeLogCache) Start() error {
50+
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", f.port))
51+
if err != nil {
52+
return err
53+
}
54+
f.addr = lis.Addr().String()
55+
56+
var opts []grpc.ServerOption
57+
if f.c != nil {
58+
opts = append(opts, grpc.Creds(credentials.NewTLS(f.c)))
59+
}
60+
f.s = grpc.NewServer(opts...)
61+
62+
logcache.RegisterEgressServer(f.s, f)
63+
logcache.RegisterIngressServer(f.s, f)
64+
logcache.RegisterPromQLQuerierServer(f.s, f)
65+
66+
go func() {
67+
f.serveCh <- f.s.Serve(lis)
68+
}()
69+
70+
return nil
71+
}
72+
73+
// Address returns the address of the FakeLogCache.
74+
func (f *FakeLogCache) Address() string {
75+
return f.addr
76+
}
77+
78+
// Read accepts incoming gRPC requests to read from Log Cache, captures the
79+
// requests and returns a fake response.
80+
func (f *FakeLogCache) Read(ctx context.Context, req *logcache.ReadRequest) (*logcache.ReadResponse, error) {
81+
fmt.Printf("Read: %#v\n", req)
82+
f.readMu.Lock()
83+
defer f.readMu.Unlock()
84+
f.readRequests = append(f.readRequests, req)
85+
return f.readResponse, f.readErr
86+
}
87+
88+
func (f *FakeLogCache) ReadRequests() []*logcache.ReadRequest {
89+
f.readMu.Lock()
90+
defer f.readMu.Unlock()
91+
return f.readRequests
92+
}
93+
94+
// Stop tells the FakeLogCache server to stop and waits for it to shutdown.
95+
func (f *FakeLogCache) Stop() error {
96+
f.s.Stop()
97+
return <-f.serveCh
98+
}

src/internal/gateway/gateway.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"log"
77
"net"
88
"net/http"
9+
"strconv"
910
"time"
1011

1112
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
@@ -179,7 +180,11 @@ func (g *Gateway) listenAndServe() {
179180
}
180181

181182
func (g *Gateway) handleInfoEndpoint(w http.ResponseWriter, r *http.Request) {
182-
_, err := w.Write([]byte(fmt.Sprintf(`{"version":"%s","vm_uptime":"%d"}`+"\n", g.logCacheVersion, g.uptimeFn())))
183+
b := []byte(fmt.Sprintf(`{"version":"%s","vm_uptime":"%d"}`+"\n", g.logCacheVersion, g.uptimeFn()))
184+
185+
w.Header().Set("Content-Type", "application/json")
186+
w.Header().Set("Content-Length", strconv.Itoa(len(b)))
187+
_, err := w.Write(b)
183188
if err != nil {
184189
g.log.Println("Cannot send result for the info endpoint")
185190
}

0 commit comments

Comments
 (0)