Skip to content

Commit 29a91a2

Browse files
committed
add config and scripts to setup logs local dev
1 parent 4ebbf45 commit 29a91a2

File tree

5 files changed

+224
-0
lines changed

5 files changed

+224
-0
lines changed

bin/clickhouse-logs-init

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#!/usr/bin/env bash
2+
# temporary script to init logs clickhouse schema before we've got migrations setup properly
3+
bin/check_kafka_clickhouse_up
4+
5+
cat bin/clickhouse-logs.sql | docker-compose exec -T clickhouse clickhouse-client

bin/clickhouse-logs.sql

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
-- temporary sql to initialise log tables for local development
2+
-- will be removed once we have migrations set up
3+
CREATE TABLE if not exists logs16
4+
(
5+
`uuid` String,
6+
`team_id` Int32,
7+
`trace_id` String,
8+
`span_id` String,
9+
`trace_flags` Int32,
10+
`timestamp` DateTime64(6),
11+
`observed_timestamp` DateTime64(6),
12+
`created_at` DateTime64(6),
13+
`body` String,
14+
`severity_text` String,
15+
`severity_number` Int32,
16+
`service_name` String,
17+
`resource_attributes` Map(String, String),
18+
`resource_id` String,
19+
`instrumentation_scope` String,
20+
`event_name` String,
21+
`attributes` Map(String, String),
22+
`attributes_map_str` Map(String, String),
23+
`attributes_map_float` Map(String, Float64),
24+
`attributes_map_datetime` Map(String, DateTime64(6)),
25+
`attribute_keys` Array(String),
26+
`attribute_values` Array(String),
27+
`level` String ALIAS severity_text,
28+
INDEX idx_severity_text_set severity_text TYPE set(10) GRANULARITY 1,
29+
INDEX idx_attributes_str_keys mapKeys(attributes_map_str) TYPE bloom_filter(0.01) GRANULARITY 1,
30+
INDEX idx_attributes_str_values mapValues(attributes_map_str) TYPE bloom_filter(0.01) GRANULARITY 1,
31+
INDEX idx_body_ngram body TYPE ngrambf_v1(3, 20000, 4, 0) GRANULARITY 1
32+
)
33+
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/logs16', '{replica}')
34+
PARTITION BY toDate(timestamp)
35+
ORDER BY (team_id, toStartOfMinute(timestamp) DESC, service_name, severity_text, toUnixTimestamp(timestamp) DESC, trace_id, span_id)
36+
SETTINGS
37+
allow_remote_fs_zero_copy_replication = 1,
38+
allow_experimental_reverse_key = 1;
39+
40+
create or replace TABLE logs AS logs16 ENGINE = Distributed('posthog', 'default', 'logs16');
41+
42+
create table if not exists log_attributes
43+
44+
(
45+
`team_id` Int32,
46+
`time_bucket` DateTime64(0),
47+
`service_name` LowCardinality(String),
48+
`attribute_key` LowCardinality(String),
49+
`attribute_value` String,
50+
`attribute_count` SimpleAggregateFunction(sum, UInt64),
51+
INDEX idx_attribute_key attribute_key TYPE bloom_filter(0.01) GRANULARITY 1,
52+
INDEX idx_attribute_value attribute_value TYPE bloom_filter(0.001) GRANULARITY 1,
53+
INDEX idx_attribute_key_n3 attribute_key TYPE ngrambf_v1(3, 32768, 3, 0) GRANULARITY 1,
54+
INDEX idx_attribute_value_n3 attribute_value TYPE ngrambf_v1(3, 32768, 3, 0) GRANULARITY 1
55+
)
56+
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/log_attributes', '{replica}')
57+
PARTITION BY toDate(time_bucket)
58+
ORDER BY (team_id, service_name, time_bucket, attribute_key, attribute_value);
59+
60+
set enable_dynamic_type=1;
61+
CREATE MATERIALIZED VIEW if not exists log_to_log_attributes TO log_attributes
62+
(
63+
`team_id` Int32,
64+
`time_bucket` DateTime64(0),
65+
`service_name` LowCardinality(String),
66+
`attribute_key` LowCardinality(String),
67+
`attribute_value` String,
68+
`attribute_count` SimpleAggregateFunction(sum, UInt64)
69+
)
70+
AS SELECT
71+
team_id,
72+
time_bucket,
73+
service_name,
74+
attribute_key,
75+
attribute_value,
76+
attribute_count
77+
FROM (select
78+
team_id AS team_id,
79+
toStartOfInterval(timestamp, toIntervalMinute(10)) AS time_bucket,
80+
service_name AS service_name,
81+
arrayJoin(arrayMap((k, v) -> (k, if(length(v) > 256, '', v)), arrayFilter((k, v) -> (length(k) < 256), CAST(attributes, 'Array(Tuple(String, String))')))) AS attribute,
82+
attribute.1 AS attribute_key,
83+
CAST(JSONExtract(attribute.2, 'Dynamic'), 'String') AS attribute_value,
84+
sumSimpleState(1) AS attribute_count
85+
FROM logs16
86+
GROUP BY
87+
team_id,
88+
time_bucket,
89+
service_name,
90+
attribute
91+
);
92+
93+
CREATE OR REPLACE TABLE kafka_logs_avro
94+
(
95+
`uuid` String,
96+
`team_id` Int32,
97+
`trace_id` String,
98+
`span_id` String,
99+
`trace_flags` Int32,
100+
`timestamp` DateTime64(6),
101+
`observed_timestamp` DateTime64(6),
102+
`created_at` DateTime64(6),
103+
`body` String,
104+
`severity_text` String,
105+
`severity_number` Int32,
106+
`service_name` String,
107+
`resource_attributes` Map(String, String),
108+
`resource_id` String,
109+
`instrumentation_scope` String,
110+
`event_name` String,
111+
`attributes` Map(String, Nullable(String)),
112+
`attributes_map_str` Map(String, Nullable(String)),
113+
`attributes_map_float` Map(String, Nullable(Float64)),
114+
`attributes_map_datetime` Map(String, Nullable(DateTime64(6))),
115+
`attribute_keys` Array(Nullable(String)),
116+
`attribute_values` Array(Nullable(String))
117+
)
118+
ENGINE = Kafka('kafka:9092', 'logs_avro', 'clickhouse-logs-avro', 'Avro')
119+
SETTINGS
120+
kafka_skip_broken_messages = 100,
121+
kafka_security_protocol = 'PLAINTEXT',
122+
kafka_thread_per_consumer = 1,
123+
kafka_num_consumers = 1,
124+
kafka_poll_timeout_ms=15000,
125+
kafka_poll_max_batch_size=100,
126+
kafka_max_block_size=1000;
127+
128+
drop table if exists kafka_logs_avro_mv;
129+
130+
CREATE MATERIALIZED VIEW kafka_logs_avro_mv TO logs16
131+
(
132+
`uuid` String,
133+
`team_id` Int32,
134+
`trace_id` String,
135+
`span_id` String,
136+
`trace_flags` Int32,
137+
`timestamp` DateTime64(6),
138+
`observed_timestamp` DateTime64(6),
139+
`created_at` DateTime64(6),
140+
`body` String,
141+
`severity_text` String,
142+
`severity_number` Int32,
143+
`service_name` String,
144+
`resource_attributes` Map(String, String),
145+
`resource_id` String,
146+
`instrumentation_scope` String,
147+
`event_name` String,
148+
`attributes` Map(String, Nullable(String)),
149+
`attributes_map_str` Map(String, Nullable(String)),
150+
`attributes_map_float` Map(String, Nullable(Float64)),
151+
`attributes_map_datetime` Map(String, Nullable(DateTime64(6))),
152+
`attribute_keys` Array(Nullable(String)),
153+
`attribute_values` Array(Nullable(String))
154+
)
155+
AS SELECT
156+
*
157+
FROM kafka_logs_avro settings materialize_skip_indexes_on_insert = 1, distributed_background_insert_sleep_time_ms=5000, distributed_background_insert_batch=true;
158+
159+
select 'clickhouse logs tables initialised successfully!';

