diff --git a/test/src/unit-capi-config.cc b/test/src/unit-capi-config.cc index 08d02937752..b87e6bb538a 100644 --- a/test/src/unit-capi-config.cc +++ b/test/src/unit-capi-config.cc @@ -266,6 +266,7 @@ void check_save_to_file() { ss << "sm.consolidation.steps 4294967295\n"; ss << "sm.consolidation.timestamp_end " << std::to_string(UINT64_MAX) << "\n"; ss << "sm.consolidation.timestamp_start 0\n"; + ss << "sm.consolidation.with_timestamps true\n"; ss << "sm.dedup_coords false\n"; ss << "sm.enable_signal_handlers true\n"; ss << "sm.encryption_type NO_ENCRYPTION\n"; @@ -677,6 +678,7 @@ TEST_CASE("C API: Test config iter", "[capi][config]") { all_param_values["sm.consolidation.timestamp_start"] = "0"; all_param_values["sm.consolidation.timestamp_end"] = std::to_string(UINT64_MAX); + all_param_values["sm.consolidation.with_timestamps"] = "true"; all_param_values["sm.consolidation.purge_deleted_cells"] = "false"; all_param_values["sm.consolidation.step_min_frags"] = "4294967295"; all_param_values["sm.consolidation.step_max_frags"] = "4294967295"; diff --git a/tiledb/api/c_api/config/config_api_external.h b/tiledb/api/c_api/config/config_api_external.h index 7c1e7002df8..4896b217d25 100644 --- a/tiledb/api/c_api/config/config_api_external.h +++ b/tiledb/api/c_api/config/config_api_external.h @@ -207,6 +207,11 @@ TILEDB_EXPORT void tiledb_config_free(tiledb_config_t** config) TILEDB_NOEXCEPT; * `sm.consolidation.timestamp_start` and this value (inclusive).
* Only for `fragments` and `array_meta` consolidation mode.
* **Default**: UINT64_MAX + * - `sm.consolidation.with_timestamps`
+ * Consolidation with timestamps will include, for each cells, the + * timestamp at which the cell was written, allowing post-consolidation + * time travel.
+ * **Default**: "true" * - `sm.encryption_key`
* The key for encrypted arrays.
* **Default**: "" diff --git a/tiledb/sm/array/array_directory.cc b/tiledb/sm/array/array_directory.cc index c0b0a913765..074f6907ed9 100644 --- a/tiledb/sm/array/array_directory.cc +++ b/tiledb/sm/array/array_directory.cc @@ -71,13 +71,15 @@ ArrayDirectory::ArrayDirectory( const URI& uri, uint64_t timestamp_start, uint64_t timestamp_end, - ArrayDirectoryMode mode) + ArrayDirectoryMode mode, + bool allow_partial_fragment_overlap) : resources_(resources) , uri_(uri.add_trailing_slash()) , stats_(resources_.get().vfs().stats()->create_child("ArrayDirectory")) , timestamp_start_(timestamp_start) , timestamp_end_(timestamp_end) , mode_(mode) + , allow_partial_fragment_overlap_(allow_partial_fragment_overlap) , loaded_(false) { auto st = load(); if (!st.ok()) { @@ -1312,11 +1314,15 @@ Status ArrayDirectory::is_fragment( bool ArrayDirectory::consolidation_with_timestamps_supported( const URI& uri) const { + auto consolidation_with_timestamps = resources_.get().config().get( + "sm.consolidation.with_timestamps", Config::must_find); + // FragmentID::array_format_version() returns UINT32_MAX for versions <= 2 // so we should explicitly exclude this case when checking if consolidation // with timestamps is supported on a fragment FragmentID fragment_id{uri}; - return mode_ == ArrayDirectoryMode::READ && + return allow_partial_fragment_overlap_ && consolidation_with_timestamps && + mode_ == ArrayDirectoryMode::READ && fragment_id.array_format_version() >= constants::consolidation_with_timestamps_min_version; } diff --git a/tiledb/sm/array/array_directory.h b/tiledb/sm/array/array_directory.h index 03c13646ec5..3715ba2f2c6 100644 --- a/tiledb/sm/array/array_directory.h +++ b/tiledb/sm/array/array_directory.h @@ -303,13 +303,17 @@ class ArrayDirectory { * [`timestamp_start`, `timestamp_end`] will be considered when * fetching URIs. * @param mode The mode to load the array directory in. + * @param allow_partial_fragment_overlap If we want to allow matching + * fragments that overlap only partially with the timestamp range. This + * applies in some cases of consolidated fragments with timestamps. */ ArrayDirectory( ContextResources& resources, const URI& uri, uint64_t timestamp_start, uint64_t timestamp_end, - ArrayDirectoryMode mode = ArrayDirectoryMode::READ); + ArrayDirectoryMode mode = ArrayDirectoryMode::READ, + bool allow_partial_fragment_overlap = true); /** Destructor. */ ~ArrayDirectory() = default; @@ -644,6 +648,10 @@ class ArrayDirectory { /** Mode for the array directory. */ ArrayDirectoryMode mode_; + /** True if we allow matching fragments with partial timestamp range overlap + */ + bool allow_partial_fragment_overlap_; + /** True if `load` has been run. */ bool loaded_; diff --git a/tiledb/sm/config/config.cc b/tiledb/sm/config/config.cc index 5d4424136d7..b9cca0cd71b 100644 --- a/tiledb/sm/config/config.cc +++ b/tiledb/sm/config/config.cc @@ -156,6 +156,7 @@ const std::string Config::SM_CONSOLIDATION_MODE = "fragments"; const std::string Config::SM_CONSOLIDATION_TIMESTAMP_START = "0"; const std::string Config::SM_CONSOLIDATION_TIMESTAMP_END = std::to_string(UINT64_MAX); +const std::string Config::SM_CONSOLIDATION_WITH_TIMESTAMPS = "true"; const std::string Config::SM_VACUUM_MODE = "fragments"; const std::string Config::SM_VACUUM_TIMESTAMP_START = "0"; const std::string Config::SM_VACUUM_TIMESTAMP_END = std::to_string(UINT64_MAX); @@ -386,6 +387,9 @@ const std::map default_config_values = { std::make_pair( "sm.consolidation.timestamp_end", Config::SM_CONSOLIDATION_TIMESTAMP_END), + std::make_pair( + "sm.consolidation.with_timestamps", + Config::SM_CONSOLIDATION_WITH_TIMESTAMPS), std::make_pair("sm.vacuum.mode", Config::SM_VACUUM_MODE), std::make_pair("sm.var_offsets.bitsize", Config::SM_OFFSETS_BITSIZE), std::make_pair( diff --git a/tiledb/sm/config/config.h b/tiledb/sm/config/config.h index 053e0a680ea..df5eeb5321d 100644 --- a/tiledb/sm/config/config.h +++ b/tiledb/sm/config/config.h @@ -409,6 +409,11 @@ class Config { */ static const std::string SM_GROUP_TIMESTAMP_END; + /** + * Enable or disable consolidation with timestamps. + */ + static const std::string SM_CONSOLIDATION_WITH_TIMESTAMPS; + /** * If `true` MBRs will be loaded at the same time as the rest of fragment * info, otherwise they will be loaded lazily when some info related to MBRs diff --git a/tiledb/sm/consolidator/array_meta_consolidator.cc b/tiledb/sm/consolidator/array_meta_consolidator.cc index 35456729da4..e129428b8c5 100644 --- a/tiledb/sm/consolidator/array_meta_consolidator.cc +++ b/tiledb/sm/consolidator/array_meta_consolidator.cc @@ -135,7 +135,12 @@ void ArrayMetaConsolidator::vacuum(const char* array_name) { auto& vfs = resources_.vfs(); auto& compute_tp = resources_.compute_tp(); auto array_dir = ArrayDirectory( - resources_, URI(array_name), 0, std::numeric_limits::max()); + resources_, + URI(array_name), + 0, + std::numeric_limits::max(), + ArrayDirectoryMode::READ, + false); // Delete the array metadata and vacuum files vfs.remove_files(&compute_tp, array_dir.array_meta_uris_to_vacuum()); diff --git a/tiledb/sm/consolidator/commits_consolidator.cc b/tiledb/sm/consolidator/commits_consolidator.cc index 35e7cffa5f1..88453c28eda 100644 --- a/tiledb/sm/consolidator/commits_consolidator.cc +++ b/tiledb/sm/consolidator/commits_consolidator.cc @@ -86,7 +86,8 @@ Status CommitsConsolidator::consolidate( URI(array_name), 0, utils::time::timestamp_now_ms(), - ArrayDirectoryMode::COMMITS); + ArrayDirectoryMode::COMMITS, + false); // Don't try to consolidate empty array if (array_dir.commit_uris_to_consolidate().size() == 0) { @@ -113,7 +114,8 @@ void CommitsConsolidator::vacuum(const char* array_name) { URI(array_name), 0, utils::time::timestamp_now_ms(), - ArrayDirectoryMode::COMMITS); + ArrayDirectoryMode::COMMITS, + false); // Delete the commits and vacuum files auto& vfs = resources_.vfs(); diff --git a/tiledb/sm/consolidator/fragment_consolidator.cc b/tiledb/sm/consolidator/fragment_consolidator.cc index cd96a98bb5d..deb6da6adbd 100644 --- a/tiledb/sm/consolidator/fragment_consolidator.cc +++ b/tiledb/sm/consolidator/fragment_consolidator.cc @@ -1081,11 +1081,13 @@ Status FragmentConsolidator::set_config(const Config& config) { throw_if_not_ok(merged_config.get( "sm.consolidation.timestamp_end", &config_.timestamp_end_, &found)); assert(found); + RETURN_NOT_OK(merged_config.get( + "sm.consolidation.with_timestamps", &config_.with_timestamps_, &found)); + assert(found); std::string reader = merged_config.get("sm.query.sparse_global_order.reader", &found); assert(found); config_.use_refactored_reader_ = reader.compare("refactored") == 0; - config_.with_timestamps_ = true; config_.with_delete_meta_ = false; // Sanity checks @@ -1101,6 +1103,10 @@ Status FragmentConsolidator::set_config(const Config& config) { throw FragmentConsolidatorException( "Invalid configuration; Amplification config parameter must be " "non-negative"); + if (config_.with_timestamps_ && !config_.use_refactored_reader_) + throw FragmentConsolidatorException( + ("Invalid configuration; Consolidation with timestamps requires " + "refactored reader")); return Status::Ok(); } diff --git a/tiledb/sm/consolidator/fragment_meta_consolidator.cc b/tiledb/sm/consolidator/fragment_meta_consolidator.cc index 3ab957f9a35..c8c615c7a08 100644 --- a/tiledb/sm/consolidator/fragment_meta_consolidator.cc +++ b/tiledb/sm/consolidator/fragment_meta_consolidator.cc @@ -186,7 +186,13 @@ void FragmentMetaConsolidator::vacuum(const char* array_name) { // Get the consolidated fragment metadata URIs to be deleted // (all except the last one) - ArrayDirectory array_dir(resources_, URI(array_name), 0, UINT64_MAX); + ArrayDirectory array_dir( + resources_, + URI(array_name), + 0, + UINT64_MAX, + ArrayDirectoryMode::READ, + false); const auto& fragment_meta_uris = array_dir.fragment_meta_uris(); // Get the latest timestamp diff --git a/tiledb/sm/cpp_api/config.h b/tiledb/sm/cpp_api/config.h index 837d8e9d8e2..002fddcbf2f 100644 --- a/tiledb/sm/cpp_api/config.h +++ b/tiledb/sm/cpp_api/config.h @@ -377,6 +377,11 @@ class Config { * `sm.consolidation.timestamp_start` and this value (inclusive).
* Only for `fragments` and `array_meta` consolidation mode.
* **Default**: UINT64_MAX + * - `sm.consolidation.with_timestamps`
+ * Consolidation with timestamps will include, for each cells, the + * timestamp at which the cell was written, allowing post-consolidation + * time travel.
+ * **Default**: "true" * - `sm.encryption_key`
* The key for encrypted arrays.
* **Default**: ""