Skip to content

Commit b7bfc38

Browse files
committed
[ntuple] Add Merger tests for Streamer fields
Also change the failure in GenerateZeroPages from a R__LOG_FATAL to a ROOT::Result failure
1 parent 58e1757 commit b7bfc38

File tree

3 files changed

+267
-36
lines changed

3 files changed

+267
-36
lines changed

tree/ntuple/inc/ROOT/RNTupleMerger.hxx

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class RNTuple;
3333
namespace Internal {
3434
class RPageAllocator;
3535
class RClusterPool;
36-
}
36+
} // namespace Internal
3737

3838
namespace Experimental::Internal {
3939

@@ -49,6 +49,12 @@ enum class ENTupleMergingMode {
4949
kUnion
5050
};
5151

52+
inline const char *ToString(ENTupleMergingMode mode)
53+
{
54+
static const char *const kMergingModeStr[] = {"Filter", "Strict", "Union"};
55+
return kMergingModeStr[static_cast<int>(mode)];
56+
}
57+
5258
enum class ENTupleMergeErrBehavior {
5359
/// The merger will abort merging as soon as an error is encountered
5460
kAbort,
@@ -98,14 +104,18 @@ class RNTupleMerger final {
98104
std::optional<TTaskGroup> fTaskGroup;
99105
std::unique_ptr<ROOT::RNTupleModel> fModel;
100106

101-
void MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc,
102-
std::span<const RColumnMergeInfo> commonColumns,
103-
const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet,
104-
std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData,
105-
const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc);
106-
107-
void MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
108-
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData);
107+
[[nodiscard]]
108+
ROOT::RResult<void> MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
109+
const ROOT::RClusterDescriptor &clusterDesc,
110+
std::span<const RColumnMergeInfo> commonColumns,
111+
const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet,
112+
std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData,
113+
const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc);
114+
115+
[[nodiscard]]
116+
ROOT::RResult<void>
117+
MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
118+
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData);
109119

110120
/// Creates a RNTupleMerger with the given destination.
111121
/// The model must be given if and only if `destination` has been initialized with that model

tree/ntuple/src/RNTupleMerger.cxx

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -647,12 +647,14 @@ static void ExtendDestinationModel(std::span<const ROOT::RFieldDescriptor *> new
647647
}
648648

649649
// Generates default (zero) values for the given columns
650-
static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<const RColumnMergeInfo> columns,
651-
RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc,
652-
const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
650+
[[nodiscard]]
651+
static ROOT::RResult<void>
652+
GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<const RColumnMergeInfo> columns,
653+
RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc,
654+
const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
653655
{
654656
if (!nEntriesToGenerate)
655-
return;
657+
return ROOT::RResult<void>::Success();
656658

657659
for (const auto &column : columns) {
658660
const ROOT::RFieldDescriptor *field = column.fParentFieldDescriptor;
@@ -682,12 +684,10 @@ static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<con
682684
const auto structure = field->GetStructure();
683685

684686
if (structure == ROOT::ENTupleStructure::kStreamer) {
685-
R__LOG_FATAL(NTupleMergeLog())
686-
<< "RNTuple::Merge"
687-
"Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
688-
"Creating a default value for a streamer field is ill-defined, therefore the merging process will abort."
689-
<< field->GetFieldName();
690-
continue;
687+
return R__FAIL(
688+
"Destination RNTuple contains a streamer field (" + field->GetFieldName() +
689+
") that is not present in one of the sources. "
690+
"Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.");
691691
}
692692

693693
// NOTE: we cannot have a Record here because it has no associated columns.
@@ -726,21 +726,23 @@ static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<con
726726
sealedPageData.fPagesV.back().cend());
727727
}
728728
}
729+
return ROOT::RResult<void>::Success();
729730
}
730731