docker-compose.base.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,25 @@ services:
246246
REDIS_URL: 'redis://redis:6379/'
247247
CAPTURE_MODE: recordings
248248

249+
log-capture:
250+
image: ghcr.io/posthog/posthog/log-capture:master
251+
build:
252+
context: rust/
253+
args:
254+
BIN: log-capture
255+
restart: on-failure
256+
environment:
257+
BIND_HOST: '0.0.0.0'
258+
BIND_PORT: '3308'
259+
RUST_LOG: info,rdkafka=warn
260+
RUST_BACKTRACE: '1'
261+
KAFKA_HOSTS: kafka:9092
262+
JWT_SECRET: '<randomly generated secret key>'
263+
KAFKA_TOPIC: logs_avro
264+
networks:
265+
- otel_network
266+
- default
267+
249268
property-defs-rs:
250269
image: ghcr.io/posthog/posthog/property-defs-rs:master
251270
build:
@@ -412,8 +431,10 @@ services:
412431
image: otel/opentelemetry-collector-contrib:latest
413432
container_name: otel-collector-local
414433
command: [--config=/etc/otel-collector-config.yaml]
434+
user: '0:0'
415435
volumes:
416436
- ./otel-collector-config.dev.yaml:/etc/otel-collector-config.yaml
437+
- /var/lib/docker/containers:/var/lib/docker/containers:ro
417438
ports:
418439
- '4317:4317' # OTLP gRPC receiver (mapped to host)
419440
- '4318:4318' # OTLP HTTP receiver (mapped to host)

