2323import com .dtstack .flink .sql .table .AbstractSourceTableInfo ;
2424import org .apache .commons .lang3 .StringUtils ;
2525import org .apache .flink .api .common .serialization .DeserializationSchema ;
26- import org .apache .flink .api .common .typeinfo .TypeInformation ;
2726import org .apache .flink .runtime .execution .SuppressRestartsException ;
2827import org .apache .flink .streaming .api .datastream .DataStreamSource ;
2928import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
4039import java .io .BufferedReader ;
4140import java .io .File ;
4241import java .io .FileInputStream ;
42+ import java .io .FileNotFoundException ;
4343import java .io .IOException ;
4444import java .io .InputStream ;
4545import java .io .InputStreamReader ;
4646import java .net .URI ;
4747import java .util .Locale ;
48- import java .util .Objects ;
48+ import java .util .concurrent . atomic . AtomicBoolean ;
4949
5050/**
5151 * @author tiezhu
@@ -63,113 +63,108 @@ public class FileSource implements IStreamSourceGener<Table>, SourceFunction<Row
6363 /**
6464 * Flag to mark the main work loop as alive.
6565 */
66- private volatile boolean running = true ;
66+ private final AtomicBoolean running = new AtomicBoolean ( true ) ;
6767
68- private FileSourceTableInfo fileSourceTableInfo ;
68+ private URI fileUri ;
6969
7070 private InputStream inputStream ;
7171
72- public void setDeserializationSchema (DeserializationSchema <Row > deserializationSchema ) {
73- this .deserializationSchema = deserializationSchema ;
74- }
75-
76- public void setFileSourceTableInfo (FileSourceTableInfo fileSourceTableInfo ) {
77- this .fileSourceTableInfo = fileSourceTableInfo ;
78- }
79-
80- public FileSourceTableInfo getFileSourceTableInfo () {
81- return fileSourceTableInfo ;
82- }
72+ private String charset ;
8373
8474 @ Override
8575 public Table genStreamSource (AbstractSourceTableInfo sourceTableInfo ,
8676 StreamExecutionEnvironment env ,
8777 StreamTableEnvironment tableEnv ) {
8878 FileSource fileSource = new FileSource ();
8979 FileSourceTableInfo tableInfo = (FileSourceTableInfo ) sourceTableInfo ;
90- fileSource .setFileSourceTableInfo (tableInfo );
91- fileSource .setDeserializationSchema (tableInfo .buildDeserializationSchema ());
92-
93- TypeInformation <Row > rowTypeInformation = tableInfo .getRowTypeInformation ();
94- String operatorName = tableInfo .getOperatorName ();
9580
96- DataStreamSource <?> source = env .addSource (fileSource , operatorName , rowTypeInformation );
81+ DataStreamSource <?> source = fileSource .initDataStream (tableInfo , env );
82+ String fields = StringUtils .join (tableInfo .getFields (), "," );
9783
98- String fields = StringUtils .join (fileSource .getFileSourceTableInfo ().getFields (), "," );
9984 return tableEnv .fromDataStream (source , fields );
10085 }
10186
87+ public DataStreamSource <?> initDataStream (FileSourceTableInfo tableInfo ,
88+ StreamExecutionEnvironment env ) {
89+ deserializationSchema = tableInfo .getDeserializationSchema ();
90+ fileUri = URI .create (tableInfo .getFilePath () + SP + tableInfo .getFileName ());
91+ charset = tableInfo .getCharsetName ();
92+ return env .addSource (
93+ this ,
94+ tableInfo .getOperatorName (),
95+ tableInfo .buildRowTypeInfo ());
96+ }
97+
10298 /**
10399 * 根据存储位置的不同,获取不同的input stream
104100 *
105- * @param sourceTableInfo source table info
101+ * @param fileUri file uri
106102 * @return input stream
107- * @throws Exception reader exception
108103 */
109- private InputStream getInputStream (FileSourceTableInfo sourceTableInfo ) throws Exception {
110- switch (sourceTableInfo .getLocation ().toLowerCase (Locale .ROOT )) {
111- case "local" :
112- return fromLocalFile (sourceTableInfo );
113- case "hdfs" :
114- return fromHdfsFile (sourceTableInfo );
115- default :
116- throw new IllegalArgumentException ();
104+ private InputStream getInputStream (URI fileUri ) {
105+ try {
106+ String scheme = fileUri .getScheme () == null ? "local" : fileUri .getScheme ();
107+ switch (scheme .toLowerCase (Locale .ROOT )) {
108+ case "local" :
109+ return fromLocalFile (fileUri );
110+ case "hdfs" :
111+ return fromHdfsFile (fileUri );
112+ default :
113+ throw new UnsupportedOperationException (
114+ String .format ("Unsupported type [%s] of file." , scheme )
115+ );
116+ }
117+ } catch (IOException e ) {
118+ throw new SuppressRestartsException (e );
117119 }
118120 }
119121
120122 /**
121123 * 从HDFS上获取文件内容
122124 *
123- * @param sourceTableInfo source table info
125+ * @param fileUri file uri of file
124126 * @return hdfs file input stream
125- * @throws Exception reader exception
127+ * @throws IOException reader exception
126128 */
127- private InputStream fromHdfsFile (FileSourceTableInfo sourceTableInfo ) throws Exception {
128- String filePath = sourceTableInfo .getFilePath ();
129- String fileName = sourceTableInfo .getFileName ();
130- String path = filePath + SP + fileName ;
131-
129+ private InputStream fromHdfsFile (URI fileUri ) throws IOException {
132130 Configuration conf = new Configuration ();
133- conf .addResource (new Path (sourceTableInfo .getHdfsSite ()));
134- conf .addResource (new Path (sourceTableInfo .getCoreSite ()));
135- FileSystem fs = FileSystem .newInstance (new URI (filePath ), conf , sourceTableInfo .getHdfsUser ());
136- return fs .open (new Path (path ));
131+
132+ // get conf from HADOOP_CONF_DIR
133+ String hadoopConfDir = System .getenv ("HADOOP_CONF_DIR" );
134+ String confHome = hadoopConfDir == null ? "." : hadoopConfDir ;
135+
136+ conf .addResource (new Path (confHome + SP + "hdfs-site.xml" ));
137+
138+ FileSystem fs = FileSystem .get (fileUri , conf );
139+ return fs .open (new Path (fileUri .getPath ()));
137140 }
138141
139142 /**
140143 * 从本地获取文件内容
141144 *
142- * @param sourceTableInfo source table
145+ * @param fileUri file uri of file
143146 * @return local file input stream
144- * @throws Exception read exception
147+ * @throws FileNotFoundException read exception
145148 */
146- private InputStream fromLocalFile (FileSourceTableInfo sourceTableInfo ) throws Exception {
147- String filePath = sourceTableInfo .getFilePath ();
148- String fileName = sourceTableInfo .getFileName ();
149- String path = filePath + SP + fileName ;
150- File file = new File (path );
149+ private InputStream fromLocalFile (URI fileUri ) throws FileNotFoundException {
150+ File file = new File (fileUri .getPath ());
151151 if (file .exists ()) {
152152 return new FileInputStream (file );
153153 } else {
154154 throw new SuppressRestartsException (new IOException (
155- String .format (
156- "File [%s] not exist. File path: [%s]" ,
157- sourceTableInfo .getFileName (),
158- sourceTableInfo .getFilePath ())
159- ));
155+ String .format ("File not exist. File path: [%s]" , fileUri .getPath ())));
160156 }
161157 }
162158
163-
164159 @ Override
165160 public void run (SourceContext <Row > ctx ) throws Exception {
166- inputStream = getInputStream (fileSourceTableInfo );
167- BufferedReader bufferedReader = new BufferedReader (new InputStreamReader (inputStream ));
161+ inputStream = getInputStream (fileUri );
162+ BufferedReader bufferedReader = new BufferedReader (new InputStreamReader (inputStream , charset ));
168163
169- while (running ) {
164+ while (running . get () ) {
170165 String line = bufferedReader .readLine ();
171166 if (line == null ) {
172- running = false ;
167+ running . compareAndSet ( true , false ) ;
173168 inputStream .close ();
174169 break ;
175170 } else {
@@ -182,8 +177,8 @@ public void run(SourceContext<Row> ctx) throws Exception {
182177 @ Override
183178 public void cancel () {
184179 LOG .info ("File source cancel.." );
185- running = false ;
186- if (Objects . nonNull ( inputStream ) ) {
180+ running . compareAndSet ( true , false ) ;
181+ if (inputStream != null ) {
187182 try {
188183 inputStream .close ();
189184 } catch (IOException ioException ) {
0 commit comments