Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 238 additions & 27 deletions NOTICE.TXT

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions logstash-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ dependencies {
exclude group: 'com.google.guava', module: 'guava'
}
implementation 'org.javassist:javassist:3.30.2-GA'
implementation 'org.hdrhistogram:HdrHistogram:2.2.2'
testImplementation "org.apache.logging.log4j:log4j-core:${log4jVersion}:tests"
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'org.hamcrest:hamcrest-library:2.2'
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/api/commands/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def plugin_stats(stats, plugin_type)

def report(stats, extended_stats = nil, opts = {})
ret = {
:batch => stats[:batch],
:events => stats[:events],
:flow => stats[:flow],
:plugins => {
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ module Environment
Setting::ExistingFilePath.new("api.ssl.keystore.path", nil, false).nullable,
Setting::Password.new("api.ssl.keystore.password", nil, false).nullable,
Setting::StringArray.new("api.ssl.supported_protocols", nil, true, %w[TLSv1 TLSv1.1 TLSv1.2 TLSv1.3]),
Setting::SettingString.new("pipeline.batch.metrics", "false", true, ["false", "true"]),
Setting::SettingString.new("queue.type", "memory", true, ["persisted", "memory"]),
Setting::Boolean.new("queue.drain", false),
Setting::Bytes.new("queue.page_capacity", "64mb"),
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/instrument/metric_type.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def self.create(type, namespaces, key)
when :gauge then return org.logstash.instrument.metrics.gauge.LazyDelegatingGauge.new(key.to_s)
when :uptime then return org.logstash.instrument.metrics.UptimeMetric.new(key.to_s)
when :timer then return org.logstash.instrument.metrics.timer.TimerMetric::create(key.to_s)
when :histogram then return org.logstash.instrument.metrics.histogram.HistogramMetric.new(key.to_s)
else fail NameError, "Unknown Metric Type `#{type}`"
end
end
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def self.included(base)
"path.dead_letter_queue",
"path.queue",
"pipeline.batch.delay",
"pipeline.batch.metrics",
"pipeline.batch.size",
"pipeline.id",
"pipeline.reloadable",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def threaded_read_client
end

context "WrappedSynchronousQueue" do
let(:queue) { LogStash::WrappedSynchronousQueue.new(1024) }
let(:queue) { LogStash::WrappedSynchronousQueue.new(1024, "false") }

before do
read_client.set_events_metric(metric.namespace([:stats, :events]))
Expand Down
1 change: 1 addition & 0 deletions logstash-core/spec/logstash/queue_factory_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
LogStash::Setting::SettingNumeric.new("queue.checkpoint.writes", 1024),
LogStash::Setting::Boolean.new("queue.checkpoint.retry", false),
LogStash::Setting::SettingString.new("pipeline.id", pipeline_id),
LogStash::Setting::SettingString.new("pipeline.batch.metrics", "false", true, ["false", "true"]),
LogStash::Setting::SettingPositiveInteger.new("pipeline.batch.size", 125),
LogStash::Setting::SettingPositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
require "logstash/instrument/collector"

describe LogStash::WrappedSynchronousQueue do
subject {LogStash::WrappedSynchronousQueue.new(5)}
subject {LogStash::WrappedSynchronousQueue.new(5, "false")}

describe "queue clients" do
context "when requesting a write client" do
Expand Down
5 changes: 4 additions & 1 deletion logstash-core/spec/support/pipeline/pipeline_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def sample_one(sample_event, &block)
end

describe "\"#{name}\"" do
let(:collector) {LogStash::Instrument::Collector.new}
let(:metric) { LogStash::Instrument::Metric.new(collector).namespace(:null) }

let(:pipeline) do
settings.set_value("queue.drain", true)
LogStash::JavaPipeline.new(
Expand All @@ -82,7 +85,7 @@ def sample_one(sample_event, &block)
"config_string", "config_string",
"input { spec_sampler_input {} }\n" + config + "\noutput { spec_sampler_output {} }"
), settings
)
), metric
)
end
let(:event) do
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/src/main/java/org/logstash/Rubyfier.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.ext.JrubyTimestampExtLibrary;
import org.logstash.instrument.metrics.histogram.HistogramSnapshot;
import org.logstash.secret.SecretVariable;

public final class Rubyfier {
Expand Down Expand Up @@ -133,6 +134,7 @@ private static Map<Class<?>, Rubyfier.Converter> initConverters() {
)
);
converters.put(SecretVariable.class, JAVAUTIL_CONVERTER);
converters.put(HistogramSnapshot.class, JAVAUTIL_CONVERTER);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewer

