Skip to content

Commit 383c223

Browse files
committed
feat: add redpanda buffering layer with split ingress/egress otel collectors
1 parent aa2d445 commit 383c223

File tree

12 files changed

+444
-56
lines changed

12 files changed

+444
-56
lines changed

deployment/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { deployPostgres } from './services/postgres';
2121
import { deployProxy } from './services/proxy';
2222
import { deployPublicGraphQLAPIGateway } from './services/public-graphql-api-gateway';
2323
import { deployRedis } from './services/redis';
24+
import { deployRedpanda } from './services/redpanda';
2425
import { deployS3, deployS3AuditLog, deployS3Mirror } from './services/s3';
2526
import { deploySchema } from './services/schema';
2627
import { configureSentry } from './services/sentry';
@@ -79,6 +80,7 @@ const clickhouse = deployClickhouse();
7980
const postgres = deployPostgres();
8081
const redis = deployRedis({ environment });
8182
const kafka = deployKafka();
83+
const redpanda = deployRedpanda({ environment });
8284
const s3 = deployS3();
8385
const s3Mirror = deployS3Mirror();
8486
const s3AuditLog = deployS3AuditLog();
@@ -290,6 +292,7 @@ const otelCollector = deployOTELCollector({
290292
graphql,
291293
dbMigrations,
292294
clickhouse,
295+
redpanda,
293296
image: docker.factory.getImageId('otel-collector', imagesTag),
294297
docker,
295298
});
@@ -350,6 +353,8 @@ export const schemaApiServiceId = schema.service.id;
350353
export const webhooksApiServiceId = webhooks.service.id;
351354

352355
export const appId = app.deployment.id;
353-
export const otelCollectorId = otelCollector.deployment.id;
356+
export const otelCollectorIngressId = otelCollector.ingress.deployment.id;
357+
export const otelCollectorEgressId = otelCollector.egress.deployment.id;
358+
export const redpandaStatefulSetId = redpanda.statefulSet.id;
354359
export const publicIp = proxy.get()!.status.loadBalancer.ingress[0].ip;
355360
export const awsLambdaArtifactsFunctionUrl = lambdaFunction;

deployment/services/environment.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,16 @@ export function prepareEnvironment(input: {
8080
memoryLimit: isProduction ? '1000Mi' : '300Mi',
8181
},
8282
tracingCollector: {
83-
cpuLimit: isProduction || isStaging ? '1000m' : '100m',
84-
memoryLimit: isProduction || isStaging ? '4000Mi' : '200Mi',
83+
cpuLimit: '500m',
84+
memoryLimit: isProduction || isStaging ? '1000Mi' : '512Mi',
8585
maxReplicas: isProduction || isStaging ? 3 : 1,
8686
},
87+
redpanda: {
88+
replicas: 1,
89+
cpuLimit: '500m',
90+
memoryLimit: isProduction || isStaging ? '1000Mi' : '512Mi',
91+
storageSize: '20Gi',
92+
},
8793
},
8894
};
8995
}

deployment/services/otel-collector.ts

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { DbMigrations } from './db-migrations';
55
import { Docker } from './docker';
66
import { Environment } from './environment';
77
import { GraphQL } from './graphql';
8+
import { Redpanda } from './redpanda';
89

910
export type OTELCollector = ReturnType<typeof deployOTELCollector>;
1011

