Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public static void print(Reader iter, PrintStream out) throws IOException {
int i = 1;
for (SummarySerializer summaryStore : stores) {
out.printf("%sSummary %d of %d generated by : %s\n", indent, i, stores.size(),
summaryStore.getSummarizerConfiguration());
summaryStore.summarizerConfiguration());
i++;
summaryStore.print(indent, indent, out);
}
Expand Down Expand Up @@ -297,11 +297,11 @@ public SummaryCollection getSummaries(List<RowRange> ranges) {
}
for (SummarySerializer summaryStore : summaryStores) {
if (summaryStore.exceededMaxSize()) {
initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration()));
initial.add(new SummaryCollection.FileSummary(summaryStore.summarizerConfiguration()));
} else {
Map<String,Long> summary = summaryStore.getSummary(ranges, factory);
boolean exceeded = summaryStore.exceedsRange(ranges);
initial.add(new SummaryCollection.FileSummary(summaryStore.getSummarizerConfiguration(),
initial.add(new SummaryCollection.FileSummary(summaryStore.summarizerConfiguration(),
summary, exceeded));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,34 +46,17 @@

/**
* This class supports serializing summaries and periodically storing summaries. The implementations
* attempts to generate around 10 summaries that are evenly spaced. This allows asking for summaries
* attempt to generate around 10 summaries that are evenly spaced. This allows asking for summaries
* for sub-ranges of data in a rfile.
*
* <p>
* At first summaries are created for every 1000 keys values. After 10 summaries are added, the 10
* At first, summaries are created for every 1000 keys values. After 10 summaries are added, the 10
* summaries are merged to 5 and summaries are then created for every 2000 key values. The code
* keeps merging summaries and doubling the amount of key values per summary. This results in each
* summary covering about the same number of key values.
*/
class SummarySerializer {

private final SummarizerConfiguration sconf;
private final LgSummaries[] allSummaries;

private SummarySerializer(SummarizerConfiguration sconf, LgSummaries[] allSummaries) {
this.sconf = sconf;
this.allSummaries = allSummaries;
}

private SummarySerializer(SummarizerConfiguration sconf) {
this.sconf = sconf;
// this indicates max size was exceeded
this.allSummaries = null;
}

public SummarizerConfiguration getSummarizerConfiguration() {
return sconf;
}
record SummarySerializer(SummarizerConfiguration summarizerConfiguration,
LgSummaries[] allSummaries) {

public void print(String prefix, String indent, PrintStream out) {

Expand All @@ -88,16 +71,21 @@ public void print(String prefix, String indent, PrintStream out) {

public Map<String,Long> getSummary(List<RowRange> ranges, SummarizerFactory sf) {

Summarizer kvs = sf.getSummarizer(sconf);
Preconditions.checkState(allSummaries != null,
"Summaries were not stored because they exceeded the maximum size");

Summarizer kvs = sf.getSummarizer(summarizerConfiguration);

Map<String,Long> summary = new HashMap<>();
for (LgSummaries lgs : allSummaries) {
lgs.getSummary(ranges, kvs.combiner(sconf), summary);
lgs.getSummary(ranges, kvs.combiner(summarizerConfiguration), summary);
}
return summary;
}

public boolean exceedsRange(List<RowRange> ranges) {
Preconditions.checkState(allSummaries != null,
"Summaries were not stored because they exceeded the maximum size");
ranges.forEach(SummarySerializer::validateSummaryRange);
return Arrays.stream(allSummaries).anyMatch(lgs -> ranges.stream().anyMatch(lgs::exceedsRange));
}
Expand All @@ -106,8 +94,7 @@ public boolean exceededMaxSize() {
return allSummaries == null;
}

private static class SummaryStoreImpl
implements org.apache.accumulo.core.client.summary.Summarizer.StatisticConsumer {
private static class SummaryStoreImpl implements Summarizer.StatisticConsumer {

HashMap<String,Long> summaries;

Expand All @@ -122,7 +109,7 @@ private static class LgBuilder {
private final SummarizerConfiguration conf;
private Collector collector;

private final int maxSummaries = 10;
private static final int MAX_SUMMARIES = 10;

private int cutoff = 1000;
private int count = 0;
Expand Down Expand Up @@ -190,7 +177,7 @@ private void addSummary(Text row, Map<String,Long> summary, int count) {
Preconditions.checkState(!finished);
summaries.add(new SummaryInfo(row, summary, count));

if (summaries.size() % 2 == 0 && summaries.size() > maxSummaries) {
if (summaries.size() % 2 == 0 && summaries.size() > MAX_SUMMARIES) {
summaries = merge(summaries.size());
cutoff *= 2;
}
Expand Down Expand Up @@ -391,7 +378,8 @@ static SummarySerializer load(SummarizerConfiguration sconf, DataInputStream in)
throws IOException {
boolean exceededMaxSize = in.readBoolean();
if (exceededMaxSize) {
return new SummarySerializer(sconf);
// null indicates max size was exceeded
return new SummarySerializer(sconf, null);
} else {
WritableUtils.readVInt(in);
// load symbol table
Expand All @@ -411,17 +399,7 @@ static SummarySerializer load(SummarizerConfiguration sconf, DataInputStream in)
}
}

private static class LgSummaries {

private final Text firstRow;
private final SummaryInfo[] summaries;
private final String lgroupName;

LgSummaries(Text firstRow, SummaryInfo[] summaries, String lgroupName) {
this.firstRow = firstRow;
this.summaries = summaries;
this.lgroupName = lgroupName;
}
private record LgSummaries(Text firstRow, SummaryInfo[] summaries, String lgroupName) {

boolean exceedsRange(RowRange range) {

Expand Down