Skip to content

Commit fab410f

Browse files
committed
[feat-35405][file] add file source. Support fileFormat [json] and [csv]. Support location [hdfs] and [local].
1 parent 4121a31 commit fab410f

File tree

14 files changed

+1224
-4
lines changed

14 files changed

+1224
-4
lines changed

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ public abstract class AbstractTableInfo implements Serializable {
4141

4242
public static final String PARALLELISM_KEY = "parallelism";
4343
public static final String ERROR_LIMIT = "errorLimit";
44+
public static final Boolean DEFAULT_FALSE = false;
45+
public static final Boolean DEFAULT_TRUE = true;
46+
public static final Object DEFAULT_NULL = null;
4447
private final List<String> fieldList = Lists.newArrayList();
4548
private final List<String> fieldTypeList = Lists.newArrayList();
4649
private final List<Class> fieldClassList = Lists.newArrayList();

core/src/main/java/com/dtstack/flink/sql/util/DtStringUtil.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,10 +252,12 @@ public static String col2string(Object column, String type) {
252252
return result.toString();
253253
}
254254

255-
public static String getPluginTypeWithoutVersion(String engineType){
256-
255+
public static String getPluginTypeWithoutVersion(String engineType) {
257256
Matcher matcher = NO_VERSION_PATTERN.matcher(engineType);
258-
if(!matcher.find()){
257+
258+
if (!engineType.equals("kafka")) {
259+
return engineType;
260+
} else if (!matcher.find()) {
259261
return engineType;
260262
}
261263

core/src/main/java/com/dtstack/flink/sql/util/MathUtil.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,4 +263,16 @@ public static Timestamp getTimestamp(Object obj) {
263263
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Date.");
264264
}
265265

266+
public static Character getChar(Object obj) {
267+
if (obj == null) {
268+
return null;
269+
}
270+
if (obj instanceof Character) {
271+
return (Character) obj;
272+
}
273+
if (obj instanceof String) {
274+
return String.valueOf(obj).charAt(0);
275+
}
276+
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Char");
277+
}
266278
}

docs/plugin/filesource.md

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
## 1.格式:
2+
3+
```
4+
CREATE TABLE tableName(
5+
colName colType,
6+
...
7+
)WITH(
8+
type ='file',
9+
format = 'csv',
10+
fieldDelimiter = ',',
11+
fileName = 'xxxx',
12+
filePath = 'xx/xxx',
13+
location = 'local',
14+
nullLiteral = 'null',
15+
allowComment = 'true',
16+
arrayElementDelimiter = ',',
17+
quoteCharacter = '"',
18+
escapeCharacter = '\\',
19+
ignoreParseErrors = 'true',
20+
hdfsSite = 'xxx/hdfs-site.xml',
21+
coreSite = 'xxx/core-site.xml',
22+
hdfsUser = 'root',
23+
charsetName = 'UTF-8'
24+
);
25+
```
26+
27+
## 2.支持的格式
28+
29+
支持 HDFS、 Local 支持 Csv、Json、Arvo 格式文件
30+
31+
## 3.表结构定义
32+
33+
|参数名称|含义|
34+
|----|---|
35+
| tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称|
36+
| colName | 列名称|
37+
| colType | 列类型 [colType支持的类型](../colType.md)|
38+
39+
## 4.参数
40+
41+
通用参数设置
42+
43+
|参数名称|默认值|是否必填|参数说明|
44+
|----|---|---|---|
45+
|type|file||当前表的类型|
46+
|format|csv||文件格式,仅支持csv,json,Arvo类型|
47+
|fileName|||文件名|
48+
|filePath|||文件绝对路径|
49+
|location|local||文件存储介质,仅支持HDFS、Local|
50+
|charsetName|UTF-8||文件编码格式|
51+
52+
### 4.1 Csv 参数设置
53+
54+
|参数名称|默认值|是否必填|参数说明|
55+
|----|---|---|---|
56+
|ignoreParseErrors|true||是否忽略解析失败的数据|
57+
|fieldDelimiter|,||csv数据的分割符|
58+
|nullLiteral|"null"||填充csv数据中的null值|
59+
|allowComments|true|||
60+
|arrayElementDelimiter|,|||
61+
|quoteCharacter|"|||
62+
|escapeCharacter|\|||
63+
64+
### 4.2 Arvo 参数说明
65+
66+
|参数名称|默认值|是否必填|参数说明|
67+
|----|---|---|---|
68+
|avroFormat|||在format = 'arvo'的情况下是必填项|
69+
70+
### 4.3 HDFS 参数说明
71+
72+
|参数名称|默认值|是否必填|参数说明|
73+
|----|---|---|---|
74+
|hdfsSite|${HADOOP_CONF_HOME}/hdfs-site.xml||hdfs-site.xml所在位置|
75+
|coreSite|${HADOOP_CONF_HOME}/core-site.xml||core-site.xml所在位置|
76+
|hdfsUser|root||HDFS访问用户,默认是[root]用户|
77+
78+
### 4.4 Json 参数说明
79+
80+
Json无特殊参数
81+
82+
## 5.样例
83+
84+
数据展示:
85+
86+
csv
87+
88+
```csv
89+
712382,1/1/2017 0:00,1/1/2017 0:03,223,7051,Wellesley St E / Yonge St Green P,7089,Church St / Wood St,Member
90+
```
91+
92+
json
93+
94+
```json
95+
{
96+
"trip_id": "712382",
97+
"trip_start_time": "1/1/2017 0:00",
98+
"trip_stop_time": "1/1/2017 0:03",
99+
"trip_duration_seconds": "223",
100+
"from_station_id": "7051",
101+
"from_station_name": "Wellesley St E / Yonge St Green P",
102+
"to_station_id": "7089",
103+
"to_station_name": "Church St / Wood St",
104+
"user_type": "Member"
105+
},
106+
107+
```
108+
109+
### 5.1 csv
110+
111+
```sql
112+
CREATE TABLE SourceOne
113+
(
114+
trip_id varchar,
115+
trip_start_time varchar,
116+
trip_stop_time varchar,
117+
trip_duration_seconds varchar,
118+
from_station_id varchar,
119+
from_station_name varchar,
120+
to_station_id varchar,
121+
to_station_name varchar,
122+
user_type varchar
123+
) WITH (
124+
type = 'file',
125+
format = 'csv',
126+
fieldDelimiter = ',',
127+
fileName = '2017-Q1.csv',
128+
filePath = '/data',
129+
location = 'local',
130+
charsetName = 'UTF-8'
131+
);
132+
```
133+
134+
### 5.2 json
135+
136+
```sql
137+
CREATE TABLE SourceOne
138+
(
139+
trip_id varchar,
140+
trip_start_time varchar,
141+
trip_stop_time varchar,
142+
trip_duration_seconds varchar,
143+
from_station_id varchar,
144+
from_station_name varchar,
145+
to_station_id varchar,
146+
to_station_name varchar,
147+
user_type varchar
148+
) WITH (
149+
type = 'file',
150+
format = 'json',
151+
fieldDelimiter = ',',
152+
fileName = '2017-Q1.json',
153+
filePath = '/data',
154+
charsetName = 'UTF-8'
155+
);
156+
```
157+
158+
### 5.3 HDFS
159+
160+
```sql
161+
CREATE TABLE SourceOne
162+
(
163+
trip_id varchar,
164+
trip_start_time varchar,
165+
trip_stop_time varchar,
166+
trip_duration_seconds varchar,
167+
from_station_id varchar,
168+
from_station_name varchar,
169+
to_station_id varchar,
170+
to_station_name varchar,
171+
user_type varchar
172+
) WITH (
173+
type = 'file',
174+
format = 'json',
175+
fieldDelimiter = ',',
176+
fileName = '2017-Q1.json',
177+
filePath = 'hdfs://ns1/data',
178+
location = 'hdfs',
179+
hdfsSite = '/Users/wtz/dtstack/conf/yarn/kudu1/hdfs-site.xml',
180+
coreSite = '/Users/wtz/dtstack/conf/yarn/kudu1/core-site.xml',
181+
hdfsUser = 'admin',
182+
charsetName = 'UTF-8'
183+
);
184+
```

file/file-source/pom.xml

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.file</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.source.file</artifactId>
14+
<name>file-source</name>
15+
<packaging>jar</packaging>
16+
17+
<build>
18+
<plugins>
19+
<plugin>
20+
<groupId>org.apache.maven.plugins</groupId>
21+
<artifactId>maven-shade-plugin</artifactId>
22+
<version>1.4</version>
23+
<executions>
24+
<execution>
25+
<phase>package</phase>
26+
<goals>
27+
<goal>shade</goal>
28+
</goals>
29+
<configuration>
30+
<createDependencyReducedPom>false</createDependencyReducedPom>
31+
<artifactSet>
32+
<excludes>
33+
<exclude>org.slf4j</exclude>
34+
</excludes>
35+
</artifactSet>
36+
<filters>
37+
<filter>
38+
<artifact>*:*</artifact>
39+
<excludes>
40+
<exclude>META-INF/*.SF</exclude>
41+
<exclude>META-INF/*.DSA</exclude>
42+
<exclude>META-INF/*.RSA</exclude>
43+
</excludes>
44+
</filter>
45+
</filters>
46+
</configuration>
47+
</execution>
48+
</executions>
49+
</plugin>
50+
51+
<plugin>
52+
<artifactId>maven-antrun-plugin</artifactId>
53+
<version>1.2</version>
54+
<executions>
55+
<execution>
56+
<id>copy-resources</id>
57+
<!-- here the phase you need -->
58+
<phase>package</phase>
59+
<goals>
60+
<goal>run</goal>
61+
</goals>
62+
<configuration>
63+
<tasks>
64+
<copy todir="${basedir}/../../sqlplugins/filesource">
65+
<fileset dir="target/">
66+
<include name="${project.artifactId}-${project.version}.jar" />
67+
</fileset>
68+
</copy>
69+
70+
<move file="${basedir}/../../sqlplugins/filesource/${project.artifactId}-${project.version}.jar"
71+
tofile="${basedir}/../../sqlplugins/filesource/${project.name}-${git.branch}.jar" />
72+
</tasks>
73+
</configuration>
74+
</execution>
75+
</executions>
76+
</plugin>
77+
</plugins>
78+
</build>
79+
</project>

0 commit comments

Comments
 (0)