Skip to content

Commit b9d2880

Browse files
author
gituser
committed
Merge branch '1.10_release_4.1.x' into 1.10_release_4.2.x
2 parents 1b532fe + 0e81e46 commit b9d2880

File tree

5 files changed

+28
-3
lines changed

5 files changed

+28
-3
lines changed

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
9595

9696
private List<PredicateInfo> predicateInfoes = Lists.newArrayList();
9797

98+
private List<PredicateInfo> fullPredicateInfoes = Lists.newArrayList();
99+
98100
private boolean fastCheck;
99101

100102
public RowTypeInfo getRowTypeInfo(){
@@ -197,6 +199,14 @@ public void addPredicateInfo(PredicateInfo predicateInfo) {
197199
this.predicateInfoes.add(predicateInfo);
198200
}
199201

202+
public List<PredicateInfo> getFullPredicateInfoes() {
203+
return fullPredicateInfoes;
204+
}
205+
206+
public void addFullPredicateInfoes(PredicateInfo predicateInfo) {
207+
this.fullPredicateInfoes.add(predicateInfo);
208+
}
209+
200210
public Long getAsyncFailMaxNum(Long defaultValue) {
201211
return Objects.isNull(asyncFailMaxNum) ? defaultValue : asyncFailMaxNum;
202212
}

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier,
194194
.setCondition(constant.toString())
195195
.build();
196196
sideTableInfo.addPredicateInfo(predicate);
197+
sideTableInfo.addFullPredicateInfoes(predicate);
197198
}
198199

199200
private void checkSupport(SqlIdentifier identifier) {

core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ private void parseSql(SqlNode sqlNode, Map<String, AbstractSideTableInfo> sideTa
109109

110110
private void fillToSideTableInfo(Map<String, AbstractSideTableInfo> sideTableMap, Map<String, String> tabMapping, List<PredicateInfo> predicateInfoList) {
111111
predicateInfoList.stream().filter(info -> sideTableMap.containsKey(tabMapping.getOrDefault(info.getOwnerTable(), info.getOwnerTable())))
112-
.map(info -> sideTableMap.get(tabMapping.getOrDefault(info.getOwnerTable(), info.getOwnerTable())).getPredicateInfoes().add(info))
112+
.map(info -> sideTableMap.get(tabMapping.getOrDefault(info.getOwnerTable(), info.getOwnerTable())).getFullPredicateInfoes().add(info))
113113
.count();
114114
}
115115

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.util.Map;
7676
import java.util.Queue;
7777
import java.util.Set;
78+
import java.util.stream.Collectors;
7879

7980
import static org.apache.calcite.sql.SqlKind.AS;
8081
import static org.apache.calcite.sql.SqlKind.INSERT;
@@ -506,6 +507,8 @@ private void joinFun(Object pollObj,
506507
sideTableInfo = sideTableMap.get(joinInfo.getRightTableAlias());
507508
}
508509

510+
extractActualSidePredicateInfos(joinInfo, sideTableInfo);
511+
509512
if (sideTableInfo == null) {
510513
throw new RuntimeException("can't not find side table:" + joinInfo.getRightTableName());
511514
}
@@ -592,6 +595,18 @@ private void joinFun(Object pollObj,
592595
}
593596
}
594597

598+
/**
599+
* 抽取维表本次真正使用的谓词集合
600+
* @param joinInfo 维表join信息
601+
* @param sideTableInfo 维表实体信息
602+
*/
603+
private void extractActualSidePredicateInfos(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
604+
sideTableInfo.setPredicateInfoes(sideTableInfo.getFullPredicateInfoes()
605+
.stream()
606+
.filter(e -> e.getOwnerTable().equals(joinInfo.getRightTableAlias()))
607+
.collect(Collectors.toList()));
608+
}
609+
595610
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table) {
596611
List<String> fieldNames = new LinkedList<>();
597612
String fieldsInfo = result.getFieldsInfoStr();

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -758,8 +758,7 @@ public static void addConstant(Map<String, Object> keyMap, AbstractSideTableInfo
758758
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
759759
final String name = sideTableInfo.getName();
760760
for (PredicateInfo info : predicateInfos) {
761-
if (info.getOwnerTable().equals(name)
762-
&& info.getOperatorName().equals("=")) {
761+
if (info.getOperatorName().equals("=")) {
763762
String condition = info.getCondition();
764763
Matcher matcher = stringPattern.matcher(condition);
765764
if (matcher.matches()) {

0 commit comments

Comments
 (0)