From 8aec574c9358647b71142540a0a7d489539346b1 Mon Sep 17 00:00:00 2001 From: Grebennikov Roman Date: Wed, 18 Dec 2019 16:42:59 +0100 Subject: [PATCH] [FLINK-15171] make StringSerializationBenchmark and PojoSerializationBenchmark use proper i/o buffer implementation as on SerializationFrameworkMiniBenchmarks --- .../benchmark/full/OffheapInputWrapper.java | 32 ++++++++ .../full/PojoSerializationBenchmark.java | 79 +++++++++++++------ .../full/StringSerializationBenchmark.java | 47 ++++++----- 3 files changed, 112 insertions(+), 46 deletions(-) create mode 100644 src/main/java/org/apache/flink/benchmark/full/OffheapInputWrapper.java diff --git a/src/main/java/org/apache/flink/benchmark/full/OffheapInputWrapper.java b/src/main/java/org/apache/flink/benchmark/full/OffheapInputWrapper.java new file mode 100644 index 0000000..c03c99d --- /dev/null +++ b/src/main/java/org/apache/flink/benchmark/full/OffheapInputWrapper.java @@ -0,0 +1,32 @@ +package org.apache.flink.benchmark.full; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; + +import java.lang.reflect.Field; + +public class OffheapInputWrapper { + public SpillingAdaptiveSpanningRecordDeserializer reader; + public Buffer buffer; + public DataInputView dataInput; + + public OffheapInputWrapper(byte[] initialPayload) throws Exception { + reader = new SpillingAdaptiveSpanningRecordDeserializer<>(new String[0]); + MemorySegment segment = MemorySegmentFactory.allocateUnpooledOffHeapMemory(initialPayload.length, this); + segment.put(0, initialPayload); + buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, true, initialPayload.length); + Field nonSpanningWrapper = reader.getClass().getDeclaredField("nonSpanningWrapper"); + nonSpanningWrapper.setAccessible(true); + dataInput = (DataInputView) nonSpanningWrapper.get(reader); + } + + public void reset() throws Exception { + reader.setNextBuffer(buffer); + } + +} diff --git a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java index 6858827..671d26e 100644 --- a/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/full/PojoSerializationBenchmark.java @@ -5,11 +5,9 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.benchmark.BenchmarkBase; import org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.*; import org.apache.flink.formats.avro.typeutils.AvroSerializer; +import org.openjdk.jmh.infra.Blackhole; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; @@ -17,7 +15,6 @@ import org.openjdk.jmh.runner.options.VerboseMode; import org.openjdk.jmh.annotations.*; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Arrays; @@ -39,13 +36,16 @@ public class PojoSerializationBenchmark extends BenchmarkBase { TypeSerializer avroSerializer = new AvroSerializer<>(org.apache.flink.benchmark.avro.MyPojo.class); - ByteArrayInputStream pojoBuffer; - ByteArrayInputStream avroBuffer; - ByteArrayInputStream kryoBuffer; + OffheapInputWrapper pojoBuffer; + OffheapInputWrapper avroBuffer; + OffheapInputWrapper kryoBuffer; + DataOutputSerializer stream = new DataOutputSerializer(128); + + public static final int INVOCATIONS = 1000; @Setup - public void setup() throws IOException { + public void setup() throws Exception { pojo = new SerializationFrameworkMiniBenchmarks.MyPojo( 0, "myName", @@ -70,9 +70,9 @@ public void setup() throws IOException { 2, 3, "null"); - pojoBuffer = new ByteArrayInputStream(write(pojoSerializer, pojo)); - avroBuffer = new ByteArrayInputStream(write(avroSerializer, avroPojo)); - kryoBuffer = new ByteArrayInputStream(write(kryoSerializer, pojo)); + pojoBuffer = new OffheapInputWrapper(writePayload(pojoSerializer, pojo)); + avroBuffer = new OffheapInputWrapper(writePayload(avroSerializer, avroPojo)); + kryoBuffer = new OffheapInputWrapper(writePayload(kryoSerializer, pojo)); } public static void main(String[] args) @@ -86,42 +86,69 @@ public static void main(String[] args) } @Benchmark - public byte[] writePojo() throws IOException { - return write(pojoSerializer, pojo); + @OperationsPerInvocation(INVOCATIONS) + public int writePojo() throws IOException { + stream.pruneBuffer(); + for (int i = 0; i < INVOCATIONS; i++) { + pojoSerializer.serialize(pojo, stream); + } + return stream.length(); } @Benchmark - public byte[] writeAvro() throws IOException { - return write(avroSerializer, avroPojo); + @OperationsPerInvocation(INVOCATIONS) + public int writeAvro() throws IOException { + stream.pruneBuffer(); + for (int i = 0; i < INVOCATIONS; i++) { + avroSerializer.serialize(avroPojo, stream); + } + return stream.length(); } @Benchmark - public byte[] writeKryo() throws IOException { - return write(kryoSerializer, pojo); + @OperationsPerInvocation(INVOCATIONS) + public int writeKryo() throws IOException { + stream.pruneBuffer(); + for (int i = 0; i < INVOCATIONS; i++) { + kryoSerializer.serialize(pojo, stream); + } + return stream.length(); } @Benchmark - public SerializationFrameworkMiniBenchmarks.MyPojo readPojo() throws IOException { + @OperationsPerInvocation(INVOCATIONS) + public void readPojo(Blackhole bh) throws Exception { pojoBuffer.reset(); - return pojoSerializer.deserialize(new DataInputViewStreamWrapper(pojoBuffer)); + for (int i = 0; i < INVOCATIONS; i++) { + bh.consume(pojoSerializer.deserialize(pojoBuffer.dataInput)); + } } @Benchmark - public SerializationFrameworkMiniBenchmarks.MyPojo readKryo() throws IOException { + @OperationsPerInvocation(INVOCATIONS) + public void readKryo(Blackhole bh) throws Exception { kryoBuffer.reset(); - return kryoSerializer.deserialize(new DataInputViewStreamWrapper(kryoBuffer)); + for (int i = 0; i < INVOCATIONS; i++) { + bh.consume(kryoSerializer.deserialize(kryoBuffer.dataInput)); + } } @Benchmark - public org.apache.flink.benchmark.avro.MyPojo readAvro() throws IOException { + @OperationsPerInvocation(INVOCATIONS) + public void readAvro(Blackhole bh) throws Exception { avroBuffer.reset(); - return avroSerializer.deserialize(new DataInputViewStreamWrapper(avroBuffer)); + for (int i = 0; i < INVOCATIONS; i++) { + bh.consume(avroSerializer.deserialize(avroBuffer.dataInput)); + } } - private byte[] write(TypeSerializer serializer, T value) throws IOException { + private byte[] writePayload(TypeSerializer serializer, T value) throws IOException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); DataOutputView out = new DataOutputViewStreamWrapper(buffer); - serializer.serialize(value, out); + for (int i = 0; i < INVOCATIONS; i++) { + serializer.serialize(value, out); + } + buffer.close(); return buffer.toByteArray(); } } diff --git a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java index 72f0549..c64a32e 100644 --- a/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/full/StringSerializationBenchmark.java @@ -4,10 +4,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.benchmark.BenchmarkBase; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.memory.*; +import org.openjdk.jmh.infra.Blackhole; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; @@ -15,8 +13,6 @@ import org.openjdk.jmh.runner.options.VerboseMode; import org.openjdk.jmh.annotations.*; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Random; import java.util.concurrent.TimeUnit; @@ -50,11 +46,13 @@ public static void main(String[] args) ExecutionConfig config = new ExecutionConfig(); TypeSerializer serializer = TypeInformation.of(String.class).createSerializer(config); - ByteArrayInputStream serializedBuffer; - DataInputView serializedStream; + DataOutputSerializer serializedStream; + OffheapInputWrapper offheapInput; + + public static final int INVOCATIONS = 1000; @Setup - public void setup() throws IOException { + public void setup() throws Exception { length = Integer.parseInt(lengthStr); switch (type) { case "ascii": @@ -69,23 +67,32 @@ public void setup() throws IOException { default: throw new IllegalArgumentException(type + "charset is not supported"); } - byte[] stringBytes = stringWrite(); - serializedBuffer = new ByteArrayInputStream(stringBytes); - serializedStream = new DataInputViewStreamWrapper(serializedBuffer); + serializedStream = new DataOutputSerializer(128); + DataOutputSerializer payloadWriter = new DataOutputSerializer(128); + for (int i = 0; i < INVOCATIONS; i++) { + serializer.serialize(input, payloadWriter); + } + byte[] payload = payloadWriter.getCopyOfBuffer(); + offheapInput = new OffheapInputWrapper(payload); } @Benchmark - public byte[] stringWrite() throws IOException { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - DataOutputView out = new DataOutputViewStreamWrapper(buffer); - serializer.serialize(input, out); - return buffer.toByteArray(); + @OperationsPerInvocation(INVOCATIONS) + public int stringWrite() throws IOException { + serializedStream.pruneBuffer(); + for (int i = 0; i < INVOCATIONS; i++) { + serializer.serialize(input, serializedStream); + } + return serializedStream.length(); } @Benchmark - public String stringRead() throws IOException { - serializedBuffer.reset(); - return serializer.deserialize(serializedStream); + @OperationsPerInvocation(INVOCATIONS) + public void stringRead(Blackhole bh) throws Exception { + offheapInput.reset(); + for (int i = 0; i < INVOCATIONS; i++) { + bh.consume(serializer.deserialize(offheapInput.dataInput)); + } } private String generate(char[] charset, int length) {