You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Kafka Connect Elasticsearch Source: fetch data from elastic-search and sends it to kafka. The connector fetches only new data using a strictly incremental / temporal field (like a timestamp or an incrementing id).
6
-
It supports dynamic schema and nested objects/ arrays.
5
+
Kafka Connect Elasticsearch Source: fetch data from elastic-search and sends it to kafka. The connector fetches only new
6
+
data using a strictly incremental / temporal field (like a timestamp or an incrementing id). It supports dynamic schema
7
+
and nested objects/ arrays.
7
8
8
9
## Requirements:
10
+
9
11
- Elasticsearch 6.x and 7.x
10
12
- Java >= 8
11
13
- Maven
12
14
13
15
## Output data serialization format:
14
-
The connector uses kafka-connect schema and structs, that are agnostic regarding
15
-
the user serialization method (e.g. it might be Avro or json, etc...).
16
+
17
+
The connector uses kafka-connect schema and structs, that are agnostic regarding the user serialization method (e.g. it
- Feel free to open an issue to discuss new ideas (or propose new solutions with a PR).
20
24
21
25
## Installation:
26
+
22
27
Compile the project with:
28
+
23
29
```bash
24
30
mvn clean package -DskipTests
25
31
```
26
32
27
33
You can also compile and running both unit and integration tests (docker is mandatory) with:
34
+
28
35
```bash
29
36
mvn clean package
30
37
```
31
38
32
-
Copy the jar with dependencies from the target folder into connect classpath (e.g ``/usr/share/java/kafka-connect-elasticsearch`` ) or set ``plugin.path`` parameter appropriately.
39
+
Copy the jar with dependencies from the target folder into connect classpath (
40
+
e.g ``/usr/share/java/kafka-connect-elasticsearch`` ) or set ``plugin.path`` parameter appropriately.
33
41
34
42
## Example
35
-
Using kafka connect in distributed way, a sample config file to fetch ``my_awesome_index*`` indices and to produce output topics with ``es_`` prefix:
36
43
44
+
Using kafka connect in distributed way, a sample config file to fetch ``my_awesome_index*`` indices and to produce
45
+
output topics with ``es_`` prefix:
37
46
38
47
```json
39
48
{
@@ -49,40 +58,43 @@ Using kafka connect in distributed way, a sample config file to fetch ``my_aweso
The name of the strictly incrementing field to use to detect new records.
102
122
103
-
* Type: password
104
-
* Default: null
105
-
* Importance: high
123
+
* Type: any
124
+
* Importance: high
125
+
126
+
``incrementing.secondary.field.name``
127
+
In case the main incrementing field may have duplicates,
128
+
this secondary field is used as a secondary sort field in order
129
+
to avoid data losses when paginating (available starting from versions >= 1.4).
130
+
131
+
* Type: any
132
+
* Importance: low
106
133
107
134
``connection.attempts``
108
-
Maximum number of attempts to retrieve a valid Elasticsearch connection.
135
+
Maximum number of attempts to retrieve a valid Elasticsearch connection.
109
136
110
-
* Type: int
111
-
* Default: 3
112
-
* Importance: low
137
+
* Type: int
138
+
* Default: 3
139
+
* Importance: low
113
140
114
141
``connection.backoff.ms``
115
-
Backoff time in milliseconds between connection attempts.
142
+
Backoff time in milliseconds between connection attempts.
116
143
117
-
* Type: long
118
-
* Default: 10000
119
-
* Importance: low
144
+
* Type: long
145
+
* Default: 10000
146
+
* Importance: low
120
147
121
148
``index.prefix``
122
-
Indices prefix to include in copying.
123
-
124
-
* Type: string
125
-
* Default: ""
126
-
* Importance: medium
149
+
Indices prefix to include in copying.
127
150
151
+
* Type: string
152
+
* Default: ""
153
+
* Importance: medium
128
154
129
155
### Connector Configuration
130
156
131
157
``poll.interval.ms``
132
-
Frequency in ms to poll for new data in each index.
158
+
Frequency in ms to poll for new data in each index.
133
159
134
-
* Type: int
135
-
* Default: 5000
136
-
* Importance: high
160
+
* Type: int
161
+
* Default: 5000
162
+
* Importance: high
137
163
138
164
``batch.max.rows``
139
-
Maximum number of documents to include in a single batch when polling for new data.
165
+
Maximum number of documents to include in a single batch when polling for new data.
140
166
141
-
* Type: int
142
-
* Default: 10000
143
-
* Importance: low
167
+
* Type: int
168
+
* Default: 10000
169
+
* Importance: low
144
170
145
171
``topic.prefix``
146
-
Prefix to prepend to index names to generate the name of the Kafka topic to publish data
172
+
Prefix to prepend to index names to generate the name of the Kafka topic to publish data
147
173
148
-
* Type: string
149
-
* Importance: high
174
+
* Type: string
175
+
* Importance: high
150
176
151
177
``filters.whitelist``
152
-
Whitelist filter for extracting a subset of fields from elastic-search json documents.
153
-
The whitelist filter supports nested fields. To provide multiple fields use `;` as separator
178
+
Whitelist filter for extracting a subset of fields from elastic-search json documents. The whitelist filter supports
179
+
nested fields. To provide multiple fields use `;` as separator
154
180
(e.g. `customer;order.qty;order.price`).
155
-
* Type: string
156
-
* Importance: medium
157
-
* Default: null
181
+
182
+
* Type: string
183
+
* Importance: medium
184
+
* Default: null
158
185
159
186
``filters.json_cast``
160
-
This filter casts nested fields to json string, avoiding parsing recursively as kafka connect-schema.
161
-
The json-cast filter supports nested fields. To provide multiple fields use `;` as separator
187
+
This filter casts nested fields to json string, avoiding parsing recursively as kafka connect-schema. The json-cast
188
+
filter supports nested fields. To provide multiple fields use `;` as separator
162
189
(e.g. `customer;order.qty;order.price`).
190
+
163
191
* Type: string
164
192
* Importance: medium
165
193
* Default: null
166
194
167
195
``fieldname_converter``
168
-
Configuring which field name converter should be used (allowed values: `avro` or `nop`).
169
-
By default, the avro field name converter renames the json fields non respecting the avro specifications (https://avro.apache.org/docs/current/spec.html#names)
170
-
in order to be serialized correctly.
171
-
To disable the field name conversion set this parameter to `nop`.
196
+
Configuring which field name converter should be used (allowed values: `avro` or `nop`). By default, the avro field name
197
+
converter renames the json fields non respecting the avro
0 commit comments