@@ -15,9 +16,12 @@ export function deployOTELCollector(args: {
1516
clickhouse: Clickhouse;
1617
dbMigrations: DbMigrations;
1718
graphql: GraphQL;
19+
redpanda: Redpanda;
1820
}) {
19-
return new ServiceDeployment(
20-
'otel-collector',
21+
const kafkaBroker = args.redpanda.brokerEndpoint;
22+
23+
const ingress = new ServiceDeployment(
24+
'otel-collector-ingress',
2125
{
2226
image: args.image,
2327
imagePullSecret: args.docker.secret,
@@ -26,6 +30,7 @@ export function deployOTELCollector(args: {
2630
HIVE_OTEL_AUTH_ENDPOINT: serviceLocalEndpoint(args.graphql.service).apply(
2731
value => value + '/otel-auth',
2832
),
33+
KAFKA_BROKER: kafkaBroker,
2934
},
3035
/**
3136
* We are using the healthcheck extension.
@@ -44,7 +49,36 @@ export function deployOTELCollector(args: {
4449
autoScaling: {
4550
maxReplicas: args.environment.podsConfig.tracingCollector.maxReplicas,
4651
cpu: {
47-
limit: args.environment.podsConfig.tracingCollector.cpuLimit,
52+
limit: '500m',
53+
cpuAverageToScale: 80,
54+
},
55+
},
56+
},
57+
[args.dbMigrations],
58+
).deploy();
59+
60+
// Egress: Redpanda -> ClickHouse
61+
const egress = new ServiceDeployment(
62+
'otel-collector-egress',
63+
{
64+
image: args.image,
65+
imagePullSecret: args.docker.secret,
66+
env: {
67+
...args.environment.envVars,
68+
KAFKA_BROKER: kafkaBroker,
69+
},
70+
probePort: 13133,
71+
readinessProbe: '/',
72+
livenessProbe: '/',
73+
startupProbe: '/',
74+
exposesMetrics: true,
75+
replicas: args.environment.podsConfig.tracingCollector.maxReplicas,
76+
pdb: true,
77+
memoryLimit: args.environment.podsConfig.tracingCollector.memoryLimit,
78+
autoScaling: {
79+
maxReplicas: args.environment.podsConfig.tracingCollector.maxReplicas,
80+
cpu: {
81+
limit: '500m',
4882
cpuAverageToScale: 80,
4983
},
5084
},
@@ -57,4 +91,12 @@ export function deployOTELCollector(args: {
5791
.withSecret('CLICKHOUSE_PASSWORD', args.clickhouse.secret, 'password')
5892
.withSecret('CLICKHOUSE_PROTOCOL', args.clickhouse.secret, 'protocol')
5993
.deploy();
94+
95+
return {
96+
ingress,
97+
egress,
98+
// For backward compatibility, expose ingress as the main deployment
99+
deployment: ingress.deployment,
100+
service: ingress.service,
101+
};
60102
}

deployment/services/redpanda.ts

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
import * as k8s from '@pulumi/kubernetes';
2+
import * as pulumi from '@pulumi/pulumi';
3+
import { Environment } from './environment';
4+
5+
export type Redpanda = ReturnType<typeof deployRedpanda>;
6+
7+
export function deployRedpanda(args: { environment: Environment }) {
8+
const labels = { app: 'redpanda' };
9+
10+
// StatefulSet for Redpanda
11+
const statefulSet = new k8s.apps.v1.StatefulSet('redpanda', {
12+
metadata: {
13+
name: 'redpanda',
14+
},
15+
spec: {
16+
serviceName: 'redpanda',
17+
replicas: args.environment.podsConfig.redpanda.replicas,
18+
selector: {
19+
matchLabels: labels,
20+
},
21+
template: {
22+
metadata: {
23+
labels,
24+
},
25+
spec: {
26+
containers: [
27+
{
28+
name: 'redpanda',
29+
image: 'redpandadata/redpanda:v25.3.1',
30+
resources: {
31+
limits: {
32+
cpu: args.environment.podsConfig.redpanda.cpuLimit,
33+
memory: args.environment.podsConfig.redpanda.memoryLimit,
34+
},
35+
},
36+
args: [
37+
'redpanda',
38+
'start',
39+
'--smp',
40+
'1',
41+
'--kafka-addr',
42+
'PLAINTEXT://0.0.0.0:9092',
43+
'--advertise-kafka-addr',
44+
pulumi.interpolate`PLAINTEXT://\${HOSTNAME}.redpanda.default.svc.cluster.local:9092`,
45+
],
46+
ports: [
47+
{ containerPort: 9092, name: 'kafka' },
48+
{ containerPort: 8082, name: 'http' },
49+
{ containerPort: 33145, name: 'rpc' },
50+
{ containerPort: 9644, name: 'admin' },
51+
],
52+
volumeMounts: [
53+
{
54+
name: 'datadir',
55+
mountPath: '/var/lib/redpanda/data',
56+
},
57+
],
58+
livenessProbe: {
59+
httpGet: {
60+
path: '/ready',
61+
port: 9644,
62+
},
63+
initialDelaySeconds: 10,
64+
terminationGracePeriodSeconds: 60,
65+
periodSeconds: 10,
66+
failureThreshold: 5,
67+
timeoutSeconds: 5,
68+
},
69+
readinessProbe: {
70+
httpGet: {
71+
path: '/ready',
72+
port: 9644,
73+
},
74+
initialDelaySeconds: 10,
75+
periodSeconds: 15,
76+
failureThreshold: 5,
77+
timeoutSeconds: 5,
78+
},
79+
},
80+
],
81+
},
82+
},
83+
volumeClaimTemplates: [
84+
{
85+
metadata: {
86+
name: 'datadir',
87+
},
88+
spec: {
89+
accessModes: ['ReadWriteOnce'],
90+
resources: {
91+
requests: {
92+
storage: args.environment.podsConfig.redpanda.storageSize,
93+
},
94+
},
95+
},
96+
},
97+
],
98+
},
99+
});
100+
101+
// Headless Service for StatefulSet (used for internal cluster communication)
102+
const headlessService = new k8s.core.v1.Service('redpanda-headless', {
103+
metadata: {
104+
name: 'redpanda',
105+
},
106+
spec: {
107+
clusterIP: 'None',
108+
selector: labels,
109+
ports: [
110+
{ name: 'kafka', port: 9092, targetPort: 9092 },
111+
{ name: 'http', port: 8082, targetPort: 8082 },
112+
{ name: 'rpc', port: 33145, targetPort: 33145 },
113+
{ name: 'admin', port: 9644, targetPort: 9644 },
114+
],
115+
},
116+
});
117+
118+
// ClusterIP Service for clients (load balances across all pods)
119+
const clientService = new k8s.core.v1.Service('redpanda-client-service', {
120+
metadata: {
121+
name: 'redpanda-client',
122+
},
123+
spec: {
124+
type: 'ClusterIP',
125+
selector: labels,
126+
ports: [
127+
{ name: 'kafka', port: 9092, targetPort: 9092 },
128+
{ name: 'http', port: 8082, targetPort: 8082 },
129+
],
130+
},
131+
});
132+
133+
// Create otel-traces topic
134+
const topicCreationJob = new k8s.batch.v1.Job(
135+
'redpanda-topic-creation',
136+
{
137+
metadata: {
138+
name: 'redpanda-topic-creation',
139+
},
140+
spec: {
141+
template: {
142+
spec: {
143+
restartPolicy: 'OnFailure',
144+
containers: [
145+
{
146+
name: 'rpk',
147+
image: 'redpandadata/redpanda:v25.3.1',
148+
imagePullPolicy: 'Always',
149+
command: [
150+
'/bin/bash',
151+
'-c',
152+
`
153+
# Wait for Redpanda to be ready
154+
for i in {1..60}; do
155+
if rpk cluster health --brokers redpanda-0.redpanda:9092 2>/dev/null | grep -q 'Healthy'; then
156+
echo "Redpanda cluster is ready"
157+
break
158+
fi
159+
echo "Waiting for Redpanda cluster... ($i/60)"
160+
sleep 5
161+
done
162+
163+
# Create topic with partitioning only (no replication)
164+
rpk topic create otel-traces \\
165+
--brokers redpanda-0.redpanda:9092 \\
166+
--replicas 1 \\
167+
--partitions 10 \\
168+
--config retention.ms=2592000000 \\
169+
--config compression.type=snappy \\
170+
--config max.message.bytes=10485760 \\
171+
|| echo "Topic may already exist"
172+
173+
# Verify topic creation
174+
rpk topic describe otel-traces --brokers redpanda-0.redpanda:9092
175+
`,
176+
],
177+
},
178+
],
179+
},
180+
},
181+
},
182+
},
183+
{ dependsOn: [statefulSet, headlessService] },
184+
);
185+
186+
return {
187+
statefulSet,
188+
headlessService,
189+
clientService,
190+
topicCreationJob,
191+
// Client service endpoint - auto-discovers all brokers
192+
brokerEndpoint: 'redpanda-client:9092',
193+
};
194+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
dist:
2+
version: 0.140.0
3+
name: otelcol-custom
4+
description: Custom OTel Collector distribution
5+
output_path: ./otelcol-custom
6+
7+
receivers:
8+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.140.0
9+
10+
processors:
11+
- gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.140.0
12+
13+
exporters:
14+
- gomod: go.opentelemetry.io/collector/exporter/debugexporter v0.140.0
15+
- gomod:
16+
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter v0.140.0
17+
18+
extensions:
19+
- gomod:
20+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension
21+
v0.140.0

docker/configs/otel-collector/builder-config.yaml renamed to docker/configs/otel-collector/builder-config-ingress.yaml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ processors:
1818

1919
exporters:
2020
- gomod: go.opentelemetry.io/collector/exporter/debugexporter v0.140.0
21-
- gomod:
22-
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter v0.140.0
21+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.140.0
2322

2423
extensions:
2524
- gomod:

0 commit comments

Comments
 (0)