Skip to content

Commit 027e773

Browse files
committed
[ntuple] Add Merger tests for Streamer fields
Also change the failure in GenerateZeroPages from a R__LOG_FATAL to a ROOT::Result failure
1 parent ca5199c commit 027e773

File tree

3 files changed

+267
-36
lines changed

3 files changed

+267
-36
lines changed

tree/ntuple/inc/ROOT/RNTupleMerger.hxx

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class RNTuple;
3333
namespace Internal {
3434
class RPageAllocator;
3535
class RClusterPool;
36-
}
36+
} // namespace Internal
3737

3838
namespace Experimental::Internal {
3939

@@ -49,6 +49,12 @@ enum class ENTupleMergingMode {
4949
kUnion
5050
};
5151

52+
inline const char *ToString(ENTupleMergingMode mode)
53+
{
54+
static const char *const kMergingModeStr[] = {"Filter", "Strict", "Union"};
55+
return kMergingModeStr[static_cast<int>(mode)];
56+
}
57+
5258
enum class ENTupleMergeErrBehavior {
5359
/// The merger will abort merging as soon as an error is encountered
5460
kAbort,
@@ -98,14 +104,18 @@ class RNTupleMerger final {
98104
std::optional<TTaskGroup> fTaskGroup;
99105
std::unique_ptr<ROOT::RNTupleModel> fModel;
100106

101-
void MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc,
102-
std::span<const RColumnMergeInfo> commonColumns,
103-
const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet,
104-
std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData,
105-
const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc);
106-
107-
void MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
108-
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData);
107+
[[nodiscard]]
108+
ROOT::RResult<void> MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
109+
const ROOT::RClusterDescriptor &clusterDesc,
110+
std::span<const RColumnMergeInfo> commonColumns,
111+
const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet,
112+
std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData,
113+
const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc);
114+
115+
[[nodiscard]]
116+
ROOT::RResult<void>
117+
MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
118+
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData);
109119

110120
/// Creates a RNTupleMerger with the given destination.
111121
/// The model must be given if and only if `destination` has been initialized with that model

tree/ntuple/src/RNTupleMerger.cxx

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -646,12 +646,14 @@ static void ExtendDestinationModel(std::span<const ROOT::RFieldDescriptor *> new
646646
}
647647

648648
// Generates default (zero) values for the given columns
649-
static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<const RColumnMergeInfo> columns,
650-
RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc,
651-
const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
649+
[[nodiscard]]
650+
static ROOT::RResult<void>
651+
GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<const RColumnMergeInfo> columns,
652+
RSealedPageMergeData &sealedPageData, ROOT::Internal::RPageAllocator &pageAlloc,
653+
const ROOT::RNTupleDescriptor &dstDescriptor, const RNTupleMergeData &mergeData)
652654
{
653655
if (!nEntriesToGenerate)
654-
return;
656+
return ROOT::RResult<void>::Success();
655657

656658
for (const auto &column : columns) {
657659
const ROOT::RFieldDescriptor *field = column.fParentFieldDescriptor;
@@ -681,12 +683,10 @@ static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<con
681683
const auto structure = field->GetStructure();
682684

683685
if (structure == ROOT::ENTupleStructure::kStreamer) {
684-
R__LOG_FATAL(NTupleMergeLog())
685-
<< "RNTuple::Merge"
686-
"Destination RNTuple contains a streamer field (%s) that is not present in one of the sources. "
687-
"Creating a default value for a streamer field is ill-defined, therefore the merging process will abort."
688-
<< field->GetFieldName();
689-
continue;
686+
return R__FAIL(
687+
"Destination RNTuple contains a streamer field (" + field->GetFieldName() +
688+
") that is not present in one of the sources. "
689+
"Creating a default value for a streamer field is ill-defined, therefore the merging process will abort.");
690690
}
691691

692692
// NOTE: we cannot have a Record here because it has no associated columns.
@@ -725,21 +725,23 @@ static void GenerateZeroPagesForColumns(size_t nEntriesToGenerate, std::span<con
725725
sealedPageData.fPagesV.back().cend());
726726
}
727727
}
728+
return ROOT::RResult<void>::Success();
728729
}
729730