731732
// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
732733
// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
733-
void RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
734-
const ROOT::RClusterDescriptor &clusterDesc,
735-
std::span<const RColumnMergeInfo> commonColumns,
736-
const RCluster::ColumnSet_t &commonColumnSet,
737-
std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData,
738-
const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
734+
ROOT::RResult<void>
735+
RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
736+
const ROOT::RClusterDescriptor &clusterDesc,
737+
std::span<const RColumnMergeInfo> commonColumns,
738+
const RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster,
739+
RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData,
740+
ROOT::Internal::RPageAllocator &pageAlloc)
739741
{
740742
assert(nCommonColumnsInCluster == commonColumnSet.size());
741743
assert(nCommonColumnsInCluster <= commonColumns.size());
742744
if (nCommonColumnsInCluster == 0)
743-
return;
745+
return ROOT::RResult<void>::Success();
744746

745747
const RCluster *cluster = clusterPool.GetCluster(clusterDesc.GetId(), commonColumnSet);
746748
// we expect the cluster pool to contain the requested set of columns, since they were
@@ -795,8 +797,10 @@ void RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool
795797
// TODO: also avoid doing this if we added no real page of this column to the destination yet.
796798
if (columnDesc.GetFirstElementIndex() > clusterDesc.GetFirstEntryIndex() && mergeData.fNumDstEntries > 0) {
797799
const auto nMissingEntries = columnDesc.GetFirstElementIndex() - clusterDesc.GetFirstEntryIndex();
798-
GenerateZeroPagesForColumns(nMissingEntries, {&column, 1}, sealedPageData, pageAlloc, mergeData.fDstDescriptor,
799-
mergeData);
800+
auto res = GenerateZeroPagesForColumns(nMissingEntries, {&column, 1}, sealedPageData, pageAlloc,
801+
mergeData.fDstDescriptor, mergeData);
802+
if (!res)
803+
return R__FORWARD_ERROR(res);
800804
}
801805

802806
// Loop over the pages
@@ -844,15 +848,18 @@ void RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool
844848
sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
845849
sealedPageData.fPagesV.back().cend());
846850
} // end loop over common columns
851+
852+
return ROOT::RResult<void>::Success();
847853
}
848854

849855
// Iterates over all clusters of `source` and merges their pages into `destination`.
850856
// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
851857
// the destination's schemas.
852858
// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
853859
// compression is unspecified or matches the original compression settings.
854-
void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
855-
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
860+
ROOT::RResult<void>
861+
RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
862+
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
856863
{
857864
ROOT::Internal::RClusterPool clusterPool{source};
858865

@@ -894,10 +901,15 @@ void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RCo
894901
missingColumns.push_back(commonColumns[i]);
895902

896903
RSealedPageMergeData sealedPageData;
897-
MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster,
898-
sealedPageData, mergeData, *fPageAlloc);
899-
GenerateZeroPagesForColumns(nClusterEntries, missingColumns, sealedPageData, *fPageAlloc,
900-
mergeData.fDstDescriptor, mergeData);
904+
auto res = MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster,
905+
sealedPageData, mergeData, *fPageAlloc);
906+
if (!res)
907+
return R__FORWARD_ERROR(res);
908+
909+
res = GenerateZeroPagesForColumns(nClusterEntries, missingColumns, sealedPageData, *fPageAlloc,
910+
mergeData.fDstDescriptor, mergeData);
911+
if (!res)
912+
return R__FORWARD_ERROR(res);
901913

902914
// Commit the pages and the clusters
903915
mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
@@ -914,6 +926,7 @@ void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RCo
914926
// However, as of today, we aren't really handling such huge files, and even relatively big ones
915927
// such as the CMS dataset have a page list size of about only 2 MB.
916928
// So currently we simply merge all cluster groups into one.
929+
return ROOT::RResult<void>::Success();
917930
}
918931

919932
static std::optional<std::type_index> ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
@@ -977,7 +990,8 @@ static void AddColumnsFromField(std::vector<RColumnMergeInfo> &columns, const RO
977990
// 2. when merging a deferred column into an existing column (in which case we need to fill the "hole" with
978991
// zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this function),
979992
// but for the second case they're not, and we need to pick the source field because we will then check the
980-
// column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see GenerateZeroPagesForColumns()).
993+
// column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see
994+
// GenerateZeroPagesForColumns()).
981995
info.fParentFieldDescriptor = &srcFieldDesc;
982996
// Save the parent field descriptor since this may be either the source or destination descriptor depending on
983997
// whether this is an extraDstField or a commonField. We will need this in GenerateZeroPagesForColumns() to
@@ -1183,7 +1197,9 @@ ROOT::RResult<void> RNTupleMerger::Merge(std::span<RPageSource *> sources, const
11831197

11841198
// handle extra dst fields & common fields
11851199
auto columnInfos = GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
1186-
MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
1200+
auto res = MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
1201+
if (!res)
1202+
return R__FORWARD_ERROR(res);
11871203
} // end loop over sources
11881204

11891205
if (fDestination->GetNEntries() == 0)

0 commit comments

Comments
 (0)