Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ public PartialPath getDevicePath() {
return devicePath;
}

public String getAbbrString() {
return devicePath.toString()
+ ", isAligned: "
+ isAligned
+ ", measurements: "
+ measurementSchemaInfoList.stream()
.map(IMeasurementSchemaInfo::getName)
.collect(Collectors.toList());
}

public boolean isAligned() {
return isAligned;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
Expand All @@ -56,6 +57,8 @@
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -70,6 +73,7 @@
import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;

class AutoCreateSchemaExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(AutoCreateSchemaExecutor.class);
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final Coordinator coordinator;
private final ITemplateManager templateManager;
Expand Down Expand Up @@ -425,6 +429,11 @@ void autoCreateMissingMeasurements(
template = templateManager.getTemplate(entry.getValue().left.getId());
schemaTree.appendTemplateDevice(
devicePath, template.isDirectAligned(), template.getId(), template);
LOGGER.info(
"Schema tree updated by template, Devices: {}",
schemaTree.getAllDevices().stream()
.map(DeviceSchemaInfo::getAbbrString)
.collect(Collectors.toSet()));
}
}

Expand Down Expand Up @@ -595,6 +604,11 @@ private void internalCreateTimeSeries(
new InternalCreateMultiTimeSeriesStatement(devicesNeedAutoCreateTimeSeries), context);

schemaTree.appendMeasurementPaths(measurementPathList);
LOGGER.info(
"Schema tree updated by already existing, Devices: {}",
schemaTree.getAllDevices().stream()
.map(DeviceSchemaInfo::getAbbrString)
.collect(Collectors.toSet()));

Map<PartialPath, Set<String>> alreadyExistingMeasurementMap = new HashMap<>();
for (MeasurementPath measurementPath : measurementPathList) {
Expand Down Expand Up @@ -624,6 +638,11 @@ private void internalCreateTimeSeries(
null,
null,
entry.getValue().left);
LOGGER.info(
"Schema tree updated by created, Devices: {}",
schemaTree.getAllDevices().stream()
.map(DeviceSchemaInfo::getAbbrString)
.collect(Collectors.toSet()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -53,7 +55,7 @@
import java.util.stream.IntStream;

public class ClusterSchemaFetcher implements ISchemaFetcher {

private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSchemaFetcher.class);
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

private final Coordinator coordinator = Coordinator.getInstance();
Expand Down Expand Up @@ -285,6 +287,11 @@ public ISchemaTree fetchSchemaListWithAutoCreate(
final List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
for (int i = 0; i < devicePathList.size(); i++) {
schemaTree.mergeSchemaTree(schemaCache.get(devicePathList.get(i), measurementsList.get(i)));
LOGGER.info(
"Schema tree updated by cache, Devices: {}",
schemaTree.getAllDevices().stream()
.map(DeviceSchemaInfo::getAbbrString)
.collect(Collectors.toSet()));
final List<Integer> indexOfMissingMeasurements =
checkMissingMeasurements(schemaTree, devicePathList.get(i), measurementsList.get(i));
if (!indexOfMissingMeasurements.isEmpty()) {
Expand All @@ -308,6 +315,11 @@ public ISchemaTree fetchSchemaListWithAutoCreate(
context);
if (!remoteSchemaTree.isEmpty()) {
schemaTree.mergeSchemaTree(remoteSchemaTree);
LOGGER.info(
"Schema tree updated by remote fetch, Devices: {}",
schemaTree.getAllDevices().stream()
.map(DeviceSchemaInfo::getAbbrString)
.collect(Collectors.toSet()));
}

if (!config.isAutoCreateSchemaEnabled()) {
Expand Down