2525import org .apache .commons .lang .StringUtils ;
2626import org .apache .flink .hadoop .shaded .com .google .common .base .Charsets ;
2727import org .apache .flink .hadoop .shaded .com .google .common .base .Preconditions ;
28-
28+ import com . dtstack . flink . sql . util . PluginUtil ;
2929import java .io .File ;
3030import java .io .FileInputStream ;
3131import java .net .URLEncoder ;
3232import java .util .List ;
3333import java .util .Map ;
34- import java . util . Properties ;
34+ import com . dtstack . flink . sql . ClusterMode ;
3535
36- import static com .dtstack .flink .sql .launcher .LauncherOptions .*;
37- import static com .dtstack .flink .sql .launcher .ClusterMode .*;
3836
3937
4038/**
4543 */
4644public class LauncherOptionParser {
4745
46+ public static final String OPTION_MODE = "mode" ;
47+
48+ public static final String OPTION_NAME = "name" ;
49+
50+ public static final String OPTION_SQL = "sql" ;
51+
52+ public static final String OPTION_FLINK_CONF_DIR = "flinkconf" ;
53+
54+ public static final String OPTION_YARN_CONF_DIR = "yarnconf" ;
55+
56+ public static final String OPTION_LOCAL_SQL_PLUGIN_PATH = "localSqlPluginPath" ;
57+
58+ public static final String OPTION_REMOTE_SQL_PLUGIN_PATH = "remoteSqlPluginPath" ;
59+
60+ public static final String OPTION_ADDJAR = "addjar" ;
61+
62+ public static final String OPTION_CONF_PROP = "confProp" ;
63+
64+ public static final String OPTION_SAVE_POINT_PATH = "savePointPath" ;
65+
66+ public static final String OPTION_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState" ;
67+
4868 private Options options = new Options ();
4969
5070 private BasicParser parser = new BasicParser ();
5171
52- private Properties properties = new Properties ();
72+ private LauncherOptions properties = new LauncherOptions ();
5373
5474 public LauncherOptionParser (String [] args ) {
55- options .addOption (LauncherOptions . OPTION_MODE , true , "Running mode" );
75+ options .addOption (OPTION_MODE , true , "Running mode" );
5676 options .addOption (OPTION_SQL , true , "Job sql file" );
5777 options .addOption (OPTION_NAME , true , "Job name" );
5878 options .addOption (OPTION_FLINK_CONF_DIR , true , "Flink configuration directory" );
@@ -62,11 +82,14 @@ public LauncherOptionParser(String[] args) {
6282 options .addOption (OPTION_CONF_PROP , true , "sql ref prop,eg specify event time" );
6383 options .addOption (OPTION_YARN_CONF_DIR , true , "Yarn and hadoop configuration directory" );
6484
85+ options .addOption (OPTION_SAVE_POINT_PATH , true , "Savepoint restore path" );
86+ options .addOption (OPTION_ALLOW_NON_RESTORED_STATE , true , "Flag indicating whether non restored state is allowed if the savepoint" );
87+
6588 try {
6689 CommandLine cl = parser .parse (options , args );
67- String mode = cl .getOptionValue (OPTION_MODE , MODE_LOCAL );
90+ String mode = cl .getOptionValue (OPTION_MODE , ClusterMode . local . name () );
6891 //check mode
69- properties .put ( OPTION_MODE , mode );
92+ properties .setMode ( mode );
7093
7194 String job = Preconditions .checkNotNull (cl .getOptionValue (OPTION_SQL ),
7295 "Must specify job file using option '" + OPTION_SQL + "'" );
@@ -76,78 +99,65 @@ public LauncherOptionParser(String[] args) {
7699 in .read (filecontent );
77100 String content = new String (filecontent , "UTF-8" );
78101 String sql = URLEncoder .encode (content , Charsets .UTF_8 .name ());
79- properties .put (OPTION_SQL , sql );
80-
102+ properties .setSql (sql );
81103 String localPlugin = Preconditions .checkNotNull (cl .getOptionValue (OPTION_LOCAL_SQL_PLUGIN_PATH ));
82- properties .put (OPTION_LOCAL_SQL_PLUGIN_PATH , localPlugin );
83-
104+ properties .setLocalSqlPluginPath (localPlugin );
84105 String remotePlugin = cl .getOptionValue (OPTION_REMOTE_SQL_PLUGIN_PATH );
85- if (!mode . equalsIgnoreCase ( ClusterMode . MODE_LOCAL )){
106+ if (!ClusterMode . local . name (). equals ( mode )){
86107 Preconditions .checkNotNull (remotePlugin );
87- properties .put ( OPTION_REMOTE_SQL_PLUGIN_PATH , remotePlugin );
108+ properties .setRemoteSqlPluginPath ( remotePlugin );
88109 }
89-
90110 String name = Preconditions .checkNotNull (cl .getOptionValue (OPTION_NAME ));
91- properties .put (OPTION_NAME , name );
92-
111+ properties .setName (name );
93112 String addJar = cl .getOptionValue (OPTION_ADDJAR );
94113 if (StringUtils .isNotBlank (addJar )){
95- properties .put ( OPTION_ADDJAR , addJar );
114+ properties .setAddjar ( addJar );
96115 }
97-
98116 String confProp = cl .getOptionValue (OPTION_CONF_PROP );
99117 if (StringUtils .isNotBlank (confProp )){
100- properties .put ( OPTION_CONF_PROP , confProp );
118+ properties .setConfProp ( confProp );
101119 }
102-
103120 String flinkConfDir = cl .getOptionValue (OPTION_FLINK_CONF_DIR );
104121 if (StringUtils .isNotBlank (flinkConfDir )) {
105- properties .put ( OPTION_FLINK_CONF_DIR , flinkConfDir );
122+ properties .setFlinkconf ( flinkConfDir );
106123 }
107124
108125 String yarnConfDir = cl .getOptionValue (OPTION_YARN_CONF_DIR );
109126 if (StringUtils .isNotBlank (yarnConfDir )) {
110- properties .put (OPTION_YARN_CONF_DIR , yarnConfDir );
127+ properties .setYarnconf (yarnConfDir );
128+ }
129+
130+ String savePointPath = cl .getOptionValue (OPTION_SAVE_POINT_PATH );
131+ if (StringUtils .isNotBlank (savePointPath )) {
132+ properties .setSavePointPath (savePointPath );
133+ }
134+
135+ String allow_non = cl .getOptionValue (OPTION_ALLOW_NON_RESTORED_STATE );
136+ if (StringUtils .isNotBlank (allow_non )) {
137+ properties .setAllowNonRestoredState (allow_non );
111138 }
112139
113140 } catch (Exception e ) {
114141 throw new RuntimeException (e );
115142 }
116-
117143 }
118144
119- public Properties getProperties (){
145+ public LauncherOptions getLauncherOptions (){
120146 return properties ;
121147 }
122148
123- public Object getVal (String key ){
124- return properties .get (key );
125- }
126-
127- public List <String > getAllArgList (){
128- List <String > args = Lists .newArrayList ();
129- for (Map .Entry <Object , Object > one : properties .entrySet ()){
130- args .add ("-" + one .getKey ().toString ());
131- args .add (one .getValue ().toString ());
132- }
133-
134- return args ;
135- }
136-
137- public List <String > getProgramExeArgList (){
149+ public List <String > getProgramExeArgList () throws Exception {
150+ Map <String ,Object > mapConf = PluginUtil .ObjectToMap (properties );
138151 List <String > args = Lists .newArrayList ();
139- for (Map .Entry <Object , Object > one : properties .entrySet ()){
140- String key = one .getKey (). toString () ;
152+ for (Map .Entry <String , Object > one : mapConf .entrySet ()){
153+ String key = one .getKey ();
141154 if (OPTION_FLINK_CONF_DIR .equalsIgnoreCase (key )
142155 || OPTION_YARN_CONF_DIR .equalsIgnoreCase (key )){
143156 continue ;
144157 }
145-
146158 args .add ("-" + key );
147159 args .add (one .getValue ().toString ());
148160 }
149-
150161 return args ;
151162 }
152-
153163}
0 commit comments