From 32cfb9272e50b16d87b1eb32983191c9599b0580 Mon Sep 17 00:00:00 2001 From: azagrebin Date: Wed, 6 Feb 2019 15:38:57 +0100 Subject: [PATCH] [FLINK-10471] Add Apache Flink specific compaction filter to evict expired state which has time-to-live --- CMakeLists.txt | 2 + Makefile | 4 + TARGETS | 8 + Vagrantfile | 2 +- java/CMakeLists.txt | 3 + java/Makefile | 1 + java/crossbuild/Vagrantfile | 2 +- java/rocksjni/flink_compactionfilterjni.cc | 239 ++++++++++++ .../org/rocksdb/FlinkCompactionFilter.java | 175 +++++++++ .../src/test/java/org/rocksdb/FilterTest.java | 2 +- .../rocksdb/FlinkCompactionFilterTest.java | 356 ++++++++++++++++++ src.mk | 3 + utilities/flink/flink_compaction_filter.cc | 206 ++++++++++ utilities/flink/flink_compaction_filter.h | 191 ++++++++++ .../flink/flink_compaction_filter_test.cc | 226 +++++++++++ 15 files changed, 1417 insertions(+), 3 deletions(-) create mode 100644 java/rocksjni/flink_compactionfilterjni.cc create mode 100644 java/src/main/java/org/rocksdb/FlinkCompactionFilter.java create mode 100644 java/src/test/java/org/rocksdb/FlinkCompactionFilterTest.java create mode 100644 utilities/flink/flink_compaction_filter.cc create mode 100644 utilities/flink/flink_compaction_filter.h create mode 100644 utilities/flink/flink_compaction_filter_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 302575cb7..4b21a6888 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -728,6 +728,7 @@ set(SOURCES utilities/debug.cc utilities/env_mirror.cc utilities/env_timed.cc + utilities/flink/flink_compaction_filter.cc utilities/leveldb_options/leveldb_options.cc utilities/memory/memory_util.cc utilities/merge_operators/bytesxor.cc @@ -1095,6 +1096,7 @@ if(WITH_TESTS) utilities/cassandra/cassandra_format_test.cc utilities/cassandra/cassandra_row_merge_test.cc utilities/cassandra/cassandra_serialize_test.cc + utilities/flink/flink_compaction_filter_test.cc utilities/checkpoint/checkpoint_test.cc utilities/memory/memory_test.cc utilities/merge_operators/string_append/stringappend_test.cc diff --git a/Makefile b/Makefile index c44c5ab66..0c2f8b8e6 100644 --- a/Makefile +++ b/Makefile @@ -569,6 +569,7 @@ TESTS = \ compaction_picker_test \ version_builder_test \ file_indexer_test \ + flink_compaction_filter_test \ write_batch_test \ write_batch_with_index_test \ write_controller_test\ @@ -1324,6 +1325,9 @@ cassandra_serialize_test: utilities/cassandra/cassandra_serialize_test.o $(LIBOB hash_table_test: utilities/persistent_cache/hash_table_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +flink_compaction_filter_test: utilities/flink/flink_compaction_filter_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + histogram_test: monitoring/histogram_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index c511d7ca9..8c1a36079 100644 --- a/TARGETS +++ b/TARGETS @@ -331,6 +331,7 @@ cpp_library( "utilities/debug.cc", "utilities/env_mirror.cc", "utilities/env_timed.cc", + "utilities/flink/flink_compaction_filter.cc", "utilities/leveldb_options/leveldb_options.cc", "utilities/memory/memory_util.cc", "utilities/merge_operators/bytesxor.cc", @@ -1084,6 +1085,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "flink_compaction_filter_test", + "utilities/flink/flink_compaction_filter_test.cc", + "serial", + [], + [], + ], [ "flush_job_test", "db/flush_job_test.cc", diff --git a/Vagrantfile b/Vagrantfile index 07f2e99fd..3dcedaf76 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -2,7 +2,7 @@ Vagrant.configure("2") do |config| config.vm.provider "virtualbox" do |v| - v.memory = 4096 + v.memory = 6096 v.cpus = 2 end diff --git a/java/CMakeLists.txt b/java/CMakeLists.txt index 828803bfb..56b95834d 100644 --- a/java/CMakeLists.txt +++ b/java/CMakeLists.txt @@ -29,6 +29,7 @@ set(JNI_NATIVE_SOURCES rocksjni/config_options.cc rocksjni/env.cc rocksjni/env_options.cc + rocksjni/flink_compactionfilterjni.cc rocksjni/filter.cc rocksjni/ingest_external_file_options.cc rocksjni/iterator.cc @@ -140,6 +141,7 @@ set(JAVA_MAIN_CLASSES src/main/java/org/rocksdb/EnvOptions.java src/main/java/org/rocksdb/Experimental.java src/main/java/org/rocksdb/Filter.java + src/main/java/org/rocksdb/FlinkCompactionFilter.java src/main/java/org/rocksdb/FlushOptions.java src/main/java/org/rocksdb/HashLinkedListMemTableConfig.java src/main/java/org/rocksdb/HashSkipListMemTableConfig.java @@ -415,6 +417,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7" org.rocksdb.Env org.rocksdb.EnvOptions org.rocksdb.Filter + org.rocksdb.FlinkCompactionFilter org.rocksdb.FlushOptions org.rocksdb.HashLinkedListMemTableConfig org.rocksdb.HashSkipListMemTableConfig diff --git a/java/Makefile b/java/Makefile index 12eb95f03..c46eab1f7 100644 --- a/java/Makefile +++ b/java/Makefile @@ -30,6 +30,7 @@ NATIVE_JAVA_CLASSES = \ org.rocksdb.DirectSlice\ org.rocksdb.Env\ org.rocksdb.EnvOptions\ + org.rocksdb.FlinkCompactionFilter\ org.rocksdb.FlushOptions\ org.rocksdb.Filter\ org.rocksdb.IngestExternalFileOptions\ diff --git a/java/crossbuild/Vagrantfile b/java/crossbuild/Vagrantfile index 0ee50de2c..a3035e683 100644 --- a/java/crossbuild/Vagrantfile +++ b/java/crossbuild/Vagrantfile @@ -33,7 +33,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| end config.vm.provider "virtualbox" do |v| - v.memory = 2048 + v.memory = 6048 v.cpus = 4 v.customize ["modifyvm", :id, "--nictype1", "virtio" ] end diff --git a/java/rocksjni/flink_compactionfilterjni.cc b/java/rocksjni/flink_compactionfilterjni.cc new file mode 100644 index 000000000..9f0954b43 --- /dev/null +++ b/java/rocksjni/flink_compactionfilterjni.cc @@ -0,0 +1,239 @@ +#include // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include + +#include "include/org_rocksdb_FlinkCompactionFilter.h" +#include "loggerjnicallback.h" +#include "portal.h" +#include "rocksjni/jnicallback.h" +#include "utilities/flink/flink_compaction_filter.h" + +using namespace ROCKSDB_NAMESPACE::flink; + +class JniCallbackBase : public ROCKSDB_NAMESPACE::JniCallback { + public: + JniCallbackBase(JNIEnv* env, jobject jcallback_obj) + : JniCallback(env, jcallback_obj) {} + + protected: + inline void CheckAndRethrowException(JNIEnv* env) const { + if (env->ExceptionCheck()) { + env->ExceptionDescribe(); + env->Throw(env->ExceptionOccurred()); + } + } +}; + +// This list element filter operates on list state for which byte length of +// elements is unknown (variable), the list element serializer has to be used in +// this case to compute the offset of the next element. The filter wraps java +// object implenented in Flink. The java object holds element serializer and +// performs filtering. +class JavaListElementFilter + : public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ListElementFilter, + JniCallbackBase { + public: + JavaListElementFilter(JNIEnv* env, jobject jlist_filter) + : JniCallbackBase(env, jlist_filter) { + jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass( + env, "org/rocksdb/FlinkCompactionFilter$ListElementFilter"); + if (jclazz == nullptr) { + // exception occurred accessing class + return; + } + m_jnext_unexpired_offset_methodid = + env->GetMethodID(jclazz, "nextUnexpiredOffset", "([BJJ)I"); + assert(m_jnext_unexpired_offset_methodid != nullptr); + } + + std::size_t NextUnexpiredOffset(const ROCKSDB_NAMESPACE::Slice& list, + int64_t ttl, + int64_t current_timestamp) const override { + jboolean attached_thread = JNI_FALSE; + JNIEnv* env = getJniEnv(&attached_thread); + jbyteArray jlist = ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, list); + CheckAndRethrowException(env); + if (jlist == nullptr) { + return static_cast(-1); + } + auto jl_ttl = static_cast(ttl); + auto jl_current_timestamp = static_cast(current_timestamp); + jint next_offset = + env->CallIntMethod(m_jcallback_obj, m_jnext_unexpired_offset_methodid, + jlist, jl_ttl, jl_current_timestamp); + CheckAndRethrowException(env); + env->DeleteLocalRef(jlist); + releaseJniEnv(attached_thread); + return static_cast(next_offset); + }; + + private: + jmethodID m_jnext_unexpired_offset_methodid; +}; + +class JavaListElemenFilterFactory + : public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter:: + ListElementFilterFactory, + JniCallbackBase { + public: + JavaListElemenFilterFactory(JNIEnv* env, jobject jlist_filter_factory) + : JniCallbackBase(env, jlist_filter_factory) { + jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass( + env, "org/rocksdb/FlinkCompactionFilter$ListElementFilterFactory"); + if (jclazz == nullptr) { + // exception occurred accessing class + return; + } + m_jcreate_filter_methodid = env->GetMethodID( + jclazz, "createListElementFilter", + "()Lorg/rocksdb/FlinkCompactionFilter$ListElementFilter;"); + assert(m_jcreate_filter_methodid != nullptr); + } + + FlinkCompactionFilter::ListElementFilter* CreateListElementFilter( + std::shared_ptr /*logger*/) const override { + jboolean attached_thread = JNI_FALSE; + JNIEnv* env = getJniEnv(&attached_thread); + auto jlist_filter = + env->CallObjectMethod(m_jcallback_obj, m_jcreate_filter_methodid); + auto list_filter = new JavaListElementFilter(env, jlist_filter); + CheckAndRethrowException(env); + releaseJniEnv(attached_thread); + return list_filter; + }; + + private: + jmethodID m_jcreate_filter_methodid; +}; + +class JavaTimeProvider + : public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::TimeProvider, + JniCallbackBase { + public: + JavaTimeProvider(JNIEnv* env, jobject jtime_provider) + : JniCallbackBase(env, jtime_provider) { + jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass( + env, "org/rocksdb/FlinkCompactionFilter$TimeProvider"); + if (jclazz == nullptr) { + // exception occurred accessing class + return; + } + m_jcurrent_timestamp_methodid = + env->GetMethodID(jclazz, "currentTimestamp", "()J"); + assert(m_jcurrent_timestamp_methodid != nullptr); + } + + int64_t CurrentTimestamp() const override { + jboolean attached_thread = JNI_FALSE; + JNIEnv* env = getJniEnv(&attached_thread); + auto jtimestamp = + env->CallLongMethod(m_jcallback_obj, m_jcurrent_timestamp_methodid); + CheckAndRethrowException(env); + releaseJniEnv(attached_thread); + return static_cast(jtimestamp); + }; + + private: + jmethodID m_jcurrent_timestamp_methodid; +}; + +static FlinkCompactionFilter::ListElementFilterFactory* +createListElementFilterFactory(JNIEnv* env, jint ji_list_elem_len, + jobject jlist_filter_factory) { + FlinkCompactionFilter::ListElementFilterFactory* list_filter_factory = + nullptr; + if (ji_list_elem_len > 0) { + auto fixed_size = static_cast(ji_list_elem_len); + list_filter_factory = + new FlinkCompactionFilter::FixedListElementFilterFactory( + fixed_size, static_cast(0)); + } else if (jlist_filter_factory != nullptr) { + list_filter_factory = + new JavaListElemenFilterFactory(env, jlist_filter_factory); + } + return list_filter_factory; +} + +/*x + * Class: org_rocksdb_FlinkCompactionFilter + * Method: createNewFlinkCompactionFilterConfigHolder + * Signature: ()J + */ +jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilterConfigHolder( + JNIEnv* /* env */, jclass /* jcls */) { + using namespace ROCKSDB_NAMESPACE::flink; + return reinterpret_cast( + new std::shared_ptr( + new FlinkCompactionFilter::ConfigHolder())); +} + +/* + * Class: org_rocksdb_FlinkCompactionFilter + * Method: disposeFlinkCompactionFilterConfigHolder + * Signature: (J)V + */ +void Java_org_rocksdb_FlinkCompactionFilter_disposeFlinkCompactionFilterConfigHolder( + JNIEnv* /* env */, jclass /* jcls */, jlong handle) { + using namespace ROCKSDB_NAMESPACE::flink; + auto* config_holder = + reinterpret_cast*>( + handle); + delete config_holder; +} + +/* + * Class: org_rocksdb_FlinkCompactionFilter + * Method: createNewFlinkCompactionFilter0 + * Signature: (JJJ)J + */ +jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilter0( + JNIEnv* env, jclass /* jcls */, jlong config_holder_handle, + jobject jtime_provider, jlong logger_handle) { + using namespace ROCKSDB_NAMESPACE::flink; + auto config_holder = + *(reinterpret_cast*>( + config_holder_handle)); + auto time_provider = new JavaTimeProvider(env, jtime_provider); + auto logger = + logger_handle == 0 + ? nullptr + : *(reinterpret_cast< + std::shared_ptr*>( + logger_handle)); + return reinterpret_cast(new FlinkCompactionFilter( + config_holder, + std::unique_ptr(time_provider), + logger)); +} + +/* + * Class: org_rocksdb_FlinkCompactionFilter + * Method: configureFlinkCompactionFilter + * Signature: (JIIJJILorg/rocksdb/FlinkCompactionFilter$ListElementFilter;)Z + */ +jboolean Java_org_rocksdb_FlinkCompactionFilter_configureFlinkCompactionFilter( + JNIEnv* env, jclass /* jcls */, jlong handle, jint ji_state_type, + jint ji_timestamp_offset, jlong jl_ttl_milli, + jlong jquery_time_after_num_entries, jint ji_list_elem_len, + jobject jlist_filter_factory) { + auto state_type = + static_cast(ji_state_type); + auto timestamp_offset = static_cast(ji_timestamp_offset); + auto ttl = static_cast(jl_ttl_milli); + auto query_time_after_num_entries = + static_cast(jquery_time_after_num_entries); + auto config_holder = + *(reinterpret_cast*>( + handle)); + auto list_filter_factory = createListElementFilterFactory( + env, ji_list_elem_len, jlist_filter_factory); + auto config = new FlinkCompactionFilter::Config{ + state_type, timestamp_offset, ttl, query_time_after_num_entries, + std::unique_ptr( + list_filter_factory)}; + return static_cast(config_holder->Configure(config)); +} \ No newline at end of file diff --git a/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java b/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java new file mode 100644 index 000000000..7f64469bd --- /dev/null +++ b/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java @@ -0,0 +1,175 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +/** + * Just a Java wrapper around FlinkCompactionFilter implemented in C++. + * + * Note: this compaction filter is a special implementation, designed for usage only in Apache Flink + * project. + */ +public class FlinkCompactionFilter extends AbstractCompactionFilter { + public enum StateType { + // WARNING!!! Do not change the order of enum entries as it is important for jni translation + Disabled, + Value, + List + } + + public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvider) { + this(configHolder, timeProvider, null); + } + + public FlinkCompactionFilter( + ConfigHolder configHolder, TimeProvider timeProvider, Logger logger) { + super(createNewFlinkCompactionFilter0( + configHolder.nativeHandle_, timeProvider, logger == null ? 0 : logger.nativeHandle_)); + } + + private native static long createNewFlinkCompactionFilter0( + long configHolderHandle, TimeProvider timeProvider, long loggerHandle); + private native static long createNewFlinkCompactionFilterConfigHolder(); + private native static void disposeFlinkCompactionFilterConfigHolder(long configHolderHandle); + private native static boolean configureFlinkCompactionFilter(long configHolderHandle, + int stateType, int timestampOffset, long ttl, long queryTimeAfterNumEntries, + int fixedElementLength, ListElementFilterFactory listElementFilterFactory); + + public interface ListElementFilter { + /** + * Gets offset of the first unexpired element in the list. + * + *

Native code wraps this java object and calls it for list state + * for which element byte length is unknown and Flink custom type serializer has to be used + * to compute offset of the next element in serialized form. + * + * @param list serialised list of elements with timestamp + * @param ttl time-to-live of the list elements + * @param currentTimestamp current timestamp to check expiration against + * @return offset of the first unexpired element in the list + */ + @SuppressWarnings("unused") + int nextUnexpiredOffset(byte[] list, long ttl, long currentTimestamp); + } + + public interface ListElementFilterFactory { + @SuppressWarnings("unused") ListElementFilter createListElementFilter(); + } + + public static class Config { + final StateType stateType; + final int timestampOffset; + final long ttl; + /** + * Number of state entries to process by compaction filter before updating current timestamp. + */ + final long queryTimeAfterNumEntries; + final int fixedElementLength; + final ListElementFilterFactory listElementFilterFactory; + + private Config(StateType stateType, int timestampOffset, long ttl, + long queryTimeAfterNumEntries, int fixedElementLength, + ListElementFilterFactory listElementFilterFactory) { + this.stateType = stateType; + this.timestampOffset = timestampOffset; + this.ttl = ttl; + this.queryTimeAfterNumEntries = queryTimeAfterNumEntries; + this.fixedElementLength = fixedElementLength; + this.listElementFilterFactory = listElementFilterFactory; + } + + @SuppressWarnings("WeakerAccess") + public static Config createNotList( + StateType stateType, int timestampOffset, long ttl, long queryTimeAfterNumEntries) { + return new Config(stateType, timestampOffset, ttl, queryTimeAfterNumEntries, -1, null); + } + + @SuppressWarnings("unused") + public static Config createForValue(long ttl, long queryTimeAfterNumEntries) { + return createNotList(StateType.Value, 0, ttl, queryTimeAfterNumEntries); + } + + @SuppressWarnings("unused") + public static Config createForMap(long ttl, long queryTimeAfterNumEntries) { + return createNotList(StateType.Value, 1, ttl, queryTimeAfterNumEntries); + } + + @SuppressWarnings("WeakerAccess") + public static Config createForFixedElementList( + long ttl, long queryTimeAfterNumEntries, int fixedElementLength) { + return new Config(StateType.List, 0, ttl, queryTimeAfterNumEntries, fixedElementLength, null); + } + + @SuppressWarnings("WeakerAccess") + public static Config createForList(long ttl, long queryTimeAfterNumEntries, + ListElementFilterFactory listElementFilterFactory) { + return new Config( + StateType.List, 0, ttl, queryTimeAfterNumEntries, -1, listElementFilterFactory); + } + } + + private static class ConfigHolder extends RocksObject { + ConfigHolder() { + super(createNewFlinkCompactionFilterConfigHolder()); + } + + @Override + protected void disposeInternal(long handle) { + disposeFlinkCompactionFilterConfigHolder(handle); + } + } + + /** Provides current timestamp to check expiration, it must be thread safe. */ + public interface TimeProvider { long currentTimestamp(); } + + public static class FlinkCompactionFilterFactory + extends AbstractCompactionFilterFactory { + private final ConfigHolder configHolder; + private final TimeProvider timeProvider; + private final Logger logger; + + @SuppressWarnings("unused") + public FlinkCompactionFilterFactory(TimeProvider timeProvider) { + this(timeProvider, null); + } + + @SuppressWarnings("WeakerAccess") + public FlinkCompactionFilterFactory(TimeProvider timeProvider, Logger logger) { + this.configHolder = new ConfigHolder(); + this.timeProvider = timeProvider; + this.logger = logger; + } + + @Override + public void close() { + super.close(); + configHolder.close(); + if (logger != null) { + logger.close(); + } + } + + @Override + public FlinkCompactionFilter createCompactionFilter(Context context) { + return new FlinkCompactionFilter(configHolder, timeProvider, logger); + } + + @Override + public String name() { + return "FlinkCompactionFilterFactory"; + } + + @SuppressWarnings("WeakerAccess") + public void configure(Config config) { + boolean already_configured = + !configureFlinkCompactionFilter(configHolder.nativeHandle_, config.stateType.ordinal(), + config.timestampOffset, config.ttl, config.queryTimeAfterNumEntries, + config.fixedElementLength, config.listElementFilterFactory); + if (already_configured) { + throw new IllegalStateException("Compaction filter is already configured"); + } + } + } +} diff --git a/java/src/test/java/org/rocksdb/FilterTest.java b/java/src/test/java/org/rocksdb/FilterTest.java index dc5c19fbc..e308ffefb 100644 --- a/java/src/test/java/org/rocksdb/FilterTest.java +++ b/java/src/test/java/org/rocksdb/FilterTest.java @@ -16,7 +16,7 @@ public class FilterTest { @Test public void filter() { - // new Bloom filter + // new Bloom filterFactory final BlockBasedTableConfig blockConfig = new BlockBasedTableConfig(); try(final Options options = new Options()) { diff --git a/java/src/test/java/org/rocksdb/FlinkCompactionFilterTest.java b/java/src/test/java/org/rocksdb/FlinkCompactionFilterTest.java new file mode 100644 index 000000000..40320e9d5 --- /dev/null +++ b/java/src/test/java/org/rocksdb/FlinkCompactionFilterTest.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.rocksdb; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.FlinkCompactionFilter.StateType; +import org.rocksdb.FlinkCompactionFilter.TimeProvider; + +public class FlinkCompactionFilterTest { + private static final int LONG_LENGTH = 8; + private static final int INT_LENGTH = 4; + private static final String MERGE_OPERATOR_NAME = "stringappendtest"; + private static final byte DELIMITER = ','; + private static final long TTL = 100; + private static final long QUERY_TIME_AFTER_NUM_ENTRIES = 100; + private static final int TEST_TIMESTAMP_OFFSET = 2; + private static final Random rnd = new Random(); + + private TestTimeProvider timeProvider; + private List stateContexts; + private List cfDescs; + private List cfHandles; + + @Rule public TemporaryFolder dbFolder = new TemporaryFolder(); + + @Before + public void init() { + timeProvider = new TestTimeProvider(); + timeProvider.time = rnd.nextLong(); + stateContexts = + Arrays.asList(new StateContext(StateType.Value, timeProvider, TEST_TIMESTAMP_OFFSET), + new FixedElementListStateContext(timeProvider), + new NonFixedElementListStateContext(timeProvider)); + cfDescs = new ArrayList<>(); + cfHandles = new ArrayList<>(); + cfDescs.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); + for (StateContext stateContext : stateContexts) { + cfDescs.add(stateContext.getCfDesc()); + } + } + + @After + public void cleanup() { + for (StateContext stateContext : stateContexts) { + stateContext.cfDesc.getOptions().close(); + stateContext.filterFactory.close(); + } + } + + @Test + public void checkStateTypeEnumOrder() { + // if the order changes it also needs to be adjusted + // in utilities/flink/flink_compaction_filter.h + // and in utilities/flink/flink_compaction_filter_test.cc + assertThat(StateType.Disabled.ordinal()).isEqualTo(0); + assertThat(StateType.Value.ordinal()).isEqualTo(1); + assertThat(StateType.List.ordinal()).isEqualTo(2); + } + + @Test + public void testCompactionFilter() throws RocksDBException { + try (DBOptions options = createDbOptions(); RocksDB rocksDb = setupDb(options)) { + try { + for (StateContext stateContext : stateContexts) { + stateContext.updateValueWithTimestamp(rocksDb); + stateContext.checkUnexpired(rocksDb); + rocksDb.compactRange(stateContext.columnFamilyHandle); + stateContext.checkUnexpired(rocksDb); + } + + timeProvider.time += TTL + TTL / 2; // expire state + + for (StateContext stateContext : stateContexts) { + stateContext.checkUnexpired(rocksDb); + rocksDb.compactRange(stateContext.columnFamilyHandle); + stateContext.checkExpired(rocksDb); + rocksDb.compactRange(stateContext.columnFamilyHandle); + } + } finally { + for (ColumnFamilyHandle cfHandle : cfHandles) { + cfHandle.close(); + } + } + } + } + + private static DBOptions createDbOptions() { + return new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true); + } + + private RocksDB setupDb(DBOptions options) throws RocksDBException { + RocksDB db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath(), cfDescs, cfHandles); + for (int i = 0; i < stateContexts.size(); i++) { + stateContexts.get(i).columnFamilyHandle = cfHandles.get(i + 1); + } + return db; + } + + private static class StateContext { + private final String cf; + final String key; + final ColumnFamilyDescriptor cfDesc; + final String userValue; + final long currentTime; + final FlinkCompactionFilter.FlinkCompactionFilterFactory filterFactory; + + ColumnFamilyHandle columnFamilyHandle; + + private StateContext(StateType type, TimeProvider timeProvider, int timestampOffset) { + this.currentTime = timeProvider.currentTimestamp(); + userValue = type.name() + "StateValue"; + cf = getClass().getSimpleName() + "StateCf"; + key = type.name() + "StateKey"; + filterFactory = + new FlinkCompactionFilter.FlinkCompactionFilterFactory(timeProvider, createLogger()); + filterFactory.configure(createConfig(type, timestampOffset)); + cfDesc = new ColumnFamilyDescriptor(getASCII(cf), getOptionsWithFilter(filterFactory)); + } + + private Logger createLogger() { + try (DBOptions opts = new DBOptions().setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL)) { + return new Logger(opts) { + @Override + protected void log(InfoLogLevel infoLogLevel, String logMsg) { + System.out.println(infoLogLevel + ": " + logMsg); + } + }; + } + } + + FlinkCompactionFilter.Config createConfig(StateType type, int timestampOffset) { + return FlinkCompactionFilter.Config.createNotList( + type, timestampOffset, TTL, QUERY_TIME_AFTER_NUM_ENTRIES); + } + + private static ColumnFamilyOptions getOptionsWithFilter( + FlinkCompactionFilter.FlinkCompactionFilterFactory filterFactory) { + return new ColumnFamilyOptions() + .setCompactionFilterFactory(filterFactory) + .setMergeOperatorName(MERGE_OPERATOR_NAME); + } + + public String getKey() { + return key; + } + + ColumnFamilyDescriptor getCfDesc() { + return cfDesc; + } + + byte[] getValueWithTimestamp(RocksDB db) throws RocksDBException { + return db.get(columnFamilyHandle, getASCII(key)); + } + + void updateValueWithTimestamp(RocksDB db) throws RocksDBException { + db.put(columnFamilyHandle, getASCII(key), valueWithTimestamp()); + } + + byte[] valueWithTimestamp() { + return valueWithTimestamp(TEST_TIMESTAMP_OFFSET); + } + + byte[] valueWithTimestamp(@SuppressWarnings("SameParameterValue") int offset) { + return valueWithTimestamp(offset, currentTime); + } + + byte[] valueWithTimestamp(int offset, long timestamp) { + ByteBuffer buffer = getByteBuffer(offset); + buffer.put(new byte[offset]); + appendValueWithTimestamp(buffer, userValue, timestamp); + return buffer.array(); + } + + void appendValueWithTimestamp(ByteBuffer buffer, String value, long timestamp) { + buffer.putLong(timestamp); + buffer.putInt(value.length()); + buffer.put(getASCII(value)); + } + + ByteBuffer getByteBuffer(int offset) { + int length = offset + LONG_LENGTH + INT_LENGTH + userValue.length(); + return ByteBuffer.allocate(length); + } + + byte[] unexpiredValue() { + return valueWithTimestamp(); + } + + byte[] expiredValue() { + return null; + } + + void checkUnexpired(RocksDB db) throws RocksDBException { + assertThat(getValueWithTimestamp(db)).isEqualTo(unexpiredValue()); + } + + void checkExpired(RocksDB db) throws RocksDBException { + assertThat(getValueWithTimestamp(db)).isEqualTo(expiredValue()); + } + } + + private static class FixedElementListStateContext extends StateContext { + private FixedElementListStateContext(TimeProvider timeProvider) { + super(StateType.List, timeProvider, 0); + } + + @Override + FlinkCompactionFilter.Config createConfig(StateType type, int timestampOffset) { + // return FlinkCompactionFilter.Config.createForList(TTL, QUERY_TIME_AFTER_NUM_ENTRIES, + // ELEM_FILTER_FACTORY); + return FlinkCompactionFilter.Config.createForFixedElementList( + TTL, QUERY_TIME_AFTER_NUM_ENTRIES, 13 + userValue.getBytes().length); + } + + @Override + void updateValueWithTimestamp(RocksDB db) throws RocksDBException { + db.merge(columnFamilyHandle, getASCII(key), listExpired(3)); + db.merge(columnFamilyHandle, getASCII(key), mixedList(2, 3)); + db.merge(columnFamilyHandle, getASCII(key), listUnexpired(4)); + } + + @Override + byte[] unexpiredValue() { + return mixedList(5, 7); + } + + byte[] mergeBytes(byte[]... bytes) { + int length = 0; + for (byte[] a : bytes) { + length += a.length; + } + ByteBuffer buffer = ByteBuffer.allocate(length); + for (byte[] a : bytes) { + buffer.put(a); + } + return buffer.array(); + } + + @Override + byte[] expiredValue() { + return listUnexpired(7); + } + + private byte[] mixedList(int numberOfExpiredElements, int numberOfUnexpiredElements) { + assert numberOfExpiredElements > 0; + assert numberOfUnexpiredElements > 0; + return mergeBytes(listExpired(numberOfExpiredElements), new byte[] {DELIMITER}, + listUnexpired(numberOfUnexpiredElements)); + } + + private byte[] listExpired(int numberOfElements) { + return list(numberOfElements, currentTime); + } + + private byte[] listUnexpired(int numberOfElements) { + return list(numberOfElements, currentTime + TTL); + } + + private byte[] list(int numberOfElements, long timestamp) { + ByteBuffer buffer = getByteBufferForList(numberOfElements); + for (int i = 0; i < numberOfElements; i++) { + appendValueWithTimestamp(buffer, userValue, timestamp); + if (i < numberOfElements - 1) { + buffer.put(DELIMITER); + } + } + return buffer.array(); + } + + private ByteBuffer getByteBufferForList(int numberOfElements) { + int length = ((LONG_LENGTH + INT_LENGTH + userValue.length() + 1) * numberOfElements) - 1; + return ByteBuffer.allocate(length); + } + } + + private static class NonFixedElementListStateContext extends FixedElementListStateContext { + private static FlinkCompactionFilter.ListElementFilterFactory ELEM_FILTER_FACTORY = + new ListElementFilterFactory(); + + private NonFixedElementListStateContext(TimeProvider timeProvider) { + super(timeProvider); + } + + @Override + FlinkCompactionFilter.Config createConfig(StateType type, int timestampOffset) { + // return FlinkCompactionFilter.Config.createForList(TTL, QUERY_TIME_AFTER_NUM_ENTRIES, + // ELEM_FILTER_FACTORY); + return FlinkCompactionFilter.Config.createForList( + TTL, QUERY_TIME_AFTER_NUM_ENTRIES, ELEM_FILTER_FACTORY); + } + + private static class ListElementFilterFactory + implements FlinkCompactionFilter.ListElementFilterFactory { + @Override + public FlinkCompactionFilter.ListElementFilter createListElementFilter() { + return new FlinkCompactionFilter.ListElementFilter() { + @Override + public int nextUnexpiredOffset(byte[] list, long ttl, long currentTimestamp) { + int currentOffset = 0; + while (currentOffset < list.length) { + ByteBuffer bf = ByteBuffer.wrap(list, currentOffset, list.length - currentOffset); + long timestamp = bf.getLong(); + if (timestamp + ttl > currentTimestamp) { + break; + } + int elemLen = bf.getInt(8); + currentOffset += 13 + elemLen; + } + return currentOffset; + } + }; + } + } + } + + private static byte[] getASCII(String str) { + return str.getBytes(StandardCharsets.US_ASCII); + } + + private static class TestTimeProvider implements TimeProvider { + private long time; + + @Override + public long currentTimestamp() { + return time; + } + } +} \ No newline at end of file diff --git a/src.mk b/src.mk index 1c28f3e0e..3ca7b9011 100644 --- a/src.mk +++ b/src.mk @@ -210,6 +210,7 @@ LIB_SOURCES = \ utilities/debug.cc \ utilities/env_mirror.cc \ utilities/env_timed.cc \ + utilities/flink/flink_compaction_filter.cc \ utilities/leveldb_options/leveldb_options.cc \ utilities/memory/memory_util.cc \ utilities/merge_operators/max.cc \ @@ -463,6 +464,7 @@ MAIN_SOURCES = \ utilities/cassandra/cassandra_row_merge_test.cc \ utilities/cassandra/cassandra_serialize_test.cc \ utilities/checkpoint/checkpoint_test.cc \ + utilities/flink/flink_compaction_filter_test.cc \ utilities/memory/memory_test.cc \ utilities/merge_operators/string_append/stringappend_test.cc \ utilities/object_registry_test.cc \ @@ -500,6 +502,7 @@ JNI_NATIVE_SOURCES = \ java/rocksjni/config_options.cc \ java/rocksjni/env.cc \ java/rocksjni/env_options.cc \ + java/rocksjni/flink_compactionfilterjni.cc \ java/rocksjni/ingest_external_file_options.cc \ java/rocksjni/filter.cc \ java/rocksjni/iterator.cc \ diff --git a/utilities/flink/flink_compaction_filter.cc b/utilities/flink/flink_compaction_filter.cc new file mode 100644 index 000000000..4cbdd7e7d --- /dev/null +++ b/utilities/flink/flink_compaction_filter.cc @@ -0,0 +1,206 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/flink/flink_compaction_filter.h" + +#include +#include + +namespace ROCKSDB_NAMESPACE { +namespace flink { + +int64_t DeserializeTimestamp(const char* src, std::size_t offset) { + uint64_t result = 0; + for (unsigned long i = 0; i < sizeof(uint64_t); i++) { + result |= static_cast(static_cast(src[offset + i])) + << ((sizeof(int64_t) - 1 - i) * BITS_PER_BYTE); + } + return static_cast(result); +} + +CompactionFilter::Decision Decide(const char* ts_bytes, const int64_t ttl, + const std::size_t timestamp_offset, + const int64_t current_timestamp, + const std::shared_ptr& logger) { + int64_t timestamp = DeserializeTimestamp(ts_bytes, timestamp_offset); + const int64_t ttlWithoutOverflow = + timestamp > 0 ? std::min(JAVA_MAX_LONG - timestamp, ttl) : ttl; + Debug(logger.get(), + "Last access timestamp: %" PRId64 " ms, ttlWithoutOverflow: %" PRId64 + " ms, Current timestamp: %" PRId64 " ms", + timestamp, ttlWithoutOverflow, current_timestamp); + return timestamp + ttlWithoutOverflow <= current_timestamp + ? CompactionFilter::Decision::kRemove + : CompactionFilter::Decision::kKeep; +} + +FlinkCompactionFilter::ConfigHolder::ConfigHolder() + : config_(const_cast(&DISABLED_CONFIG)){}; + +FlinkCompactionFilter::ConfigHolder::~ConfigHolder() { + Config* config = config_.load(); + if (config != &DISABLED_CONFIG) { + delete config; + } +} + +// at the moment Flink configures filters (can be already created) only once +// when user creates state otherwise it can lead to ListElementFilter leak in +// Config or race between its delete in Configure() and usage in FilterV2() the +// method returns true if it was configured before +bool FlinkCompactionFilter::ConfigHolder::Configure(Config* config) { + bool not_configured = GetConfig() == &DISABLED_CONFIG; + if (not_configured) { + assert(config->query_time_after_num_entries_ >= 0); + config_ = config; + } + return not_configured; +} + +FlinkCompactionFilter::Config* +FlinkCompactionFilter::ConfigHolder::GetConfig() { + return config_.load(); +} + +std::size_t FlinkCompactionFilter::FixedListElementFilter::NextUnexpiredOffset( + const Slice& list, int64_t ttl, int64_t current_timestamp) const { + std::size_t offset = 0; + while (offset < list.size()) { + Decision decision = Decide(list.data(), ttl, offset + timestamp_offset_, + current_timestamp, logger_); + if (decision != Decision::kKeep) { + std::size_t new_offset = offset + fixed_size_; + if (new_offset >= JAVA_MAX_SIZE || new_offset < offset) { + return JAVA_MAX_SIZE; + } + offset = new_offset; + } else { + break; + } + } + return offset; +} + +const char* FlinkCompactionFilter::Name() const { + return "FlinkCompactionFilter"; +} + +FlinkCompactionFilter::FlinkCompactionFilter( + std::shared_ptr config_holder, + std::unique_ptr time_provider) + : FlinkCompactionFilter(std::move(config_holder), std::move(time_provider), + nullptr){}; + +FlinkCompactionFilter::FlinkCompactionFilter( + std::shared_ptr config_holder, + std::unique_ptr time_provider, std::shared_ptr logger) + : config_holder_(std::move(config_holder)), + time_provider_(std::move(time_provider)), + logger_(std::move(logger)), + config_cached_(const_cast(&DISABLED_CONFIG)){}; + +inline void FlinkCompactionFilter::InitConfigIfNotYet() const { + const_cast(this)->config_cached_ = + config_cached_ == &DISABLED_CONFIG ? config_holder_->GetConfig() + : config_cached_; +} + +CompactionFilter::Decision FlinkCompactionFilter::FilterV2( + int /*level*/, const Slice& key, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* /*skip_until*/) const { + InitConfigIfNotYet(); + CreateListElementFilterIfNull(); + UpdateCurrentTimestampIfStale(); + + const char* data = existing_value.data(); + + Debug(logger_.get(), + "Call FlinkCompactionFilter::FilterV2 - Key: %s, Data: %s, Value type: " + "%d, " + "State type: %d, TTL: %" PRId64 " ms, timestamp_offset: %zu", + key.ToString().c_str(), existing_value.ToString(true).c_str(), + value_type, config_cached_->state_type_, config_cached_->ttl_, + config_cached_->timestamp_offset_); + + // too short value to have timestamp at all + const bool tooShortValue = + existing_value.size() < + config_cached_->timestamp_offset_ + TIMESTAMP_BYTE_SIZE; + + const StateType state_type = config_cached_->state_type_; + const bool value_or_merge = + value_type == ValueType::kValue || value_type == ValueType::kMergeOperand; + const bool value_state = + state_type == StateType::Value && value_type == ValueType::kValue; + const bool list_entry = state_type == StateType::List && value_or_merge; + const bool toDecide = value_state || list_entry; + const bool list_filter = list_entry && list_element_filter_; + + Decision decision = Decision::kKeep; + if (!tooShortValue && toDecide) { + decision = list_filter ? ListDecide(existing_value, new_value) + : Decide(data, config_cached_->ttl_, + config_cached_->timestamp_offset_, + current_timestamp_, logger_); + } + Debug(logger_.get(), "Decision: %d", static_cast(decision)); + return decision; +} + +CompactionFilter::Decision FlinkCompactionFilter::ListDecide( + const Slice& existing_value, std::string* new_value) const { + std::size_t offset = 0; + if (offset < existing_value.size()) { + Decision decision = Decide(existing_value.data(), config_cached_->ttl_, + offset + config_cached_->timestamp_offset_, + current_timestamp_, logger_); + if (decision != Decision::kKeep) { + offset = + ListNextUnexpiredOffset(existing_value, offset, config_cached_->ttl_); + if (offset >= JAVA_MAX_SIZE) { + return Decision::kKeep; + } + } + } + if (offset >= existing_value.size()) { + return Decision::kRemove; + } else if (offset > 0) { + SetUnexpiredListValue(existing_value, offset, new_value); + return Decision::kChangeValue; + } + return Decision::kKeep; +} + +std::size_t FlinkCompactionFilter::ListNextUnexpiredOffset( + const Slice& existing_value, size_t offset, int64_t ttl) const { + std::size_t new_offset = list_element_filter_->NextUnexpiredOffset( + existing_value, ttl, current_timestamp_); + if (new_offset >= JAVA_MAX_SIZE || new_offset < offset) { + Error(logger_.get(), "Wrong next offset in list filter: %zu -> %zu", offset, + new_offset); + new_offset = JAVA_MAX_SIZE; + } else { + Debug(logger_.get(), "Next unexpired offset: %zu -> %zu", offset, + new_offset); + } + return new_offset; +} + +void FlinkCompactionFilter::SetUnexpiredListValue( + const Slice& existing_value, std::size_t offset, + std::string* new_value) const { + new_value->clear(); + auto new_value_char = existing_value.data() + offset; + auto new_value_size = existing_value.size() - offset; + new_value->assign(new_value_char, new_value_size); + Logger* logger = logger_.get(); + if (logger && logger->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) { + Slice new_value_slice = Slice(new_value_char, new_value_size); + Debug(logger, "New list value: %s", new_value_slice.ToString(true).c_str()); + } +} +} // namespace flink +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/flink/flink_compaction_filter.h b/utilities/flink/flink_compaction_filter.h new file mode 100644 index 000000000..3b3b651ea --- /dev/null +++ b/utilities/flink/flink_compaction_filter.h @@ -0,0 +1,191 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include + +#include +#include +#include +#include +#include + +#include "rocksdb/compaction_filter.h" +#include "rocksdb/slice.h" + +namespace ROCKSDB_NAMESPACE { +namespace flink { + +static const std::size_t BITS_PER_BYTE = static_cast(8); +static const std::size_t TIMESTAMP_BYTE_SIZE = static_cast(8); +static const int64_t JAVA_MIN_LONG = static_cast(0x8000000000000000); +static const int64_t JAVA_MAX_LONG = static_cast(0x7fffffffffffffff); +static const std::size_t JAVA_MAX_SIZE = static_cast(0x7fffffff); + +/** + * Compaction filter for removing expired Flink state entries with ttl. + * + * Note: this compaction filter is a special implementation, designed for usage + * only in Apache Flink project. + */ +class FlinkCompactionFilter : public CompactionFilter { + public: + enum StateType { + // WARNING!!! Do not change the order of enum entries as it is important for + // jni translation + Disabled, + Value, + List + }; + + // Provides current timestamp to check expiration, it must thread safe. + class TimeProvider { + public: + virtual ~TimeProvider() = default; + virtual int64_t CurrentTimestamp() const = 0; + }; + + // accepts serialized list state and checks elements for expiration starting + // from the head stops upon discovery of unexpired element and returns its + // offset or returns offset greater or equal to list byte length. + class ListElementFilter { + public: + virtual ~ListElementFilter() = default; + virtual std::size_t NextUnexpiredOffset( + const Slice& list, int64_t ttl, int64_t current_timestamp) const = 0; + }; + + // this filter can operate directly on list state bytes + // because the byte length of list element and last acess timestamp position + // are known. + class FixedListElementFilter : public ListElementFilter { + public: + explicit FixedListElementFilter(std::size_t fixed_size, + std::size_t timestamp_offset, + std::shared_ptr logger) + : fixed_size_(fixed_size), + timestamp_offset_(timestamp_offset), + logger_(std::move(logger)) {} + std::size_t NextUnexpiredOffset(const Slice& list, int64_t ttl, + int64_t current_timestamp) const override; + + private: + std::size_t fixed_size_; + std::size_t timestamp_offset_; + std::shared_ptr logger_; + }; + + // Factory is needed to create one filter per filter/thread + // and avoid concurrent access to the filter state + class ListElementFilterFactory { + public: + virtual ~ListElementFilterFactory() = default; + virtual ListElementFilter* CreateListElementFilter( + std::shared_ptr logger) const = 0; + }; + + class FixedListElementFilterFactory : public ListElementFilterFactory { + public: + explicit FixedListElementFilterFactory(std::size_t fixed_size, + std::size_t timestamp_offset) + : fixed_size_(fixed_size), timestamp_offset_(timestamp_offset) {} + FixedListElementFilter* CreateListElementFilter( + std::shared_ptr logger) const override { + return new FixedListElementFilter(fixed_size_, timestamp_offset_, logger); + }; + + private: + std::size_t fixed_size_; + std::size_t timestamp_offset_; + }; + + struct Config { + StateType state_type_; + std::size_t timestamp_offset_; + int64_t ttl_; + // Number of state entries to process by compaction filter before updating + // current timestamp. + int64_t query_time_after_num_entries_; + std::unique_ptr list_element_filter_factory_; + }; + + // Allows to configure at once all FlinkCompactionFilters created by the + // factory. The ConfigHolder holds the shared Config. + class ConfigHolder { + public: + explicit ConfigHolder(); + ~ConfigHolder(); + bool Configure(Config* config); + Config* GetConfig(); + + private: + std::atomic config_; + }; + + explicit FlinkCompactionFilter(std::shared_ptr config_holder, + std::unique_ptr time_provider); + + explicit FlinkCompactionFilter(std::shared_ptr config_holder, + std::unique_ptr time_provider, + std::shared_ptr logger); + + const char* Name() const override; + Decision FilterV2(int level, const Slice& key, ValueType value_type, + const Slice& existing_value, std::string* new_value, + std::string* skip_until) const override; + + bool IgnoreSnapshots() const override { return true; } + + private: + inline void InitConfigIfNotYet() const; + + Decision ListDecide(const Slice& existing_value, + std::string* new_value) const; + + inline std::size_t ListNextUnexpiredOffset(const Slice& existing_value, + std::size_t offset, + int64_t ttl) const; + + inline void SetUnexpiredListValue(const Slice& existing_value, + std::size_t offset, + std::string* new_value) const; + + inline void CreateListElementFilterIfNull() const { + if (!list_element_filter_ && config_cached_->list_element_filter_factory_) { + const_cast(this)->list_element_filter_ = + std::unique_ptr( + config_cached_->list_element_filter_factory_ + ->CreateListElementFilter(logger_)); + } + } + + inline void UpdateCurrentTimestampIfStale() const { + bool is_stale = + record_counter_ >= config_cached_->query_time_after_num_entries_; + if (is_stale) { + const_cast(this)->record_counter_ = 0; + const_cast(this)->current_timestamp_ = + time_provider_->CurrentTimestamp(); + } + const_cast(this)->record_counter_ = + record_counter_ + 1; + } + + std::shared_ptr config_holder_; + std::unique_ptr time_provider_; + std::shared_ptr logger_; + Config* config_cached_; + std::unique_ptr list_element_filter_; + int64_t current_timestamp_ = std::numeric_limits::max(); + int64_t record_counter_ = std::numeric_limits::max(); +}; + +static const FlinkCompactionFilter::Config DISABLED_CONFIG = + FlinkCompactionFilter::Config{FlinkCompactionFilter::StateType::Disabled, 0, + std::numeric_limits::max(), + std::numeric_limits::max(), nullptr}; + +} // namespace flink +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/flink/flink_compaction_filter_test.cc b/utilities/flink/flink_compaction_filter_test.cc new file mode 100644 index 000000000..26613ae68 --- /dev/null +++ b/utilities/flink/flink_compaction_filter_test.cc @@ -0,0 +1,226 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "utilities/flink/flink_compaction_filter.h" + +#include + +#include "test_util/testharness.h" + +namespace ROCKSDB_NAMESPACE { +namespace flink { + +#define DISABLED FlinkCompactionFilter::StateType::Disabled +#define VALUE FlinkCompactionFilter::StateType::Value +#define LIST FlinkCompactionFilter::StateType::List + +#define KVALUE CompactionFilter::ValueType::kValue +#define KMERGE CompactionFilter::ValueType::kMergeOperand +#define KBLOB CompactionFilter::ValueType::kBlobIndex + +#define KKEEP CompactionFilter::Decision::kKeep +#define KREMOVE CompactionFilter::Decision::kRemove +#define KCHANGE CompactionFilter::Decision::kChangeValue + +#define EXPIRE (time += ttl + 20) + +#define EXPECT_ARR_EQ(arr1, arr2, num) \ + EXPECT_TRUE(0 == memcmp(arr1, arr2, num)); + +static const std::size_t TEST_TIMESTAMP_OFFSET = static_cast(2); + +static const std::size_t LIST_ELEM_FIXED_LEN = static_cast(8 + 4); + +static const int64_t QUERY_TIME_AFTER_NUM_ENTRIES = static_cast(10); + +class ConsoleLogger : public Logger { + public: + using Logger::Logv; + ConsoleLogger() : Logger(InfoLogLevel::DEBUG_LEVEL) {} + + void Logv(const char* format, va_list ap) override { + vprintf(format, ap); + printf("\n"); + } +}; + +int64_t time = 0; + +class TestTimeProvider : public FlinkCompactionFilter::TimeProvider { + public: + int64_t CurrentTimestamp() const override { return time; } +}; + +std::random_device rd; // NOLINT +std::mt19937 mt(rd()); // NOLINT +std::uniform_int_distribution rnd(JAVA_MIN_LONG, + JAVA_MAX_LONG); // NOLINT + +int64_t ttl = 100; + +Slice key = Slice("key"); // NOLINT +char data[24]; +std::string new_list = ""; // NOLINT +std::string stub = ""; // NOLINT + +FlinkCompactionFilter::StateType state_type; +CompactionFilter::ValueType value_type; +FlinkCompactionFilter* filter; // NOLINT + +void SetTimestamp(int64_t timestamp, size_t offset = 0, char* value = data) { + for (unsigned long i = 0; i < sizeof(uint64_t); i++) { + value[offset + i] = + static_cast(static_cast(timestamp) >> + ((sizeof(int64_t) - 1 - i) * BITS_PER_BYTE)); + } +} + +CompactionFilter::Decision decide(size_t data_size = sizeof(data)) { + return filter->FilterV2(0, key, value_type, Slice(data, data_size), &new_list, + &stub); +} + +void Init( + FlinkCompactionFilter::StateType stype, CompactionFilter::ValueType vtype, + FlinkCompactionFilter::ListElementFilterFactory* fixed_len_filter_factory, + size_t timestamp_offset, bool expired = false) { + time = expired ? time + ttl + 20 : time; + state_type = stype; + value_type = vtype; + + auto config_holder = std::make_shared(); + auto time_provider = new TestTimeProvider(); + auto logger = std::make_shared(); + + filter = new FlinkCompactionFilter( + config_holder, + std::unique_ptr(time_provider), + logger); + auto config = new FlinkCompactionFilter::Config{ + state_type, timestamp_offset, ttl, QUERY_TIME_AFTER_NUM_ENTRIES, + std::unique_ptr( + fixed_len_filter_factory)}; + EXPECT_EQ(decide(), KKEEP); // test disabled config + EXPECT_TRUE(config_holder->Configure(config)); + EXPECT_FALSE(config_holder->Configure(config)); +} + +void InitValue(FlinkCompactionFilter::StateType stype, + CompactionFilter::ValueType vtype, bool expired = false, + size_t timestamp_offset = TEST_TIMESTAMP_OFFSET) { + time = rnd(mt); + SetTimestamp(time, timestamp_offset); + Init(stype, vtype, nullptr, timestamp_offset, expired); +} + +void InitList(CompactionFilter::ValueType vtype, bool all_expired = false, + bool first_elem_expired = false, size_t timestamp_offset = 0) { + time = rnd(mt); + SetTimestamp(first_elem_expired ? time - ttl - 20 : time, + timestamp_offset); // elem 1 ts + SetTimestamp(time, LIST_ELEM_FIXED_LEN + timestamp_offset); // elem 2 ts + auto fixed_len_filter_factory = + new FlinkCompactionFilter::FixedListElementFilterFactory( + LIST_ELEM_FIXED_LEN, static_cast(0)); + Init(LIST, vtype, fixed_len_filter_factory, timestamp_offset, all_expired); +} + +void Deinit() { delete filter; } + +TEST(FlinkStateTtlTest, CheckStateTypeEnumOrder) { // NOLINT + // if the order changes it also needs to be adjusted in Java client: + // in org.rocksdb.FlinkCompactionFilter + // and in org.rocksdb.FlinkCompactionFilterTest + EXPECT_EQ(DISABLED, 0); + EXPECT_EQ(VALUE, 1); + EXPECT_EQ(LIST, 2); +} + +TEST(FlinkStateTtlTest, SkipShortDataWithoutTimestamp) { // NOLINT + InitValue(VALUE, KVALUE, true); + EXPECT_EQ(decide(TIMESTAMP_BYTE_SIZE - 1), KKEEP); + Deinit(); +} + +TEST(FlinkValueStateTtlTest, Unexpired) { // NOLINT + InitValue(VALUE, KVALUE); + EXPECT_EQ(decide(), KKEEP); + Deinit(); +} + +TEST(FlinkValueStateTtlTest, Expired) { // NOLINT + InitValue(VALUE, KVALUE, true); + EXPECT_EQ(decide(), KREMOVE); + Deinit(); +} + +TEST(FlinkValueStateTtlTest, CachedTimeUpdate) { // NOLINT + InitValue(VALUE, KVALUE); + EXPECT_EQ(decide(), KKEEP); // also implicitly cache current timestamp + EXPIRE; // advance current timestamp to expire but cached should be used + // QUERY_TIME_AFTER_NUM_ENTRIES - 2: + // -1 -> for decide disabled in InitValue + // and -1 -> for decide right after InitValue + for (int64_t i = 0; i < QUERY_TIME_AFTER_NUM_ENTRIES - 2; i++) { + EXPECT_EQ(decide(), KKEEP); + } + EXPECT_EQ(decide(), KREMOVE); // advanced current timestamp should be updated + // in cache and expire state + Deinit(); +} + +TEST(FlinkValueStateTtlTest, WrongFilterValueType) { // NOLINT + InitValue(VALUE, KMERGE, true); + EXPECT_EQ(decide(), KKEEP); + Deinit(); +} + +TEST(FlinkListStateTtlTest, Unexpired) { // NOLINT + InitList(KMERGE); + EXPECT_EQ(decide(), KKEEP); + Deinit(); + + InitList(KVALUE); + EXPECT_EQ(decide(), KKEEP); + Deinit(); +} + +TEST(FlinkListStateTtlTest, Expired) { // NOLINT + InitList(KMERGE, true); + EXPECT_EQ(decide(), KREMOVE); + Deinit(); + + InitList(KVALUE, true); + EXPECT_EQ(decide(), KREMOVE); + Deinit(); +} + +TEST(FlinkListStateTtlTest, HalfExpired) { // NOLINT + InitList(KMERGE, false, true); + EXPECT_EQ(decide(), KCHANGE); + EXPECT_ARR_EQ(new_list.data(), data + LIST_ELEM_FIXED_LEN, + LIST_ELEM_FIXED_LEN); + Deinit(); + + InitList(KVALUE, false, true); + EXPECT_EQ(decide(), KCHANGE); + EXPECT_ARR_EQ(new_list.data(), data + LIST_ELEM_FIXED_LEN, + LIST_ELEM_FIXED_LEN); + Deinit(); +} + +TEST(FlinkListStateTtlTest, WrongFilterValueType) { // NOLINT + InitList(KBLOB, true); + EXPECT_EQ(decide(), KKEEP); + Deinit(); +} + +} // namespace flink +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}