Skip to content

Commit ff7e8ac

Browse files
feat: put if absent & insert or ignore (#3692)
- insert or ignore in c++/java - put if absent for load data - abort imediately in spark data writer - appendentries fix: delete or put - doc - refactor spark connector config setup
1 parent dfd860e commit ff7e8ac

File tree

60 files changed

+659
-456
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+659
-456
lines changed

demo/usability_testing/data_mocker.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Optional
77
import numpy as np
88
import pandas as pd
9+
import dateutil
910

1011

1112
# to support save csv, and faster parquet, we don't use faker-cli directly
@@ -146,8 +147,9 @@ def type_converter(sql_type):
146147
if sql_type in ['varchar', 'string']:
147148
# TODO(hw): set max length
148149
return 'pystr', {}
150+
# timestamp should > 0 cuz tablet insert will check it, use utc
149151
if sql_type in ['date', 'timestamp']:
150-
return 'iso8601', {}
152+
return 'iso8601', {"tzinfo": dateutil.tz.UTC}
151153
if sql_type in ['float', 'double']:
152154
return 'pyfloat', ranges[sql_type]
153155
return 'py' + sql_type, {}

docs/zh/openmldb_sql/ddl/CREATE_TABLE_STATEMENT.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,8 @@ IndexOption ::=
233233
| ----------- | ------------------------------------------------------------ | ---------------------------------------------------- | ------------------------------------------------------------ |
234234
| `ABSOLUTE` | TTL的值代表过期时间。配置值为时间段如`100m, 12h, 1d, 365d`。最大可以配置的过期时间为`15768000m`(即30年) | 当记录过期时,会被淘汰。 | `INDEX(KEY=col1, TS=std_time, TTL_TYPE=absolute, TTL=100m)`<br />OpenMLDB将会删除100分钟之前的数据。 |
235235
| `LATEST` | TTL的值代表最大存活条数。即同一个索引下面,最大允许存在的数据条数。最大可以配置1000条 | 记录超过最大条数时,会被淘汰。 | `INDEX(KEY=col1, TS=std_time, TTL_TYPE=LATEST, TTL=10)`。OpenMLDB只会保留最近10条记录,删除以前的记录。 |
236-
| `ABSORLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 当且仅当记录过期****记录超过最大条数时,才会淘汰。 | `INDEX(key=c1, ts=c6, ttl=(120min, 100), ttl_type=absorlat)`。当记录超过100条,**或者**当记录过期时,会被淘汰 |
237-
| `ABSANDLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 当记录过期****记录超过最大条数时,记录会被淘汰。 | `INDEX(key=c1, ts=c6, ttl=(120min, 100), ttl_type=absandlat)`。当记录超过100条,**而且**记录过期时,会被淘汰 |
236+
| `ABSORLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 当且仅当记录过期****记录超过最大条数时,才会淘汰。 | `INDEX(key=c1, ts=c6, ttl=(120m, 100), ttl_type=absorlat)`。当记录超过100条,**或者**当记录过期时,会被淘汰 |
237+
| `ABSANDLAT` | 配置过期时间和最大存活条数。配置值是一个2元组,形如`(100m, 10), (1d, 1)`。最大可以配置`(15768000m, 1000)`| 当记录过期****记录超过最大条数时,记录会被淘汰。 | `INDEX(key=c1, ts=c6, ttl=(120m, 100), ttl_type=absandlat)`。当记录超过100条,**而且**记录过期时,会被淘汰 |
238238

239239
```{note}
240240
最大过期时间和最大存活条数的限制,是出于性能考虑。如果你一定要配置更大的TTL值,请使用UpdateTTL来增大(可无视max限制),或者调整nameserver配置`absolute_ttl_max`和`latest_ttl_max`,重启生效。

docs/zh/openmldb_sql/dml/INSERT_STATEMENT.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ OpenMLDB 支持一次插入单行或多行数据。
55
## syntax
66

77
```
8-
INSERT INFO tbl_name (column_list) VALUES (value_list) [, value_list ...]
8+
INSERT [[OR] IGNORE] INTO tbl_name (column_list) VALUES (value_list) [, value_list ...]
99
1010
column_list:
1111
col_name [, col_name] ...
@@ -16,6 +16,7 @@ value_list:
1616

1717
**说明**
1818
- `INSERT` 只能用在在线模式
19+
- 默认`INSERT`不会去重,`INSERT OR IGNORE` 则可以忽略已存在于表中的数据,可以反复重试。
1920

2021
## Examples
2122

docs/zh/openmldb_sql/dml/LOAD_DATA_STATEMENT.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ FilePathPattern
5858
| load_mode | String | cluster | `load_mode='local'`仅支持从csv本地文件导入在线存储, 它通过本地客户端同步插入数据;<br /> `load_mode='cluster'`仅支持集群版, 通过spark插入数据,支持同步或异步模式 |
5959
| thread | Integer | 1 | 仅在本地文件导入时生效,即`load_mode='local'`或者单机版,表示本地插入数据的线程数。 最大值为`50`|
6060
| writer_type | String | single | 集群版在线导入中插入数据的writer类型。可选值为`single``batch`,默认为`single``single`表示数据即读即写,节省内存。`batch`则是将整个rdd分区读完,确认数据类型有效性后,再写入集群,需要更多内存。在部分情况下,`batch`模式有利于筛选未写入的数据,方便重试这部分数据。 |
61+
| put_if_absent | Boolean | false | 在源数据无重复行也不与表中已有数据重复时,可以使用此选项避免插入重复数据,特别是job失败后可以重试。等价于使用`INSERT OR IGNORE`。更多详情见下文。 |
6162

6263
```{note}
6364
在集群版中,`LOAD DATA INFILE`语句会根据当前执行模式(execute_mode)决定将数据导入到在线或离线存储。单机版中没有存储区别,只会导入到在线存储中,同时也不支持`deep_copy`选项。
@@ -73,6 +74,7 @@ FilePathPattern
7374
7475
所以,请尽量使用绝对路径。单机测试中,本地文件用`file://`开头;生产环境中,推荐使用hdfs等文件系统。
7576
```
77+
7678
## SQL语句模版
7779

7880
```sql
@@ -158,3 +160,12 @@ null,null
158160
第二行两列都是两个双引号。
159161
- cluster模式默认quote为`"`,所以这一行是两个空字符串。
160162
- local模式默认quote为`\0`,所以这一行两列都是两个双引号。local模式quote可以配置为`"`,但escape规则是`""`为单个`"`,和Spark不一致,具体见[issue3015](https://github.com/4paradigm/OpenMLDB/issues/3015)
163+
164+
## PutIfAbsent说明
165+
166+
PutIfAbsent是一个特殊的选项,它可以避免插入重复数据,仅需一个配置,操作简单,特别适合load datajob失败后重试,等价于使用`INSERT OR IGNORE`。如果你想要导入的数据中存在重复,那么通过PutIfAbsent导入,会导致部分数据丢失。如果你需要保留重复数据,不应使用此选项,建议通过其他方式去重后再导入。
167+
168+
PutIfAbsent需要去重这一额外开销,所以,它的性能与去重的复杂度有关:
169+
170+
- 表中只存在ts索引,且同一key+ts的数据量少于10k时(为了精确去重,在同一个key+ts下会逐行对比整行数据),PutIfAbsent的性能表现不会很差,通常导入时间在普通导入时间的2倍以内。
171+
- 表中如果存在time索引(ts列为空),或者ts索引同一key+ts的数据量大于100k时,PutIfAbsent的性能会很差,导入时间可能超过普通导入时间的10倍,无法正常使用。这样的数据条件下,更建议进行去重后再导入。

docs/zh/quickstart/beginner_must_read.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,16 @@ OpenMLDB是在线离线存储计算分离的,所以,你需要明确自己导
6969

7070
关于如何设计你的数据流入流出,可参考[实时决策系统中 OpenMLDB 的常见架构整合方式](../tutorial/app_arch.md)
7171

72+
### 在线表
73+
74+
在线表是存在内存中的数据,同时也会使用硬盘进行备份恢复。在线表的数据,可以通过`select count(*) from t1`来检查条数,或者使用`show table status`来查看表状态(可能有一定延迟,可以稍等再查)。
75+
76+
在线表是可以有多个索引的,通过`desc <table>`可以查看。写入一条数据时每个索引中都会写入一条,区别是各个索引的分类排序不同。但由于索引还有TTL淘汰机制,各个索引的数据量可能不一致。`select count(*) from t1``show table status`的结果是第一个索引的数据量,它并不代表其他索引的数据量。SQL查询会使用哪一个索引,是由SQL Engine选择的最优索引,可以通过SQL物理计划来查看。
77+
78+
建表时,可以指定索引,也可以不指定,不指定时,会默认创建一个索引。如果是默认索引,它无ts列(用当前time作为排序列,我们称为time索引)将会永不淘汰数据,可以以它为标准检查数据量是否准确,但这样的索引会占用太多的内存,目前也不可以删除第一条索引(计划未来支持),可以通过NS Client修改TTL淘汰数据,减少它的内存占用。
79+
80+
time索引(无ts的索引)还会影响PutIfAbsent导入。如果你的数据导入可能中途失败,无其他方法进行删除或去重,想要使用PutIfAbsent来进行导入重试时,请参考[PutIfAbsent说明](../openmldb_sql/dml/LOAD_DATA_STATEMENT.md#putifabsent说明)对自己的数据进行评估,避免PutIfAbsent效率太差。
81+
7282
## 源数据
7383

7484
### LOAD DATA

hybridse/include/node/node_manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ class NodeManager {
166166
SqlNode *MakeInsertTableNode(const std::string &db_name,
167167
const std::string &table_name,
168168
const ExprListNode *column_names,
169-
const ExprListNode *values);
169+
const ExprListNode *values, InsertStmt::InsertMode insert_mode);
170170
CreateStmt *MakeCreateTableNode(bool op_if_not_exist,
171171
const std::string &db_name,
172172
const std::string &table_name,

hybridse/include/node/sql_node.h

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1865,26 +1865,43 @@ class ColumnDefNode : public SqlNode {
18651865

18661866
class InsertStmt : public SqlNode {
18671867
public:
1868+
// ref zetasql ASTInsertStatement
1869+
enum InsertMode {
1870+
DEFAULT_MODE, // plain INSERT
1871+
REPLACE, // INSERT OR REPLACE
1872+
UPDATE, // INSERT OR UPDATE
1873+
IGNORE // INSERT OR IGNORE
1874+
};
1875+
18681876
InsertStmt(const std::string &db_name,
18691877
const std::string &table_name,
18701878
const std::vector<std::string> &columns,
1871-
const std::vector<ExprNode *> &values)
1879+
const std::vector<ExprNode *> &values,
1880+
InsertMode insert_mode)
18721881
: SqlNode(kInsertStmt, 0, 0),
18731882
db_name_(db_name),
18741883
table_name_(table_name),
18751884
columns_(columns),
18761885
values_(values),
1877-
is_all_(columns.empty()) {}
1886+
is_all_(columns.empty()),
1887+
insert_mode_(insert_mode) {}
18781888

1879-
InsertStmt(const std::string &db_name, const std::string &table_name, const std::vector<ExprNode *> &values)
1880-
: SqlNode(kInsertStmt, 0, 0), db_name_(db_name), table_name_(table_name), values_(values), is_all_(true) {}
1889+
InsertStmt(const std::string &db_name, const std::string &table_name, const std::vector<ExprNode *> &values,
1890+
InsertMode insert_mode)
1891+
: SqlNode(kInsertStmt, 0, 0),
1892+
db_name_(db_name),
1893+
table_name_(table_name),
1894+
values_(values),
1895+
is_all_(true),
1896+
insert_mode_(insert_mode) {}
18811897
void Print(std::ostream &output, const std::string &org_tab) const;
18821898

18831899
const std::string db_name_;
18841900
const std::string table_name_;
18851901
const std::vector<std::string> columns_;
18861902
const std::vector<ExprNode *> values_;
18871903
const bool is_all_;
1904+
const InsertMode insert_mode_;
18881905
};
18891906

18901907
class StorageModeNode : public SqlNode {

hybridse/src/node/node_manager.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -792,9 +792,10 @@ AllNode *NodeManager::MakeAllNode(const std::string &relation_name, const std::s
792792
}
793793

794794
SqlNode *NodeManager::MakeInsertTableNode(const std::string &db_name, const std::string &table_name,
795-
const ExprListNode *columns_expr, const ExprListNode *values) {
795+
const ExprListNode *columns_expr, const ExprListNode *values,
796+
InsertStmt::InsertMode insert_mode) {
796797
if (nullptr == columns_expr) {
797-
InsertStmt *node_ptr = new InsertStmt(db_name, table_name, values->children_);
798+
InsertStmt *node_ptr = new InsertStmt(db_name, table_name, values->children_, insert_mode);
798799
return RegisterNode(node_ptr);
799800
} else {
800801
std::vector<std::string> column_names;
@@ -811,7 +812,7 @@ SqlNode *NodeManager::MakeInsertTableNode(const std::string &db_name, const std:
811812
}
812813
}
813814
}
814-
InsertStmt *node_ptr = new InsertStmt(db_name, table_name, column_names, values->children_);
815+
InsertStmt *node_ptr = new InsertStmt(db_name, table_name, column_names, values->children_, insert_mode);
815816
return RegisterNode(node_ptr);
816817
}
817818
}

hybridse/src/node/sql_node_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,8 @@ TEST_F(SqlNodeTest, MakeInsertNodeTest) {
308308
value_expr_list->PushBack(value4);
309309
ExprListNode *insert_values = node_manager_->MakeExprList();
310310
insert_values->PushBack(value_expr_list);
311-
SqlNode *node_ptr = node_manager_->MakeInsertTableNode("", "t1", column_expr_list, insert_values);
311+
SqlNode *node_ptr = node_manager_->MakeInsertTableNode("", "t1", column_expr_list, insert_values,
312+
InsertStmt::InsertMode::DEFAULT_MODE);
312313

313314
ASSERT_EQ(kInsertStmt, node_ptr->GetType());
314315
InsertStmt *insert_stmt = dynamic_cast<InsertStmt *>(node_ptr);

hybridse/src/planv2/ast_node_converter.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1962,8 +1962,9 @@ base::Status ConvertInsertStatement(const zetasql::ASTInsertStatement* root, nod
19621962
}
19631963
CHECK_TRUE(nullptr == root->query(), common::kSqlAstError, "Un-support insert statement with query");
19641964

1965-
CHECK_TRUE(zetasql::ASTInsertStatement::InsertMode::DEFAULT_MODE == root->insert_mode(), common::kSqlAstError,
1966-
"Un-support insert mode ", root->GetSQLForInsertMode());
1965+
CHECK_TRUE(zetasql::ASTInsertStatement::InsertMode::DEFAULT_MODE == root->insert_mode() ||
1966+
zetasql::ASTInsertStatement::InsertMode::IGNORE == root->insert_mode(),
1967+
common::kSqlAstError, "Un-support insert mode ", root->GetSQLForInsertMode());
19671968
CHECK_TRUE(nullptr == root->returning(), common::kSqlAstError,
19681969
"Un-support insert statement with return clause currently", root->GetSQLForInsertMode());
19691970
CHECK_TRUE(nullptr == root->assert_rows_modified(), common::kSqlAstError,
@@ -2000,8 +2001,8 @@ base::Status ConvertInsertStatement(const zetasql::ASTInsertStatement* root, nod
20002001
if (names.size() == 2) {
20012002
db_name = names[0];
20022003
}
2003-
*output =
2004-
dynamic_cast<node::InsertStmt*>(node_manager->MakeInsertTableNode(db_name, table_name, column_list, rows));
2004+
*output = dynamic_cast<node::InsertStmt*>(node_manager->MakeInsertTableNode(
2005+
db_name, table_name, column_list, rows, static_cast<node::InsertStmt::InsertMode>(root->insert_mode())));
20052006
return base::Status::OK();
20062007
}
20072008
base::Status ConvertDropStatement(const zetasql::ASTDropStatement* root, node::NodeManager* node_manager,

0 commit comments

Comments
 (0)