Skip to content

Commit b81c210

Browse files
authored
Merge pull request #8 from DarioBalinzo/7.x
resolves #5
2 parents f6114f6 + cf4cf7e commit b81c210

26 files changed

+1214
-684
lines changed

.github/workflows/maven.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# This workflow will build a Java project with Maven
2+
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven
3+
4+
name: Java CI with Maven
5+
6+
on: [ push ]
7+
8+
jobs:
9+
build:
10+
11+
runs-on: ubuntu-latest
12+
13+
steps:
14+
- uses: actions/checkout@v2
15+
- name: Set up JDK 1.8
16+
uses: actions/setup-java@v1
17+
with:
18+
java-version: 1.8
19+
- name: Build with Maven
20+
run: mvn -B package --file pom.xml

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
/elastic-source-connect.iml
2+
/target/
3+
/.idea/

README.md

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,50 @@
1-
# kafka-connect-elasticsearch-source
2-
Kafka Connect Elasticsearch Source: fetch data from elasting indices using scroll API. Fetch only new data using a strictly incremental / temporal field.
3-
It supports dynamic schema and nested objects/ arrays.
1+
# Kafka-connect-elasticsearch-source
2+
Kafka Connect Elasticsearch Source: fetch data from elastic-search and sends it to kafka. The connector fetches only new data using a strictly incremental / temporal field (like a timestamp or an incrementing id).
3+
It supports dynamic schema and nested objects/ arrays.
4+
5+
## Requirements:
6+
- Elasticsearch 6.x and 7.x
7+
- Java >= 8
8+
- Maven
9+
10+
## Bugs or new Ideas?
11+
- Issues tracker: https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/issues
12+
- Feel free to open an issue to discuss new ideas (or propose new solutions with a PR)
13+
- Do you use this project in production? Would you like new features? This connector will be always free and open source,
14+
but if you want to support me in the development please consider offering me a coffee (https://www.paypal.me/coffeeDarioBalinzo).
415

516
## Installation:
17+
Compile the project with:
18+
```bash
19+
mvn clean package -DskipTests
20+
```
621

7-
Download (https://github.com/DarioBalinzo/kafka-connect-elasticsearch-source/raw/master/target/kafka-connect-elastic-source-connect-0.1.jar) the jar and put into the connect classpath (e.g ``/usr/share/java/kafka-connect-elasticsearch`` ) or set ``plugin.path`` parameter appropriately.
22+
You can also compile and running both unit and integration tests (docker is mandatory) with:
23+
```bash
24+
mvn clean package
25+
```
26+
27+
Copy the jar with dependencies from the target folder into connect classpath (e.g ``/usr/share/java/kafka-connect-elasticsearch`` ) or set ``plugin.path`` parameter appropriately.
828

929
## Example
1030
Using kafka connect in distributed way, a sample config file to fetch ``metric*`` indices and to produce output topics with ``es_`` prefix:
1131

1232

1333
```json
14-
{ "name": "elastic-source",
15-
"config": {"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
16-
"tasks.max": "1",
17-
"es.host" : "localhost",
18-
"es.port" : "9200",
19-
"index.prefix" : "metric",
20-
"topic.prefix" : "es_",
21-
"incrementing.field.name" : "@timestamp"
34+
{
35+
"name": "elastic-source",
36+
"config": {
37+
"connector.class":"com.github.dariobalinzo.ElasticSourceConnector",
38+
"tasks.max": "1",
39+
"es.host" : "localhost",
40+
"es.port" : "9200",
41+
"index.prefix" : "my_awesome_index",
42+
"topic.prefix" : "es_",
43+
"incrementing.field.name" : "@timestamp"
2244
}
2345
}
2446
```
25-
To start the connector we send the json config with post:
47+
To start the connector with curl:
2648
```bash
2749
curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors | jq
2850
```
@@ -32,6 +54,11 @@ To check the status:
3254
curl localhost:8083/connectors/elastic-source/status | jq
3355
```
3456

57+
To stop the connector:
58+
```bash
59+
curl -X DELETE localhost:8083/connectors/elastic-source | jq
60+
```
61+
3562

3663
## Documentation
3764

@@ -50,6 +77,13 @@ curl localhost:8083/connectors/elastic-source/status | jq
5077
* Type: string
5178
* Importance: high
5279
* Dependents: ``index.prefix``
80+
81+
``es.scheme``
82+
ElasticSearch scheme (http/https)
83+
84+
* Type: string
85+
* Importance: medium
86+
* Default: ``http``
5387

5488
``es.user``
5589
Elasticsearch username

pom.xml

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@
2222
<properties>
2323
<maven.compiler.source>1.8</maven.compiler.source>
2424
<maven.compiler.target>1.8</maven.compiler.target>
25+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
2526
</properties>
2627

2728
<groupId>com.github.dariobalinzo</groupId>
2829
<artifactId>elastic-source-connect</artifactId>
29-
<version>0.1</version>
30+
<version>1.0</version>
3031

3132
<licenses>
3233
<license>
@@ -37,12 +38,10 @@
3738
</licenses>
3839

3940
<dependencies>
40-
<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
4141
<dependency>
4242
<groupId>org.apache.kafka</groupId>
4343
<artifactId>connect-api</artifactId>
44-
<version>1.1.0</version>
45-
<scope>compile</scope>
44+
<version>2.6.0</version>
4645
</dependency>
4746
<dependency>
4847
<groupId>org.elasticsearch.client</groupId>
@@ -54,11 +53,40 @@
5453
<artifactId>elasticsearch-rest-client</artifactId>
5554
<version>6.2.3</version>
5655
</dependency>
57-
<!-- https://mvnrepository.com/artifact/junit/junit -->
5856
<dependency>
5957
<groupId>junit</groupId>
6058
<artifactId>junit</artifactId>
61-
<version>4.12</version>
59+
<version>4.13.1</version>
60+
<scope>test</scope>
61+
</dependency>
62+
<dependency>
63+
<groupId>org.testcontainers</groupId>
64+
<artifactId>testcontainers</artifactId>
65+
<version>1.15.0-rc2</version>
66+
<scope>test</scope>
67+
</dependency>
68+
<dependency>
69+
<groupId>org.testcontainers</groupId>
70+
<artifactId>elasticsearch</artifactId>
71+
<version>1.15.0-rc2</version>
72+
<scope>test</scope>
73+
</dependency>
74+
<dependency>
75+
<groupId>ch.qos.logback</groupId>
76+
<artifactId>logback-core</artifactId>
77+
<version>1.2.3</version>
78+
<scope>test</scope>
79+
</dependency>
80+
<dependency>
81+
<groupId>ch.qos.logback</groupId>
82+
<artifactId>logback-classic</artifactId>
83+
<version>1.2.3</version>
84+
<scope>test</scope>
85+
</dependency>
86+
<dependency>
87+
<groupId>org.mockito</groupId>
88+
<artifactId>mockito-all</artifactId>
89+
<version>1.10.19</version>
6290
<scope>test</scope>
6391
</dependency>
6492
</dependencies>

src/main/java/com/github/dariobalinzo/ElasticSourceConnector.java

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,28 @@
1616

1717
package com.github.dariobalinzo;
1818

19+
import com.github.dariobalinzo.elastic.ElasticConnection;
20+
import com.github.dariobalinzo.elastic.ElasticRepository;
1921
import com.github.dariobalinzo.task.ElasticSourceTask;
20-
import com.github.dariobalinzo.utils.ElasticConnection;
21-
import com.github.dariobalinzo.utils.Utils;
22-
import com.github.dariobalinzo.utils.Version;
2322
import org.apache.kafka.common.config.ConfigDef;
2423
import org.apache.kafka.common.config.ConfigException;
25-
import org.apache.kafka.common.config.types.Password;
2624
import org.apache.kafka.connect.connector.Task;
2725
import org.apache.kafka.connect.errors.ConnectException;
2826
import org.apache.kafka.connect.source.SourceConnector;
29-
import org.elasticsearch.client.Response;
3027
import org.slf4j.Logger;
3128
import org.slf4j.LoggerFactory;
3229

33-
import java.io.IOException;
3430
import java.util.ArrayList;
3531
import java.util.HashMap;
3632
import java.util.List;
3733
import java.util.Map;
3834

3935
public class ElasticSourceConnector extends SourceConnector {
36+
private static Logger logger = LoggerFactory.getLogger(ElasticSourceConnector.class);
4037

41-
private static final Logger logger = LoggerFactory.getLogger(ElasticSourceConnector.class);
4238
private ElasticSourceConnectorConfig config;
4339
private ElasticConnection elasticConnection;
40+
private ElasticRepository elasticRepository;
4441
private Map<String, String> configProperties;
4542

4643
@Override
@@ -50,7 +47,6 @@ public String version() {
5047

5148
@Override
5249
public void start(Map<String, String> props) {
53-
5450
try {
5551
configProperties = props;
5652
config = new ElasticSourceConnectorConfig(props);
@@ -59,30 +55,33 @@ public void start(Map<String, String> props) {
5955
+ "error", e);
6056
}
6157

62-
final String esHost = config.getString(ElasticSourceConnectorConfig.ES_HOST_CONF);
58+
String esScheme = config.getString(ElasticSourceConnectorConfig.ES_SCHEME_CONF);
59+
String esHost = config.getString(ElasticSourceConnectorConfig.ES_HOST_CONF);
6360

6461
//using rest config all the parameters are strings
65-
final int esPort = Integer.parseInt(config.getString(ElasticSourceConnectorConfig.ES_PORT_CONF));
62+
int esPort = Integer.parseInt(config.getString(ElasticSourceConnectorConfig.ES_PORT_CONF));
6663

67-
final String esUser = config.getString(ElasticSourceConnectorConfig.ES_USER_CONF);
68-
final String esPwd = config.getString(ElasticSourceConnectorConfig.ES_PWD_CONF);
64+
String esUser = config.getString(ElasticSourceConnectorConfig.ES_USER_CONF);
65+
String esPwd = config.getString(ElasticSourceConnectorConfig.ES_PWD_CONF);
6966

70-
final int maxConnectionAttempts = Integer.parseInt(config.getString(
67+
int maxConnectionAttempts = Integer.parseInt(config.getString(
7168
ElasticSourceConnectorConfig.CONNECTION_ATTEMPTS_CONFIG
7269
));
73-
final long connectionRetryBackoff = Long.parseLong(config.getString(
70+
long connectionRetryBackoff = Long.parseLong(config.getString(
7471
ElasticSourceConnectorConfig.CONNECTION_BACKOFF_CONFIG
7572
));
7673
if (esUser == null || esUser.isEmpty()) {
7774
elasticConnection = new ElasticConnection(
7875
esHost,
76+
esScheme,
7977
esPort,
8078
maxConnectionAttempts,
8179
connectionRetryBackoff
8280
);
8381
} else {
8482
elasticConnection = new ElasticConnection(
8583
esHost,
84+
esScheme,
8685
esPort,
8786
esUser,
8887
esPwd,
@@ -92,11 +91,7 @@ public void start(Map<String, String> props) {
9291

9392
}
9493

95-
// Initial connection attempt
96-
if (!elasticConnection.testConnection()) {
97-
throw new ConfigException("cannot connect to es");
98-
}
99-
94+
elasticRepository = new ElasticRepository(elasticConnection);
10095
}
10196

10297
@Override
@@ -107,43 +102,43 @@ public Class<? extends Task> taskClass() {
107102

108103
@Override
109104
public List<Map<String, String>> taskConfigs(int maxTasks) {
110-
111-
Response resp;
112-
try {
113-
resp = elasticConnection.getClient()
114-
.getLowLevelClient()
115-
.performRequest("GET", "_cat/indices");
116-
} catch (IOException e) {
117-
logger.error("error in searching index names");
118-
throw new RuntimeException(e);
119-
}
120-
121-
122-
List<String> currentIndexes = Utils.getIndexList(resp,
105+
List<String> currentIndexes = elasticRepository.catIndices(
123106
config.getString(ElasticSourceConnectorConfig.INDEX_PREFIX_CONFIG)
124107
);
125108
int numGroups = Math.min(currentIndexes.size(), maxTasks);
126-
List<List<String>> tablesGrouped = Utils.groupPartitions(currentIndexes, numGroups);
127-
List<Map<String, String>> taskConfigs = new ArrayList<>(tablesGrouped.size());
128-
for (List<String> taskIndices : tablesGrouped) {
109+
List<List<String>> indexGrouped = groupPartitions(currentIndexes, numGroups);
110+
List<Map<String, String>> taskConfigs = new ArrayList<>(indexGrouped.size());
111+
for (List<String> taskIndices : indexGrouped) {
129112
Map<String, String> taskProps = new HashMap<>(configProperties);
130113
taskProps.put(ElasticSourceConnectorConfig.INDICES_CONFIG,
131-
String.join(",",taskIndices));
114+
String.join(",", taskIndices));
132115
taskConfigs.add(taskProps);
133116
}
134117
return taskConfigs;
135118
}
136119

137120
@Override
138121
public void stop() {
139-
140122
logger.info("stopping elastic source");
141123
elasticConnection.closeQuietly();
142-
143124
}
144125

145126
@Override
146127
public ConfigDef config() {
147128
return ElasticSourceConnectorConfig.CONFIG_DEF;
148129
}
130+
131+
132+
private List<List<String>> groupPartitions(List<String> currentIndices, int numGroups) {
133+
List<List<String>> result = new ArrayList<>(numGroups);
134+
for (int i = 0; i < numGroups; ++i) {
135+
result.add(new ArrayList<>());
136+
}
137+
138+
for (int i = 0; i < currentIndices.size(); ++i) {
139+
result.get(i % numGroups).add(currentIndices.get(i));
140+
}
141+
142+
return result;
143+
}
149144
}

src/main/java/com/github/dariobalinzo/ElasticSourceConnectorConfig.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ public class ElasticSourceConnectorConfig extends AbstractConfig {
3232
private final static String ES_HOST_DOC = "ElasticSearch host";
3333
private final static String ES_HOST_DISPLAY = "Elastic host";
3434

35+
public final static String ES_SCHEME_CONF = "es.scheme";
36+
private final static String ES_SCHEME_DOC = "Elasticsearch scheme (default: http)";
37+
private final static String ES_SCHEME_DISPLAY = "Elasticsearch scheme";
38+
private static final String ES_SCHEME_DEFAULT = "http";
3539

3640
public final static String ES_PORT_CONF = "es.port";
3741
private final static String ES_PORT_DOC = "ElasticSearch port";
@@ -127,6 +131,16 @@ private static void addDatabaseOptions(ConfigDef config) {
127131
Width.LONG,
128132
ES_HOST_DISPLAY,
129133
Collections.singletonList(INDEX_PREFIX_CONFIG)
134+
).define(
135+
ES_SCHEME_CONF,
136+
Type.STRING,
137+
ES_SCHEME_DEFAULT,
138+
Importance.MEDIUM,
139+
ES_SCHEME_DOC,
140+
DATABASE_GROUP,
141+
++orderInGroup,
142+
Width.LONG,
143+
ES_SCHEME_DISPLAY
130144
).define(
131145
ES_PORT_CONF,
132146
Type.STRING,
@@ -260,16 +274,11 @@ private static void addConnectorOptions(ConfigDef config) {
260274
}
261275

262276
public ElasticSourceConnectorConfig(Map<String, String> properties) {
263-
264277
super(CONFIG_DEF, properties);
265-
266278
}
267279

268280
protected ElasticSourceConnectorConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
269281
super(subclassConfigDef, props);
270282
}
271283

272-
public static void main(String[] args) {
273-
System.out.println(CONFIG_DEF.toEnrichedRst());
274-
}
275284
}

0 commit comments

Comments
 (0)