-
Notifications
You must be signed in to change notification settings - Fork 41
[FLINK-15171] string serialization benchmark to use proper i/o buffer implementation as on SerializationFrameworkMiniBenchmarks #44
base: master
Are you sure you want to change the base?
Conversation
…Benchmark use proper i/o buffer implementation as on SerializationFrameworkMiniBenchmarks
| @Benchmark | ||
| public byte[] writePojo() throws IOException { | ||
| return write(pojoSerializer, pojo); | ||
| @OperationsPerInvocation(INVOCATIONS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could this be moved to the top of the class?
| @OperationsPerInvocation(INVOCATIONS) | ||
| public int writePojo() throws IOException { | ||
| stream.pruneBuffer(); | ||
| for (int i = 0; i < INVOCATIONS; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you motivate why do we need multiple invocations in a single benchmark? Because of potential costs of stream.pruneBuffer(); or pojoBuffer.reset();? Are they measurable?
As it is, I would be a little be concerned what magic JIT can do after inlining pojoSerializer.serialize(pojo, stream); and unrolling the loop. That might be desireable (vectorisation) but as we are invoking it over and over again with the same parameters, some other magic could yield false results.
| public OffheapInputWrapper(byte[] initialPayload) throws Exception { | ||
| reader = new SpillingAdaptiveSpanningRecordDeserializer<>(new String[0]); | ||
| MemorySegment segment = MemorySegmentFactory.allocateUnpooledOffHeapMemory(initialPayload.length, this); | ||
| segment.put(0, initialPayload); | ||
| buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, true, initialPayload.length); | ||
| Field nonSpanningWrapper = reader.getClass().getDeclaredField("nonSpanningWrapper"); | ||
| nonSpanningWrapper.setAccessible(true); | ||
| dataInput = (DataInputView) nonSpanningWrapper.get(reader); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of issues here.
First, this relays on a non public Flink's API (SpillingAdaptiveSpanningRecordDeserializer), which means it can change at any point of time and brake the benchmark builds.
Secondly, this is using reflections to access a private field of an Internal class, which makes this benchmark even more fragile (and could cause compilation issues like this recent one).
Is there a way to avoid using internal classes? If there is really a no way, then it could be fixed similarly how #40 is refactoring the 15199 (take a look at org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils class).
Here in this case, the OffheapInputWrapper class should be defined and tested in flink repository, with some comment/annotation that it's being used by micro benchmarks. + code should be adjusted that the reflection is no longer needed.
|
@shuttie this repo will be mitigated to https://github.com/apache/flink-benchmarks, please create this PR under that repo once FLINK-17281 is completed. |
This is a follow-up to the upstream FLINK-15171 PR.
Originally
PojoSerializationBenchmarkandStringSerializationBenchmarkused simple jdk defaultbyte[]-backed input-output streams and views to test the serialization performance. But this resulted in a severe performance mismatch with the e2e tests inSerializationFrameworkMiniBenchmarks: by default Flink is using own offheap-based implementation of memory operations, which has completely different performance characteristics.So, for example, on
StringSerializationBenchmarkFLINK-14346 implementation of string serialization heavily outperformed the original implementation, being faster up to 15 times. But onSerializationFrameworkMiniBenchmarksthere was performance degradation instead.In this PR we do the following improvements:
OffheapInputWrapperwhich is using exactly the same implementations of memory read operations as in default Flink withHybridMemorySegmentDataOutputSerializerto match the default one used in Flink for memory write operations.StringSerializationBenchmarkandPojoSerializationBenchmarknow use proper read-write primitives, matching the behavior ofSerializationFrameworkMiniBenchmarks.These improvements allowed to reliably reproduce the mysterious performance regression discussed in FLINK-15171.