Skip to content

Commit 73a152a

Browse files
committed
[hotfix-37535][core&elasticsearch]Fixed dimension table join not out of the data.
1 parent b9d2880 commit 73a152a

File tree

3 files changed

+6
-4
lines changed

3 files changed

+6
-4
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
3535

3636
import java.io.Serializable;
37+
import java.util.Arrays;
3738
import java.util.List;
3839
import java.util.Map;
3940

@@ -92,17 +93,20 @@ public void parseSelectFields(JoinInfo joinInfo){
9293
String sideTableName = joinInfo.getSideTableName();
9394
String nonSideTableName = joinInfo.getNonSideTable();
9495
List<String> fields = Lists.newArrayList();
95-
int sideTableFieldIndex = 0;
96+
int sideTableFieldIndex;
9697

9798
for( int i=0; i<outFieldInfoList.size(); i++){
9899
FieldInfo fieldInfo = outFieldInfoList.get(i);
99100
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
100101
String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName());
101102
fields.add(sideFieldName);
103+
sideTableFieldIndex = Arrays.asList(sideTableInfo.getFields()).indexOf(sideFieldName);
104+
if (sideTableFieldIndex == -1){
105+
throw new RuntimeException(String.format("unknown filed {%s} in sideTable {%s} ", sideFieldName, sideTableName));
106+
}
102107
sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(fieldInfo.getFieldName()));
103108
sideFieldIndex.put(i, sideTableFieldIndex);
104109
sideFieldNameIndex.put(i, sideFieldName);
105-
sideTableFieldIndex++;
106110
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
107111
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
108112
inFieldIndex.put(i, nonSideIndex);

elasticsearch7/elasticsearch7-side/elasticsearch7-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch7/Elasticsearch7AllReqRow.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ private SearchSourceBuilder initConfiguration(BoolQueryBuilder boolQueryBuilder)
208208
}
209209

210210
searchSourceBuilder.size(getFetchSize());
211-
searchSourceBuilder.sort("_id", SortOrder.DESC);
212211

213212
// fields included in the source data
214213
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields().trim(), ",");

elasticsearch7/elasticsearch7-side/elasticsearch7-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch7/Elasticsearch7AsyncReqRow.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ public void close() throws Exception {
218218
private SearchSourceBuilder initConfiguration() {
219219
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
220220
searchSourceBuilder.size(getFetchSize());
221-
searchSourceBuilder.sort("_id", SortOrder.DESC);
222221
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields().trim(), ",");
223222
searchSourceBuilder.fetchSource(sideFieldNames, null);
224223

0 commit comments

Comments
 (0)