Skip to content

Commit 6797a3f

Browse files
committed
wip
1 parent b6a630b commit 6797a3f

File tree

13 files changed

+1451
-1
lines changed

13 files changed

+1451
-1
lines changed

src/integration/doc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
// Package integration contains integration tests for Log Cache components.
2+
package integration

src/integration/gateway_test.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package integration_test
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"net/http"
8+
"os/exec"
9+
"time"
10+
11+
logcache "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1"
12+
"code.cloudfoundry.org/log-cache/integration/integrationfakes"
13+
. "github.com/onsi/ginkgo/v2"
14+
. "github.com/onsi/gomega"
15+
"github.com/onsi/gomega/gexec"
16+
)
17+
18+
var _ = Describe("Gateway", func() {
19+
var (
20+
fakeLogCache *integrationfakes.FakeLogCache
21+
22+
gatewayPort int
23+
gateway *gexec.Session
24+
)
25+
26+
BeforeEach(func() {
27+
port := 8000 + GinkgoParallelProcess()
28+
fakeLogCache = integrationfakes.NewFakeLogCache(port, nil)
29+
30+
gatewayPort = 8081 + GinkgoParallelProcess()
31+
})
32+
33+
JustBeforeEach(func() {
34+
fakeLogCache.Start()
35+
36+
command := exec.Command(componentPaths.Gateway)
37+
envVars := map[string]string{
38+
"ADDR": fmt.Sprintf(":%d", gatewayPort),
39+
"LOG_CACHE_ADDR": fakeLogCache.Address(),
40+
"METRICS_PORT": "0",
41+
}
42+
for k, v := range envVars {
43+
command.Env = append(command.Env, fmt.Sprintf("%s=%s", k, v))
44+
}
45+
46+
var err error
47+
gateway, err = gexec.Start(command, GinkgoWriter, GinkgoWriter)
48+
Expect(err).ShouldNot(HaveOccurred())
49+
})
50+
51+
JustAfterEach(func() {
52+
gateway.Interrupt().Wait(2 * time.Second)
53+
fakeLogCache.Stop()
54+
})
55+
56+
Context("/api/v1/info endpoint", func() {
57+
var resp *http.Response
58+
59+
JustBeforeEach(func() {
60+
u := fmt.Sprintf("http://localhost:%d/api/v1/info", gatewayPort)
61+
Eventually(func() error {
62+
var err error
63+
resp, err = http.Get(u)
64+
return err
65+
}, "5s").ShouldNot(HaveOccurred())
66+
})
67+
68+
AfterEach(func() {
69+
resp.Body.Close()
70+
})
71+
72+
It("returns 200", func() {
73+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
74+
})
75+
76+
It("sets Content-Type header", func() {
77+
Expect(resp.Header.Get("Content-Type")).To(Equal("application/json"))
78+
})
79+
80+
It("sets Content-Length header", func() {
81+
Expect(resp.Header.Get("Content-Length")).To(MatchRegexp("\\d+"))
82+
})
83+
84+
Context("response body", func() {
85+
var body []byte
86+
87+
JustBeforeEach(func() {
88+
var err error
89+
body, err = io.ReadAll(resp.Body)
90+
Expect(err).ToNot(HaveOccurred())
91+
})
92+
93+
It("is a JSON with version and uptime information", func() {
94+
result := struct {
95+
Version string `json:"version"`
96+
VMUptime string `json:"vm_uptime"`
97+
}{}
98+
err := json.Unmarshal(body, &result)
99+
Expect(err).ToNot(HaveOccurred())
100+
Expect(result.Version).To(Equal("1.2.3"))
101+
Expect(result.VMUptime).To(MatchRegexp("\\d+"))
102+
})
103+
104+
It("has a newline at the end", func() {
105+
Expect(string(body)).To(MatchRegexp(".*\\n$"))
106+
})
107+
})
108+
})
109+
110+
Context("api/v1/read/:sourceID endpoint", func() {
111+
DescribeTableSubtree("with valid source IDs", func(sourceID string) {
112+
var resp *http.Response
113+
114+
JustBeforeEach(func() {
115+
u := fmt.Sprintf("http://localhost:%d/api/v1/read/%s", gatewayPort, sourceID)
116+
Eventually(func() error {
117+
var err error
118+
resp, err = http.Get(u)
119+
return err
120+
}, "5s").ShouldNot(HaveOccurred())
121+
})
122+
123+
AfterEach(func() {
124+
resp.Body.Close()
125+
})
126+
127+
It("returns 200", func() {
128+
Expect(resp.StatusCode).To(Equal(http.StatusOK))
129+
})
130+
131+
It("sets Content-Type header", func() {
132+
Expect(resp.Header.Get("Content-Type")).To(Equal("application/json"))
133+
})
134+
135+
It("sets Content-Length header", func() {
136+
Expect(resp.Header.Get("Content-Length")).To(MatchRegexp("\\d+"))
137+
})
138+
139+
PIt("forwards the request to Log Cache", func() {
140+
// Expect(fakeLogCache.requestCount).To(Equal(1))
141+
})
142+
143+
Context("response body", func() {
144+
var body []byte
145+
146+
JustBeforeEach(func() {
147+
var err error
148+
body, err = io.ReadAll(resp.Body)
149+
Expect(err).ToNot(HaveOccurred())
150+
})
151+
152+
PIt("is a JSON with envelopes", func() {
153+
var rr logcache.ReadResponse
154+
err := json.Unmarshal(body, &rr)
155+
Expect(err).ToNot(HaveOccurred())
156+
Expect(rr.Envelopes).To(HaveLen(0))
157+
})
158+
159+
It("has a newline at the end", func() {
160+
Expect(string(body)).To(MatchRegexp(".*\\n$"))
161+
})
162+
})
163+
},
164+
Entry("regular", "myid"),
165+
Entry("URL encoded", "my%2Fid"),
166+
Entry("with slash", "my/id"),
167+
Entry("with dash", "my-id"),
168+
)
169+
})
170+
})
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package integration_test
2+
3+
import (
4+
"encoding/json"
5+
"testing"
6+
7+
. "github.com/onsi/ginkgo/v2"
8+
. "github.com/onsi/gomega"
9+
"github.com/onsi/gomega/gexec"
10+
)
11+
12+
type ComponentPaths struct {
13+
Gateway string `json:"gateway_path"`
14+
CFAuthProxy string `json:"cf_auth_proxy_path"`
15+
LogCache string `json:"log_cache_path"`
16+
SyslogServer string `json:"syslog_server_path"`
17+
}
18+
19+
func NewComponentPaths() ComponentPaths {
20+
cps := ComponentPaths{}
21+
22+
path, err := gexec.Build("code.cloudfoundry.org/log-cache/cmd/gateway", "-ldflags", "-X main.buildVersion=1.2.3")
23+
Expect(err).NotTo(HaveOccurred())
24+
cps.Gateway = path
25+
26+
path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/cf-auth-proxy")
27+
Expect(err).NotTo(HaveOccurred())
28+
cps.CFAuthProxy = path
29+
30+
path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/log-cache")
31+
Expect(err).NotTo(HaveOccurred())
32+
cps.LogCache = path
33+
34+
path, err = gexec.Build("code.cloudfoundry.org/log-cache/cmd/syslog-server")
35+
Expect(err).NotTo(HaveOccurred())
36+
cps.SyslogServer = path
37+
38+
return cps
39+
}
40+
41+
func (cps *ComponentPaths) Marshal() []byte {
42+
data, err := json.Marshal(cps)
43+
Expect(err).NotTo(HaveOccurred())
44+
return data
45+
}
46+
47+
func TestIntegration(t *testing.T) {
48+
RegisterFailHandler(Fail)
49+
RunSpecs(t, "Integration Suite")
50+
}
51+
52+
var componentPaths ComponentPaths
53+
54+
var _ = SynchronizedBeforeSuite(func() []byte {
55+
cps := NewComponentPaths()
56+
return cps.Marshal()
57+
}, func(data []byte) {
58+
Expect(json.Unmarshal(data, &componentPaths)).To(Succeed())
59+
})
60+
61+
var _ = SynchronizedAfterSuite(func() {}, func() {
62+
gexec.CleanupBuildArtifacts()
63+
})
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package integrationfakes
2+
3+
import (
4+
"context"
5+
"crypto/tls"
6+
"fmt"
7+
"net"
8+
9+
logcache "code.cloudfoundry.org/go-log-cache/v2/rpc/logcache_v1"
10+
"google.golang.org/grpc"
11+
"google.golang.org/grpc/credentials"
12+
)
13+
14+
// FakeLogCache is a fake implementation of log-cache.
15+
type FakeLogCache struct {
16+
port int
17+
addr string
18+
c *tls.Config
19+
s *grpc.Server
20+
21+
serveCh chan error
22+
23+
logcache.UnimplementedEgressServer
24+
logcache.UnimplementedIngressServer
25+
logcache.UnimplementedPromQLQuerierServer
26+
}
27+
28+
// NewFakeLogCache creates a new instance of FakeLogCache with the specified
29+
// port and TLS configuration.
30+
func NewFakeLogCache(port int, c *tls.Config) *FakeLogCache {
31+
return &FakeLogCache{
32+
port: port,
33+
c: c,
34+
serveCh: make(chan error),
35+
}
36+
}
37+
38+
// Start attempts to claim a net listener on FakeLogCache's port and
39+
// start a log-cache gRPC server in a separate goroutine. The server uses
40+
// FakeLogCache's TLS config if it was provided. This is a non-blocking
41+
// operation and returns an error if it fails.
42+
//
43+
// If FakeLogCache is started, it must be stopped with Stop().
44+
func (f *FakeLogCache) Start() error {
45+
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", f.port))
46+
if err != nil {
47+
return err
48+
}
49+
f.addr = lis.Addr().String()
50+
51+
f.s = grpc.NewServer()
52+
if f.c != nil {
53+
f.s = grpc.NewServer(grpc.Creds(credentials.NewTLS(f.c)))
54+
}
55+
56+
logcache.RegisterEgressServer(f.s, f)
57+
logcache.RegisterIngressServer(f.s, f)
58+
logcache.RegisterPromQLQuerierServer(f.s, f)
59+
60+
go func() {
61+
f.serveCh <- f.s.Serve(lis)
62+
}()
63+
64+
return nil
65+
}
66+
67+
// Address returns the address of the FakeLogCache.
68+
func (f *FakeLogCache) Address() string {
69+
return f.addr
70+
}
71+
72+
func (f *FakeLogCache) Read(ctx context.Context, req *logcache.ReadRequest) (*logcache.ReadResponse, error) {
73+
fmt.Printf("Read: %#v\n", req)
74+
return nil, nil
75+
}
76+
77+
// Stop tells the FakeLogCache server to stop and waits for it to shutdown.
78+
func (f *FakeLogCache) Stop() error {
79+
f.s.Stop()
80+
return <-f.serveCh
81+
}

src/internal/gateway/gateway.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"log"
77
"net"
88
"net/http"
9+
"strconv"
910
"time"
1011

1112
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
@@ -179,7 +180,11 @@ func (g *Gateway) listenAndServe() {
179180
}
180181

181182
func (g *Gateway) handleInfoEndpoint(w http.ResponseWriter, r *http.Request) {
182-
_, err := w.Write([]byte(fmt.Sprintf(`{"version":"%s","vm_uptime":"%d"}`+"\n", g.logCacheVersion, g.uptimeFn())))
183+
b := []byte(fmt.Sprintf(`{"version":"%s","vm_uptime":"%d"}`+"\n", g.logCacheVersion, g.uptimeFn()))
184+
185+
w.Header().Set("Content-Type", "application/json")
186+
w.Header().Set("Content-Length", strconv.Itoa(len(b)))
187+
_, err := w.Write(b)
183188
if err != nil {
184189
g.log.Println("Cannot send result for the info endpoint")
185190
}

0 commit comments

Comments
 (0)