Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 58 additions & 51 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,36 @@ The connector is supplied as source code which you can easily build into a JAR f

## Contents

- [Building the connector](#building-the-connector)
- [Build Testing and Quickstart](#build-testing-and-quickstart)
- [Running the connector](#running-the-connector)
- [Configuration](#configuration)
- [Document formats](#document-formats)
- [Issues and contributions](#issues-and-contributions)
- [License](#license)

- [Building the connector](#building-the-connector)
- [Build Testing and Quickstart](#build-testing-and-quickstart)
- [Running the connector](#running-the-connector)
- [Configuration](#configuration)
- [Document formats](#document-formats)
- [Issues and contributions](#issues-and-contributions)
- [License](#license)

## Building the connector

To build the connector, you must have the following installed:

- [git](https://git-scm.com/)
- [Maven 3.0 or later](https://maven.apache.org)
- Java 8 or later

- [git](https://git-scm.com/)
- [Maven 3.0 or later](https://maven.apache.org)
- Java 8 or later

Clone the repository with the following command:

```shell
git clone https://github.com/ibm-messaging/kafka-connect-elastic-sink.git
```

Change directory into the `kafka-connect-elastic-sink` directory:

```shell
cd kafka-connect-elastic-sink
```

Build the connector using Maven:

```shell
mvn clean package
```
Expand All @@ -50,10 +51,11 @@ For more information see the [README](quickstart/README-QS.md) in that directory
## Running the connector

To run the connector, you must have:
* The JAR from building the connector
* A properties file containing the configuration for the connector
* Apache Kafka 2.0.0 or later, either standalone or included as part of an offering such as IBM Event Streams
* Elasticsearch 7.0.0 or later

- The JAR from building the connector
- A properties file containing the configuration for the connector
- Apache Kafka 2.0.0 or later, either standalone or included as part of an offering such as IBM Event Streams
- Elasticsearch 7.0.0 or later

The connector can be run in a Kafka Connect worker in either standalone (single process) or distributed mode. It's a good idea to start in standalone mode.

Expand All @@ -64,9 +66,9 @@ You need two configuration files. One is for the configuration that applies to a
To run the connector in standalone mode from the directory into which you installed Apache Kafka, you use a command like this:

``` shell
$ export REPO=<path to cloned repository>
$ export CLASSPATH=$CLASSPATH:$REPO/target/kafka-connect-elastic-sink-<version>-jar-with-dependencies.jar
$ bin/connect-standalone.sh config/connect-standalone.properties $(REPO)/config/elastic-sink.properties
export REPO=<path to cloned repository>
export CLASSPATH=$CLASSPATH:$REPO/target/kafka-connect-elastic-sink-<version>-jar-with-dependencies.jar
bin/connect-standalone.sh config/connect-standalone.properties $(REPO)/config/elastic-sink.properties
```

### Running in distributed mode
Expand Down Expand Up @@ -104,54 +106,59 @@ The following instructions assume you are running on OpenShift and have Strimzi
1. `oc describe kafkaconnects2i <kafkaConnectClusterName>` to check that the Elasticsearch sink connector is in the list of available connector plugins.

#### Start an instance of the Elasticsearch sink connector using KafkaConnector

1. `cp deploy/strimzi.kafkaconnector.yaml kafkaconnector.yaml`
1. Update the `kafkaconnector.yaml` file to replace all of the values in `<>`, adding any additional configuration properties.
1. `oc apply -f kafkaconnector.yaml` to start the connector.
1. `oc get kafkaconnector` to list the connectors. You can use `oc describe` to get more details on the connector, such as its status.

## Configuration

See this [configuration file](config/elastic-sink.properties) for all exposed configuration attributes. Required attributes for the connector are
the address of the Elasticsearch server, and the Kafka topics that will be collected. You do not include the protocol (http or https) as part
of the server address.

The only other required properties are the `es.document.builder` and `es.index.builder` classes which should not be changed unless you write your
The only other required properties are the `es.document.builder` and `es.index.builder` classes which should not be changed unless you write your
own Java classes for alternative document and index formatting.

Additional properties allow for security configuration and tuning of how the calls are made to the Elasticsearch server.

The configuration options for the Kafka Connect sink connector for Elasticsearch are as follows:

| Name | Description | Type | Default | Valid values |
| ---------------------------- | ---------------------------------------------------------------------- | ------- | -------------- | ------------------------------------------------------- |
| topics | A list of topics to use as input | string | | topic1[,topic2,...] |
| es.connection | The connection endpoint for Elasticsearch | string | | host:port |
| es.document.builder | The class used to build the document content | string | | Class implementing DocumentBuilder |
| es.identifier.builder | The class used to build the document identifier | string | | Class implementing IdentifierBuilder |
| es.index.builder | The class used to build the document index | string | | Class implementing IndexBuilder |
| key.converter | The class used to convert Kafka record keys | string | | See below |
| value.converter | The class used to convert Kafka record values | string | | See below |
| es.http.connections | The maximum number of HTTP connections to Elasticsearch | integer | 5 | |
| es.http.timeout.idle | Timeout (seconds) for idle HTTP connections to Elasticsearch | integer | 30 | |
| es.http.timeout.connection | Time (seconds) allowed for initial HTTP connection to Elasticsearch | integer | 10 | |
| es.http.timeout.operation | Time (seconds) allowed for calls to Elasticsearch | integer | 6 | |
| es.max.failures | Maximum number of failed attempts to commit an update to Elasticsearch | integer | 5 | |
| es.http.proxy.host | Hostname for HTTP proxy | string | | Hostname |
| es.http.proxy.port | Port number for HTTP proxy | integer | 8080 | Port number |
| es.user.name | The user name for authenticating with Elasticsearch | string | | |
| es.password | The password for authenticating with Elasticsearch | string | | |
| es.tls.keystore.location | The path to the JKS keystore to use for TLS connections | string | JVM keystore | Local path to a JKS file |
| es.tls.keystore.password | The password of the JKS keystore to use for TLS connections | string | | |
| es.tls.truststore.location | The path to the JKS truststore to use for TLS connections | string | JVM truststore | Local path to a JKS file |
| es.tls.truststore.password | The password of the JKS truststore to use for TLS connections | string | | |
| Name | Description | Type | Default | Valid values |
|----------------------------|------------------------------------------------------------------------|---------|----------------|--------------------------------------|
| topics | A list of topics to use as input | string | | topic1[,topic2,...] |
| es.connection | The connection endpoint for Elasticsearch | string | | host:port |
| es.document.builder | The class used to build the document content | string | | Class implementing DocumentBuilder |
| es.identifier.builder | The class used to build the document identifier | string | | Class implementing IdentifierBuilder |
| es.index.builder | The class used to build the document index | string | | Class implementing IndexBuilder |
| key.converter | The class used to convert Kafka record keys | string | | See below |
| value.converter | The class used to convert Kafka record values | string | | See below |
| es.http.connections | The maximum number of HTTP connections to Elasticsearch | integer | 5 | |
| es.http.timeout.idle | Timeout (seconds) for idle HTTP connections to Elasticsearch | integer | 30 | |
| es.http.timeout.connection | Time (seconds) allowed for initial HTTP connection to Elasticsearch | integer | 10 | |
| es.http.timeout.operation | Time (seconds) allowed for calls to Elasticsearch | integer | 6 | |
| es.max.failures | Maximum number of failed attempts to commit an update to Elasticsearch | integer | 5 | |
| es.http.proxy.host | Hostname for HTTP proxy | string | | Hostname |
| es.http.proxy.port | Port number for HTTP proxy | integer | 8080 | Port number |
| es.user.name | The user name for authenticating with Elasticsearch | string | | |
| es.password | The password for authenticating with Elasticsearch | string | | |
| es.tls.keystore.location | The path to the keystore to use for TLS connections | string | JVM keystore | Local path to a keystore file |
| es.tls.keystore.password | The password of the keystore to use for TLS connections | string | | |
| es.tls.keystore.type | The type of the keystore to use for TLS connections | string | JKS | JKS, PKCS12, etc. |
| es.tls.truststore.location | The path to the truststore to use for TLS connections | string | JVM truststore | Local path to a truststore file |
| es.tls.truststore.password | The password of the truststore to use for TLS connections | string | | |
| es.tls.truststore.type | The type of the truststore to use for TLS connections | string | JKS | JKS, PKCS12, etc. |
| es.tls.skip.verification | Whether to skip SSL verification for the Elasticsearch connection | boolean | false | true, false |

### Supported configurations

In order to ensure reliable indexing of documents, the following configurations are only supported with these values:

| Name | Valid values |
| ---------------------------- | ------------------------------------------------------------------------------------------------- |
| key.converter | `org.apache.kafka.connect.storage.StringConverter`, `org.apache.kafka.connect.json.JsonConverter` |
| value.converter | `org.apache.kafka.connect.json.JsonConverter` |
| Name | Valid values |
|-----------------|---------------------------------------------------------------------------------------------------|
| key.converter | `org.apache.kafka.connect.storage.StringConverter`, `org.apache.kafka.connect.json.JsonConverter` |
| value.converter | `org.apache.kafka.connect.json.JsonConverter` |

### Performance tuning

Expand All @@ -168,13 +175,13 @@ For TLS-protected communication to Elasticsearch, you must provide appropriate c

Given a file `es-secrets.properties` with the contents:

```
```properties
secret-key=password
```

Update the worker configuration file to specify the FileConfigProvider which is included by default:

```
```properties
# Additional properties for the worker configuration to enable use of ConfigProviders
# multiple comma-separated provider types can be specified here
config.providers=file
Expand All @@ -183,11 +190,12 @@ config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigPr

Update the connector configuration file to reference `secret-key` in the file:

```
```properties
es.password=${file:es-secret.properties:secret-key}
```

## Document formats

The documents inserted into Elasticsearch by this connector are JSON objects. The conversion from the incoming Kafka records to the Elasticsearch documents is performed using a *document builder*. The supplied `JsonDocumentBuilder` converts the Kafka record's value into a document containing a JSON object. The connector inserts documents into the store using Elasticsearch type `_doc`.

To ensure the documents can be indexed reliably, the incoming Kafka records must also be JSON objects. This is ensured by setting the `value.converter` configuration to `org.apache.kafka.connect.json.JsonConverter` which only accepts well-formed JSON objects.
Expand All @@ -204,7 +212,6 @@ By setting the `es.identifier.builder` configuration to `com.ibm.eventstreams.co

This mode of operation is suitable if the Kafka records are independent events and you want each of them to be indexed in Elasticsearch separately.


### Document ID based on Kafka record key

By setting the `es.identifier.builder` configuration to `com.ibm.eventstreams.connect.elasticsink.builders.KeyIdentifierBuilder`, each Kafka record replaces any existing document in Elasticsearch which has the same key. The Kafka record key is used as the document ID. This means the document IDs are only as unique as the Kafka record keys. The records must have keys.
Expand All @@ -229,7 +236,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

(http://www.apache.org/licenses/LICENSE-2.0)
<http://www.apache.org/licenses/LICENSE-2.0>

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
15 changes: 9 additions & 6 deletions config/elastic-sink.properties
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,19 @@ es.identifier.builder=com.ibm.eventstreams.connect.elasticsink.builders.DefaultI

# Authentication to Elasticsearch. Defaults are empty
#es.user.name=
#es.password=
# JKS Keystore locations and passwords.
#es.password=

# Keystore and truststore configurations for TLS connection to Elasticsearch
# TLS is used for connection to Elasticsearch if any of these are set
#es.tls.keystore.location=
#es.tls.keystore.password=
#es.tls.keystore.type=JKS
#es.tls.truststore.location=
#es.tls.truststore.password=

# HTTP Proxy information. Default port is 8080
#es.tls.truststore.type=JKS
#es.tls.skip.verification=false

# HTTP Proxy information. Default port is 8080
#es.http.proxy.host=
#es.http.proxy.port=8080

Expand All @@ -55,7 +58,7 @@ es.identifier.builder=com.ibm.eventstreams.connect.elasticsink.builders.DefaultI
#es.http.connections=10
# Maximum number of uninterrupted failures before considering the connection failed
#es.max.failures=5

# These converters control conversion of data between the internal Kafka Connect representation and the messages in Kafka.
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<kafka.version>3.6.1</kafka.version>
<kafka.version>4.0.0</kafka.version>
</properties>
<dependencies>
<dependency>
Expand Down
Loading