Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/foundation/inc/ROOT/RError.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public:
void Throw();

/// Used by R__FORWARD_ERROR in order to keep track of the stack trace.
[[nodiscard]]
static RError ForwardError(RResultBase &&result, RError::RLocation &&sourceLocation)
{
if (!result.fError) {
Expand Down
28 changes: 19 additions & 9 deletions tree/ntuple/inc/ROOT/RNTupleMerger.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RNTuple;
namespace Internal {
class RPageAllocator;
class RClusterPool;
}
} // namespace Internal

namespace Experimental::Internal {

Expand All @@ -49,6 +49,12 @@ enum class ENTupleMergingMode {
kUnion
};

inline const char *ToString(ENTupleMergingMode mode)
{
static const char *const kMergingModeStr[] = {"Filter", "Strict", "Union"};
return kMergingModeStr[static_cast<int>(mode)];
}

enum class ENTupleMergeErrBehavior {
/// The merger will abort merging as soon as an error is encountered
kAbort,
Expand Down Expand Up @@ -98,14 +104,18 @@ class RNTupleMerger final {
std::optional<TTaskGroup> fTaskGroup;
std::unique_ptr<ROOT::RNTupleModel> fModel;

void MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc,
std::span<const RColumnMergeInfo> commonColumns,
const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet,
std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData,
const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc);

void MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData);
[[nodiscard]]
ROOT::RResult<void> MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
const ROOT::RClusterDescriptor &clusterDesc,
std::span<const RColumnMergeInfo> commonColumns,
const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet,
std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData,
const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc);

[[nodiscard]]
ROOT::RResult<void>
MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData);

/// Creates a RNTupleMerger with the given destination.
/// The model must be given if and only if `destination` has been initialized with that model
Expand Down
2 changes: 1 addition & 1 deletion tree/ntuple/inc/ROOT/RPageStorage.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ public:
if (fIsInitialized) {
throw RException(R__FAIL("already initialized"));
}
fIsInitialized = true;
InitImpl(model);
fIsInitialized = true;
}

protected:
Expand Down
88 changes: 57 additions & 31 deletions tree/ntuple/src/RNTupleMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,10 @@ CompareDescriptorStructure(const ROOT::RNTupleDescriptor &dst, const ROOT::RNTup
}

// Applies late model extension to `destination`, adding all `newFields` to it.
static void ExtendDestinationModel(std::span<const ROOT::RFieldDescriptor *> newFields, ROOT::RNTupleModel &dstModel,
RNTupleMergeData &mergeData, std::vector<RCommonField> &commonFields)
[[nodiscard]]
static ROOT::RResult<void>
ExtendDestinationModel(std::span<const ROOT::RFieldDescriptor *> newFields, ROOT::RNTupleModel &dstModel,
RNTupleMergeData &mergeData, std::vector<RCommonField> &commonFields)
{
assert(newFields.size() > 0); // no point in calling this with 0 new cols

Expand Down Expand Up @@ -636,23 +638,31 @@ static void ExtendDestinationModel(std::span<const ROOT::RFieldDescriptor *> new
ROOT::Internal::GetProjectedFieldsOfModel(dstModel).Add(std::move(field), fieldMap);
}
dstModel.Freeze();
mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries);
try {
mergeData.fDestination.UpdateSchema(changeset, mergeData.fNumDstEntries);
} catch (const ROOT::RException &ex) {
return R__FAIL(ex.GetError().GetReport());
}

commonFields.reserve(commonFields.size() + newFields.size());
for (const auto *field : newFields) {
const auto newFieldInDstId = mergeData.fDstDescriptor.FindFieldId(field->GetFieldName());
const auto &newFieldInDst = mergeData.fDstDescriptor.GetFieldDescriptor(newFieldInDstId);
commonFields.emplace_back(*field, newFieldInDst);
}

return ROOT::RResult<void>::Success();
}

// Generates default (zero) values for the given columns
static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<const RColumnMergeInfo> columns,
RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc,
const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
[[nodiscard]]
static ROOT::RResult<void>
GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<const RColumnMergeInfo> columns,
RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc,
const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
{
if (!nEntriesToGenerate)
return;
return ROOT::RResult<void>::Success();

for (const auto &column : columns) {
const ROOT::RFieldDescriptor *field = column.fParentFieldDescriptor;
Expand Down Expand Up @@ -682,12 +692,10 @@ static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<con
const auto structure = field->GetStructure();

if (structure == ROOT::ENTupleStructure::kStreamer) {
R__LOG_FATAL(NTupleMergeLog())
<< "RNTuple::Merge"
"Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
"Creating a default value for a streamer field is ill-defined, therefore the merging process will abort."
<< field->GetFieldName();
continue;
return R__FAIL(
"Destination RNTuple contains a streamer field (" + field->GetFieldName() +
") that is not present in one of the sources. "
"Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.");
}

// NOTE: we cannot have a Record here because it has no associated columns.
Expand Down Expand Up @@ -726,21 +734,23 @@ static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<con
sealedPageData.fPagesV.back().cend());
}
}
return ROOT::RResult<void>::Success();
}

// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
void RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
const ROOT::RClusterDescriptor &clusterDesc,
std::span<const RColumnMergeInfo> commonColumns,
const RCluster::ColumnSet_t &commonColumnSet,
std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData,
const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
ROOT::RResult<void>
RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
const ROOT::RClusterDescriptor &clusterDesc,
std::span<const RColumnMergeInfo> commonColumns,
const RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster,
RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData,
ROOT::Internal::RPageAllocator &pageAlloc)
{
assert(nCommonColumnsInCluster == commonColumnSet.size());
assert(nCommonColumnsInCluster <= commonColumns.size());
if (nCommonColumnsInCluster == 0)
return;
return ROOT::RResult<void>::Success();

const RCluster *cluster = clusterPool.GetCluster(clusterDesc.GetId(), commonColumnSet);
// we expect the cluster pool to contain the requested set of columns, since they were
Expand Down Expand Up @@ -795,8 +805,10 @@ void RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool
// TODO: also avoid doing this if we added no real page of this column to the destination yet.
if (columnDesc.GetFirstElementIndex() > clusterDesc.GetFirstEntryIndex() && mergeData.fNumDstEntries > 0) {
const auto nMissingEntries = columnDesc.GetFirstElementIndex() - clusterDesc.GetFirstEntryIndex();
GenerateZeroPagesForColumns(nMissingEntries, {&column, 1}, sealedPageData, pageAlloc, mergeData.fDstDescriptor,
mergeData);
auto res = GenerateZeroPagesForColumns(nMissingEntries, {&column, 1}, sealedPageData, pageAlloc,
mergeData.fDstDescriptor, mergeData);
if (!res)
return R__FORWARD_ERROR(res);
}

// Loop over the pages
Expand Down Expand Up @@ -844,15 +856,18 @@ void RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool
sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
sealedPageData.fPagesV.back().cend());
} // end loop over common columns

return ROOT::RResult<void>::Success();
}

// Iterates over all clusters of `source` and merges their pages into `destination`.
// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
// the destination's schemas.
// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
// compression is unspecified or matches the original compression settings.
void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
ROOT::RResult<void>
RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
{
ROOT::Internal::RClusterPool clusterPool{source};

Expand Down Expand Up @@ -894,10 +909,15 @@ void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RCo
missingColumns.push_back(commonColumns[i]);

RSealedPageMergeData sealedPageData;
MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster,
sealedPageData, mergeData, *fPageAlloc);
GenerateZeroPagesForColumns(nClusterEntries, missingColumns, sealedPageData, *fPageAlloc,
mergeData.fDstDescriptor, mergeData);
auto res = MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster,
sealedPageData, mergeData, *fPageAlloc);
if (!res)
return R__FORWARD_ERROR(res);

res = GenerateZeroPagesForColumns(nClusterEntries, missingColumns, sealedPageData, *fPageAlloc,
mergeData.fDstDescriptor, mergeData);
if (!res)
return R__FORWARD_ERROR(res);

// Commit the pages and the clusters
mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
Expand All @@ -914,6 +934,7 @@ void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RCo
// However, as of today, we aren't really handling such huge files, and even relatively big ones
// such as the CMS dataset have a page list size of about only 2 MB.
// So currently we simply merge all cluster groups into one.
return ROOT::RResult<void>::Success();
}

static std::optional<std::type_index> ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
Expand Down Expand Up @@ -977,7 +998,8 @@ static void AddColumnsFromField(std::vector<RColumnMergeInfo> &columns, const RO
// 2. when merging a deferred column into an existing column (in which case we need to fill the "hole" with
// zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this function),
// but for the second case they're not, and we need to pick the source field because we will then check the
// column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see GenerateZeroPagesForColumns()).
// column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see
// GenerateZeroPagesForColumns()).
info.fParentFieldDescriptor = &srcFieldDesc;
// Save the parent field descriptor since this may be either the source or destination descriptor depending on
// whether this is an extraDstField or a commonField. We will need this in GenerateZeroPagesForColumns() to
Expand Down Expand Up @@ -1170,7 +1192,9 @@ ROOT::RResult<void> RNTupleMerger::Merge(std::span<RPageSource *> sources, const
if (descCmp.fExtraSrcFields.size()) {
if (mergeOpts.fMergingMode == ENTupleMergingMode::kUnion) {
// late model extension for all fExtraSrcFields in Union mode
ExtendDestinationModel(descCmp.fExtraSrcFields, *fModel, mergeData, descCmp.fCommonFields);
auto res = ExtendDestinationModel(descCmp.fExtraSrcFields, *fModel, mergeData, descCmp.fCommonFields);
if (!res)
return R__FORWARD_ERROR(res);
} else if (mergeOpts.fMergingMode == ENTupleMergingMode::kStrict) {
// If the current source has extra fields and we're in Strict mode, error
std::string msg = "Source RNTuple has extra fields that the destination RNTuple doesn't have:";
Expand All @@ -1183,7 +1207,9 @@ ROOT::RResult<void> RNTupleMerger::Merge(std::span<RPageSource *> sources, const

// handle extra dst fields & common fields
auto columnInfos = GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
auto res = MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
if (!res)
return R__FORWARD_ERROR(res);
} // end loop over sources

if (fDestination->GetNEntries() == 0)
Expand Down
5 changes: 5 additions & 0 deletions tree/ntuple/src/RPageStorage.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,11 @@ ROOT::Internal::RPagePersistentSink::AddColumn(ROOT::DescriptorId_t fieldId, RCo
void ROOT::Internal::RPagePersistentSink::UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset,
ROOT::NTupleSize_t firstEntry)
{
if (fIsInitialized)
for (const auto &field : changeset.fAddedFields)
if (field->GetStructure() == ENTupleStructure::kStreamer)
throw ROOT::RException(R__FAIL("a Model cannot be extended with Streamer fields"));

Comment on lines +833 to +837
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change should probably also be tested outside the merger, in tree/ntuple/test/ntuple_modelext.cxx, as it's a general restriction...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const auto &descriptor = fDescriptorBuilder.GetDescriptor();

if (descriptor.GetNLogicalColumns() > descriptor.GetNPhysicalColumns()) {
Expand Down
Loading
Loading