Skip to content

Commit 0836cd0

Browse files
feat: insert error improve (#3725)
#3708 print failed column value if trans failed print failed row when load data failed apiserver print col/row or idx
1 parent 3deb139 commit 0836cd0

File tree

13 files changed

+481
-191
lines changed

13 files changed

+481
-191
lines changed

docs/zh/openmldb_sql/ddl/DESC_STATEMENT.md

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ USE db1;
3737
-- SUCCEED: Database changed
3838
```
3939

40-
创建两张表:
40+
创建一张自定义索引的表:
4141

4242
```sql
4343
CREATE TABLE t1 (col0 STRING, col1 int, std_time TIMESTAMP, INDEX(KEY=col1, TS=std_time, TTL_TYPE=absolute, TTL=30d));
@@ -64,6 +64,37 @@ desc t1;
6464

6565
```
6666

67+
有离线数据的表:
68+
69+
```sql
70+
--- ------- ----------- ------ ---------
71+
# Field Type Null Default
72+
--- ------- ----------- ------ ---------
73+
1 c1 Varchar YES
74+
2 c2 Int YES
75+
3 c3 BigInt YES
76+
4 c4 Float YES
77+
5 c5 Double YES
78+
6 c6 Timestamp YES
79+
7 c7 Date YES
80+
--- ------- ----------- ------ ---------
81+
--- -------------------- ------ ---- ------ ---------------
82+
# name keys ts ttl ttl_type
83+
--- -------------------- ------ ---- ------ ---------------
84+
1 INDEX_0_1705743486 c1 - 0min kAbsoluteTime
85+
--- -------------------- ------ ---- ------ ---------------
86+
---------------------------------------------------------- ------------------------------------------ --------- ---------
87+
Data path Symbolic paths Format Options
88+
---------------------------------------------------------- ------------------------------------------ --------- ---------
89+
file:///tmp/openmldb_offline_storage/demo_db/demo_table1 file:///work/taxi-trip/data/data.parquet parquet
90+
---------------------------------------------------------- ------------------------------------------ --------- ---------
91+
92+
--------------- --------------
93+
compress_type storage_mode
94+
--------------- --------------
95+
NoCompress Memory
96+
--------------- --------------
97+
```
6798

6899

69100
## 相关语句

docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md

Lines changed: 68 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
# LOAD DATA INFILE
2-
`LOAD DATA INFILE`语句能高效地将文件中的数据读取到数据库中的表中。`LOAD DATA INFILE``SELECT INTO OUTFILE`互补。要将数据从 table导出到文件,请使用[SELECT INTO OUTFILE](../dql/SELECT_INTO_STATEMENT.md)。要将文件数据导入到 table 中,请使用`LOAD DATA INFILE`
2+
`LOAD DATA INFILE`语句能高效地将文件中的数据读取到数据库中的表中。`LOAD DATA INFILE``SELECT INTO OUTFILE`互补。要将数据从 table导出到文件,请使用[SELECT INTO OUTFILE](../dql/SELECT_INTO_STATEMENT.md)。要将文件数据导入到 table 中,请使用`LOAD DATA INFILE`注意,导入的文件schema顺序应与表的schema顺序一致。
33

44
```{note}
55
无论何种load_mode,INFILE 的 filePath既可以是单个文件名,也可以是目录,也可以使用`*`通配符。
66
- load_mode=cluster的具体格式等价于`DataFrameReader.read.load(String)`,可以使用spark shell来read你想要的文件路径,确认能否读入成功。如果目录中存在多格式的文件,只会选择 LoadDataInfileOptionsList 中指定的FORMAT格式文件。
7-
- load_mode=local则使用glob选择出符合的所有文件,不会检查单个文件的格式,所以,请保证满足条件的是csv格式,建议使用`*.csv`限制文件格式。
7+
- load_mode=local则使用glob选择出符合的所有文件,不会检查单个文件的格式,所以,请保证满足条件的文件都是csv格式,建议使用`*.csv`限制文件格式。
88
```
99

1010
## Syntax
@@ -55,7 +55,7 @@ FilePathPattern
5555
| quote | String | " | 输入数据的包围字符串。字符串长度<=1。<br />load_mode=`cluster`默认为双引号`"`。配置包围字符后,被包围字符包围的内容将作为一个整体解析。例如,当配置包围字符串为"#"时, `1, 1.0, #This is a string field, even there is a comma#, normal_string`将为解析为三个filed,第一个是整数1,第二个是浮点1.0,第三个是一个字符串,第四个虽然没有quote,但也是一个字符串。<br /> **local_mode=`local`默认为`\0`,也可使用空字符串赋值,不处理包围字符。** |
5656
| mode | String | "error_if_exists" | 导入模式:<br />`error_if_exists`: 仅离线模式可用,若离线表已有数据则报错。<br />`overwrite`: 仅离线模式可用,数据将覆盖离线表数据。<br />`append`:离线在线均可用,若文件已存在,数据将追加到原文件后面。<br /> **local_mode=`local`默认为`append`** |
5757
| deep_copy | Boolean | true | `deep_copy=false`仅支持离线load, 可以指定`INFILE` Path为该表的离线存储地址,从而不需要硬拷贝。 |
58-
| load_mode | String | cluster | `load_mode='local'`仅支持从csv本地文件导入在线存储, 它通过本地客户端同步插入数据;<br /> `load_mode='cluster'`仅支持集群版, 通过spark插入数据,支持同步或异步模式 |
58+
| load_mode | String | cluster | `load_mode='local'`仅支持从csv本地文件导入在线存储, 它通过本地客户端同步插入数据;<br /> `load_mode='cluster'`仅支持集群版, 通过spark插入数据,支持同步或异步模式 <br />local模式的使用限制见[local导入模式说明](#local导入模式说明) |
5959
| thread | Integer | 1 | 仅在本地文件导入时生效,即`load_mode='local'`或者单机版,表示本地插入数据的线程数。 最大值为`50`|
6060
| writer_type | String | single | 集群版在线导入中插入数据的writer类型。可选值为`single``batch`,默认为`single``single`表示数据即读即写,节省内存。`batch`则是将整个rdd分区读完,确认数据类型有效性后,再写入集群,需要更多内存。在部分情况下,`batch`模式有利于筛选未写入的数据,方便重试这部分数据。 |
6161
| put_if_absent | Boolean | false | 在源数据无重复行也不与表中已有数据重复时,可以使用此选项避免插入重复数据,特别是job失败后可以重试。等价于使用`INSERT OR IGNORE`。更多详情见下文。 |
@@ -121,7 +121,36 @@ LOAD DATA INFILE 'hive://db1.t1' INTO TABLE t1;
121121

122122
## 离线导入规则
123123

124-
表的离线信息可通过`desc <table>`查看。我们将数据地址分为两类,离线地址是OpenMLDB的内部存储路径,硬拷贝将写入此地址,仅一个;软链接地址是软链接导入的地址列表。
124+
表的离线信息可通过`desc <table>`查看。我们将数据地址分为两类,Data path与Symbolic path,离线地址Data path是OpenMLDB的内部存储路径,硬拷贝将写入此地址,仅一个;软链接地址Symbolic path,则是软链接导入的地址列表,可以是多个。
125+
```
126+
--- ------- ----------- ------ ---------
127+
# Field Type Null Default
128+
--- ------- ----------- ------ ---------
129+
1 c1 Varchar YES
130+
2 c2 Int YES
131+
3 c3 BigInt YES
132+
4 c4 Float YES
133+
5 c5 Double YES
134+
6 c6 Timestamp YES
135+
7 c7 Date YES
136+
--- ------- ----------- ------ ---------
137+
--- -------------------- ------ ---- ------ ---------------
138+
# name keys ts ttl ttl_type
139+
--- -------------------- ------ ---- ------ ---------------
140+
1 INDEX_0_1705743486 c1 - 0min kAbsoluteTime
141+
--- -------------------- ------ ---- ------ ---------------
142+
---------------------------------------------------------- ------------------------------------------ --------- ---------
143+
Data path Symbolic paths Format Options
144+
---------------------------------------------------------- ------------------------------------------ --------- ---------
145+
file:///tmp/openmldb_offline_storage/demo_db/demo_table1 file:///work/taxi-trip/data/data.parquet parquet
146+
---------------------------------------------------------- ------------------------------------------ --------- ---------
147+
148+
--------------- --------------
149+
compress_type storage_mode
150+
--------------- --------------
151+
NoCompress Memory
152+
--------------- --------------
153+
```
125154
根据模式的不同,对离线信息的修改也不同。
126155
- overwrite模式,将会覆盖原有的所有字段,包括离线地址、软链接地址、格式、读取选项,仅保留当前overwrite进入的信息。
127156
- overwrite 硬拷贝,离线地址如果存在数据将被覆盖,软链接全部清空,格式更改为内部默认格式parquet,读取选项全部清空。
@@ -144,28 +173,46 @@ curl http://<ns_endpoint>/NameServer/UpdateOfflineTableInfo -d '{"db":"<db_name>
144173
## CSV源数据格式说明
145174

146175
导入支持csv和parquet两种数据格式,csv的格式需要特别注意,下面举例说明。
147-
148-
```
149-
c1, c2
150-
,
151-
"",""
152-
ab,cd
153-
"ef","gh"
154-
null,null
155-
```
156-
这个csv源数据中,第一行两个空值(blank value)。
157-
- cluster模式空值会被当作`null`(无论null_value是什么)。
158-
- local模式空值会被当作空字符串,具体见[issue3015](https://github.com/4paradigm/OpenMLDB/issues/3015)
159-
160-
第二行两列都是两个双引号。
161-
- cluster模式默认quote为`"`,所以这一行是两个空字符串。
162-
- local模式默认quote为`\0`,所以这一行两列都是两个双引号。local模式quote可以配置为`"`,但escape规则是`""`为单个`"`,和Spark不一致,具体见[issue3015](https://github.com/4paradigm/OpenMLDB/issues/3015)
176+
1. csv的列分隔符默认为`,`,不允许出现空格,否则,"a, b"将被解析为两列,第一列为`a`,第二列为` b`(有一个空格)。
177+
1. local模式会trim掉列分隔符两边的空格,所以`a, b`会被解析为两列,第一列为`a`,第二列为`b`。但从规范上来说,csv的列分隔符左右不应该有空格,请不要依赖这个特性。
178+
2. cluster和local模式对于空值的处理不同,具体为:
179+
```
180+
c1, c2
181+
,
182+
"",""
183+
ab,cd
184+
"ef","gh"
185+
null,null
186+
```
187+
这个csv源数据中,第一行两个空值(blank value)。
188+
- cluster模式空值会被当作`null`(无论null_value是什么)。
189+
- local模式空值会被当作空字符串,具体见[issue3015](https://github.com/4paradigm/OpenMLDB/issues/3015)。
190+
191+
第二行两列都是两个双引号。
192+
- cluster模式默认quote为`"`,所以这一行是两个空字符串。
193+
- local模式默认quote为`\0`,所以这一行两列都是两个双引号。local模式quote可以配置为`"`,但escape规则是`""`为单个`"`,和Spark不一致,具体见[issue3015](https://github.com/4paradigm/OpenMLDB/issues/3015)。
194+
195+
3. cluster的csv格式支持两种格式的timestamp,但同一次load只会选择一种格式,不会混合使用。如果csv中存在两种格式的timestamp,会导致解析失败。选择哪种格式由第一行数据决定,如果第一行数据是`2020-01-01 00:00:00`,则后续所有timestamp都会按照`yyyy-MM-dd HH:mm:ss`格式解析;如果第一行数据是整型`1577808000000`,则后续所有timestamp都会按照整型格式解析。
196+
1. timestamp可以为字符串格式,比如`"2020-01-01 00:00:00"`。
197+
2. date可以是年月日(`yyyy-MM-dd`)或者年月日时分秒(`yyyy-MM-dd HH:mm:ss`)。
198+
4. local的csv格式只支持整型timestamp,date类型为年月日,例如`2022-2-2`
199+
1. timestamp和date均不可以为字符串格式,比如`"2020-01-01"`将解析失败。
200+
2. date不可以是年月日时分秒,例如`2022-2-2 00:00:00`将解析失败。
201+
5. local的字符串不支持quote转义,所以如果你的字符串中存在quote字符,请使用cluster模式。
202+
6. cluster如果读取csv时解析失败,将会把失败的列值设为NULL,继续导入流程,但local模式会直接报错,不会继续导入。
163203

164204
## PutIfAbsent说明
165205

166-
PutIfAbsent是一个特殊的选项,它可以避免插入重复数据,仅需一个配置,操作简单,特别适合load datajob失败后重试,等价于使用`INSERT OR IGNORE`。如果你想要导入的数据中存在重复,那么通过PutIfAbsent导入,会导致部分数据丢失。如果你需要保留重复数据,不应使用此选项,建议通过其他方式去重后再导入。
206+
PutIfAbsent是一个特殊的选项,它可以避免插入重复数据,仅需一个配置,操作简单,特别适合load datajob失败后重试,等价于使用`INSERT OR IGNORE`。如果你想要导入的数据中存在重复,那么通过PutIfAbsent导入,会导致部分数据丢失。如果你需要保留重复数据,不应使用此选项,建议通过其他方式去重后再导入。local模式暂不支持此选项。
167207

168208
PutIfAbsent需要去重这一额外开销,所以,它的性能与去重的复杂度有关:
169209

170210
- 表中只存在ts索引,且同一key+ts的数据量少于10k时(为了精确去重,在同一个key+ts下会逐行对比整行数据),PutIfAbsent的性能表现不会很差,通常导入时间在普通导入时间的2倍以内。
171211
- 表中如果存在time索引(ts列为空),或者ts索引同一key+ts的数据量大于100k时,PutIfAbsent的性能会很差,导入时间可能超过普通导入时间的10倍,无法正常使用。这样的数据条件下,更建议进行去重后再导入。
212+
213+
## local导入模式说明
214+
215+
load_mode可使用local模式,但与cluster模式有一些不同,如果你部署了TaskManager,我们建议使用cluster模式。不同之处如下:
216+
217+
- local模式仅支持在线,不支持离线。也只支持csv格式,不支持parquet格式。
218+
- csv的读取支持有限,(SplitLineWithDelimiterForStrings)

docs/zh/quickstart/sdk/rest_api.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
```
3535

3636
- 目前仅支持插入一条数据。
37-
- 数据需严格按照 schema 排列。
37+
- 数据需严格按照表 schema 排列。
3838

3939
请求数据样例:
4040

@@ -152,13 +152,13 @@ curl http://127.0.0.1:8080/dbs/demo_db/deployments/demo_data_service -X POST -d'
152152

153153
请求参数:
154154

155-
| **参数** | **类型** | **必需** | **说明** |
156-
| -------- | -------- | -------- | ------------------------------------------------------------ |
157-
| mode | String || 可配 `offsync` , `offasync`, `online` |
158-
| sql | String | | |
159-
| input | Object | | |
160-
| schema | Array || 可支持数据类型(大小写不敏感):`Bool`, `Int16`, `Int32`, `Int64`, `Float`, `Double`, `String`, `Date` and `Timestamp`. |
161-
| data | Array || |
155+
| **参数** | **类型** | **必需** | **说明** |
156+
| -------- | -------- | -------- | ----------------------------------------------------------------------------------------------------------------------- |
157+
| mode | String || 可配 `offsync` , `offasync`, `online` |
158+
| sql | String || |
159+
| input | Object || |
160+
| schema | Array || 可支持数据类型(大小写不敏感):`Bool`, `Int16`, `Int32`, `Int64`, `Float`, `Double`, `String`, `Date` and `Timestamp`. |
161+
| data | Array || schema和data字段必须同时存在 |
162162

163163
**请求数据样例**
164164

java/hybridse-sdk/src/main/java/com/_4paradigm/hybridse/sdk/RequestEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
*/
3333
public class RequestEngine implements AutoCloseable {
3434

35-
private static final Logger logger = LoggerFactory.getLogger(SqlEngine.class);
35+
private static final Logger logger = LoggerFactory.getLogger(RequestEngine.class);
3636

3737
private SimpleCatalog catalog;
3838
private EngineOptions options;

java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/codec/ClassicRowBuilder.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import com._4paradigm.openmldb.proto.Type.DataType;
1919
import com._4paradigm.openmldb.proto.Common.ColumnDesc;
20-
import org.joda.time.DateTime;
21-
2220
import java.nio.ByteBuffer;
2321
import java.nio.ByteOrder;
2422
import java.sql.Date;

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataSingleWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,16 @@ public void write(InternalRow record) throws IOException {
6969
ResultSetMetaData metaData = preparedStatement.getMetaData();
7070
Preconditions.checkState(record.numFields() == metaData.getColumnCount());
7171
OpenmldbDataWriter.addRow(record, preparedStatement);
72-
// check return for put result
72+
7373
// you can cache failed rows and throw exception when commit/close,
7474
// but it still may interrupt other writers(pending or slow writers)
75+
76+
// check return for put result
7577
if(!preparedStatement.execute()) {
7678
throw new IOException("execute failed");
7779
}
7880
} catch (Exception e) {
79-
throw new IOException("write row to openmldb failed on " + record, e);
81+
throw new IOException("write row to openmldb failed on " + OpenmldbDataWriter.readable(record, preparedStatement), e);
8082
}
8183
}
8284

java/openmldb-spark-connector/src/main/java/com/_4paradigm/openmldb/spark/write/OpenmldbDataWriter.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com._4paradigm.openmldb.sdk.SdkOption;
2424
import com._4paradigm.openmldb.sdk.SqlException;
2525
import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor;
26+
import com._4paradigm.openmldb.spark.OpenmldbTable;
2627
import com.google.common.base.Preconditions;
2728
import org.apache.spark.sql.catalyst.InternalRow;
2829
import org.apache.spark.sql.connector.write.DataWriter;
@@ -75,7 +76,7 @@ public void write(InternalRow record) throws IOException {
7576
addRow(record, preparedStatement);
7677
preparedStatement.addBatch();
7778
} catch (Exception e) {
78-
throw new IOException("convert to openmldb row failed on " + record, e);
79+
throw new IOException("convert to openmldb row failed on " + readable(record, preparedStatement), e);
7980
}
8081
}
8182

@@ -126,6 +127,19 @@ static void addRow(InternalRow record, PreparedStatement preparedStatement) thro
126127
}
127128
}
128129

130+
static String readable(InternalRow record, PreparedStatement preparedStatement) {
131+
try {
132+
ResultSetMetaData metaData = preparedStatement.getMetaData();
133+
StringBuilder sb = new StringBuilder();
134+
for (int i = 0; i < record.numFields(); i++) {
135+
sb.append(record.get(i, OpenmldbTable.sdkTypeToSparkType(metaData.getColumnType(i + 1)))).append(",");
136+
}
137+
return sb.toString();
138+
} catch (SQLException e) {
139+
return "readable error: " + e.getMessage();
140+
}
141+
}
142+
129143
@Override
130144
public WriterCommitMessage commit() throws IOException {
131145
try {

0 commit comments

Comments
 (0)