1818
1919package com .dtstack .flink .sql .side .rdb .all ;
2020
21- import org . apache .flink .api . java . typeutils . RowTypeInfo ;
22-
21+ import com . dtstack .flink .sql . side . AbstractSideTableInfo ;
22+ import com . dtstack . flink . sql . side . BaseSideInfo ;
2323import com .dtstack .flink .sql .side .FieldInfo ;
2424import com .dtstack .flink .sql .side .JoinInfo ;
2525import com .dtstack .flink .sql .side .PredicateInfo ;
26- import com .dtstack .flink .sql .side .BaseSideInfo ;
27- import com .dtstack .flink .sql .side .AbstractSideTableInfo ;
2826import com .dtstack .flink .sql .side .rdb .table .RdbSideTableInfo ;
2927import com .dtstack .flink .sql .util .ParseUtils ;
3028import com .google .common .collect .Lists ;
3129import org .apache .calcite .sql .SqlNode ;
3230import org .apache .commons .collections .CollectionUtils ;
3331import org .apache .commons .lang3 .StringUtils ;
32+ import org .apache .flink .api .java .typeutils .RowTypeInfo ;
3433import org .slf4j .Logger ;
3534import org .slf4j .LoggerFactory ;
3635
37- import java .util .Arrays ;
3836import java .util .List ;
37+ import java .util .Map ;
38+ import java .util .Objects ;
3939import java .util .stream .Collectors ;
4040
4141/**
@@ -59,7 +59,18 @@ public RdbAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
5959 @ Override
6060 public void buildEqualInfo (JoinInfo joinInfo , AbstractSideTableInfo sideTableInfo ) {
6161 RdbSideTableInfo rdbSideTableInfo = (RdbSideTableInfo ) sideTableInfo ;
62- sqlCondition = getSelectFromStatement (getTableName (rdbSideTableInfo ), Arrays .asList (StringUtils .split (sideSelectFields , "," )), sideTableInfo .getPredicateInfoes ());
62+ List <String > selectFields = Lists .newArrayList ();
63+ Map <String , String > physicalFields = rdbSideTableInfo .getPhysicalFields ();
64+ physicalFields .keySet ().forEach (
65+ item -> {
66+ if (Objects .isNull (physicalFields .get (item ))) {
67+ selectFields .add (quoteIdentifier (item ));
68+ } else {
69+ selectFields .add (quoteIdentifier (physicalFields .get (item )) + " AS " + quoteIdentifier (item ));
70+ }
71+ }
72+ );
73+ sqlCondition = getSelectFromStatement (getTableName (rdbSideTableInfo ), selectFields , sideTableInfo .getPredicateInfoes ());
6374 LOG .info ("--------dimension sql query-------\n {}" + sqlCondition );
6475 }
6576
@@ -68,7 +79,7 @@ public String getAdditionalWhereClause() {
6879 }
6980
7081 private String getSelectFromStatement (String tableName , List <String > selectFields , List <PredicateInfo > predicateInfoes ) {
71- String fromClause = selectFields . stream (). map ( this :: quoteIdentifier ). collect ( Collectors . joining ( ", " ) );
82+ String fromClause = String . join ( ", " , selectFields );
7283 String predicateClause = predicateInfoes .stream ().map (this ::buildFilterCondition ).collect (Collectors .joining (" AND " ));
7384 String whereClause = buildWhereClause (predicateClause );
7485 return "SELECT " + fromClause + " FROM " + tableName + whereClause ;
0 commit comments