Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ set(SOURCES
utilities/debug.cc
utilities/env_mirror.cc
utilities/env_timed.cc
utilities/flink/flink_compaction_filter.cc
utilities/leveldb_options/leveldb_options.cc
utilities/memory/memory_util.cc
utilities/merge_operators/bytesxor.cc
Expand Down Expand Up @@ -1095,6 +1096,7 @@ if(WITH_TESTS)
utilities/cassandra/cassandra_format_test.cc
utilities/cassandra/cassandra_row_merge_test.cc
utilities/cassandra/cassandra_serialize_test.cc
utilities/flink/flink_compaction_filter_test.cc
utilities/checkpoint/checkpoint_test.cc
utilities/memory/memory_test.cc
utilities/merge_operators/string_append/stringappend_test.cc
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ TESTS = \
compaction_picker_test \
version_builder_test \
file_indexer_test \
flink_compaction_filter_test \
write_batch_test \
write_batch_with_index_test \
write_controller_test\
Expand Down Expand Up @@ -1324,6 +1325,9 @@ cassandra_serialize_test: utilities/cassandra/cassandra_serialize_test.o $(LIBOB
hash_table_test: utilities/persistent_cache/hash_table_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

flink_compaction_filter_test: utilities/flink/flink_compaction_filter_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

histogram_test: monitoring/histogram_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

Expand Down
8 changes: 8 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ cpp_library(
"utilities/debug.cc",
"utilities/env_mirror.cc",
"utilities/env_timed.cc",
"utilities/flink/flink_compaction_filter.cc",
"utilities/leveldb_options/leveldb_options.cc",
"utilities/memory/memory_util.cc",
"utilities/merge_operators/bytesxor.cc",
Expand Down Expand Up @@ -1084,6 +1085,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"flink_compaction_filter_test",
"utilities/flink/flink_compaction_filter_test.cc",
"serial",
[],
[],
],
[
"flush_job_test",
"db/flush_job_test.cc",
Expand Down
2 changes: 1 addition & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
Vagrant.configure("2") do |config|

config.vm.provider "virtualbox" do |v|
v.memory = 4096
v.memory = 6096
v.cpus = 2
end

Expand Down
3 changes: 3 additions & 0 deletions java/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ set(JNI_NATIVE_SOURCES
rocksjni/config_options.cc
rocksjni/env.cc
rocksjni/env_options.cc
rocksjni/flink_compactionfilterjni.cc
rocksjni/filter.cc
rocksjni/ingest_external_file_options.cc
rocksjni/iterator.cc
Expand Down Expand Up @@ -140,6 +141,7 @@ set(JAVA_MAIN_CLASSES
src/main/java/org/rocksdb/EnvOptions.java
src/main/java/org/rocksdb/Experimental.java
src/main/java/org/rocksdb/Filter.java
src/main/java/org/rocksdb/FlinkCompactionFilter.java
src/main/java/org/rocksdb/FlushOptions.java
src/main/java/org/rocksdb/HashLinkedListMemTableConfig.java
src/main/java/org/rocksdb/HashSkipListMemTableConfig.java
Expand Down Expand Up @@ -415,6 +417,7 @@ if(${CMAKE_VERSION} VERSION_LESS "3.11.4" OR (${Java_VERSION_MINOR} STREQUAL "7"
org.rocksdb.Env
org.rocksdb.EnvOptions
org.rocksdb.Filter
org.rocksdb.FlinkCompactionFilter
org.rocksdb.FlushOptions
org.rocksdb.HashLinkedListMemTableConfig
org.rocksdb.HashSkipListMemTableConfig
Expand Down
1 change: 1 addition & 0 deletions java/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ NATIVE_JAVA_CLASSES = \
org.rocksdb.DirectSlice\
org.rocksdb.Env\
org.rocksdb.EnvOptions\
org.rocksdb.FlinkCompactionFilter\
org.rocksdb.FlushOptions\
org.rocksdb.Filter\
org.rocksdb.IngestExternalFileOptions\
Expand Down
2 changes: 1 addition & 1 deletion java/crossbuild/Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
end

config.vm.provider "virtualbox" do |v|
v.memory = 2048
v.memory = 6048
v.cpus = 4
v.customize ["modifyvm", :id, "--nictype1", "virtio" ]
end
Expand Down
239 changes: 239 additions & 0 deletions java/rocksjni/flink_compactionfilterjni.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
#include <climits> // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include <include/rocksdb/env.h>
#include <jni.h>

#include "include/org_rocksdb_FlinkCompactionFilter.h"
#include "loggerjnicallback.h"
#include "portal.h"
#include "rocksjni/jnicallback.h"
#include "utilities/flink/flink_compaction_filter.h"

using namespace ROCKSDB_NAMESPACE::flink;

class JniCallbackBase : public ROCKSDB_NAMESPACE::JniCallback {
public:
JniCallbackBase(JNIEnv* env, jobject jcallback_obj)
: JniCallback(env, jcallback_obj) {}

protected:
inline void CheckAndRethrowException(JNIEnv* env) const {
if (env->ExceptionCheck()) {
env->ExceptionDescribe();
env->Throw(env->ExceptionOccurred());
}
}
};

// This list element filter operates on list state for which byte length of
// elements is unknown (variable), the list element serializer has to be used in
// this case to compute the offset of the next element. The filter wraps java
// object implenented in Flink. The java object holds element serializer and
// performs filtering.
class JavaListElementFilter
: public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::ListElementFilter,
JniCallbackBase {
public:
JavaListElementFilter(JNIEnv* env, jobject jlist_filter)
: JniCallbackBase(env, jlist_filter) {
jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass(
env, "org/rocksdb/FlinkCompactionFilter$ListElementFilter");
if (jclazz == nullptr) {
// exception occurred accessing class
return;
}
m_jnext_unexpired_offset_methodid =
env->GetMethodID(jclazz, "nextUnexpiredOffset", "([BJJ)I");
assert(m_jnext_unexpired_offset_methodid != nullptr);
}

std::size_t NextUnexpiredOffset(const ROCKSDB_NAMESPACE::Slice& list,
int64_t ttl,
int64_t current_timestamp) const override {
jboolean attached_thread = JNI_FALSE;
JNIEnv* env = getJniEnv(&attached_thread);
jbyteArray jlist = ROCKSDB_NAMESPACE::JniUtil::copyBytes(env, list);
CheckAndRethrowException(env);
if (jlist == nullptr) {
return static_cast<std::size_t>(-1);
}
auto jl_ttl = static_cast<jlong>(ttl);
auto jl_current_timestamp = static_cast<jlong>(current_timestamp);
jint next_offset =
env->CallIntMethod(m_jcallback_obj, m_jnext_unexpired_offset_methodid,
jlist, jl_ttl, jl_current_timestamp);
CheckAndRethrowException(env);
env->DeleteLocalRef(jlist);
releaseJniEnv(attached_thread);
return static_cast<std::size_t>(next_offset);
};

private:
jmethodID m_jnext_unexpired_offset_methodid;
};

class JavaListElemenFilterFactory
: public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::
ListElementFilterFactory,
JniCallbackBase {
public:
JavaListElemenFilterFactory(JNIEnv* env, jobject jlist_filter_factory)
: JniCallbackBase(env, jlist_filter_factory) {
jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass(
env, "org/rocksdb/FlinkCompactionFilter$ListElementFilterFactory");
if (jclazz == nullptr) {
// exception occurred accessing class
return;
}
m_jcreate_filter_methodid = env->GetMethodID(
jclazz, "createListElementFilter",
"()Lorg/rocksdb/FlinkCompactionFilter$ListElementFilter;");
assert(m_jcreate_filter_methodid != nullptr);
}

FlinkCompactionFilter::ListElementFilter* CreateListElementFilter(
std::shared_ptr<ROCKSDB_NAMESPACE::Logger> /*logger*/) const override {
jboolean attached_thread = JNI_FALSE;
JNIEnv* env = getJniEnv(&attached_thread);
auto jlist_filter =
env->CallObjectMethod(m_jcallback_obj, m_jcreate_filter_methodid);
auto list_filter = new JavaListElementFilter(env, jlist_filter);
CheckAndRethrowException(env);
releaseJniEnv(attached_thread);
return list_filter;
};

private:
jmethodID m_jcreate_filter_methodid;
};

class JavaTimeProvider
: public ROCKSDB_NAMESPACE::flink::FlinkCompactionFilter::TimeProvider,
JniCallbackBase {
public:
JavaTimeProvider(JNIEnv* env, jobject jtime_provider)
: JniCallbackBase(env, jtime_provider) {
jclass jclazz = ROCKSDB_NAMESPACE::JavaClass::getJClass(
env, "org/rocksdb/FlinkCompactionFilter$TimeProvider");
if (jclazz == nullptr) {
// exception occurred accessing class
return;
}
m_jcurrent_timestamp_methodid =
env->GetMethodID(jclazz, "currentTimestamp", "()J");
assert(m_jcurrent_timestamp_methodid != nullptr);
}

int64_t CurrentTimestamp() const override {
jboolean attached_thread = JNI_FALSE;
JNIEnv* env = getJniEnv(&attached_thread);
auto jtimestamp =
env->CallLongMethod(m_jcallback_obj, m_jcurrent_timestamp_methodid);
CheckAndRethrowException(env);
releaseJniEnv(attached_thread);
return static_cast<int64_t>(jtimestamp);
};

private:
jmethodID m_jcurrent_timestamp_methodid;
};

static FlinkCompactionFilter::ListElementFilterFactory*
createListElementFilterFactory(JNIEnv* env, jint ji_list_elem_len,
jobject jlist_filter_factory) {
FlinkCompactionFilter::ListElementFilterFactory* list_filter_factory =
nullptr;
if (ji_list_elem_len > 0) {
auto fixed_size = static_cast<std::size_t>(ji_list_elem_len);
list_filter_factory =
new FlinkCompactionFilter::FixedListElementFilterFactory(
fixed_size, static_cast<std::size_t>(0));
} else if (jlist_filter_factory != nullptr) {
list_filter_factory =
new JavaListElemenFilterFactory(env, jlist_filter_factory);
}
return list_filter_factory;
}

/*x
* Class: org_rocksdb_FlinkCompactionFilter
* Method: createNewFlinkCompactionFilterConfigHolder
* Signature: ()J
*/
jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilterConfigHolder(
JNIEnv* /* env */, jclass /* jcls */) {
using namespace ROCKSDB_NAMESPACE::flink;
return reinterpret_cast<jlong>(
new std::shared_ptr<FlinkCompactionFilter::ConfigHolder>(
new FlinkCompactionFilter::ConfigHolder()));
}

/*
* Class: org_rocksdb_FlinkCompactionFilter
* Method: disposeFlinkCompactionFilterConfigHolder
* Signature: (J)V
*/
void Java_org_rocksdb_FlinkCompactionFilter_disposeFlinkCompactionFilterConfigHolder(
JNIEnv* /* env */, jclass /* jcls */, jlong handle) {
using namespace ROCKSDB_NAMESPACE::flink;
auto* config_holder =
reinterpret_cast<std::shared_ptr<FlinkCompactionFilter::ConfigHolder>*>(
handle);
delete config_holder;
}

/*
* Class: org_rocksdb_FlinkCompactionFilter
* Method: createNewFlinkCompactionFilter0
* Signature: (JJJ)J
*/
jlong Java_org_rocksdb_FlinkCompactionFilter_createNewFlinkCompactionFilter0(
JNIEnv* env, jclass /* jcls */, jlong config_holder_handle,
jobject jtime_provider, jlong logger_handle) {
using namespace ROCKSDB_NAMESPACE::flink;
auto config_holder =
*(reinterpret_cast<std::shared_ptr<FlinkCompactionFilter::ConfigHolder>*>(
config_holder_handle));
auto time_provider = new JavaTimeProvider(env, jtime_provider);
auto logger =
logger_handle == 0
? nullptr
: *(reinterpret_cast<
std::shared_ptr<ROCKSDB_NAMESPACE::LoggerJniCallback>*>(
logger_handle));
return reinterpret_cast<jlong>(new FlinkCompactionFilter(
config_holder,
std::unique_ptr<FlinkCompactionFilter::TimeProvider>(time_provider),
logger));
}

/*
* Class: org_rocksdb_FlinkCompactionFilter
* Method: configureFlinkCompactionFilter
* Signature: (JIIJJILorg/rocksdb/FlinkCompactionFilter$ListElementFilter;)Z
*/
jboolean Java_org_rocksdb_FlinkCompactionFilter_configureFlinkCompactionFilter(
JNIEnv* env, jclass /* jcls */, jlong handle, jint ji_state_type,
jint ji_timestamp_offset, jlong jl_ttl_milli,
jlong jquery_time_after_num_entries, jint ji_list_elem_len,
jobject jlist_filter_factory) {
auto state_type =
static_cast<FlinkCompactionFilter::StateType>(ji_state_type);
auto timestamp_offset = static_cast<size_t>(ji_timestamp_offset);
auto ttl = static_cast<int64_t>(jl_ttl_milli);
auto query_time_after_num_entries =
static_cast<int64_t>(jquery_time_after_num_entries);
auto config_holder =
*(reinterpret_cast<std::shared_ptr<FlinkCompactionFilter::ConfigHolder>*>(
handle));
auto list_filter_factory = createListElementFilterFactory(
env, ji_list_elem_len, jlist_filter_factory);
auto config = new FlinkCompactionFilter::Config{
state_type, timestamp_offset, ttl, query_time_after_num_entries,
std::unique_ptr<FlinkCompactionFilter::ListElementFilterFactory>(
list_filter_factory)};
return static_cast<jboolean>(config_holder->Configure(config));
}
Loading