docker-compose.dev.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,14 @@ services:
246246
service: otel-collector
247247
depends_on:
248248
- jaeger
249+
- log-capture
250+
251+
log-capture:
252+
extends:
253+
file: docker-compose.base.yml
254+
service: log-capture
255+
depends_on:
256+
- kafka
249257

250258
jaeger:
251259
extends:

otel-collector-config.dev.yaml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,20 @@
11
receivers:
2+
filelog:
3+
include:
4+
- /var/lib/docker/containers/*/*.log
5+
include_file_name: true
6+
include_file_path: true
7+
operators:
8+
- id: container-parser
9+
max_log_size: 102400
10+
type: container
11+
format: docker
12+
on_error: send_quiet
13+
add_metadata_from_filepath: false
14+
- type: json_parser
15+
parse_from: body
16+
on_error: send_quiet
17+
if: "hasPrefix(body, '{')"
218
otlp:
319
protocols:
420
grpc:
@@ -11,6 +27,13 @@ exporters:
1127
endpoint: 'jaeger-local:4317' # Sending OTLP gRPC to Jaeger
1228
tls:
1329
insecure: true # For local communication to Jaeger
30+
otlp/logs:
31+
endpoint: 'log-capture:3308'
32+
compression: none
33+
tls:
34+
insecure: true
35+
headers:
36+
authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ0ZWFtX2lkIjoiMiIsImV4cCI6OTIyMzM3MjAzNjg1NDc3Nn0._HnjzroZ52kR9_7rKauMgnuCKU_ouwdvF_Jv--i9PTc
1437

1538
extensions: # Declaring the extensions
1639
health_check: # Default configuration is usually fine
@@ -25,4 +48,12 @@ service:
2548
receivers: [otlp]
2649
processors: [batch]
2750
exporters: [otlp]
51+
logs:
52+
exporters:
53+
- otlp/logs
54+
processors:
55+
- batch
56+
receivers:
57+
- otlp
58+
- filelog
2859
extensions: [health_check, zpages] # Enabling the declared extensions

0 commit comments

Comments
 (0)