The x-pack monitoring pipeline create Logstash events which contains this snapshot, and need to be converted to Ruby object in the Rubyfier.deep method.

return converters;
}

Expand Down
2 changes: 2 additions & 0 deletions logstash-core/src/main/java/org/logstash/Valuefier.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.ext.JrubyTimestampExtLibrary;
import org.logstash.instrument.metrics.histogram.HistogramSnapshot;

public final class Valuefier {

Expand Down Expand Up @@ -201,6 +202,7 @@ RubyUtil.RUBY, new Timestamp(((LocalDateTime) input).toInstant(ZoneOffset.UTC))
RubyUtil.RUBY, new Timestamp(((ZonedDateTime) input).toInstant())
)
);
converters.put(HistogramSnapshot.class, IDENTITY);
return converters;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv,
final IRubyObject settings) throws IOException {
final String type = getSetting(context, settings, QUEUE_TYPE_CONTEXT_NAME).asJavaString();

final String histogramFlag = getSetting(context, settings, SettingKeyDefinitions.PIPELINE_BATCH_METRICS)
.asJavaString();

if (PERSISTED_TYPE.equals(type)) {
final Path queuePath = Paths.get(
getSetting(context, settings, SettingKeyDefinitions.PATH_QUEUE).asJavaString(),
Expand All @@ -93,15 +97,14 @@ public static AbstractWrappedQueueExt create(final ThreadContext context, final
}
);
} else if (MEMORY_TYPE.equals(type)) {
final int batchSize = getSetting(context, settings, SettingKeyDefinitions.PIPELINE_BATCH_SIZE)
.convertToInteger().getIntValue();
final int workers = getSetting(context, settings, SettingKeyDefinitions.PIPELINE_WORKERS)
.convertToInteger().getIntValue();
return new JrubyWrappedSynchronousQueueExt(
context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
).initialize(
context, context.runtime.newFixnum(
getSetting(context, settings, SettingKeyDefinitions.PIPELINE_BATCH_SIZE)
.convertToInteger().getIntValue()
* getSetting(context, settings, SettingKeyDefinitions.PIPELINE_WORKERS)
.convertToInteger().getIntValue()
)
context, context.runtime.newFixnum(batchSize * workers), context.runtime.newString(histogramFlag)
);
} else {
throw context.runtime.newRaiseException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class SettingKeyDefinitions {

public static final String PIPELINE_BATCH_SIZE = "pipeline.batch.size";

public static final String PIPELINE_BATCH_METRICS = "pipeline.batch.metrics";

public static final String PATH_QUEUE = "path.queue";

public static final String QUEUE_PAGE_CAPACITY = "queue.page_capacity";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.logstash.instrument.metrics.MetricType;
import org.logstash.instrument.metrics.NullMetricExt;
import org.logstash.instrument.metrics.UpScaledMetric;
import org.logstash.instrument.metrics.histogram.HistogramMetric;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.instrument.metrics.UptimeMetric;
import org.logstash.instrument.metrics.counter.LongCounter;
Expand Down Expand Up @@ -286,6 +287,12 @@ private AbstractPipelineExt initialize(final ThreadContext context,
} catch (InvalidIRException iirex) {
throw new IllegalArgumentException(iirex);
}


// init histogram sample
final RubySymbol[] batchNamespace = buildNamespace(BATCH_KEY);
initOrGetHistogramMetric(context, batchNamespace, BATCH_EVENT_COUNT_KEY);

return this;
}

Expand Down Expand Up @@ -317,8 +324,8 @@ public final IRubyObject openQueue(final ThreadContext context) {
new IRubyObject[]{
STATS_KEY,
PIPELINES_KEY,
pipelineId.convertToString().intern(),
EVENTS_KEY
pipelineId.convertToString().intern()/*,
EVENTS_KEY*/
}
)
)
Expand Down Expand Up @@ -640,6 +647,19 @@ private TimerMetric initOrGetTimerMetric(final ThreadContext context,
return retrievedMetric.toJava(TimerMetric.class);
}

private HistogramMetric initOrGetHistogramMetric(final ThreadContext context,
final RubySymbol[] subPipelineNamespacePath,
final RubySymbol metricName) {
final IRubyObject collector = this.metric.collector(context);
if (collector.isNil()) {
return null;
}
final IRubyObject fullNamespace = pipelineNamespacedPath(subPipelineNamespacePath);

final IRubyObject retrievedMetric = collector.callMethod(context, "get", new IRubyObject[]{fullNamespace, metricName, context.runtime.newSymbol("histogram")});
return retrievedMetric.toJava(HistogramMetric.class);
}

private Optional<NumberGauge> initOrGetNumberGaugeMetric(final ThreadContext context,
final RubySymbol[] subPipelineNamespacePath,
final RubySymbol metricName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public interface QueueReadClient {
void addFilteredMetrics(int filteredSize);
void closeBatch(QueueBatch batch) throws IOException;

public <V, E extends Exception> V executeWithTimers(final TimerMetric.ExceptionalSupplier<V,E> supplier) throws E;
<V, E extends Exception> V executeWithTimers(final TimerMetric.ExceptionalSupplier<V,E> supplier) throws E;

public <E extends Exception> void executeWithTimers(final TimerMetric.ExceptionalRunnable<E> runnable) throws E;
<E extends Exception> void executeWithTimers(final TimerMetric.ExceptionalRunnable<E> runnable) throws E;

boolean isEmpty();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,35 @@
import org.logstash.RubyUtil;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.histogram.HistogramMetric;
import org.logstash.instrument.metrics.timer.TimerMetric;
import org.logstash.instrument.metrics.counter.LongCounter;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static org.logstash.instrument.metrics.MetricKeys.BATCH_KEY;
import static org.logstash.instrument.metrics.MetricKeys.EVENTS_KEY;

/**
* Common code shared by Persistent and In-Memory queues clients implementation
* */
@JRubyClass(name = "QueueReadClientBase")
public abstract class QueueReadClientBase extends RubyObject implements QueueReadClient {

public enum BatchSizeSamplingType {
NONE, FULL;

public static BatchSizeSamplingType decode(String type) {
return switch (type) {
case "false" -> NONE;
case "true" -> FULL;
default -> throw new IllegalArgumentException("Invalid batch size type: " + type);
};
}
}

private static final long serialVersionUID = 1L;

protected int batchSize = 125;
Expand All @@ -60,6 +76,8 @@ public abstract class QueueReadClientBase extends RubyObject implements QueueRea
private transient LongCounter pipelineMetricOut;
private transient LongCounter pipelineMetricFiltered;
private transient TimerMetric pipelineMetricTime;
private transient HistogramMetric pipelineMetricBatch;
protected BatchSizeSamplingType batchSizeSamplingType = BatchSizeSamplingType.FULL;

protected QueueReadClientBase(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
Expand All @@ -86,10 +104,14 @@ public IRubyObject setEventsMetric(final IRubyObject metric) {
@JRubyMethod(name = "set_pipeline_metric")
public IRubyObject setPipelineMetric(final IRubyObject metric) {
final AbstractNamespacedMetricExt namespacedMetric = (AbstractNamespacedMetricExt) metric;
ThreadContext context = metric.getRuntime().getCurrentContext();
AbstractNamespacedMetricExt eventsNamespace = namespacedMetric.namespace(context, EVENTS_KEY);
AbstractNamespacedMetricExt batchNamespace = namespacedMetric.namespace(context, BATCH_KEY);
synchronized(namespacedMetric.getMetric()) {
pipelineMetricOut = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.OUT_KEY);
pipelineMetricFiltered = LongCounter.fromRubyBase(namespacedMetric, MetricKeys.FILTERED_KEY);
pipelineMetricTime = TimerMetric.fromRubyBase(namespacedMetric, MetricKeys.DURATION_IN_MILLIS_KEY);
pipelineMetricOut = LongCounter.fromRubyBase(eventsNamespace, MetricKeys.OUT_KEY);
pipelineMetricFiltered = LongCounter.fromRubyBase(eventsNamespace, MetricKeys.FILTERED_KEY);
pipelineMetricTime = TimerMetric.fromRubyBase(eventsNamespace, MetricKeys.DURATION_IN_MILLIS_KEY);
pipelineMetricBatch = HistogramMetric.fromRubyBase(batchNamespace, MetricKeys.BATCH_EVENT_COUNT_KEY);
}
return this;
}
Expand Down Expand Up @@ -193,6 +215,9 @@ public void startMetrics(QueueBatch batch) {
// JTODO getId has been deprecated in JDK 19, when JDK 21 is the target version use threadId() instead
long threadId = Thread.currentThread().getId();
inflightBatches.put(threadId, batch);
if (batchSizeSamplingType == BatchSizeSamplingType.FULL) {
pipelineMetricBatch.update(batch.filteredSize());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ public JrubyMemoryReadClientExt(final Ruby runtime, final RubyClass metaClass) {

@SuppressWarnings("rawtypes")
private JrubyMemoryReadClientExt(final Ruby runtime, final RubyClass metaClass,
BlockingQueue queue, int batchSize, int waitForMillis) {
BlockingQueue queue, int batchSize, int waitForMillis,
BatchSizeSamplingType batchSizeSamplingType) {
super(runtime, metaClass);
this.queue = queue;
this.batchSizeSamplingType = batchSizeSamplingType;
this.batchSize = batchSize;
this.waitForNanos = TimeUnit.NANOSECONDS.convert(waitForMillis, TimeUnit.MILLISECONDS);
this.waitForMillis = waitForMillis;
Expand All @@ -58,8 +60,14 @@ private JrubyMemoryReadClientExt(final Ruby runtime, final RubyClass metaClass,
@SuppressWarnings("rawtypes")
public static JrubyMemoryReadClientExt create(BlockingQueue queue, int batchSize,
int waitForMillis) {
return create(queue, batchSize, waitForMillis, BatchSizeSamplingType.FULL);
}

@SuppressWarnings("rawtypes")
public static JrubyMemoryReadClientExt create(BlockingQueue queue, int batchSize, int waitForMillis,
BatchSizeSamplingType batchSizeSamplingType) {
return new JrubyMemoryReadClientExt(RubyUtil.RUBY,
RubyUtil.MEMORY_READ_CLIENT_CLASS, queue, batchSize, waitForMillis);
RubyUtil.MEMORY_READ_CLIENT_CLASS, queue, batchSize, waitForMillis, batchSizeSamplingType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class JrubyWrappedSynchronousQueueExt extends AbstractWrappedQueueE
private static final long serialVersionUID = 1L;

private transient BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue;
private QueueReadClientBase.BatchSizeSamplingType batchMetricsSamplingType;

public JrubyWrappedSynchronousQueueExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
Expand All @@ -50,8 +51,11 @@ public JrubyWrappedSynchronousQueueExt(final Ruby runtime, final RubyClass metaC
@JRubyMethod
@SuppressWarnings("unchecked")
public JrubyWrappedSynchronousQueueExt initialize(final ThreadContext context,
IRubyObject size) {
IRubyObject size,
IRubyObject batchMetricsSampling) {
int typedSize = ((RubyNumeric)size).getIntValue();
this.batchMetricsSamplingType = QueueReadClientBase.BatchSizeSamplingType.decode(batchMetricsSampling.asJavaString());

this.queue = new ArrayBlockingQueue<>(typedSize);
return this;
}
Expand All @@ -65,7 +69,7 @@ protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext co
protected QueueReadClientBase getReadClient() {
// batch size and timeout are currently hard-coded to 125 and 50ms as values observed
// to be reasonable tradeoffs between latency and throughput per PR #8707
return JrubyMemoryReadClientExt.create(queue, 125, 50);
return JrubyMemoryReadClientExt.create(queue, 125, 50, batchMetricsSamplingType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public IRubyObject timer(final ThreadContext context, final IRubyObject key) {
return getTimer(context, key);
}

@JRubyMethod
public IRubyObject histogram(final ThreadContext context, final IRubyObject key) {
return getHistogram(context, key);
}

@JRubyMethod(required = 1, optional = 1)
public IRubyObject increment(final ThreadContext context, final IRubyObject[] args) {
return doIncrement(context, args);
Expand Down Expand Up @@ -95,6 +100,8 @@ protected abstract IRubyObject getGauge(ThreadContext context, IRubyObject key,

protected abstract IRubyObject getTimer(ThreadContext context, IRubyObject key);

protected abstract IRubyObject getHistogram(ThreadContext context, IRubyObject key);

protected abstract IRubyObject doTime(ThreadContext context, IRubyObject key, Block block);

protected abstract IRubyObject doReportTime(ThreadContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public IRubyObject timer(final ThreadContext context,
return getTimer(context, namespace, key);
}

@JRubyMethod
public IRubyObject histogram(final ThreadContext context,
final IRubyObject namespace,
final IRubyObject key) {
return getHistogram(context, namespace, key);
}

@JRubyMethod(name = "report_time")
public IRubyObject reportTime(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final IRubyObject duration) {
Expand All @@ -83,6 +90,8 @@ protected abstract IRubyObject getGauge(ThreadContext context, IRubyObject names

protected abstract IRubyObject getTimer(ThreadContext context, IRubyObject namespace, IRubyObject key);

protected abstract IRubyObject getHistogram(ThreadContext context, IRubyObject namespace, IRubyObject key);

protected abstract IRubyObject doReportTime(ThreadContext context, IRubyObject namespace,
IRubyObject key, IRubyObject duration);

Expand Down
Loading