Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
f4dfec7
New container with Kafka 0.10 based on ches/kafka:0.10.0.0
Jun 21, 2016
0f065a8
Added Kafka 0.10.0 docker image
Jun 28, 2016
79cf6f4
Ongoing work on 0.10 support and HighLevelConsumer
Aug 3, 2016
82fb038
Reorganized files to get rid of separate libraries in the new impleme…
pulyaevskiy Jan 19, 2017
0210148
Base refactore of produce API
pulyaevskiy Jan 22, 2017
fc8c106
Introduced default session for simpler bootstrap
pulyaevskiy Jan 22, 2017
e754b04
Added basic implementation for producers
pulyaevskiy Jan 22, 2017
32526d6
WIP on Consumer implementation
pulyaevskiy Jan 23, 2017
f889816
Refactored group membership API
pulyaevskiy Jan 25, 2017
53fae24
Updated test filename
Jan 26, 2017
9c01daa
Moved consumer group to ng
Jan 28, 2017
3111cff
Work-in-progress on Consumer implementation
pulyaevskiy Jan 29, 2017
ce4aa92
Added refactored offset fetch api
pulyaevskiy Jan 30, 2017
627e7a9
Refactored fetch API with a simple test case
pulyaevskiy Jan 30, 2017
f9edef0
Work-in-progress on Consumer implementation
pulyaevskiy Jan 31, 2017
469ce89
Completed minimum possible implementation of consumer
pulyaevskiy Feb 1, 2017
50ebeec
Small updates to consumer docs; fixed consumer test
Mar 4, 2017
b650f7e
Cleaned up session creation
pulyaevskiy Mar 4, 2017
b975969
Moved all tests to single location, cleaned up bootstrap; upgraded so…
pulyaevskiy Mar 5, 2017
65e192f
Cleaning up naming convention, errors and api classes
pulyaevskiy Mar 5, 2017
008b5d6
Completed naming clean-up in kafka api layer
pulyaevskiy Mar 5, 2017
e116e52
Moved previous implementation to _old
pulyaevskiy Mar 5, 2017
fdfa287
Another clean-up of some common variable names
pulyaevskiy Mar 5, 2017
ada8666
Re-introduced ConsumerGroup abstraction for convenience
pulyaevskiy Mar 6, 2017
1870d6a
Work-in-progress on committing offsets in Consumer
pulyaevskiy Mar 6, 2017
ecba46a
Switched back to StreamIterator, refactored offset commits
pulyaevskiy Mar 7, 2017
eb29395
Refactoring consumer's state machine
pulyaevskiy Mar 8, 2017
3bda60c
Refactoring consumer's state machine 2
pulyaevskiy Mar 8, 2017
e4294f0
Work on consumer
Mar 9, 2017
940f3a9
Added some extra logging for better debugging
pulyaevskiy Mar 9, 2017
2f5f982
Ongoing work on refactoring consumer poll loop
Mar 10, 2017
89a505f
Fixed test
pulyaevskiy Mar 10, 2017
691de33
Better polling logic in Consumer
pulyaevskiy Mar 12, 2017
5167ab0
Minor cleanup in Consumer
pulyaevskiy Mar 12, 2017
4f8d8fa
Updated JoinGroup request to v1
pulyaevskiy Mar 12, 2017
aba9bb2
Updated messages to v1
pulyaevskiy Mar 13, 2017
a8b3a20
Fix for timestamp type decoding code
pulyaevskiy Mar 13, 2017
4d19580
Added ApiVersions request/response
pulyaevskiy Mar 13, 2017
215fad6
Working on dynamic api version resolution
pulyaevskiy Mar 13, 2017
8c0f8c4
Resolve api versions before sending requests to a broker
Mar 14, 2017
9677e1a
Refactored version handling in requests and encoders
Mar 15, 2017
91bb2bb
Added basic Producer configuration (not integrated yet)
Mar 15, 2017
8478acb
Cleaning up public interface and access to Session objects
Mar 16, 2017
d2c7853
Deleted legacy implementations
Mar 16, 2017
5d17134
Moved new code out from subfolder
Mar 16, 2017
dd59e6b
Cleaned up old naming
Mar 16, 2017
9cebb03
Refactored Producer to implement StreamSink; man, Dart streams are aw…
Mar 17, 2017
76b71fe
Resolve record offsets after produce
Mar 18, 2017
3714cbd
Attempt to fix travis build
Mar 19, 2017
850772d
Increased retry delay when fetching consumer coordinator
Mar 19, 2017
dc305a3
Updated code to support Dart 2.7, updated package dependency versions…
larryaasen Apr 9, 2020
7ce7d50
Updated travis config.
larryaasen Apr 9, 2020
b8348bc
Fixed code that broke the tests.
larryaasen Apr 9, 2020
af2ca10
Removed Dart 1.x new keywords from examples.
larryaasen Apr 9, 2020
87f7712
Updated code in kafka-0.10 branch to support Dart 2.7 (#14)
larryaasen Apr 10, 2020
34f38d5
Updated docker image ches/kafka to latest. Updated docker image jploc…
larryaasen Apr 15, 2020
c71a7eb
Merge pull request #1 from dart-kafka/kafka-0.10
larryaasen Apr 18, 2020
818fb9e
Updated to the latest Docker Official Image of zookeeper, 3.6.0.
larryaasen Apr 18, 2020
b9ada21
Merge pull request #2 from larryaasen/updates
larryaasen Apr 18, 2020
a05ba01
Merge pull request #3 from larryaasen/dart_2_7
larryaasen Apr 18, 2020
0c4010c
Merge pull request #4 from larryaasen/updates
larryaasen Apr 18, 2020
898700f
Disabled the uploading to codecov.io from Travis job to ensure test s…
larryaasen Apr 18, 2020
66bfe03
Removed the remaining code coverage steps in Travis build until it ge…
larryaasen Apr 18, 2020
b059b06
Updated Docker images to the latest (#15)
larryaasen Apr 18, 2020
8c399a9
Fixed bug in ConsumerStreamIterator.attachStream() where '==' was bei…
larryaasen Apr 24, 2020
b30ad34
Merge pull request #5 from dart-kafka/kafka-0.10
larryaasen Apr 24, 2020
2db1601
Changed import.
larryaasen Apr 24, 2020
a9ababa
Merge branch 'kafka-0.10' of https://github.com/larryaasen/kafka into…
larryaasen Apr 24, 2020
ece25a2
Added one log statement useful for debugging commit request.
larryaasen Apr 25, 2020
56eafa1
Changed Docker being built from ches/kafka, which was Kafka 0.10.2.1,…
larryaasen May 30, 2020
69699d1
Renamed folder tool/kafka-0.10.1.0 to tool/kafka to better reflect th…
larryaasen May 30, 2020
7bf91ba
More renaming of tool/kafka-0.10.1.0 to tool/kafka.
larryaasen May 30, 2020
57999d7
Merge pull request #16 from larryaasen/kafka-0.10
pulyaevskiy May 30, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .analysis_options

This file was deleted.

5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@ packages
.packages
notes.md
coverage/
.atom/
.vscode/
.idea/
package_config.json
.DS_Store
25 changes: 8 additions & 17 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
sudo: required

dist: trusty

language: dart

services:
Expand All @@ -11,20 +7,15 @@ dart:
- stable

before_install:
- docker build -t kafka-cluster tool/kafka-cluster/
- docker run -d --name kafka-cluster -p 2181:2181 -p 9092:9092 -p 9093:9093 --env ADVERTISED_HOST=127.0.0.1 kafka-cluster
- docker ps -a
- sleep 5
- docker exec kafka-cluster bash -c '$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper=localhost:2181 --topic dartKafkaTest --partitions 3 --replication-factor 2'
- docker exec kafka-cluster bash -c '$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper=localhost:2181'
- ./tool/rebuild.sh

script:
- pub run test -r expanded test/all.dart
- pub global activate coverage
- dart --observe=8111 test/all.dart &
- sleep 20
- pub global run coverage:collect_coverage --port=8111 -o coverage.json --resume-isolates
- pub global run coverage:format_coverage --package-root=packages --report-on lib --in coverage.json --out lcov.info --lcov
- pub run test test/all.dart
# - pub global activate coverage
# - dart --observe=8111 test/all.dart &
# - sleep 10
# - pub global run coverage:collect_coverage --port=8111 -o coverage.json --resume-isolates
# - pub global run coverage:format_coverage --packages=.packages --report-on lib --in coverage.json --out lcov.info --lcov

after_success:
- bash <(curl -s https://codecov.io/bash)
# - bash <(curl -s https://codecov.io/bash)
27 changes: 26 additions & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Requirements:

* Docker Toolbox (OS X)

## Starting Kafka container locally
## Kafka 0.8.x

```
docker build -t kafka-cluster tool/kafka-cluster/
Expand All @@ -22,3 +22,28 @@ Now you should be able to run tests with:
```
pub run test -j 1
```

## Kafka 0.10.0.0

We're using `ches/kafka` base image so instructions are a bit different.

```
# Zookeeper is in a separate container now
docker run -d --name zookeeper --publish 2181:2181 jplock/zookeeper:3.4.6

# Build our image
docker build -t kafka tool/kafka/

ZK_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' zookeeper)

# or for fish users
export ZK_IP=(docker inspect --format '{{ .NetworkSettings.IPAddress }}' zookeeper)

# Start Kafka container
docker run -d --name kafka --publish 9092:9092 --publish 9093:9093 \
--env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 \
--env ZOOKEEPER_IP=$ZK_IP \
kafka
```

Kafka brokers will be available on `127.0.0.1:9092` and `127.0.0.1:9093`.
199 changes: 40 additions & 159 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,198 +1,79 @@
# Dart Kafka

[![Build Status](https://travis-ci.org/pulyaevskiy/dart-kafka.svg?branch=master)](https://travis-ci.org/pulyaevskiy/dart-kafka)
[![Build Status](https://travis-ci.org/dart-kafka/kafka.svg?branch=kafka-0.10)](https://travis-ci.org/dart-kafka/kafka)
[![Coverage](https://codecov.io/gh/pulyaevskiy/dart-kafka/branch/master/graph/badge.svg)](https://codecov.io/gh/pulyaevskiy/dart-kafka)
[![License](https://img.shields.io/badge/license-BSD--2-blue.svg)](https://raw.githubusercontent.com/pulyaevskiy/dart-kafka/master/LICENSE)

Kafka client library written in Dart.

### Current status

This library is a work-in-progress and has not been used in production yet.

### Things that are not supported yet.

* Snappy compression.
* SSL

## Installation

There is no Pub package yet, but it will be published as soon as APIs are
stable enough.

For now you can use git dependency in your `pubspec.yaml`:

```yaml
dependencies:
kafka:
git: https://github.com/pulyaevskiy/dart-kafka.git
```

And then import it as usual:

```dart
import 'package:kafka/kafka.dart';
```

## Features
_To be updated with first release._

This library provides several high-level API objects to interact with Kafka:

* __KafkaSession__ - responsible for managing connections to Kafka brokers and
coordinating all requests. Also provides access to metadata information.
* __Producer__ - publishes messages to Kafka topics
* __Consumer__ - consumes messages from Kafka topics and stores it's state (current
offsets). Leverages ConsumerMetadata API via ConsumerGroup.
* __Fetcher__ - consumes messages from Kafka without storing state.
* __OffsetMaster__ - provides convenience on top of Offset API allowing to easily
retrieve earliest and latest offsets of particular topic-partitions.
* __ConsumerGroup__ - provides convenience on top of Consumer Metadata API to easily
fetch or commit consumer offsets.

## Producer

Simple implementation of Kafka producer. Supports auto-detection of leaders for
topic-partitions and creates separate `ProduceRequest`s for each broker.
Requests are sent in parallel and all responses are aggregated in special
`ProduceResult` object.
Producer publishes messages to the Kafka cluster. Here is a simple example
of using the producer to send `String` records:

```dart
// file:produce.dart
import 'dart:io';
import 'package:kafka/kafka.dart';

main(List<String> arguments) async {
var host = new ContactPoint('127.0.0.1', 9092);
var session = new KafkaSession([host]);

var producer = new Producer(session, 1, 1000);
var result = await producer.produce([
new ProduceEnvelope('topicName', 0, [new Message('msgForPartition0'.codeUnits)]),
new ProduceEnvelope('topicName', 1, [new Message('msgForPartition1'.codeUnits)])
]);
print(result.hasErrors);
print(result.offsets);
session.close(); // make sure to always close the session when the work is done.
}
```

Result:

```shell
$ dart produce.dart
$ false
$ {dartKafkaTest: {0: 213075, 1: 201680}}
```

## Consumer

High-level implementation of Kafka consumer which stores it's state using
Kafka's ConsumerMetadata API.

> If you don't want to keep state of consumed offsets take a look at `Fetcher`
> which was designed specifically for this use case.

Consumer returns messages as a `Stream`, so all standard stream operations
should be applicable. However Kafka topics are ordered streams of messages
with sequential offsets. Consumer implementation allows to preserve order of
messages received from server. For this purpose all messages are wrapped in
special `MessageEnvelope` object with following methods:

```
/// Signals to consumer that message has been processed and it's offset can
/// be committed.
void commit(String metadata);

/// Signals that message has been processed and we are ready for
/// the next one. Offset of this message will **not** be committed.
void ack();

/// Signals to consumer to cancel any further deliveries and close the stream.
void cancel();
```

One must call `commit()` or `ack()` for each processed message, otherwise
Consumer won't send the next message to the stream.

Simplest example of a consumer:

```dart
import 'dart:io';
import 'dart:async';

import 'package:kafka/kafka.dart';

void main(List<String> arguments) async {
var host = new ContactPoint('127.0.0.1', 9092);
var session = new KafkaSession([host]);
var group = new ConsumerGroup(session, 'consumerGroupName');
var topics = {
'topicName': [0, 1] // list of partitions to consume from.
};

var consumer = new Consumer(session, group, topics, 100, 1);
await for (MessageEnvelope envelope in consumer.consume(limit: 3)) {
// Assuming that messages were produces by Producer from previous example.
var value = new String.fromCharCodes(envelope.message.value);
print('Got message: ${envelope.offset}, ${value}');
envelope.commit('metadata'); // Important.
}
session.close(); // make sure to always close the session when the work is done.
Future main() async {
var config = new ProducerConfig(bootstrapServers: ['127.0.0.1:9092']);
var producer = new Producer<String, String>(
new StringSerializer(), new StringSerializer(), config);
var record = new ProducerRecord('example-topic', 0, 'key', 'value');
producer.add(record);
var result = await record.result;
print(result);
await producer.close();
}
```

It is also possible to consume messages in batches for improved efficiency:

```dart
import 'dart:io';
import 'dart:async';
import 'package:kafka/kafka.dart';
The producer implements `StreamSink` interface so it is possible to send
individual records via `add()` as well as streams of records via
`addStream()`.

void main(List<String> arguments) async {
var host = new ContactPoint('127.0.0.1', 9092);
var session = new KafkaSession([host]);
var group = new ConsumerGroup(session, 'consumerGroupName');
var topics = {
'topicName': [0, 1] // list of partitions to consume from.
};

var consumer = new Consumer(session, group, topics, 100, 1);
await for (BatchEnvelope batch in consumer.batchConsume(20)) {
batch.items.forEach((MessageEnvelope envelope) {
// use envelope as usual
});
batch.commit('metadata'); // use batch control methods instead of individual messages.
}
session.close(); // make sure to always close the session when the work is done.
}
```
The producer buffers records internally so that they can be sent in bulk to
the server. This does not necessarily mean increased latency for record
delivery. When a record is added with `add()` it is sent immediately
(although an asynchronous gap is present between call to `add()` and actual
send) as long as there is available slot in IO pool. By default producer
has a limit of up to 5 in-flight requests per broker connection,
so delay can occur only if all the slots are already occupied.

### Consumer offset reset strategy
## Note on new API design

Due to the fact that Kafka topics can be configured to delete old messages
periodically, it is possible that your consumer offset may become invalid (
just because there is no such message/offset in Kafka topic anymore).
Public API of this library has been completely re-written since original
version (which supported only Kafka 0.8.x and was never published on Pub).

In such cases `Consumer` provides configurable strategy with following options:
New design is trying to accomplish two main goals:

* `OffsetOutOfRangeBehavior.throwError`
* `OffsetOutOfRangeBehavior.resetToEarliest` (default)
* `OffsetOutOfRangeBehavior.resetToLatest`
#### 1. Follow official Java client semantics and contract.

By default if it gets `OffsetOutOfRange` server error it will reset it's offsets
to earliest available in the consumed topic and partitions, which essentially
means consuming all available messages from the beginning.
`Producer` and `Consumer` are trying to preserve characteristics of
original Java implementations. This is why configurations for both
are almost identical to the ones in the official client. The way serialization is implemented also based on Java code.

To modify this behavior simply set `onOffsetOutOfRange` property of consumer to
one of the above values:
#### 2. Streams-compatible public API.

```
var consumer = new Consumer(session, group, topics, 100, 1);
consumer.onOffsetOutOfRange = OffsetOutOfRangeBehavior.throwError;
```
The main reason is to allow better interoperability with other libraries.
`Producer` implements `StreamSink` for this specific reason, so instead of
having a `send()` method (as in Java client) there are `add()` and
`addStream()`.

## Supported protocol versions

Current version targets version `0.8.2` of the Kafka protocol. There is no plans
to support earlier versions.
Current version targets version `0.10` of the Kafka protocol.
There is no plans to support earlier versions.

## License

Expand Down
4 changes: 4 additions & 0 deletions analysis_options.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
analyzer:
strong-mode:
implicit-casts: true
implicit-dynamic: true
15 changes: 15 additions & 0 deletions example/offsets.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import 'dart:async';

import 'package:kafka/kafka.dart';

Future main() async {
var session = Session(['127.0.0.1:9092']);
var master = OffsetMaster(session);
var offsets = await master.fetchEarliest([
TopicPartition('simple_topic', 0),
TopicPartition('simple_topic', 1),
TopicPartition('simple_topic', 2),
]);
print(offsets);
await session.close(); // Always close session in the end.
}
25 changes: 25 additions & 0 deletions example/simple_consumer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import 'dart:async';

import 'package:kafka/kafka.dart';
import 'package:logging/logging.dart';

Future main() async {
Logger.root.level = Level.ALL;
Logger.root.onRecord.listen(print);

var session = Session(['127.0.0.1:9092']);
var consumer = Consumer<String, String>(
'simple_consumer', StringDeserializer(), StringDeserializer(), session);

await consumer.subscribe(['simple_topic']);
var queue = consumer.poll();
while (await queue.moveNext()) {
var records = queue.current;
for (var record in records.records) {
print(
"[${record.topic}:${record.partition}], offset: ${record.offset}, ${record.key}, ${record.value}, ts: ${record.timestamp}");
}
await consumer.commit();
}
await session.close();
}
22 changes: 22 additions & 0 deletions example/simple_producer.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import 'dart:async';
import 'package:kafka/kafka.dart';

Future main() async {
// Logger.root.level = Level.ALL;
// Logger.root.onRecord.listen(print);

var config = ProducerConfig(bootstrapServers: ['127.0.0.1:9092']);
var producer =
Producer<String, String>(StringSerializer(), StringSerializer(), config);

for (var i = 0; i < 10; i++) {
// Loop through a list of partitions.
for (var p in [0, 1, 2]) {
var rec =
ProducerRecord('simple_topic', p, 'key:${p},$i', 'value:${p},$i');
producer.add(rec);
rec.result.then(print);
}
}
await producer.close();
}
Loading