From 4aa2e40da315675641ac38b9b7b5895fb7e295bb Mon Sep 17 00:00:00 2001 From: Kenji Okimoto Date: Wed, 27 Sep 2017 13:27:59 +0900 Subject: [PATCH] Use Thread::Mutex to resolve race condition The problematic sequence: 1. Add last line to `@buffer` 2. Call `flush_buffer` in `flush_timeout_buffer` (via periodical timer) 3. `flush_buffer` Sequence 2 and 3 may race. --- lib/fluent/plugin/filter_concat.rb | 44 ++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/lib/fluent/plugin/filter_concat.rb b/lib/fluent/plugin/filter_concat.rb index f990aa7..6510ce3 100644 --- a/lib/fluent/plugin/filter_concat.rb +++ b/lib/fluent/plugin/filter_concat.rb @@ -35,6 +35,7 @@ def initialize @buffer = Hash.new {|h, k| h[k] = [] } @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now } + @mutex = Thread::Mutex.new end def configure(conf) @@ -122,9 +123,13 @@ def process(tag, time, record) def process_line(stream_identity, tag, time, record) new_es = Fluent::MultiEventStream.new - @buffer[stream_identity] << [tag, time, record] + @mutex.synchronize do + @buffer[stream_identity] << [tag, time, record] + end if @buffer[stream_identity].size >= @n_lines - new_time, new_record = flush_buffer(stream_identity) + new_time, new_record = @mutex.synchronize do + flush_buffer(stream_identity) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) end @@ -136,26 +141,36 @@ def process_regexp(stream_identity, tag, time, record) case when firstline?(record[@key]) if @buffer[stream_identity].empty? - @buffer[stream_identity] << [tag, time, record] + @mutex.synchronize do + @buffer[stream_identity] << [tag, time, record] + end if lastline?(record[@key]) - new_time, new_record = flush_buffer(stream_identity) + new_time, new_record = @mutex.synchronize do + flush_buffer(stream_identity) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) end else - new_time, new_record = flush_buffer(stream_identity, [tag, time, record]) + new_time, new_record = @mutex.synchronize do + flush_buffer(stream_identity, [tag, time, record]) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) if lastline?(record[@key]) - new_time, new_record = flush_buffer(stream_identity) + new_time, new_record = @mutex.synchronize do + flush_buffer(stream_identity) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) end return new_es end when lastline?(record[@key]) - @buffer[stream_identity] << [tag, time, record] - new_time, new_record = flush_buffer(stream_identity) + @mutex.synchronize do + @buffer[stream_identity] << [tag, time, record] + new_time, new_record = flush_buffer(stream_identity) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) return new_es @@ -166,9 +181,13 @@ def process_regexp(stream_identity, tag, time, record) else if continuous_line?(record[@key]) # Continuation of the previous line - @buffer[stream_identity] << [tag, time, record] + @mutex.synchronize do + @buffer[stream_identity] << [tag, time, record] + end else - new_time, new_record = flush_buffer(stream_identity) + new_time, new_record = @mutex.synchronize do + flush_buffer(stream_identity) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) new_es.add(time, record) @@ -211,7 +230,10 @@ def flush_timeout_buffer @timeout_map.each do |stream_identity, previous_timestamp| next if @flush_interval > (now - previous_timestamp) next if @buffer[stream_identity].empty? - time, flushed_record = flush_buffer(stream_identity) + next if @mutex.locked? + time, flushed_record = @mutex.synchronize do + flush_buffer(stream_identity) + end timeout_stream_identities << stream_identity tag = stream_identity.split(":").first message = "Timeout flush: #{stream_identity}"