730731
// Merges all columns appearing both in the source and destination RNTuples, just copying them if their
731732
// compression matches ("fast merge") or by unsealing and resealing them with the proper compression.
732-
void RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
733-
const ROOT::RClusterDescriptor &clusterDesc,
734-
std::span<const RColumnMergeInfo> commonColumns,
735-
const RCluster::ColumnSet_t &commonColumnSet,
736-
std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData,
737-
const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc)
733+
ROOT::RResult<void>
734+
RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
735+
const ROOT::RClusterDescriptor &clusterDesc,
736+
std::span<const RColumnMergeInfo> commonColumns,
737+
const RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster,
738+
RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData,
739+
ROOT::Internal::RPageAllocator &pageAlloc)
738740
{
739741
assert(nCommonColumnsInCluster == commonColumnSet.size());
740742
assert(nCommonColumnsInCluster <= commonColumns.size());
741743
if (nCommonColumnsInCluster == 0)
742-
return;
744+
return ROOT::RResult<void>::Success();
743745

744746
const RCluster *cluster = clusterPool.GetCluster(clusterDesc.GetId(), commonColumnSet);
745747
// we expect the cluster pool to contain the requested set of columns, since they were
@@ -794,8 +796,10 @@ void RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool
794796
// TODO: also avoid doing this if we added no real page of this column to the destination yet.
795797
if (columnDesc.GetFirstElementIndex() > clusterDesc.GetFirstEntryIndex() && mergeData.fNumDstEntries > 0) {
796798
const auto nMissingEntries = columnDesc.GetFirstElementIndex() - clusterDesc.GetFirstEntryIndex();
797-
GenerateZeroPagesForColumns(nMissingEntries, {&column, 1}, sealedPageData, pageAlloc, mergeData.fDstDescriptor,
798-
mergeData);
799+
auto res = GenerateZeroPagesForColumns(nMissingEntries, {&column, 1}, sealedPageData, pageAlloc,
800+
mergeData.fDstDescriptor, mergeData);
801+
if (!res)
802+
return R__FORWARD_ERROR(res);
799803
}
800804

801805
// Loop over the pages
@@ -843,15 +847,18 @@ void RNTupleMerger::MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool
843847
sealedPageData.fGroups.emplace_back(column.fOutputId, sealedPageData.fPagesV.back().cbegin(),
844848
sealedPageData.fPagesV.back().cend());
845849
} // end loop over common columns
850+
851+
return ROOT::RResult<void>::Success();
846852
}
847853

848854
// Iterates over all clusters of `source` and merges their pages into `destination`.
849855
// It is assumed that all columns in `commonColumns` are present (and compatible) in both the source and
850856
// the destination's schemas.
851857
// The pages may be "fast-merged" (i.e. simply copied with no decompression/recompression) if the target
852858
// compression is unspecified or matches the original compression settings.
853-
void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
854-
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
859+
ROOT::RResult<void>
860+
RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
861+
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData)
855862
{
856863
ROOT::Internal::RClusterPool clusterPool{source};
857864

@@ -893,10 +900,15 @@ void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RCo
893900
missingColumns.push_back(commonColumns[i]);
894901

895902
RSealedPageMergeData sealedPageData;
896-
MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster,
897-
sealedPageData, mergeData, *fPageAlloc);
898-
GenerateZeroPagesForColumns(nClusterEntries, missingColumns, sealedPageData, *fPageAlloc,
899-
mergeData.fDstDescriptor, mergeData);
903+
auto res = MergeCommonColumns(clusterPool, clusterDesc, commonColumns, commonColumnSet, nCommonColumnsInCluster,
904+
sealedPageData, mergeData, *fPageAlloc);
905+
if (!res)
906+
return R__FORWARD_ERROR(res);
907+
908+
res = GenerateZeroPagesForColumns(nClusterEntries, missingColumns, sealedPageData, *fPageAlloc,
909+
mergeData.fDstDescriptor, mergeData);
910+
if (!res)
911+
return R__FORWARD_ERROR(res);
900912

901913
// Commit the pages and the clusters
902914
mergeData.fDestination.CommitSealedPageV(sealedPageData.fGroups);
@@ -913,6 +925,7 @@ void RNTupleMerger::MergeSourceClusters(RPageSource &source, std::span<const RCo
913925
// However, as of today, we aren't really handling such huge files, and even relatively big ones
914926
// such as the CMS dataset have a page list size of about only 2 MB.
915927
// So currently we simply merge all cluster groups into one.
928+
return ROOT::RResult<void>::Success();
916929
}
917930

918931
static std::optional<std::type_index> ColumnInMemoryType(std::string_view fieldType, ENTupleColumnType onDiskType)
@@ -976,7 +989,8 @@ static void AddColumnsFromField(std::vector<RColumnMergeInfo> &columns, const RO
976989
// 2. when merging a deferred column into an existing column (in which case we need to fill the "hole" with
977990
// zeroes). For the first case srcFieldDesc and dstFieldDesc are the same (see the calling site of this function),
978991
// but for the second case they're not, and we need to pick the source field because we will then check the
979-
// column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see GenerateZeroPagesForColumns()).
992+
// column's *input* id inside fParentFieldDescriptor to see if it's a suppressed column (see
993+
// GenerateZeroPagesForColumns()).
980994
info.fParentFieldDescriptor = &srcFieldDesc;
981995
// Save the parent field descriptor since this may be either the source or destination descriptor depending on
982996
// whether this is an extraDstField or a commonField. We will need this in GenerateZeroPagesForColumns() to
@@ -1182,7 +1196,9 @@ ROOT::RResult<void> RNTupleMerger::Merge(std::span<RPageSource *> sources, const
11821196

11831197
// handle extra dst fields & common fields
11841198
auto columnInfos = GatherColumnInfos(descCmp, srcDescriptor.GetRef(), mergeData);
1185-
MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
1199+
auto res = MergeSourceClusters(*source, columnInfos.fCommonColumns, columnInfos.fExtraDstColumns, mergeData);
1200+
if (!res)
1201+
return R__FORWARD_ERROR(res);
11861202
} // end loop over sources
11871203

11881204
if (fDestination->GetNEntries() == 0)

0 commit comments

Comments
 (0)