diff --git a/src/Akka.Streams.Kafka.Tests/Integration/TransactionalIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/TransactionalIntegrationTests.cs index 1f45a247..a0b51d5a 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/TransactionalIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/TransactionalIntegrationTests.cs @@ -8,6 +8,7 @@ using Akka.Streams.Kafka.Helpers; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; +using Akka.Streams.TestKit; using Confluent.Kafka; using FluentAssertions; using Xunit; @@ -22,42 +23,42 @@ public TransactionalIntegrationTests(ITestOutputHelper output, KafkaFixture fixt { } - [Fact(Skip = "Missing producer transactions support, see https://github.com/akkadotnet/Akka.Streams.Kafka/issues/85")] + [Fact] public async Task Transactional_source_with_sink_Should_work() { - var settings = CreateConsumerSettings(CreateGroup(1)); + var consumerSettings = CreateConsumerSettings(CreateGroup(1)); var sourceTopic = CreateTopic(1); var targetTopic = CreateTopic(2); var transactionalId = Guid.NewGuid().ToString(); const int totalMessages = 10; - - var control = KafkaConsumer.TransactionalSource(settings, Subscriptions.Topics(sourceTopic)) - .Via(Business>()) + + await ProduceStrings(sourceTopic, Enumerable.Range(1, totalMessages), ProducerSettings); + + var control = KafkaConsumer.TransactionalSource(consumerSettings, Subscriptions.Topics(sourceTopic)) .Select(message => { return ProducerMessage.Single( - new ProducerRecord(targetTopic, message.Record.Key, message.Record.Value), + new ProducerRecord(targetTopic, message.Record.Message.Key, message.Record.Message.Value), passThrough: message.PartitionOffset); }) .ToMaterialized(KafkaProducer.TransactionalSink(ProducerSettings, transactionalId), Keep.Both) .MapMaterializedValue(DrainingControl.Create) .Run(Materializer); - var consumer = ConsumeStrings(targetTopic, totalMessages); + var consumer = ConsumeStrings(targetTopic, totalMessages, CreateConsumerSettings(CreateGroup(2))); - await ProduceStrings(sourceTopic, Enumerable.Range(1, totalMessages), ProducerSettings); - - AssertTaskCompletesWithin(TimeSpan.FromSeconds(totalMessages), consumer.IsShutdown); - AssertTaskCompletesWithin(TimeSpan.FromSeconds(totalMessages), control.DrainAndShutdown()); + AssertTaskCompletesWithin(TimeSpan.FromSeconds(30), consumer.IsShutdown); + AssertTaskCompletesWithin(TimeSpan.FromSeconds(30), control.DrainAndShutdown()); consumer.DrainAndShutdown().Result.Should().HaveCount(totalMessages); } - private Flow Business() => Flow.Create(); - - private DrainingControl>> ConsumeStrings(string topic, int count) + private DrainingControl>> ConsumeStrings( + string topic, + int count, + ConsumerSettings settings) { - return KafkaConsumer.PlainSource(CreateConsumerSettings(CreateGroup(1)), Subscriptions.Topics(topic)) + return KafkaConsumer.PlainSource(settings, Subscriptions.Topics(topic)) .Take(count) .ToMaterialized(Sink.Seq>(), Keep.Both) .MapMaterializedValue(DrainingControl>>.Create) diff --git a/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs b/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs index 3fa261b0..650eddae 100644 --- a/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs +++ b/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs @@ -211,13 +211,10 @@ public static FlowWithContext, C, IResults, } /// - /// API IS FOR INTERNAL USAGE: see https://github.com/akkadotnet/Akka.Streams.Kafka/issues/85 - /// /// Publish records to Kafka topics and then continue the flow. The flow can only be used with a that /// emits a . The flow requires a unique `transactional.id` across all app /// instances. The flow will override producer properties to enable Kafka exactly-once transactional support. /// - [InternalApi] public static Flow, IResults, NotUsed> TransactionalFlow( ProducerSettings setting, string transactionalId) @@ -237,12 +234,9 @@ public static Flow, IResults - /// API IS FOR INTERNAL USAGE: see https://github.com/akkadotnet/Akka.Streams.Kafka/issues/85 - /// /// Sink that is aware of the from a . /// It will initialize, begin, produce, and commit the consumer offset as part of a transaction. /// - [InternalApi] public static Sink, Task> TransactionalSink( ProducerSettings settings, string transactionalId) diff --git a/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs b/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs index 0e4b9cc5..e536cac3 100644 --- a/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs +++ b/src/Akka.Streams.Kafka/Messages/CommittedMarker.cs @@ -30,17 +30,33 @@ internal sealed class PartitionOffsetCommittedMarker : GroupTopicPartitionOffset /// Committed marker /// public ICommittedMarker CommittedMarker { get; } + /// + /// Consumer group metadata + /// + public IConsumerGroupMetadata ConsumerGroupMetadata { get; } - public PartitionOffsetCommittedMarker(string groupId, string topic, int partition, Offset offset, ICommittedMarker committedMarker) + public PartitionOffsetCommittedMarker( + string groupId, + string topic, + int partition, + Offset offset, + ICommittedMarker committedMarker, + IConsumerGroupMetadata consumerGroupMetadata) : base(groupId, topic, partition, offset) { CommittedMarker = committedMarker; + ConsumerGroupMetadata = consumerGroupMetadata; } - public PartitionOffsetCommittedMarker(GroupTopicPartition groupTopicPartition, Offset offset, ICommittedMarker committedMarker) + public PartitionOffsetCommittedMarker( + GroupTopicPartition groupTopicPartition, + Offset offset, + ICommittedMarker committedMarker, + IConsumerGroupMetadata consumerGroupMetadata) : base(groupTopicPartition, offset) { CommittedMarker = committedMarker; + ConsumerGroupMetadata = consumerGroupMetadata; } } } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs b/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs index a72e5d0c..baad4eab 100644 --- a/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs +++ b/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs @@ -9,9 +9,15 @@ namespace Akka.Streams.Kafka.Settings { public sealed class ProducerSettings { - public ProducerSettings(ISerializer keySerializer, ISerializer valueSerializer, int parallelism, - string dispatcherId, TimeSpan flushTimeout, TimeSpan eosCommitInterval, - IImmutableDictionary properties) + public ProducerSettings( + ISerializer keySerializer, + ISerializer valueSerializer, + int parallelism, + string dispatcherId, + TimeSpan flushTimeout, + TimeSpan eosCommitInterval, + TimeSpan maxBlock, + IImmutableDictionary properties) { KeySerializer = keySerializer; ValueSerializer = valueSerializer; @@ -19,6 +25,7 @@ public ProducerSettings(ISerializer keySerializer, ISerializer val DispatcherId = dispatcherId; FlushTimeout = flushTimeout; EosCommitInterval = eosCommitInterval; + MaxBlock = maxBlock; Properties = properties; } @@ -26,6 +33,11 @@ public ProducerSettings(ISerializer keySerializer, ISerializer val public ISerializer ValueSerializer { get; } public int Parallelism { get; } public string DispatcherId { get; } + /// + /// Configures how long producer methods can block. + /// See also: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_max.block.ms + /// + public TimeSpan MaxBlock { get; } public TimeSpan FlushTimeout { get; } /// /// The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`. @@ -69,6 +81,7 @@ private ProducerSettings Copy( string dispatcherId = null, TimeSpan? flushTimeout = null, TimeSpan? eosCommitInterval = null, + TimeSpan? maxBlock = null, IImmutableDictionary properties = null) => new ProducerSettings( keySerializer: keySerializer ?? this.KeySerializer, @@ -77,6 +90,7 @@ private ProducerSettings Copy( dispatcherId: dispatcherId ?? this.DispatcherId, flushTimeout: flushTimeout ?? this.FlushTimeout, eosCommitInterval: eosCommitInterval ?? this.EosCommitInterval, + maxBlock: maxBlock ?? this.MaxBlock, properties: properties ?? this.Properties); public static ProducerSettings Create(ActorSystem system, ISerializer keySerializer, ISerializer valueSerializer) @@ -98,6 +112,7 @@ public static ProducerSettings Create(Akka.Configuration.Config co dispatcherId: config.GetString("use-dispatcher", "akka.kafka.default-dispatcher"), flushTimeout: config.GetTimeSpan("flush-timeout", TimeSpan.FromSeconds(2)), eosCommitInterval: config.GetTimeSpan("eos-commit-interval", TimeSpan.FromMilliseconds(100)), + maxBlock: config.GetTimeSpan("max.block.ms", TimeSpan.FromSeconds(60)), properties: ImmutableDictionary.Empty); } diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/BaseSingleSourceLogic.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/BaseSingleSourceLogic.cs index 8d4c866e..9e676843 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/BaseSingleSourceLogic.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/BaseSingleSourceLogic.cs @@ -35,6 +35,7 @@ internal abstract class BaseSingleSourceLogic : GraphStageLogic private readonly ConcurrentQueue> _buffer = new ConcurrentQueue>(); protected IImmutableSet TopicPartitions { get; set; } = ImmutableHashSet.Create(); + public IConsumerGroupMetadata ConsumerGroupMetadata { get; private set; } protected StageActor SourceActor { get; private set; } internal IActorRef ConsumerActor { get; private set; } @@ -66,6 +67,9 @@ public override void PreStart() ConsumerActor = CreateConsumerActor(); SourceActor.Watch(ConsumerActor); + // get consumer metadata before consuming messages + ConsumerActor.Tell(KafkaConsumerActorMetadata.Internal.ConsumerGroupMetadataRequest.Instance, SourceActor.Ref); + ConfigureSubscription(); } @@ -109,6 +113,10 @@ protected virtual void MessageHandling((IActorRef, object) args) { switch (args.Item2) { + case KafkaConsumerActorMetadata.Internal.ConsumerGroupMetadata metadata: + ConsumerGroupMetadata = metadata.Metadata; + break; + case KafkaConsumerActorMetadata.Internal.Messages msg: // might be more than one in flight when we assign/revoke tps if (msg.RequestId == _requestId) @@ -150,7 +158,8 @@ private void Pump() { if (_buffer.TryDequeue(out var message)) { - Push(_shape.Outlet, _messageBuilder.CreateMessage(message)); + var result = _messageBuilder.CreateMessage(message, ConsumerGroupMetadata); + Push(_shape.Outlet, result); Pump(); } else if (!_requested && TopicPartitions.Any()) diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SubSourceLogic.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SubSourceLogic.cs index ff2f80b5..ab5a0be7 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SubSourceLogic.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/SubSourceLogic.cs @@ -62,6 +62,7 @@ private class CloseRevokedPartitions { } /// private IImmutableSet _partitionsToRevoke = ImmutableHashSet.Empty; + public IConsumerGroupMetadata ConsumerGroupMetadata { get; private set; } protected StageActor SourceActor { get; private set; } public IActorRef ConsumerActor { get; private set; } @@ -107,6 +108,10 @@ public override void PreStart() { switch (args.Item2) { + case KafkaConsumerActorMetadata.Internal.ConsumerGroupMetadata metadata: + ConsumerGroupMetadata = metadata.Metadata; + break; + case Status.Failure failure: var exception = failure.Cause; switch (_decider(failure.Cause)) @@ -140,6 +145,8 @@ public override void PreStart() $"kafka-consumer-{_actorNumber}"); SourceActor.Watch(ConsumerActor); + + ConsumerActor.Tell(KafkaConsumerActorMetadata.Internal.ConsumerGroupMetadataRequest.Instance, SourceActor.Ref); switch (_subscription) { @@ -285,8 +292,11 @@ private void EmitSubSourcesForPendingPartitions() _pendingPartitions = _pendingPartitions.Remove(topicPartition); _partitionsInStartup = _partitionsInStartup.Add(topicPartition); - var subSourceStage = new SubSourceStreamStage(topicPartition, ConsumerActor, _subsourceStartedCallback, - _subsourceCancelledCallback, _messageBuilder, _decider, _actorNumber); + var subSourceStage = new SubSourceStreamStage( + topicPartition, ConsumerActor, ConsumerGroupMetadata, + _subsourceStartedCallback, _subsourceCancelledCallback, + _messageBuilder, _decider, _actorNumber); + var subsource = Source.FromGraph(subSourceStage); Push(_shape.Outlet, (topicPartition, subsource)); @@ -359,6 +369,7 @@ private class SubSourceStreamStage : GraphStage> private readonly TopicPartition _topicPartition; private readonly IActorRef _consumerActor; private readonly Action<(TopicPartition, IControl)> _subSourceStartedCallback; + private readonly IConsumerGroupMetadata _consumerGroupMetadata; private readonly Action<(TopicPartition, Option>)> _subSourceCancelledCallback; private readonly IMessageBuilder _messageBuilder; private readonly int _actorNumber; @@ -367,16 +378,20 @@ private class SubSourceStreamStage : GraphStage> public Outlet Out { get; } public override SourceShape Shape { get; } - public SubSourceStreamStage(TopicPartition topicPartition, IActorRef consumerActor, - Action<(TopicPartition, IControl)> subSourceStartedCallback, - Action<(TopicPartition, Option>)> subSourceCancelledCallback, - IMessageBuilder messageBuilder, - Decider decider, - int actorNumber) + public SubSourceStreamStage( + TopicPartition topicPartition, + IActorRef consumerActor, + IConsumerGroupMetadata consumerGroupMetadata, + Action<(TopicPartition, IControl)> subSourceStartedCallback, + Action<(TopicPartition, Option>)> subSourceCancelledCallback, + IMessageBuilder messageBuilder, + Decider decider, + int actorNumber) { _topicPartition = topicPartition; _consumerActor = consumerActor; _subSourceStartedCallback = subSourceStartedCallback; + _consumerGroupMetadata = consumerGroupMetadata; _subSourceCancelledCallback = subSourceCancelledCallback; _messageBuilder = messageBuilder; _decider = decider; @@ -388,8 +403,9 @@ public SubSourceStreamStage(TopicPartition topicPartition, IActorRef consumerAct protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) { - return new SubSourceStreamStageLogic(Shape, _topicPartition, _consumerActor, _actorNumber, _messageBuilder, _decider, - _subSourceStartedCallback, _subSourceCancelledCallback); + return new SubSourceStreamStageLogic( + Shape, _topicPartition, _consumerActor, _actorNumber, _messageBuilder, _decider, + _consumerGroupMetadata, _subSourceStartedCallback, _subSourceCancelledCallback); } private class SubSourceStreamStageLogic : GraphStageLogic @@ -404,12 +420,14 @@ private class SubSourceStreamStageLogic : GraphStageLogic private bool _requested = false; private StageActor _subSourceActor; private readonly Decider _decider; + private readonly IConsumerGroupMetadata _consumerGroupMetadata; private readonly ConcurrentQueue> _buffer = new ConcurrentQueue>(); public PromiseControl Control { get; } public SubSourceStreamStageLogic(SourceShape shape, TopicPartition topicPartition, IActorRef consumerActor, int actorNumber, IMessageBuilder messageBuilder, Decider decider, + IConsumerGroupMetadata consumerGroupMetadata, Action<(TopicPartition, IControl)> subSourceStartedCallback, Action<(TopicPartition, Option>)> subSourceCancelledCallback) : base(shape) @@ -420,6 +438,7 @@ public SubSourceStreamStageLogic(SourceShape shape, TopicPartition topicPa _actorNumber = actorNumber; _messageBuilder = messageBuilder; _decider = decider; + _consumerGroupMetadata = consumerGroupMetadata; _subSourceStartedCallback = subSourceStartedCallback; _requestMessages = new KafkaConsumerActorMetadata.Internal.RequestMessages(0, ImmutableHashSet.Create(topicPartition)); @@ -493,7 +512,7 @@ private void Pump() { if (_buffer.TryDequeue(out var message)) { - Push(_shape.Outlet, _messageBuilder.CreateMessage(message)); + Push(_shape.Outlet, _messageBuilder.CreateMessage(message, _consumerGroupMetadata)); Pump(); } else if (!_requested) diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/TransactionalSourceLogic.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/TransactionalSourceLogic.cs index 292dc5e4..9bdb3ace 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/TransactionalSourceLogic.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Abstract/TransactionalSourceLogic.cs @@ -119,7 +119,8 @@ private void DrainHandling((IActorRef, object) arg, Action<(IActorRef, object)> } /// - public TransactionalMessage CreateMessage(ConsumeResult record) => _messageBuilder.CreateMessage(record); + public TransactionalMessage CreateMessage(ConsumeResult record, IConsumerGroupMetadata consumerGroupMetadata) + => _messageBuilder.CreateMessage(record, ConsumerGroupMetadata); /// public string GroupId => _settings.GroupId; diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs index 16e01e8d..fa6d40ce 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActor.cs @@ -142,6 +142,10 @@ protected override bool Receive(object message) HandleSubscription(subscribe); return true; + case KafkaConsumerActorMetadata.Internal.ConsumerGroupMetadataRequest metadataRequest: + Sender.Tell(new KafkaConsumerActorMetadata.Internal.ConsumerGroupMetadata(_consumer.ConsumerGroupMetadata)); + return true; + case KafkaConsumerActorMetadata.Internal.RequestMessages requestMessages: Context.Watch(Sender); CheckOverlappingRequests("RequestMessages", Sender, requestMessages.Topics); @@ -237,7 +241,8 @@ private void ApplySettings(ConsumerSettings updatedSettings) _log.Debug($"Creating Kafka consumer with settings: {JsonConvert.SerializeObject(_settings)}"); _consumer = _settings.CreateKafkaConsumer( - consumeErrorHandler: (c, e) => ProcessError(new KafkaException(e)), + consumeErrorHandler: (c, e) => + ProcessError(new KafkaException(e)), partitionAssignedHandler: (c, tp) => _partitionAssignmentHandler.OnPartitionsAssigned(tp.ToImmutableHashSet()), partitionRevokedHandler: (c, tp) => diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActorMetadata.cs b/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActorMetadata.cs index 269e1d7e..dca4acb4 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActorMetadata.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/Actors/KafkaConsumerActorMetadata.cs @@ -47,6 +47,28 @@ internal static Props GetProps(IActorRef owner, ConsumerSettings set [InternalApi] public class Internal { + public class ConsumerGroupMetadataRequest + { + private ConsumerGroupMetadataRequest() { } + public static readonly ConsumerGroupMetadataRequest Instance = new ConsumerGroupMetadataRequest(); + } + + /// + /// Consumer group metadata generated by + /// + public class ConsumerGroupMetadata + { + public ConsumerGroupMetadata(IConsumerGroupMetadata metadata) + { + Metadata = metadata; + } + + /// + /// Metadata + /// + public IConsumerGroupMetadata Metadata { get; } + } + /// /// Messages /// @@ -57,7 +79,9 @@ public class Messages /// /// Request Id /// List of consumed messages - public Messages(int requestId, ImmutableList> messagesList) + public Messages( + int requestId, + ImmutableList> messagesList) { RequestId = requestId; MessagesList = messagesList; diff --git a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs index 3987c507..7ea58dc3 100644 --- a/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs +++ b/src/Akka.Streams.Kafka/Stages/Consumers/MessageBuilders.cs @@ -17,10 +17,9 @@ public interface IMessageBuilder /// Creates downstream message /// /// - /// We pass consumer here, because there is no way to get consumer instance from - /// some global configuration, like Alpakka does getting consumer actor ref + /// We pass consumer group metadata here, because Confluent driver requires it instead of GroupId /// - TMessage CreateMessage(ConsumeResult record); + TMessage CreateMessage(ConsumeResult record, IConsumerGroupMetadata consumerGroupMetadata); } /// @@ -28,7 +27,7 @@ public interface IMessageBuilder /// public class PlainMessageBuilder : IMessageBuilder> { - public ConsumeResult CreateMessage(ConsumeResult record) => record; + public ConsumeResult CreateMessage(ConsumeResult record, IConsumerGroupMetadata consumerGroupMetadata) => record; } /// @@ -50,7 +49,7 @@ internal abstract class CommittableMessageBuilderBase : IMessageBuilder record); /// - public CommittableMessage CreateMessage(ConsumeResult record) + public CommittableMessage CreateMessage(ConsumeResult record, IConsumerGroupMetadata consumerGroupMetadata) { var offset = new GroupTopicPartitionOffset(GroupId, record.Topic, record.Partition, record.Offset); return new CommittableMessage(record, new CommittableOffset(Committer, offset, MetadataFromRecord(record))); @@ -113,7 +112,9 @@ public OffsetContextBuilder(IInternalCommitter committer, ConsumerSettings } /// - public (ConsumeResult, ICommittableOffset) CreateMessage(ConsumeResult record) + public (ConsumeResult, ICommittableOffset) CreateMessage( + ConsumeResult record, + IConsumerGroupMetadata consumerGroupMetadata) { var offset = new GroupTopicPartitionOffset(GroupId, record.Topic, record.Partition, record.Offset); return (record, new CommittableOffset(Committer, offset, _metadataFromMessage(record))); @@ -134,6 +135,10 @@ internal interface ITransactionalMessageBuilderStage : IMessageBuild /// ICommittedMarker CommittedMarker { get; } /// + /// Consumer group metadata + /// + IConsumerGroupMetadata ConsumerGroupMetadata { get; } + /// /// On message callback /// /// @@ -153,7 +158,9 @@ public TransactionalMessageBuilder(ITransactionalMessageBuilderStage - public TransactionalMessage CreateMessage(ConsumeResult record) + public TransactionalMessage CreateMessage( + ConsumeResult record, + IConsumerGroupMetadata consumerGroupMetadata) { _transactionalMessageBuilderStage.OnMessage(record); @@ -162,7 +169,8 @@ public TransactionalMessage CreateMessage(ConsumeResult record) record.Topic, record.Partition, record.Offset, - _transactionalMessageBuilderStage.CommittedMarker); + _transactionalMessageBuilderStage.CommittedMarker, + consumerGroupMetadata); return new TransactionalMessage(record, offset); } diff --git a/src/Akka.Streams.Kafka/Stages/Producers/TransactionalProducerStage.cs b/src/Akka.Streams.Kafka/Stages/Producers/TransactionalProducerStage.cs index 4c9fee59..3b592c59 100644 --- a/src/Akka.Streams.Kafka/Stages/Producers/TransactionalProducerStage.cs +++ b/src/Akka.Streams.Kafka/Stages/Producers/TransactionalProducerStage.cs @@ -43,7 +43,7 @@ public TransactionalProducerStage(bool closeProducerOnStop, ProducerSettings protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) { - return new TransactionalProducerStageLogic(this, inheritedAttributes, _settings.EosCommitInterval); + return new TransactionalProducerStageLogic(this, inheritedAttributes, _settings); } } @@ -52,7 +52,7 @@ internal class TransactionalProducerStageLogic : DefaultProd private const string CommitSchedulerKey = "commit"; private readonly TransactionalProducerStage _stage; - private readonly TimeSpan _commitInterval; + private readonly ProducerSettings _settings; private readonly Decider _decider; private readonly TimeSpan _messageDrainInterval = TimeSpan.FromMilliseconds(10); @@ -63,16 +63,16 @@ internal class TransactionalProducerStageLogic : DefaultProd public TransactionalProducerStageLogic( TransactionalProducerStage stage, Attributes attributes, - TimeSpan commitInterval) + ProducerSettings settings) : base(stage, attributes) { _stage = stage; - _commitInterval = commitInterval; + _settings = settings; var supervisionStrategy = attributes.GetAttribute(null); _decider = supervisionStrategy != null ? supervisionStrategy.Decider : Deciders.StoppingDecider; - _onInternalCommitCallback = GetAsyncCallback(() => ScheduleOnce(CommitSchedulerKey, commitInterval)); + _onInternalCommitCallback = GetAsyncCallback(() => ScheduleOnce(CommitSchedulerKey, _settings.EosCommitInterval)); } public override void PreStart() @@ -82,7 +82,7 @@ public override void PreStart() InitTransactions(); BeginTransaction(); ResumeDemand(tryToPull: false); - ScheduleOnce(CommitSchedulerKey, _commitInterval); + ScheduleOnce(CommitSchedulerKey, _settings.EosCommitInterval); } private void ResumeDemand(bool tryToPull = true) @@ -128,7 +128,7 @@ private void MaybeCommitTransaction(bool beginNewTransaction = true) } else { - ScheduleOnce(CommitSchedulerKey, _commitInterval); + ScheduleOnce(CommitSchedulerKey, _settings.EosCommitInterval); } } @@ -159,13 +159,11 @@ private void CommitTransaction(NonemptyTransactionBatch batch, bool beginNewTran { var groupId = batch.GroupId; Log.Debug("Committing transaction for consumer group '{0}' with offsets: {1}", groupId, batch.Offsets); - var offsetMap = batch.OffsetMap(); + var offsetMap = batch.OffsetMap().Select(pair => new TopicPartitionOffset(pair.Key, pair.Value.Offset)); + + Producer.SendOffsetsToTransaction(offsetMap, batch.ConsumerGroupMetadata, _settings.MaxBlock); + Producer.CommitTransaction(_settings.MaxBlock); - // TODO: Add producer work with transactions - /* scala code: - producer.sendOffsetsToTransaction(offsetMap.asJava, group) - producer.commitTransaction() - */ Log.Debug("Committed transaction for consumer group '{0}' with offsets: {1}", groupId, batch.Offsets); _batchOffsets = new EmptyTransactionBatch(); batch.InternalCommit().ContinueWith(t => @@ -183,22 +181,19 @@ private void CommitTransaction(NonemptyTransactionBatch batch, bool beginNewTran private void InitTransactions() { Log.Debug("Iinitializing transactions"); - // TODO: Add producer work with transactions - // producer.initTransactions() + Producer.InitTransactions(_settings.MaxBlock); } private void BeginTransaction() { Log.Debug("Beginning new transaction"); - // TODO: Add producer work with transactions - // producer.beginTransaction() + Producer.BeginTransaction(); } private void AbortTransaction() { Log.Debug("Aborting transaction"); - // TODO: Add producer work with transactions - // producer.abortTransaction() + Producer.AbortTransaction(_settings.MaxBlock); } @@ -219,25 +214,24 @@ public void CommittingFailed() private class NonemptyTransactionBatch : ITransactionBatch { - private readonly PartitionOffsetCommittedMarker _head; - private readonly IImmutableDictionary _tail; private readonly ICommittedMarker _committedMarker; public IImmutableDictionary Offsets { get; } public string GroupId { get; } + public IConsumerGroupMetadata ConsumerGroupMetadata { get; } public NonemptyTransactionBatch(PartitionOffsetCommittedMarker head, IImmutableDictionary tail = null) { - _head = head; - _tail = tail ?? ImmutableDictionary.Empty; + var tail1 = tail ?? ImmutableDictionary.Empty; _committedMarker = head.CommittedMarker; GroupId = head.GroupId; + ConsumerGroupMetadata = head.ConsumerGroupMetadata; - var previousHighest = _tail.GetValueOrDefault(head.GroupTopicPartition, new Offset(-1)).Value; + var previousHighest = tail1.GetValueOrDefault(head.GroupTopicPartition, new Offset(-1)).Value; var highestOffset = new Offset(Math.Max(head.Offset, previousHighest)); - Offsets = _tail.AddRange(new []{ new KeyValuePair(head.GroupTopicPartition, highestOffset) }); + Offsets = tail1.AddRange(new []{ new KeyValuePair(head.GroupTopicPartition, highestOffset) }); } /// diff --git a/src/Akka.Streams.Kafka/reference.conf b/src/Akka.Streams.Kafka/reference.conf index 33d587f4..006261f0 100644 --- a/src/Akka.Streams.Kafka/reference.conf +++ b/src/Akka.Streams.Kafka/reference.conf @@ -13,6 +13,15 @@ akka.kafka.producer { # When this value is empty, the dispatcher configured for the stream # will be used. use-dispatcher = "akka.kafka.default-dispatcher" + + # The configuration controls how long the KafkaProducer's send(), partitionsFor(), initTransactions(), + # sendOffsetsToTransaction(), commitTransaction() and abortTransaction() methods will block. + # For send() this timeout bounds the total time waiting for both metadata fetch and buffer allocation + # (blocking in the user-supplied serializers or partitioner is not counted against this timeout). + # For partitionsFor() this timeout bounds the time spent waiting for metadata if it is unavailable. + # The transaction-related methods always block, but may timeout if the transaction coordinator could not be + # discovered or did not respond within the timeout. + max.block.ms = "60000" } # Properties for akka.kafka.ConsumerSettings can be