Skip to content

Add back config to toggle the preservation of timestamps in consolidated fragments #5515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions test/src/unit-capi-config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ void check_save_to_file() {
ss << "sm.consolidation.steps 4294967295\n";
ss << "sm.consolidation.timestamp_end " << std::to_string(UINT64_MAX) << "\n";
ss << "sm.consolidation.timestamp_start 0\n";
ss << "sm.consolidation.with_timestamps true\n";
ss << "sm.dedup_coords false\n";
ss << "sm.enable_signal_handlers true\n";
ss << "sm.encryption_type NO_ENCRYPTION\n";
Expand Down Expand Up @@ -677,6 +678,7 @@ TEST_CASE("C API: Test config iter", "[capi][config]") {
all_param_values["sm.consolidation.timestamp_start"] = "0";
all_param_values["sm.consolidation.timestamp_end"] =
std::to_string(UINT64_MAX);
all_param_values["sm.consolidation.with_timestamps"] = "true";
all_param_values["sm.consolidation.purge_deleted_cells"] = "false";
all_param_values["sm.consolidation.step_min_frags"] = "4294967295";
all_param_values["sm.consolidation.step_max_frags"] = "4294967295";
Expand Down
5 changes: 5 additions & 0 deletions tiledb/api/c_api/config/config_api_external.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ TILEDB_EXPORT void tiledb_config_free(tiledb_config_t** config) TILEDB_NOEXCEPT;
* `sm.consolidation.timestamp_start` and this value (inclusive). <br>
* Only for `fragments` and `array_meta` consolidation mode. <br>
* **Default**: UINT64_MAX
* - `sm.consolidation.with_timestamps` <br>
* Consolidation with timestamps will include, for each cells, the
* timestamp at which the cell was written, allowing post-consolidation
* time travel. <br>
* **Default**: "true"
* - `sm.encryption_key` <br>
* The key for encrypted arrays. <br>
* **Default**: ""
Expand Down
10 changes: 8 additions & 2 deletions tiledb/sm/array/array_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ ArrayDirectory::ArrayDirectory(
const URI& uri,
uint64_t timestamp_start,
uint64_t timestamp_end,
ArrayDirectoryMode mode)
ArrayDirectoryMode mode,
bool allow_partial_fragment_overlap)
: resources_(resources)
, uri_(uri.add_trailing_slash())
, stats_(resources_.get().vfs().stats()->create_child("ArrayDirectory"))
, timestamp_start_(timestamp_start)
, timestamp_end_(timestamp_end)
, mode_(mode)
, allow_partial_fragment_overlap_(allow_partial_fragment_overlap)
, loaded_(false) {
auto st = load();
if (!st.ok()) {
Expand Down Expand Up @@ -1312,11 +1314,15 @@ Status ArrayDirectory::is_fragment(

bool ArrayDirectory::consolidation_with_timestamps_supported(
const URI& uri) const {
auto consolidation_with_timestamps = resources_.get().config().get<bool>(
"sm.consolidation.with_timestamps", Config::must_find);

// FragmentID::array_format_version() returns UINT32_MAX for versions <= 2
// so we should explicitly exclude this case when checking if consolidation
// with timestamps is supported on a fragment
FragmentID fragment_id{uri};
return mode_ == ArrayDirectoryMode::READ &&
return allow_partial_fragment_overlap_ && consolidation_with_timestamps &&
mode_ == ArrayDirectoryMode::READ &&
fragment_id.array_format_version() >=
constants::consolidation_with_timestamps_min_version;
}
Expand Down
10 changes: 9 additions & 1 deletion tiledb/sm/array/array_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,17 @@ class ArrayDirectory {
* [`timestamp_start`, `timestamp_end`] will be considered when
* fetching URIs.
* @param mode The mode to load the array directory in.
* @param allow_partial_fragment_overlap If we want to allow matching
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the the only usage of this is "if false, consolidation_with_timestamps_supported returns false". What's the connection to overlapping fragments?

* fragments that overlap only partially with the timestamp range. This
* applies in some cases of consolidated fragments with timestamps.
*/
ArrayDirectory(
ContextResources& resources,
const URI& uri,
uint64_t timestamp_start,
uint64_t timestamp_end,
ArrayDirectoryMode mode = ArrayDirectoryMode::READ);
ArrayDirectoryMode mode = ArrayDirectoryMode::READ,
bool allow_partial_fragment_overlap = true);

/** Destructor. */
~ArrayDirectory() = default;
Expand Down Expand Up @@ -644,6 +648,10 @@ class ArrayDirectory {
/** Mode for the array directory. */
ArrayDirectoryMode mode_;

/** True if we allow matching fragments with partial timestamp range overlap
*/
bool allow_partial_fragment_overlap_;

/** True if `load` has been run. */
bool loaded_;

Expand Down
4 changes: 4 additions & 0 deletions tiledb/sm/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ const std::string Config::SM_CONSOLIDATION_MODE = "fragments";
const std::string Config::SM_CONSOLIDATION_TIMESTAMP_START = "0";
const std::string Config::SM_CONSOLIDATION_TIMESTAMP_END =
std::to_string(UINT64_MAX);
const std::string Config::SM_CONSOLIDATION_WITH_TIMESTAMPS = "true";
const std::string Config::SM_VACUUM_MODE = "fragments";
const std::string Config::SM_VACUUM_TIMESTAMP_START = "0";
const std::string Config::SM_VACUUM_TIMESTAMP_END = std::to_string(UINT64_MAX);
Expand Down Expand Up @@ -386,6 +387,9 @@ const std::map<std::string, std::string> default_config_values = {
std::make_pair(
"sm.consolidation.timestamp_end",
Config::SM_CONSOLIDATION_TIMESTAMP_END),
std::make_pair(
"sm.consolidation.with_timestamps",
Config::SM_CONSOLIDATION_WITH_TIMESTAMPS),
std::make_pair("sm.vacuum.mode", Config::SM_VACUUM_MODE),
std::make_pair("sm.var_offsets.bitsize", Config::SM_OFFSETS_BITSIZE),
std::make_pair(
Expand Down
5 changes: 5 additions & 0 deletions tiledb/sm/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@ class Config {
*/
static const std::string SM_GROUP_TIMESTAMP_END;

/**
* Enable or disable consolidation with timestamps.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC @nickvigilante

I think this comment gets pulled into docs, right?

This is not specific enough, especially if it is user-facing documentation. There's no description here of what the consolidation result actually looks like for the different options, which I think is really important given that one of the options results in something which might qualify as "data loss" for an unwitting customer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I'm leaving my comment for posterity, but I see that there is more specific documentation in config_api_external.h.

However, those docs aren't specific enough for my liking, the options should be annotated with a brief description of what happens to duplicate coordinates in consolidated fragments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*/
static const std::string SM_CONSOLIDATION_WITH_TIMESTAMPS;

/**
* If `true` MBRs will be loaded at the same time as the rest of fragment
* info, otherwise they will be loaded lazily when some info related to MBRs
Expand Down
7 changes: 6 additions & 1 deletion tiledb/sm/consolidator/array_meta_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ void ArrayMetaConsolidator::vacuum(const char* array_name) {
auto& vfs = resources_.vfs();
auto& compute_tp = resources_.compute_tp();
auto array_dir = ArrayDirectory(
resources_, URI(array_name), 0, std::numeric_limits<uint64_t>::max());
resources_,
URI(array_name),
0,
std::numeric_limits<uint64_t>::max(),
ArrayDirectoryMode::READ,
false);

// Delete the array metadata and vacuum files
vfs.remove_files(&compute_tp, array_dir.array_meta_uris_to_vacuum());
Expand Down
6 changes: 4 additions & 2 deletions tiledb/sm/consolidator/commits_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ Status CommitsConsolidator::consolidate(
URI(array_name),
0,
utils::time::timestamp_now_ms(),
ArrayDirectoryMode::COMMITS);
ArrayDirectoryMode::COMMITS,
false);

// Don't try to consolidate empty array
if (array_dir.commit_uris_to_consolidate().size() == 0) {
Expand All @@ -113,7 +114,8 @@ void CommitsConsolidator::vacuum(const char* array_name) {
URI(array_name),
0,
utils::time::timestamp_now_ms(),
ArrayDirectoryMode::COMMITS);
ArrayDirectoryMode::COMMITS,
false);

// Delete the commits and vacuum files
auto& vfs = resources_.vfs();
Expand Down
8 changes: 7 additions & 1 deletion tiledb/sm/consolidator/fragment_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1081,11 +1081,13 @@ Status FragmentConsolidator::set_config(const Config& config) {
throw_if_not_ok(merged_config.get<uint64_t>(
"sm.consolidation.timestamp_end", &config_.timestamp_end_, &found));
assert(found);
RETURN_NOT_OK(merged_config.get<bool>(
"sm.consolidation.with_timestamps", &config_.with_timestamps_, &found));
assert(found);
std::string reader =
merged_config.get("sm.query.sparse_global_order.reader", &found);
assert(found);
config_.use_refactored_reader_ = reader.compare("refactored") == 0;
config_.with_timestamps_ = true;
config_.with_delete_meta_ = false;

// Sanity checks
Expand All @@ -1101,6 +1103,10 @@ Status FragmentConsolidator::set_config(const Config& config) {
throw FragmentConsolidatorException(
"Invalid configuration; Amplification config parameter must be "
"non-negative");
if (config_.with_timestamps_ && !config_.use_refactored_reader_)
throw FragmentConsolidatorException(
("Invalid configuration; Consolidation with timestamps requires "
"refactored reader"));

return Status::Ok();
}
Expand Down
8 changes: 7 additions & 1 deletion tiledb/sm/consolidator/fragment_meta_consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,13 @@ void FragmentMetaConsolidator::vacuum(const char* array_name) {

// Get the consolidated fragment metadata URIs to be deleted
// (all except the last one)
ArrayDirectory array_dir(resources_, URI(array_name), 0, UINT64_MAX);
ArrayDirectory array_dir(
resources_,
URI(array_name),
0,
UINT64_MAX,
ArrayDirectoryMode::READ,
false);
const auto& fragment_meta_uris = array_dir.fragment_meta_uris();

// Get the latest timestamp
Expand Down
5 changes: 5 additions & 0 deletions tiledb/sm/cpp_api/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,11 @@ class Config {
* `sm.consolidation.timestamp_start` and this value (inclusive). <br>
* Only for `fragments` and `array_meta` consolidation mode. <br>
* **Default**: UINT64_MAX
* - `sm.consolidation.with_timestamps` <br>
* Consolidation with timestamps will include, for each cells, the
* timestamp at which the cell was written, allowing post-consolidation
* time travel. <br>
* **Default**: "true"
* - `sm.encryption_key` <br>
* The key for encrypted arrays. <br>
* **Default**: ""
Expand Down
Loading