Q&A - struggling to get a transform of type: reduce
working, the same config works _flawlessly_ when used as multiline:
on a source of type: file
#23837
-
QuestionI have a log file that is almost always well structured but every once in a while there's an errant stack trace. Here's what my sample log looks like:
Essentially "well formed" lines always start with a capital letter and the stack trace either starts with a lower case letter OR whitespace. Given the above example log, I would expect three log events to come out of this:
But I am struggling to get this working correctly. When I have this: # transform.cassandra_system_log_stacktrace_merger
# merge_strategies:
# message: concat_newline The result is: ❯ vector tap --outputs-of cassandra_system_log,cassandra_system_log_stacktrace_merger --meta --interval 50
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"WARN [prometheus-netty-pool-0] 2024-09-03 22:24:16,343 Harvester.java:250 - Metrics collector com.zegelin.cassandra.exporter.collector.StorageServiceMBeanMetricFamilyCollector failed to collect. Skipping.","source_type":"file","timestamp":"2025-09-23T23:07:20.200396Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"java.lang.RuntimeException: No nodes present in the cluster. Has this node finished starting up?","source_type":"file","timestamp":"2025-09-23T23:07:20.200626Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"\tat org.apache.cassandra.dht.Murmur3Partitioner.describeOwnership(Murmur3Partitioner.java:262)","source_type":"file","timestamp":"2025-09-23T23:07:20.200631Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"\tat org.apache.cassandra.service.StorageService.getOwnership(StorageService.java:5000)","source_type":"file","timestamp":"2025-09-23T23:07:20.200634Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"\t<. another 100 or so lines here.>","source_type":"file","timestamp":"2025-09-23T23:07:20.200638Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"\tat io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)","source_type":"file","timestamp":"2025-09-23T23:07:20.200641Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"\tat java.lang.Thread.run(Thread.java:750)","source_type":"file","timestamp":"2025-09-23T23:07:20.200645Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"INFO [IndexSummaryManager:1] 2024-09-29 17:24:05,083 IndexSummaryRedistribution.java:78 - Redistributing index summaries","source_type":"file","timestamp":"2025-09-23T23:07:20.200648Z"}}
{"component_id":"cassandra_system_log_stacktrace_merger","component_kind":"transform","component_type":"reduce","event":{"file":"trim-system.log","host":"myMachine","message":"WARN [prometheus-netty-pool-0] 2024-09-03 22:24:16,343 Harvester.java:250 - Metrics collector com.zegelin.cassandra.exporter.collector.StorageServiceMBeanMetricFamilyCollector failed to collect. Skipping.","source_type":"file","timestamp":"2025-09-23T23:07:20.200396Z","timestamp_end":"2025-09-23T23:07:20.200648Z"}}
So if you look at each And the behavior is even more confusing when I change the config: merge_strategies:
message: concat_newline I re-run the same few log lines through vector and this is the ❯ VECTOR_LOG=info vector tap --outputs-of cassandra_system_log,cassandra_system_log_stacktrace_merger --meta --interval 50
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"WARN [prometheus-netty-pool-0] 2024-09-03 22:24:16,343 Harvester.java:250 - Metrics collector com.zegelin.cassandra.exporter.collector.StorageServiceMBeanMetricFamilyCollector failed to collect. Skipping.","source_type":"file","timestamp":"2025-09-23T23:11:11.752927Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"java.lang.RuntimeException: No nodes present in the cluster. Has this node finished starting up?","source_type":"file","timestamp":"2025-09-23T23:11:11.752973Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"\tat org.apache.cassandra.dht.Murmur3Partitioner.describeOwnership(Murmur3Partitioner.java:262)","source_type":"file","timestamp":"2025-09-23T23:11:11.752977Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"\tat org.apache.cassandra.service.StorageService.getOwnership(StorageService.java:5000)","source_type":"file","timestamp":"2025-09-23T23:11:11.752980Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"\t<. another 100 or so lines here.>","source_type":"file","timestamp":"2025-09-23T23:11:11.752984Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"\tat io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)","source_type":"file","timestamp":"2025-09-23T23:11:11.752987Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"\tat java.lang.Thread.run(Thread.java:750)","source_type":"file","timestamp":"2025-09-23T23:11:11.752991Z"}}
{"component_id":"cassandra_system_log","component_kind":"source","component_type":"file","event":{"file":"trim-system.log","host":"myMachine","message":"INFO [IndexSummaryManager:1] 2024-09-29 17:24:05,083 IndexSummaryRedistribution.java:78 - Redistributing index summaries","source_type":"file","timestamp":"2025-09-23T23:11:11.752995Z"}}
{"component_id":"cassandra_system_log_stacktrace_merger","component_kind":"transform","component_type":"reduce","event":{"file":"trim-system.log","host":"myMachine","message":"WARN [prometheus-netty-pool-0] 2024-09-03 22:24:16,343 Harvester.java:250 - Metrics collector com.zegelin.cassandra.exporter.collector.StorageServiceMBeanMetricFamilyCollector failed to collect. Skipping.\njava.lang.RuntimeException: No nodes present in the cluster. Has this node finished starting up?\n\tat org.apache.cassandra.dht.Murmur3Partitioner.describeOwnership(Murmur3Partitioner.java:262)\n\tat org.apache.cassandra.service.StorageService.getOwnership(StorageService.java:5000)\n\t<. another 100 or so lines here.>\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)\n\tat java.lang.Thread.run(Thread.java:750)\nINFO [IndexSummaryManager:1] 2024-09-29 17:24:05,083 IndexSummaryRedistribution.java:78 - Redistributing index summaries","source_type":"file","timestamp":"2025-09-23T23:11:11.752927Z","timestamp_end":"2025-09-23T23:11:11.752995Z"}} Same deal: 8 individual events for the file source, as expected. Vector ConfigThis is the "toy" that I've been testing with: data_dir: "./vector-data"
timezone: "UTC"
api:
enabled: true
schema:
log_namespace: false
sources:
cassandra_system_log:
type: file
ignore_checkpoints: true
include:
# A test-case that contains a single `WARN` line, then a few lines of stack trace and then an INFO line
# Ideally, we should get _three_ events out of this: the WARN line, the multi-line stack trace, and then the INFO line
- trim-system.log
transforms:
cassandra_system_log_stacktrace_merger:
type: reduce
inputs:
- cassandra_system_log
starts_when:
type: "vrl"
# For reasons that do not make a ton of sense to me, Vector does not allow both a starts_when and an ends_when
# You can specify only one or the other.
# After spending a lot of time looking at cassandra logs, it looks like "official" lines start with a capital level (INFO, WARN, ERROR, DEBUG... etc)
# and the stacktraces(s) that we are trying to capture do NOT start with a capital letter. Either they start with a class name (`java.*`) almost exclusively
# or they start with whitespace (the indented stack frame lines).
# So we can use regex to say that we start a new event when we see a line that either starts with a non capital letter OR starts with whitespace.
# Vector will `concat_newline` until the regex does not match anymore which would indicate that we are now on a new "official" log line.
source: |
msg = string!(.message)
match(msg, r'^([[:lower:]]+\\.|\\s)')
merge_strategies:
message: concat_newline
sinks:
test_json_file:
type: file
inputs:
- cassandra_system_log_stacktrace_merger
path: pipe-test.out.json
encoding:
timestamp_format: rfc3339
codec: json
json:
pretty: true And in Vector LogsNo response |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
I was able to figure this out. Here's the fix: match(msg, r'^\S')
# match(msg, r'^([[:lower:]]+\\.|\\s)') The regex that I was using would match on EVERY indented stack frame line which meant that each line was getting flushed as it's own event. As soon as the expression is changed to only look for a line that starts with whitespace, the internal logic that sets the "oh, this is a continuation, don't flush $previousLine" flag is set, properly. The other question I had was "why can't I use a |
Beta Was this translation helpful? Give feedback.
I was able to figure this out. Here's the fix:
The regex that I was using would match on EVERY indented stack frame line which meant that each line was getting flushed as it's own event. As soon as the expression is changed to only look for a line that starts with whitespace, the internal logic that sets the "oh, this is a continuation, don't flush $previousLine" flag is set, properly.
The other question I had was "why can't I use a
starts_when
andends_when
together. This is partially answered in #9615