2828import org .apache .calcite .sql .SqlNode ;
2929import org .apache .flink .api .java .typeutils .RowTypeInfo ;
3030
31+ import java .util .Arrays ;
3132import java .util .List ;
3233
3334/**
@@ -43,6 +44,40 @@ public Elasticsearch7AsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, L
4344 super (rowTypeInfo , joinInfo , outFieldInfoList , sideTableInfo );
4445 }
4546
47+ @ Override
48+ public void parseSelectFields (JoinInfo joinInfo ){
49+ String sideTableName = joinInfo .getSideTableName ();
50+ String nonSideTableName = joinInfo .getNonSideTable ();
51+ List <String > fields = Lists .newArrayList ();
52+ int sideTableFieldIndex ;
53+
54+ for ( int i =0 ; i <outFieldInfoList .size (); i ++){
55+ FieldInfo fieldInfo = outFieldInfoList .get (i );
56+ if (fieldInfo .getTable ().equalsIgnoreCase (sideTableName )){
57+ String sideFieldName = sideTableInfo .getPhysicalFields ().getOrDefault (fieldInfo .getFieldName (), fieldInfo .getFieldName ());
58+ fields .add (sideFieldName );
59+ sideTableFieldIndex = Arrays .asList (sideTableInfo .getFields ()).indexOf (sideFieldName );
60+ if (sideTableFieldIndex == -1 ){
61+ throw new RuntimeException (String .format ("unknown filed {%s} in sideTable {%s} " , sideFieldName , sideTableName ));
62+ }
63+ sideSelectFieldsType .put (sideTableFieldIndex , getTargetFieldType (fieldInfo .getFieldName ()));
64+ sideFieldIndex .put (i , sideTableFieldIndex );
65+ sideFieldNameIndex .put (i , sideFieldName );
66+ }else if (fieldInfo .getTable ().equalsIgnoreCase (nonSideTableName )){
67+ int nonSideIndex = rowTypeInfo .getFieldIndex (fieldInfo .getFieldName ());
68+ inFieldIndex .put (i , nonSideIndex );
69+ }else {
70+ throw new RuntimeException ("unknown table " + fieldInfo .getTable ());
71+ }
72+ }
73+
74+ if (fields .size () == 0 ){
75+ throw new RuntimeException ("select non field from table " + sideTableName );
76+ }
77+
78+ sideSelectFields = String .join ("," , fields );
79+ }
80+
4681 @ Override
4782 public void buildEqualInfo (JoinInfo joinInfo , AbstractSideTableInfo sideTableInfo ) {
4883
0 commit comments