From 7f1e96416394b709b28225e08d288dc334bdc102 Mon Sep 17 00:00:00 2001 From: "sergio.ribeiro" Date: Wed, 11 May 2022 18:37:24 +0100 Subject: [PATCH 01/11] fix: support Kafka messages with nullable or empty partition key --- README.md | 2 +- .../Assertion/IPhysicalStorageAssert.cs | 2 + ...OrderedConsumptionPhysicalStorageAssert.cs | 23 ++++ ...eLatestConsumptionPhysicalStorageAssert.cs | 32 +++++ .../Core/Storages/InMemoryAuxiliarStorage.cs | 28 +++- .../EmptyPartitionKeyRetryDurableTests.cs | 120 ++++++++++++++++++ .../RetryDurableTests.cs | 8 +- .../API/RetryRequestHandlerBaseTests.cs | 9 +- .../RetryDurableQueueRepositoryTests.cs | 116 +++++++++++++++++ .../MongoDb/Adapters/ItemAdapterTests.cs | 7 +- .../Adapters/RetryQueueItemAdapterTests.cs | 5 +- .../Readers/RetryQueueReaderTests.cs | 6 +- ...DurableEmbeddedClusterDefinitionBuilder.cs | 8 +- .../Durable/Polling/QueueTracker.cs | 26 ++-- .../Repository/RetryDurableQueueRepository.cs | 83 +++++++----- 15 files changed, 407 insertions(+), 68 deletions(-) create mode 100644 src/KafkaFlow.Retry.IntegrationTests/EmptyPartitionKeyRetryDurableTests.cs create mode 100644 src/KafkaFlow.Retry.UnitTests/Durable/Repository/RetryDurableQueueRepositoryTests.cs diff --git a/README.md b/README.md index 3d7bc462..11b4f6f7 100644 --- a/README.md +++ b/README.md @@ -89,7 +89,7 @@ Install packages related to your context. The Core package is required for all o .WithRetryTopicName("test-topic-retry") .WithRetryConsumerBufferSize(4) .WithRetryConsumerWorkersCount(2) - .WithRetryConusmerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption) + .WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption) .WithRetryTypedHandlers( handlers => handlers .WithHandlerLifetime(InstanceLifetime.Transient) diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/IPhysicalStorageAssert.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/IPhysicalStorageAssert.cs index 2bacf35d..1ffd22d8 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/IPhysicalStorageAssert.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/IPhysicalStorageAssert.cs @@ -6,6 +6,8 @@ internal interface IPhysicalStorageAssert { + Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount); + Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count); Task AssertRetryDurableMessageDoneAsync(RepositoryType repositoryType, RetryDurableTestMessage message); diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert.cs index 656e0b2c..23fd9745 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert.cs @@ -18,6 +18,29 @@ public RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert(IRepositoryP this.repositoryProvider = repositoryProvider; } + public async Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount) + { + var retryQueue = await this + .repositoryProvider + .GetRepositoryOfType(repositoryType) + .GetRetryQueueAsync(message.Key) + .ConfigureAwait(false); + + Assert.True(retryQueue.Id != Guid.Empty, "Retry Durable Creation Get Retry Queue cannot be asserted."); + + var retryQueueItems = await this + .repositoryProvider + .GetRepositoryOfType(repositoryType) + .GetRetryQueueItemsAsync(retryQueue.Id, rqi => rqi.Count(item => item.Status == RetryQueueItemStatus.InRetry) != retryCount) + .ConfigureAwait(false); + + Assert.True(retryQueueItems != null, "Retry Durable Creation Get Retry Queue Item Message cannot be asserted."); + + Assert.Equal(retryQueueItems.Count() - 1, retryQueueItems.Max(i => i.Sort)); + Assert.True(Enum.Equals(retryQueue.Status, RetryQueueStatus.Active)); + Assert.All(retryQueueItems, i => Enum.Equals(i.Status, RetryQueueItemStatus.Waiting)); + } + public async Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count) { var retryQueue = await this diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/RetryDurableLatestConsumptionPhysicalStorageAssert.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/RetryDurableLatestConsumptionPhysicalStorageAssert.cs index c44bebd5..8d0f48b2 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/RetryDurableLatestConsumptionPhysicalStorageAssert.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/RetryDurableLatestConsumptionPhysicalStorageAssert.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Retry.IntegrationTests.Core.Storages.Assertion { using System; + using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using KafkaFlow.Retry.Durable.Repository.Model; @@ -18,6 +19,37 @@ public RetryDurableLatestConsumptionPhysicalStorageAssert(IRepositoryProvider re this.repositoryProvider = repositoryProvider; } + public async Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount) + { + var retryQueue = await this + .repositoryProvider + .GetRepositoryOfType(repositoryType) + .GetRetryQueueAsync(message.Key).ConfigureAwait(false); + + Assert.True(retryQueue.Id != Guid.Empty, "Retry Durable Retrying Get Retry Queue cannot be asserted."); + + var retryQueueItems = await this + .repositoryProvider + .GetRepositoryOfType(repositoryType) + .GetRetryQueueItemsAsync( + retryQueue.Id, + rqi => + { + return rqi.Count(item => item.Status == RetryQueueItemStatus.Waiting) != retryCount; + }).ConfigureAwait(false); + + var lastRetryItem = retryQueueItems.OrderBy(x => x.Sort).Last(); + var numberOrRetryItems = retryQueueItems.Count(); + var maxSortValue = retryQueueItems.Max(i => i.Sort); + var cancelledRetryItems = retryQueueItems.Except(new List { lastRetryItem }); + + Assert.True(retryQueueItems != null, "Retry Durable Retrying Get Retry Queue Item Message cannot be asserted."); + Assert.True(Enum.Equals(retryQueue.Status, RetryQueueStatus.Active), "Actual retry queue should be in active state"); + Assert.Equal(numberOrRetryItems - 1, maxSortValue); + Assert.Equal(RetryQueueItemStatus.Waiting, lastRetryItem.Status); + Assert.All(cancelledRetryItems, i => Enum.Equals(i.Status, RetryQueueItemStatus.Cancelled)); + } + public async Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count) { var retryQueue = await this diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/InMemoryAuxiliarStorage.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/InMemoryAuxiliarStorage.cs index 135de05e..94c58084 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/InMemoryAuxiliarStorage.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/InMemoryAuxiliarStorage.cs @@ -11,24 +11,24 @@ internal static class InMemoryAuxiliarStorage where T : ITestMessage { private const int TimeoutSec = 60; - private static readonly ConcurrentBag Message = new ConcurrentBag(); + private static readonly ConcurrentBag Messages = new ConcurrentBag(); public static bool ThrowException { get; set; } public static void Add(T message) { - Message.Add(message); + Messages.Add(message); } public static async Task AssertCountMessageAsync(T message, int count) { var start = DateTime.Now; - while (Message.Count(x => x.Key == message.Key && x.Value == message.Value) != count) + while (Messages.Count(x => x.Key == message.Key && x.Value == message.Value) != count) { if (DateTime.Now.Subtract(start).TotalSeconds > TimeoutSec && !Debugger.IsAttached) { - Assert.True(false, "Message not received."); + Assert.True(false, $"Message not received - {message.Key}:{message.Value}."); return; } @@ -36,9 +36,27 @@ public static async Task AssertCountMessageAsync(T message, int count) } } + public static async Task AssertEmptyPartitionKeyCountMessageAsync(T message, int count, int timoutSeconds = TimeoutSec) + { + var start = DateTime.Now; + int numberOfMessages = 0; + do + { + numberOfMessages = Messages.Count(x => x.Value == message.Value); + + if (DateTime.Now.Subtract(start).TotalSeconds > timoutSeconds && !Debugger.IsAttached) + { + Assert.True(false, $"Message {message.Key}:{message.Value} not received. Expected {count}, messages received {numberOfMessages}"); + return; + } + + await Task.Delay(1000).ConfigureAwait(false); + } while (numberOfMessages != count); + } + public static void Clear() { - Message.Clear(); + Messages.Clear(); } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/EmptyPartitionKeyRetryDurableTests.cs b/src/KafkaFlow.Retry.IntegrationTests/EmptyPartitionKeyRetryDurableTests.cs new file mode 100644 index 00000000..316ea11c --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/EmptyPartitionKeyRetryDurableTests.cs @@ -0,0 +1,120 @@ +namespace KafkaFlow.Retry.IntegrationTests +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers.Fixtures; + using KafkaFlow.Retry.IntegrationTests.Core.Messages; + using KafkaFlow.Retry.IntegrationTests.Core.Producers; + using KafkaFlow.Retry.IntegrationTests.Core.Storages; + using KafkaFlow.Retry.IntegrationTests.Core.Storages.Assertion; + using KafkaFlow.Retry.IntegrationTests.Core.Storages.Repositories; + using Microsoft.Extensions.DependencyInjection; + using Xunit; + + [Collection("BootstrapperHostCollection")] + public class EmptyPartitionKeyRetryDurableTests + { + private const int defaultWaitingTimeSeconds = 120; + private readonly IRepositoryProvider repositoryProvider; + private readonly IServiceProvider serviceProvider; + + public EmptyPartitionKeyRetryDurableTests(BootstrapperHostFixture bootstrapperHostFixture) + { + this.serviceProvider = bootstrapperHostFixture.ServiceProvider; + this.repositoryProvider = bootstrapperHostFixture.ServiceProvider.GetRequiredService(); + InMemoryAuxiliarStorage.Clear(); + InMemoryAuxiliarStorage.ThrowException = true; + } + + public static IEnumerable EmptyKeyScenarios() + { + yield return new object[] + { + RepositoryType.MongoDb, + typeof(IMessageProducer), + typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert), + 3 //numberOfMessagesToBeProduced + }; + yield return new object[] + { + RepositoryType.SqlServer, + typeof(IMessageProducer), + typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert), + 3 + }; + yield return new object[] + { + RepositoryType.MongoDb, + typeof(IMessageProducer), + typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), + 1 + }; + yield return new object[] + { + RepositoryType.SqlServer, + typeof(IMessageProducer), + typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), + 1 + }; + } + + [Theory] + [MemberData(nameof(EmptyKeyScenarios))] + internal async Task EmptyKeyRetryDurableTest( + RepositoryType repositoryType, + Type producerType, + Type physicalStorageType, + int numberOfMessagesToBeProduced) + { + // Arrange + var numberOfMessagesByEachSameKey = 1; + var numberOfTimesThatEachMessageIsTriedWhenDone = 1; + var numberOfTimesThatEachMessageIsTriedDuringDurable = 1; + var producer = this.serviceProvider.GetRequiredService(producerType) as IMessageProducer; + var physicalStorageAssert = this.serviceProvider.GetRequiredService(physicalStorageType) as IPhysicalStorageAssert; + var messages = new List(); + for (int i = 0; i < numberOfMessagesToBeProduced; i++) + { + messages.Add(new RetryDurableTestMessage { Key = string.Empty, Value = $"Message_{i + 1}" }); + } + + await this.repositoryProvider.GetRepositoryOfType(repositoryType).CleanDatabaseAsync().ConfigureAwait(false); + + // Act + foreach (var message in messages) + { + await producer.ProduceAsync(message.Key, message).ConfigureAwait(false); + } + + RetryDurableTestMessage messageToValidate = messages[0]; + + await physicalStorageAssert + .AssertEmptyKeyRetryDurableMessageRetryingAsync(repositoryType, messageToValidate, numberOfMessagesByEachSameKey) + .ConfigureAwait(false); + + // Assert - Retrying + InMemoryAuxiliarStorage.Clear(); + + await InMemoryAuxiliarStorage + .AssertEmptyPartitionKeyCountMessageAsync(messageToValidate, numberOfTimesThatEachMessageIsTriedDuringDurable, defaultWaitingTimeSeconds) + .ConfigureAwait(false); + + await physicalStorageAssert + .AssertEmptyKeyRetryDurableMessageRetryingAsync(repositoryType, messageToValidate, numberOfTimesThatEachMessageIsTriedDuringDurable) + .ConfigureAwait(false); + + // Assert - Done + InMemoryAuxiliarStorage.ThrowException = false; + InMemoryAuxiliarStorage.Clear(); + + await InMemoryAuxiliarStorage + .AssertEmptyPartitionKeyCountMessageAsync(messageToValidate, numberOfTimesThatEachMessageIsTriedWhenDone, defaultWaitingTimeSeconds) + .ConfigureAwait(false); + + await physicalStorageAssert + .AssertRetryDurableMessageDoneAsync(repositoryType, messageToValidate) + .ConfigureAwait(false); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs b/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs index f141a8eb..563230ce 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs @@ -1,9 +1,5 @@ namespace KafkaFlow.Retry.IntegrationTests { - using System; - using System.Collections.Generic; - using System.Linq; - using System.Threading.Tasks; using AutoFixture; using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers.Fixtures; using KafkaFlow.Retry.IntegrationTests.Core.Messages; @@ -12,6 +8,10 @@ namespace KafkaFlow.Retry.IntegrationTests using KafkaFlow.Retry.IntegrationTests.Core.Storages.Assertion; using KafkaFlow.Retry.IntegrationTests.Core.Storages.Repositories; using Microsoft.Extensions.DependencyInjection; + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; using Xunit; [Collection("BootstrapperHostCollection")] diff --git a/src/KafkaFlow.Retry.UnitTests/API/RetryRequestHandlerBaseTests.cs b/src/KafkaFlow.Retry.UnitTests/API/RetryRequestHandlerBaseTests.cs index 25ab7cb0..fc12a077 100644 --- a/src/KafkaFlow.Retry.UnitTests/API/RetryRequestHandlerBaseTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/API/RetryRequestHandlerBaseTests.cs @@ -1,13 +1,14 @@ namespace KafkaFlow.Retry.UnitTests.API { - using System.Text; - using System.Threading; - using System.Threading.Tasks; using FluentAssertions; + using global::KafkaFlow.Retry.Durable.Repository.Model; using global::KafkaFlow.Retry.UnitTests.API.Surrogate; using global::KafkaFlow.Retry.UnitTests.API.Utilities; using Microsoft.AspNetCore.Http; using Moq; + using System.Text; + using System.Threading; + using System.Threading.Tasks; using Xunit; public class RetryRequestHandlerBaseTests @@ -21,7 +22,7 @@ public async Task RetryRequestHandlerBase_HandleAsync_CallsHandleRequestAsync() // Arrange var dto = new DtoSurrogate { - Text = Durable.Repository.Model.RetryQueueStatus.Active + Text = RetryQueueStatus.Active }; var mockHttpContext = HttpContextHelper.MockHttpContext(ResourcePath, HttpMethod, requestBody: dto); diff --git a/src/KafkaFlow.Retry.UnitTests/Durable/Repository/RetryDurableQueueRepositoryTests.cs b/src/KafkaFlow.Retry.UnitTests/Durable/Repository/RetryDurableQueueRepositoryTests.cs new file mode 100644 index 00000000..097ad405 --- /dev/null +++ b/src/KafkaFlow.Retry.UnitTests/Durable/Repository/RetryDurableQueueRepositoryTests.cs @@ -0,0 +1,116 @@ +namespace KafkaFlow.Retry.UnitTests.Durable.Repository +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Text; + using System.Threading.Tasks; + using global::KafkaFlow.Retry.Durable.Definitions; + using global::KafkaFlow.Retry.Durable.Encoders; + using global::KafkaFlow.Retry.Durable.Repository; + using global::KafkaFlow.Retry.Durable.Repository.Actions.Create; + using global::KafkaFlow.Retry.Durable.Repository.Actions.Read; + using global::KafkaFlow.Retry.Durable.Repository.Adapters; + using global::KafkaFlow.Retry.Durable.Repository.Model; + using Moq; + using Xunit; + + public class RetryDurableQueueRepositoryTests + { + private readonly Mock mockMessageAdapter; + private readonly Mock mockMessageHeadersAdapter; + private readonly MockRepository mockRepository; + private readonly Mock mockRetryDurableQueueRepositoryProvider; + private readonly Mock mockUtf8Encoder; + private readonly RetryDurableQueueRepository retryDurableQueueRepository; + + public RetryDurableQueueRepositoryTests() + { + mockRepository = new MockRepository(MockBehavior.Strict); + + mockRetryDurableQueueRepositoryProvider = mockRepository.Create(); + var mockIUpdateRetryQueueItemHandler = mockRepository.Create(); + mockMessageHeadersAdapter = mockRepository.Create(); + mockMessageAdapter = mockRepository.Create(); + mockUtf8Encoder = mockRepository.Create(); + var retryDurablePollingDefinition = new RetryDurablePollingDefinition(true, "0 0 0 ? * * *", 1, 1, "some_id"); + + retryDurableQueueRepository = new RetryDurableQueueRepository( + mockRetryDurableQueueRepositoryProvider.Object, + new List { mockIUpdateRetryQueueItemHandler.Object }, + mockMessageHeadersAdapter.Object, + mockMessageAdapter.Object, + mockUtf8Encoder.Object, + retryDurablePollingDefinition); + } + + [Theory] + [InlineData("key", "value")] + [InlineData(null, "value")] + [InlineData("", "value")] + public async Task AddIfQueueExistsAsync_WithValidMessage_ReturnResultStatusAdded(string messageKey, string messageValue) + { + // Arrange + byte[] messageKeyBytes = null; + if (messageKey is object) + { + messageKeyBytes = Encoding.ASCII.GetBytes(messageKey); + } + + var messageValueBytes = Encoding.ASCII.GetBytes(messageValue); + AddIfQueueExistsResultStatus addedIfQueueExistsResultStatus = AddIfQueueExistsResultStatus.Added; + Mock mockIConsumerContext = new Mock(); + mockIConsumerContext + .SetupGet(c => c.Topic) + .Returns("topic"); + mockIConsumerContext + .SetupGet(c => c.Partition) + .Returns(1); + mockIConsumerContext + .SetupGet(c => c.Offset) + .Returns(2); + mockIConsumerContext + .SetupGet(c => c.MessageTimestamp) + .Returns(new DateTime(2022, 01, 01)); + + Mock mockIMessageContext = new Mock(); + mockIMessageContext + .Setup(c => c.ConsumerContext) + .Returns(mockIConsumerContext.Object); + mockIMessageContext + .Setup(c => c.Message) + .Returns(new Message(messageKeyBytes, messageValueBytes)); + + mockMessageAdapter + .Setup(mes => mes.AdaptMessageToRepository(It.IsAny())) + .Returns(messageValueBytes); + + mockMessageHeadersAdapter + .Setup(mes => mes.AdaptMessageHeadersToRepository(It.IsAny())) + .Returns(Enumerable.Empty()); + + if (messageKey is object) + { + mockUtf8Encoder + .Setup(enc => enc.Decode(It.IsAny())) + .Returns(messageKey); + } + + mockRetryDurableQueueRepositoryProvider + .Setup(rep => rep.CheckQueueAsync(It.IsAny())) + .ReturnsAsync(new CheckQueueResult(CheckQueueResultStatus.Exists)); + mockRetryDurableQueueRepositoryProvider + .Setup(rep => rep.SaveToQueueAsync(It.IsAny())) + .ReturnsAsync(new SaveToQueueResult(SaveToQueueResultStatus.Added)); + + // Act + var result = await retryDurableQueueRepository.AddIfQueueExistsAsync( + mockIMessageContext.Object); + + // Assert + Assert.NotNull(result); + Assert.Equal(addedIfQueueExistsResultStatus, result.Status); + mockRepository.VerifyAll(); + } + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.UnitTests/Repositories/MongoDb/Adapters/ItemAdapterTests.cs b/src/KafkaFlow.Retry.UnitTests/Repositories/MongoDb/Adapters/ItemAdapterTests.cs index c3f450c5..f1b84193 100644 --- a/src/KafkaFlow.Retry.UnitTests/Repositories/MongoDb/Adapters/ItemAdapterTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/Repositories/MongoDb/Adapters/ItemAdapterTests.cs @@ -1,13 +1,14 @@ namespace KafkaFlow.Retry.UnitTests.Repositories.MongoDb.Adapters { - using System; - using System.Collections.Generic; using FluentAssertions; + using global::KafkaFlow.Retry.Durable.Common; using global::KafkaFlow.Retry.Durable.Repository.Model; using global::KafkaFlow.Retry.MongoDb.Adapters; using global::KafkaFlow.Retry.MongoDb.Adapters.Interfaces; using global::KafkaFlow.Retry.MongoDb.Model; using Moq; + using System; + using System.Collections.Generic; using Xunit; public class ItemAdapterTests @@ -48,7 +49,7 @@ public void HeaderAdapter_Adapt_WithMessageHeader_Success() Value = new byte[] { 2, 4, 6 } }, RetryQueueId = Guid.NewGuid(), - SeverityLevel = Durable.Common.SeverityLevel.High, + SeverityLevel = SeverityLevel.High, Sort = 0 }; diff --git a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/Adapters/RetryQueueItemAdapterTests.cs b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/Adapters/RetryQueueItemAdapterTests.cs index 02044380..45ca0551 100644 --- a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/Adapters/RetryQueueItemAdapterTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/Adapters/RetryQueueItemAdapterTests.cs @@ -1,10 +1,11 @@ namespace KafkaFlow.Retry.UnitTests.Repositories.SqlServer.Readers.Adapters { - using System; using FluentAssertions; + using global::KafkaFlow.Retry.Durable.Common; using global::KafkaFlow.Retry.Durable.Repository.Model; using global::KafkaFlow.Retry.SqlServer.Model; using global::KafkaFlow.Retry.SqlServer.Readers.Adapters; + using System; using Xunit; public class RetryQueueItemAdapterTests @@ -26,7 +27,7 @@ public void RetryQueueItemAdapter_Adapt_Success() ModifiedStatusDate = DateTime.UtcNow, AttemptsCount = 1, RetryQueueId = 1, - SeverityLevel = Durable.Common.SeverityLevel.High, + SeverityLevel = SeverityLevel.High, Sort = 1, Status = RetryQueueItemStatus.InRetry }; diff --git a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/RetryQueueReaderTests.cs b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/RetryQueueReaderTests.cs index 29b769b1..754cccd8 100644 --- a/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/RetryQueueReaderTests.cs +++ b/src/KafkaFlow.Retry.UnitTests/Repositories/SqlServer/Readers/RetryQueueReaderTests.cs @@ -1,7 +1,5 @@ namespace KafkaFlow.Retry.UnitTests.Repositories.SqlServer.Readers { - using System; - using System.Collections.Generic; using FluentAssertions; using global::KafkaFlow.Retry.Durable.Common; using global::KafkaFlow.Retry.Durable.Repository.Model; @@ -9,6 +7,8 @@ using global::KafkaFlow.Retry.SqlServer.Readers; using global::KafkaFlow.Retry.SqlServer.Readers.Adapters; using Moq; + using System; + using System.Collections.Generic; using Xunit; public class RetryQueueReaderTests @@ -135,7 +135,7 @@ public void RetryQueueReader_Read_Success() Id = 1, LastExecution = DateTime.UtcNow, RetryQueueId = 1, - SeverityLevel = Durable.Common.SeverityLevel.High, + SeverityLevel = SeverityLevel.High, Sort = 1, Status = RetryQueueItemStatus.InRetry } diff --git a/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableEmbeddedClusterDefinitionBuilder.cs b/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableEmbeddedClusterDefinitionBuilder.cs index ac099a39..6d917236 100644 --- a/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableEmbeddedClusterDefinitionBuilder.cs +++ b/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableEmbeddedClusterDefinitionBuilder.cs @@ -21,8 +21,8 @@ public class RetryDurableEmbeddedClusterDefinitionBuilder private readonly IClusterConfigurationBuilder cluster; private bool enabled; private int retryConsumerBufferSize; + private RetryConsumerStrategy retryConsumerStrategy = RetryConsumerStrategy.GuaranteeOrderedConsumption; private int retryConsumerWorkersCount; - private RetryConsumerStrategy retryConusmerStrategy = RetryConsumerStrategy.GuaranteeOrderedConsumption; private string retryTopicName; private Action retryTypeHandlers; @@ -43,9 +43,9 @@ public RetryDurableEmbeddedClusterDefinitionBuilder WithRetryConsumerBufferSize( return this; } - public RetryDurableEmbeddedClusterDefinitionBuilder WithRetryConsumerStrategy(RetryConsumerStrategy retryConusmerStrategy) + public RetryDurableEmbeddedClusterDefinitionBuilder WithRetryConsumerStrategy(RetryConsumerStrategy retryConsumerStrategy) { - this.retryConusmerStrategy = retryConusmerStrategy; + this.retryConsumerStrategy = retryConsumerStrategy; return this; } @@ -144,7 +144,7 @@ RetryDurablePollingDefinition retryDurablePollingDefinition .Add(resolver => new RetryDurableConsumerCompressorMiddleware(gzipCompressor)) .Add(resolver => new RetryDurableConsumerUtf8EncoderMiddleware(utf8Encoder)) .Add(resolver => new RetryDurableConsumerNewtonsoftJsonSerializerMiddleware(newtonsoftJsonSerializer, messageType)) - .WithRetryConsumerStrategy(this.retryConusmerStrategy, retryDurableQueueRepository, utf8Encoder) + .WithRetryConsumerStrategy(this.retryConsumerStrategy, retryDurableQueueRepository, utf8Encoder) .Add(resolver => new RetryDurableConsumerValidationMiddleware( resolver.Resolve(), diff --git a/src/KafkaFlow.Retry/Durable/Polling/QueueTracker.cs b/src/KafkaFlow.Retry/Durable/Polling/QueueTracker.cs index 3bf20e7c..b972f2ac 100644 --- a/src/KafkaFlow.Retry/Durable/Polling/QueueTracker.cs +++ b/src/KafkaFlow.Retry/Durable/Polling/QueueTracker.cs @@ -11,7 +11,7 @@ internal class QueueTracker { - private static object internalLock = new object(); + private static readonly object internalLock = new object(); private readonly ILogHandler logHandler; private readonly IMessageAdapter messageAdapter; private readonly IMessageHeadersAdapter messageHeadersAdapter; @@ -67,14 +67,16 @@ internal void ScheduleJob(CancellationToken cancellationToken = default) .GetAwaiter() .GetResult(); - JobDataMap dataMap = new JobDataMap(); - dataMap.Add(QueuePollingJobConstants.RetryDurableQueueRepository, this.retryDurableQueueRepository); - dataMap.Add(QueuePollingJobConstants.RetryDurableMessageProducer, this.retryDurableMessageProducer); - dataMap.Add(QueuePollingJobConstants.RetryDurablePollingDefinition, this.retryDurablePollingDefinition); - dataMap.Add(QueuePollingJobConstants.LogHandler, this.logHandler); - dataMap.Add(QueuePollingJobConstants.MessageHeadersAdapter, this.messageHeadersAdapter); - dataMap.Add(QueuePollingJobConstants.MessageAdapter, this.messageAdapter); - dataMap.Add(QueuePollingJobConstants.Utf8Encoder, this.utf8Encoder); + JobDataMap dataMap = new JobDataMap + { + { QueuePollingJobConstants.RetryDurableQueueRepository, this.retryDurableQueueRepository }, + { QueuePollingJobConstants.RetryDurableMessageProducer, this.retryDurableMessageProducer }, + { QueuePollingJobConstants.RetryDurablePollingDefinition, this.retryDurablePollingDefinition }, + { QueuePollingJobConstants.LogHandler, this.logHandler }, + { QueuePollingJobConstants.MessageHeadersAdapter, this.messageHeadersAdapter }, + { QueuePollingJobConstants.MessageAdapter, this.messageAdapter }, + { QueuePollingJobConstants.Utf8Encoder, this.utf8Encoder } + }; IJobDetail job = JobBuilder .Create() @@ -96,8 +98,8 @@ internal void ScheduleJob(CancellationToken cancellationToken = default) .GetResult(); this.logHandler.Info( - "PollingJob Schedule", - new + "PollingJob Schedule", + new { PollingId = this.retryDurablePollingDefinition.Id, CronExpression = this.retryDurablePollingDefinition.CronExpression @@ -118,7 +120,7 @@ internal void Shutdown(CancellationToken cancellationToken = default) } } - this.logHandler.Info("PollingJob Shutdown", new {}); + this.logHandler.Info("PollingJob Shutdown", new { }); } } } \ No newline at end of file diff --git a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs index 337ca79f..b846ba4f 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; using System.Linq; + using System.Text; using System.Threading.Tasks; using Dawn; using KafkaFlow.Retry.Durable; @@ -14,6 +15,7 @@ using KafkaFlow.Retry.Durable.Repository.Actions.Update; using KafkaFlow.Retry.Durable.Repository.Adapters; using KafkaFlow.Retry.Durable.Repository.Model; + using Newtonsoft.Json; using Polly; internal class RetryDurableQueueRepository : IRetryDurableQueueRepository @@ -62,17 +64,9 @@ public async Task AddIfQueueExistsAsync(IMessageContext return await this.AddIfQueueExistsAsync( context, new SaveToQueueInput( - new RetryQueueItemMessage( - context.ConsumerContext.Topic, - (byte[])context.Message.Key, - this.messageAdapter.AdaptMessageToRepository(context.Message.Value), - context.ConsumerContext.Partition, - context.ConsumerContext.Offset, - context.ConsumerContext.MessageTimestamp, - this.messageHeadersAdapter.AdaptMessageHeadersToRepository(context.Headers) - ), - this.retryDurablePollingDefinition.Id, - $"{this.retryDurablePollingDefinition.Id}-{this.utf8Encoder.Decode((byte[])context.Message.Key)}", + CreateRetryQueueItemMessage(context), + retryDurablePollingDefinition.Id, + GetQueueGroupKey(context.Message.Key), RetryQueueStatus.Active, RetryQueueItemStatus.Waiting, SeverityLevel.Unknown, @@ -162,25 +156,17 @@ public async Task SaveToQueueAsync(IMessageContext context, s return await this.SaveToQueueAsync(context, new SaveToQueueInput( - new RetryQueueItemMessage( - context.ConsumerContext.Topic, - (byte[])context.Message.Key, - this.messageAdapter.AdaptMessageToRepository(context.Message.Value), - context.ConsumerContext.Partition, - context.ConsumerContext.Offset, - context.ConsumerContext.MessageTimestamp, - this.messageHeadersAdapter.AdaptMessageHeadersToRepository(context.Headers) - ), - this.retryDurablePollingDefinition.Id, - $"{this.retryDurablePollingDefinition.Id}-{this.utf8Encoder.Decode((byte[])context.Message.Key)}", - RetryQueueStatus.Active, - RetryQueueItemStatus.Waiting, - SeverityLevel.Unknown, - refDate, - refDate, - refDate, - 0, - description + CreateRetryQueueItemMessage(context), + this.retryDurablePollingDefinition.Id, + GetQueueGroupKey(context.Message.Key), + RetryQueueStatus.Active, + RetryQueueItemStatus.Waiting, + SeverityLevel.Unknown, + refDate, + refDate, + refDate, + 0, + description ) ).ConfigureAwait(false); } @@ -257,6 +243,28 @@ private async Task AddIfQueueExistsAsync(IMessageContext } } + private RetryQueueItemMessage CreateRetryQueueItemMessage(IMessageContext context) + { + var partitionKey = default(byte[]); + if (context.Message.Key is object) + { + partitionKey = Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(context.Message.Key)); + } + + var message = this.messageAdapter.AdaptMessageToRepository(context.Message.Value); + var headers = this.messageHeadersAdapter.AdaptMessageHeadersToRepository(context.Headers); + + return new RetryQueueItemMessage( + context.ConsumerContext.Topic, + partitionKey, + message, + context.ConsumerContext.Partition, + context.ConsumerContext.Offset, + context.ConsumerContext.MessageTimestamp, + headers + ); + } + private RetryDurableException GetCheckQueueException(string message, QueuePendingItemsInput input) { var kafkaException = new RetryDurableException(new RetryError(RetryErrorCode.DataProvider_CheckQueuePendingItems), message); @@ -279,6 +287,21 @@ private RetryDurableException GetCheckQueueException(string message, QueueNewest return kafkaException; } + private string GetQueueGroupKey(object messageKey) + { + string messageKeyAsString; + if (messageKey is null) + { + messageKeyAsString = string.Empty; + } + else + { + messageKeyAsString = utf8Encoder.Decode((byte[])messageKey); + } + + return $"{this.retryDurablePollingDefinition.Id}-{messageKeyAsString}"; + } + private async Task SaveToQueueAsync(IMessageContext context, SaveToQueueInput input) { try From abbcaca582f4d3c4bd287ef16b6404b7cef0f221 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leandro=20Magalh=C3=A3es?= Date: Fri, 27 May 2022 15:23:38 +0100 Subject: [PATCH 02/11] chore: update contributing file --- CONTRIBUTING.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d7276135..3877a026 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -20,6 +20,7 @@ Here are a few things you can do that will increase the likelihood of your pull - Keep your change as focused as possible. If there are multiple changes you would like to make that are not dependent upon each other, submit them as separate pull requests - Write [good commit messages](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html) following [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) - Open a pull request with a title following [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) +- Verify if the Kafka.Flow is at least in the last MINOR version available (MAJOR.MINOR.PATCH) [Kafka.Flow Releases](https://github.com/Farfetch/kafka-flow/releases) ## Resources From d4e8f20be94271960775aab944a908e19101dd44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leandro=20Magalh=C3=A3es?= Date: Fri, 27 May 2022 17:00:05 +0100 Subject: [PATCH 03/11] chore: peer review --- .../Durable/Repository/RetryDurableQueueRepository.cs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs index b846ba4f..813b4a6d 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs @@ -289,15 +289,7 @@ private RetryDurableException GetCheckQueueException(string message, QueueNewest private string GetQueueGroupKey(object messageKey) { - string messageKeyAsString; - if (messageKey is null) - { - messageKeyAsString = string.Empty; - } - else - { - messageKeyAsString = utf8Encoder.Decode((byte[])messageKey); - } + var messageKeyAsString = messageKey is null ? string.Empty : utf8Encoder.Decode((byte[])messageKey); return $"{this.retryDurablePollingDefinition.Id}-{messageKeyAsString}"; } From 8094691da7ac82cb28dcdddfd172b33f89469fc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leandro=20Magalh=C3=A3es?= Date: Wed, 1 Jun 2022 16:42:36 +0100 Subject: [PATCH 04/11] chore: peer review --- .../Durable/Repository/RetryDurableQueueRepository.cs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs index 813b4a6d..6d77bb59 100644 --- a/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs +++ b/src/KafkaFlow.Retry/Durable/Repository/RetryDurableQueueRepository.cs @@ -245,11 +245,7 @@ private async Task AddIfQueueExistsAsync(IMessageContext private RetryQueueItemMessage CreateRetryQueueItemMessage(IMessageContext context) { - var partitionKey = default(byte[]); - if (context.Message.Key is object) - { - partitionKey = Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(context.Message.Key)); - } + var partitionKey = context.Message.Key is object ? Encoding.ASCII.GetBytes(JsonConvert.SerializeObject(context.Message.Key)) : new byte[0]; var message = this.messageAdapter.AdaptMessageToRepository(context.Message.Value); var headers = this.messageHeadersAdapter.AdaptMessageHeadersToRepository(context.Headers); @@ -311,7 +307,7 @@ private async Task SaveToQueueAsync(IMessageContext context, return result; } - catch (System.Text.DecoderFallbackException ex) + catch (DecoderFallbackException ex) { var unrecoverableException = new RetryDurableException( new RetryError(RetryErrorCode.DataProvider_UnrecoverableException), From 74f6554f63c0b9567abb9576632c460f2ab5fe60 Mon Sep 17 00:00:00 2001 From: "sergio.ribeiro" Date: Fri, 1 Jul 2022 18:57:34 +0100 Subject: [PATCH 05/11] chore: add integration test for Null partition key --- .../Repositories/MongoDbRepository.cs | 2 +- .../Repositories/SqlServerRepository.cs | 2 +- .../EmptyPartitionKeyRetryDurableTests.cs | 142 +++++++++++++++++- 3 files changed, 141 insertions(+), 5 deletions(-) diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/MongoDbRepository.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/MongoDbRepository.cs index 45907e25..f7170ae3 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/MongoDbRepository.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/MongoDbRepository.cs @@ -145,7 +145,7 @@ public async Task GetRetryQueueAsync(string queueGroupKey) await Task.Delay(100).ConfigureAwait(false); - var retryQueueCursor = await this.retryQueuesCollection.FindAsync(x => x.QueueGroupKey.Contains(queueGroupKey)).ConfigureAwait(false); + var retryQueueCursor = await this.retryQueuesCollection.FindAsync(x => x.QueueGroupKey.Contains(queueGroupKey ?? string.Empty)).ConfigureAwait(false); var retryQueues = await retryQueueCursor.ToListAsync().ConfigureAwait(false); if (retryQueues.Any()) { diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs index a69b0592..abd2b3fa 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Repositories/SqlServerRepository.cs @@ -183,7 +183,7 @@ FROM [RetryQueues] WHERE QueueGroupKey LIKE '%'+@QueueGroupKey ORDER BY Id"; - command.Parameters.AddWithValue("QueueGroupKey", queueGroupKey); + command.Parameters.AddWithValue("QueueGroupKey", queueGroupKey ?? string.Empty); retryQueue = await this.ExecuteSingleLineReaderAsync(command).ConfigureAwait(false); } diff --git a/src/KafkaFlow.Retry.IntegrationTests/EmptyPartitionKeyRetryDurableTests.cs b/src/KafkaFlow.Retry.IntegrationTests/EmptyPartitionKeyRetryDurableTests.cs index 316ea11c..d1e35806 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/EmptyPartitionKeyRetryDurableTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/EmptyPartitionKeyRetryDurableTests.cs @@ -2,7 +2,10 @@ namespace KafkaFlow.Retry.IntegrationTests { using System; using System.Collections.Generic; + using System.Text; using System.Threading.Tasks; + using Confluent.Kafka; + using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers; using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers.Fixtures; using KafkaFlow.Retry.IntegrationTests.Core.Messages; using KafkaFlow.Retry.IntegrationTests.Core.Producers; @@ -15,9 +18,11 @@ namespace KafkaFlow.Retry.IntegrationTests [Collection("BootstrapperHostCollection")] public class EmptyPartitionKeyRetryDurableTests { - private const int defaultWaitingTimeSeconds = 120; + private const int defaultWaitingTimeSeconds = 240; + private readonly BootstrapperHostFixture bootstrapperHostFixture; private readonly IRepositoryProvider repositoryProvider; private readonly IServiceProvider serviceProvider; + private RepositoryType testRepositoryType; public EmptyPartitionKeyRetryDurableTests(BootstrapperHostFixture bootstrapperHostFixture) { @@ -25,6 +30,25 @@ public EmptyPartitionKeyRetryDurableTests(BootstrapperHostFixture bootstrapperHo this.repositoryProvider = bootstrapperHostFixture.ServiceProvider.GetRequiredService(); InMemoryAuxiliarStorage.Clear(); InMemoryAuxiliarStorage.ThrowException = true; + this.bootstrapperHostFixture = bootstrapperHostFixture; + } + + ~EmptyPartitionKeyRetryDurableTests() + { + BootstrapperKafka.RecreateKafkaTopicsAsync(bootstrapperHostFixture.KafkaSettings.Brokers, new string[] { + "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db", + "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db-retry", + "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-sql-server", + "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-sql-server-retry", + "test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db", + "test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db-retry", + "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server", + "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server-retry" + }) + .GetAwaiter() + .GetResult(); + + repositoryProvider.GetRepositoryOfType(testRepositoryType).CleanDatabaseAsync().GetAwaiter().GetResult(); } public static IEnumerable EmptyKeyScenarios() @@ -59,6 +83,38 @@ public static IEnumerable EmptyKeyScenarios() }; } + public static IEnumerable NullKeyScenarios() + { + yield return new object[] + { + RepositoryType.MongoDb, + typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert), + "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db", + 2 //numberOfMessagesToBeProduced + }; + yield return new object[] + { + RepositoryType.SqlServer, + typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert), + "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-sql-server", + 2 + }; + yield return new object[] + { + RepositoryType.MongoDb, + typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), + "test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db", + 1 + }; + yield return new object[] + { + RepositoryType.SqlServer, + typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), + "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server", + 1 + }; + } + [Theory] [MemberData(nameof(EmptyKeyScenarios))] internal async Task EmptyKeyRetryDurableTest( @@ -68,6 +124,7 @@ internal async Task EmptyKeyRetryDurableTest( int numberOfMessagesToBeProduced) { // Arrange + testRepositoryType = repositoryType; var numberOfMessagesByEachSameKey = 1; var numberOfTimesThatEachMessageIsTriedWhenDone = 1; var numberOfTimesThatEachMessageIsTriedDuringDurable = 1; @@ -89,9 +146,75 @@ internal async Task EmptyKeyRetryDurableTest( RetryDurableTestMessage messageToValidate = messages[0]; + await AssertRetryAndConsumeMessages(repositoryType, + numberOfMessagesByEachSameKey, + numberOfTimesThatEachMessageIsTriedWhenDone, + numberOfTimesThatEachMessageIsTriedDuringDurable, + physicalStorageAssert, + messageToValidate + ).ConfigureAwait(false); + } + + [Theory] + [MemberData(nameof(NullKeyScenarios))] + internal async Task NullKeyRetryDurableTest( + RepositoryType repositoryType, + Type physicalStorageType, + string topicName, + int numberOfMessagesToBeProduced) + { + // Arrange + testRepositoryType = repositoryType; + var numberOfMessagesByEachSameKey = 1; + var numberOfTimesThatEachMessageIsTriedWhenDone = 1; + var numberOfTimesThatEachMessageIsTriedDuringDurable = 1; + + var config = new ProducerConfig + { + BootstrapServers = bootstrapperHostFixture.KafkaSettings.Brokers, + }; + + Error actualError = null; + var producer = new ProducerBuilder(config) + .SetValueSerializer(new RetryDurableTestMessageSerializer()) + .SetErrorHandler((producer, error) => + { + actualError = error; + }) + .Build(); + + var physicalStorageAssert = this.serviceProvider.GetRequiredService(physicalStorageType) as IPhysicalStorageAssert; + + var messages = new List>(); + for (int i = 0; i < numberOfMessagesToBeProduced; i++) + { + messages.Add(new Message { Key = null, Value = new RetryDurableTestMessage { Key = null, Value = $"Message_{i + 1}" } }); + } + + // Act + foreach (var message in messages) + { + await producer.ProduceAsync(topicName, message).ConfigureAwait(false); + } + + RetryDurableTestMessage messageToValidate = messages[0].Value; + + await AssertRetryAndConsumeMessages(repositoryType, + numberOfMessagesByEachSameKey, + numberOfTimesThatEachMessageIsTriedWhenDone, + numberOfTimesThatEachMessageIsTriedDuringDurable, + physicalStorageAssert, + messageToValidate + ).ConfigureAwait(false); + + Assert.Null(actualError); + } + + private static async Task AssertRetryAndConsumeMessages(RepositoryType repositoryType, int numberOfMessagesByEachSameKey, int numberOfTimesThatEachMessageIsTriedWhenDone, int numberOfTimesThatEachMessageIsTriedDuringDurable, IPhysicalStorageAssert physicalStorageAssert, RetryDurableTestMessage messageToValidate) + { await physicalStorageAssert - .AssertEmptyKeyRetryDurableMessageRetryingAsync(repositoryType, messageToValidate, numberOfMessagesByEachSameKey) - .ConfigureAwait(false); + .AssertEmptyKeyRetryDurableMessageRetryingAsync(repositoryType, messageToValidate, numberOfMessagesByEachSameKey) + .ConfigureAwait(false); // Assert - Retrying InMemoryAuxiliarStorage.Clear(); @@ -116,5 +239,18 @@ await physicalStorageAssert .AssertRetryDurableMessageDoneAsync(repositoryType, messageToValidate) .ConfigureAwait(false); } + + private class RetryDurableTestMessageSerializer : Confluent.Kafka.ISerializer + { + public byte[] Serialize(RetryDurableTestMessage data, SerializationContext context) + { + if (data == null) + { + return null; + } + + return Encoding.UTF8.GetBytes(Newtonsoft.Json.JsonConvert.SerializeObject(data)); + } + } } } \ No newline at end of file From 5de9d34c600e27f66990aa5d41f53db19c06e532 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leandro=20Magalh=C3=A3es?= Date: Thu, 7 Jul 2022 17:29:48 +0100 Subject: [PATCH 06/11] chore: empty or null partition key tests running at once --- ...> EmptyOrNullPartitionKeyRetryDurableTests.cs} | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) rename src/KafkaFlow.Retry.IntegrationTests/{EmptyPartitionKeyRetryDurableTests.cs => EmptyOrNullPartitionKeyRetryDurableTests.cs} (94%) diff --git a/src/KafkaFlow.Retry.IntegrationTests/EmptyPartitionKeyRetryDurableTests.cs b/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs similarity index 94% rename from src/KafkaFlow.Retry.IntegrationTests/EmptyPartitionKeyRetryDurableTests.cs rename to src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs index d1e35806..410c5cea 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/EmptyPartitionKeyRetryDurableTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs @@ -16,25 +16,21 @@ namespace KafkaFlow.Retry.IntegrationTests using Xunit; [Collection("BootstrapperHostCollection")] - public class EmptyPartitionKeyRetryDurableTests + public class EmptyOrNullPartitionKeyRetryDurableTests { private const int defaultWaitingTimeSeconds = 240; private readonly BootstrapperHostFixture bootstrapperHostFixture; private readonly IRepositoryProvider repositoryProvider; private readonly IServiceProvider serviceProvider; - private RepositoryType testRepositoryType; - public EmptyPartitionKeyRetryDurableTests(BootstrapperHostFixture bootstrapperHostFixture) + public EmptyOrNullPartitionKeyRetryDurableTests(BootstrapperHostFixture bootstrapperHostFixture) { this.serviceProvider = bootstrapperHostFixture.ServiceProvider; this.repositoryProvider = bootstrapperHostFixture.ServiceProvider.GetRequiredService(); InMemoryAuxiliarStorage.Clear(); InMemoryAuxiliarStorage.ThrowException = true; this.bootstrapperHostFixture = bootstrapperHostFixture; - } - ~EmptyPartitionKeyRetryDurableTests() - { BootstrapperKafka.RecreateKafkaTopicsAsync(bootstrapperHostFixture.KafkaSettings.Brokers, new string[] { "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db", "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db-retry", @@ -48,7 +44,8 @@ public EmptyPartitionKeyRetryDurableTests(BootstrapperHostFixture bootstrapperHo .GetAwaiter() .GetResult(); - repositoryProvider.GetRepositoryOfType(testRepositoryType).CleanDatabaseAsync().GetAwaiter().GetResult(); + repositoryProvider.GetRepositoryOfType(RepositoryType.MongoDb).CleanDatabaseAsync().GetAwaiter().GetResult(); + repositoryProvider.GetRepositoryOfType(RepositoryType.SqlServer).CleanDatabaseAsync().GetAwaiter().GetResult(); } public static IEnumerable EmptyKeyScenarios() @@ -124,7 +121,6 @@ internal async Task EmptyKeyRetryDurableTest( int numberOfMessagesToBeProduced) { // Arrange - testRepositoryType = repositoryType; var numberOfMessagesByEachSameKey = 1; var numberOfTimesThatEachMessageIsTriedWhenDone = 1; var numberOfTimesThatEachMessageIsTriedDuringDurable = 1; @@ -164,7 +160,6 @@ internal async Task NullKeyRetryDurableTest( int numberOfMessagesToBeProduced) { // Arrange - testRepositoryType = repositoryType; var numberOfMessagesByEachSameKey = 1; var numberOfTimesThatEachMessageIsTriedWhenDone = 1; var numberOfTimesThatEachMessageIsTriedDuringDurable = 1; @@ -240,7 +235,7 @@ await physicalStorageAssert .ConfigureAwait(false); } - private class RetryDurableTestMessageSerializer : Confluent.Kafka.ISerializer + private class RetryDurableTestMessageSerializer : ISerializer { public byte[] Serialize(RetryDurableTestMessage data, SerializationContext context) { From 07094506a558ec4e8b0561e756a2a7f1f9737b3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leandro=20Magalh=C3=A3es?= Date: Thu, 7 Jul 2022 17:38:19 +0100 Subject: [PATCH 07/11] chore: remove warnings from codacy --- .../EmptyOrNullPartitionKeyRetryDurableTests.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs b/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs index 410c5cea..05b31bd7 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs @@ -31,7 +31,7 @@ public EmptyOrNullPartitionKeyRetryDurableTests(BootstrapperHostFixture bootstra InMemoryAuxiliarStorage.ThrowException = true; this.bootstrapperHostFixture = bootstrapperHostFixture; - BootstrapperKafka.RecreateKafkaTopicsAsync(bootstrapperHostFixture.KafkaSettings.Brokers, new string[] { + BootstrapperKafka.RecreateKafkaTopicsAsync(bootstrapperHostFixture.KafkaSettings.Brokers, new[] { "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db", "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db-retry", "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-sql-server", @@ -172,7 +172,7 @@ internal async Task NullKeyRetryDurableTest( Error actualError = null; var producer = new ProducerBuilder(config) .SetValueSerializer(new RetryDurableTestMessageSerializer()) - .SetErrorHandler((producer, error) => + .SetErrorHandler((_, error) => { actualError = error; }) From c17742e08ab42df81b12b271621462fd8996f08b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leandro=20Magalh=C3=A3es?= Date: Thu, 7 Jul 2022 17:46:34 +0100 Subject: [PATCH 08/11] chore: dicard context propertie --- .../EmptyOrNullPartitionKeyRetryDurableTests.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs b/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs index 05b31bd7..f85ff527 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs @@ -239,6 +239,8 @@ private class RetryDurableTestMessageSerializer : ISerializer Date: Thu, 14 Jul 2022 18:19:43 +0100 Subject: [PATCH 09/11] chore: add topics to bootstrapper kafka --- .../Core/Bootstrappers/BootstrapperKafka.cs | 586 +++++++++++++++++- .../Fixtures/BootstrapperFixtureTemplate.cs | 20 +- .../Fixtures/BootstrapperHostFixture.cs | 45 ++ ...ranteeOrderedConsumptionMongoDbProducer.cs | 6 + ...nteeOrderedConsumptionSqlServerProducer.cs | 6 + ...DurableLatestConsumptionMongoDbProducer.cs | 6 + ...rableLatestConsumptionSqlServerProducer.cs | 6 + ...ranteeOrderedConsumptionMongoDbProducer.cs | 6 + ...nteeOrderedConsumptionSqlServerProducer.cs | 6 + ...DurableLatestConsumptionMongoDbProducer.cs | 6 + ...rableLatestConsumptionSqlServerProducer.cs | 6 + ...mptyOrNullPartitionKeyRetryDurableTests.cs | 41 +- 12 files changed, 711 insertions(+), 29 deletions(-) create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableGuaranteeOrderedConsumptionMongoDbProducer.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableGuaranteeOrderedConsumptionSqlServerProducer.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableLatestConsumptionMongoDbProducer.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableLatestConsumptionSqlServerProducer.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableGuaranteeOrderedConsumptionMongoDbProducer.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableGuaranteeOrderedConsumptionSqlServerProducer.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableLatestConsumptionMongoDbProducer.cs create mode 100644 src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableLatestConsumptionSqlServerProducer.cs diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs index e1ef74fd..51fb61df 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/BootstrapperKafka.cs @@ -73,8 +73,592 @@ await adminClient } } - internal static IClusterConfigurationBuilder SetupRetryDurableGuaranteeOrderedConsumptionMongoDbCluster( + internal static IClusterConfigurationBuilder SetupEmptyRetryDurableGuaranteeOrderedConsumptionMongoDbCluster( + this IClusterConfigurationBuilder cluster, + string mongoDbConnectionString, + string mongoDbDatabaseName, + string mongoDbRetryQueueCollectionName, + string mongoDbRetryQueueItemCollectionName) + { + cluster + .AddProducer( + producer => producer + .DefaultTopic("test-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-mongo-db") + .WithCompression(Confluent.Kafka.CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer())) + .AddConsumer( + consumer => consumer + .Topic("test-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-mongo-db") + .WithGroupId("test-consumer-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-mongo-db") + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer(typeof(RetryDurableTestMessage)) + .RetryDurable( + (configure) => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithMessageSerializeSettings( + new JsonSerializerSettings + { + DateTimeZoneHandling = DateTimeZoneHandling.Utc, + TypeNameHandling = TypeNameHandling.Auto + }) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .Enabled(true) + .WithRetryTopicName("test-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-mongo-db-retry") + .WithRetryConsumerBufferSize(100) + .WithRetryConsumerWorkersCount(10) + .WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler())) + .WithQueuePollingJobConfiguration( + configure => configure + .Enabled(true) + .WithId("custom_search_key_empty_durable_guarantee_ordered_consumption_mongo_db") + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + .WithMongoDbDataProvider( + mongoDbConnectionString, + mongoDbDatabaseName, + mongoDbRetryQueueCollectionName, + mongoDbRetryQueueItemCollectionName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false))) + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler()))); + return cluster; + } + + internal static IClusterConfigurationBuilder SetupEmptyRetryDurableGuaranteeOrderedConsumptionSqlServerCluster( + this IClusterConfigurationBuilder cluster, + string sqlServerConnectionString, + string sqlServerDatabaseName) + { + cluster + .AddProducer( + producer => producer + .DefaultTopic("test-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-sql-server") + .WithCompression(Confluent.Kafka.CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer())) + .AddConsumer( + consumer => consumer + .Topic("test-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-sql-server") + .WithGroupId("test-consumer-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-sql-server") + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer(typeof(RetryDurableTestMessage)) + .RetryDurable( + (configure) => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithMessageSerializeSettings( + new JsonSerializerSettings + { + DateTimeZoneHandling = DateTimeZoneHandling.Utc, + TypeNameHandling = TypeNameHandling.Auto + }) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .Enabled(true) + .WithRetryTopicName("test-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-sql-server-retry") + .WithRetryConsumerBufferSize(100) + .WithRetryConsumerWorkersCount(10) + .WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler())) + .WithQueuePollingJobConfiguration( + configure => configure + .Enabled(true) + .WithId("custom_search_key_empty_durable_guarantee_ordered_consumption_sql_server") + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + .WithSqlServerDataProvider( + sqlServerConnectionString, + sqlServerDatabaseName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false))) + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler()))); + return cluster; + } + + internal static IClusterConfigurationBuilder SetupEmptyRetryDurableLatestConsumptionMongoDbCluster( + this IClusterConfigurationBuilder cluster, + string mongoDbConnectionString, + string mongoDbDatabaseName, + string mongoDbRetryQueueCollectionName, + string mongoDbRetryQueueItemCollectionName) + { + cluster + .AddProducer( + producer => producer + .DefaultTopic("test-kafka-flow-retry-empty-retry-durable-latest-consumption-mongo-db") + .WithCompression(Confluent.Kafka.CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer())) + .AddConsumer( + consumer => consumer + .Topic("test-kafka-flow-retry-empty-retry-durable-latest-consumption-mongo-db") + .WithGroupId("test-consumer-kafka-flow-retry-empty-retry-durable-latest-consumption-mongo-db") + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer(typeof(RetryDurableTestMessage)) + .RetryDurable( + (configure) => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithMessageSerializeSettings( + new JsonSerializerSettings + { + DateTimeZoneHandling = DateTimeZoneHandling.Utc, + TypeNameHandling = TypeNameHandling.Auto + }) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .Enabled(true) + .WithRetryTopicName("test-kafka-flow-retry-empty-retry-durable-latest-consumption-mongo-db-retry") + .WithRetryConsumerBufferSize(100) + .WithRetryConsumerWorkersCount(10) + .WithRetryConsumerStrategy(RetryConsumerStrategy.LatestConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler())) + .WithQueuePollingJobConfiguration( + configure => configure + .Enabled(true) + .WithId("custom_search_key_empty_durable_latest_consumption_mongo_db") + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + .WithMongoDbDataProvider( + mongoDbConnectionString, + mongoDbDatabaseName, + mongoDbRetryQueueCollectionName, + mongoDbRetryQueueItemCollectionName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false))) + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler()))); + return cluster; + } + + internal static IClusterConfigurationBuilder SetupEmptyRetryDurableLatestConsumptionSqlServerCluster( + this IClusterConfigurationBuilder cluster, + string sqlServerConnectionString, + string sqlServerDatabaseName) + { + cluster + .AddProducer( + producer => producer + .DefaultTopic("test-kafka-flow-retry-empty-retry-durable-latest-consumption-sql-server") + .WithCompression(Confluent.Kafka.CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer())) + .AddConsumer( + consumer => consumer + .Topic("test-kafka-flow-retry-empty-retry-durable-latest-consumption-sql-server") + .WithGroupId("test-consumer-kafka-flow-retry-empty-retry-durable-latest-consumption-sql-server") + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer(typeof(RetryDurableTestMessage)) + .RetryDurable( + (configure) => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithMessageSerializeSettings( + new JsonSerializerSettings + { + DateTimeZoneHandling = DateTimeZoneHandling.Utc, + TypeNameHandling = TypeNameHandling.Auto + }) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .Enabled(true) + .WithRetryTopicName("test-kafka-flow-retry-empty-retry-durable-latest-consumption-sql-server-retry") + .WithRetryConsumerBufferSize(100) + .WithRetryConsumerWorkersCount(10) + .WithRetryConsumerStrategy(RetryConsumerStrategy.LatestConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler())) + .WithQueuePollingJobConfiguration( + configure => configure + .Enabled(true) + .WithId("custom_search_key_empty_durable_latest_consumption_sql_server") + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + .WithSqlServerDataProvider( + sqlServerConnectionString, + sqlServerDatabaseName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false))) + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler()))); + return cluster; + } + + internal static IClusterConfigurationBuilder SetupNullRetryDurableGuaranteeOrderedConsumptionMongoDbCluster( + this IClusterConfigurationBuilder cluster, + string mongoDbConnectionString, + string mongoDbDatabaseName, + string mongoDbRetryQueueCollectionName, + string mongoDbRetryQueueItemCollectionName) + { + cluster + .AddProducer( + producer => producer + .DefaultTopic("test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-mongo-db") + .WithCompression(Confluent.Kafka.CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer())) + .AddConsumer( + consumer => consumer + .Topic("test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-mongo-db") + .WithGroupId("test-consumer-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-mongo-db") + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer(typeof(RetryDurableTestMessage)) + .RetryDurable( + (configure) => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithMessageSerializeSettings( + new JsonSerializerSettings + { + DateTimeZoneHandling = DateTimeZoneHandling.Utc, + TypeNameHandling = TypeNameHandling.Auto + }) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .Enabled(true) + .WithRetryTopicName("test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-mongo-db-retry") + .WithRetryConsumerBufferSize(100) + .WithRetryConsumerWorkersCount(10) + .WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler())) + .WithQueuePollingJobConfiguration( + configure => configure + .Enabled(true) + .WithId("custom_search_key_null_durable_guarantee_ordered_consumption_mongo_db") + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + .WithMongoDbDataProvider( + mongoDbConnectionString, + mongoDbDatabaseName, + mongoDbRetryQueueCollectionName, + mongoDbRetryQueueItemCollectionName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false))) + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler()))); + return cluster; + } + + internal static IClusterConfigurationBuilder SetupNullRetryDurableGuaranteeOrderedConsumptionSqlServerCluster( + this IClusterConfigurationBuilder cluster, + string sqlServerConnectionString, + string sqlServerDatabaseName) + { + cluster + .AddProducer( + producer => producer + .DefaultTopic("test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-sql-server") + .WithCompression(Confluent.Kafka.CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer())) + .AddConsumer( + consumer => consumer + .Topic("test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-sql-server") + .WithGroupId("test-consumer-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-sql-server") + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer(typeof(RetryDurableTestMessage)) + .RetryDurable( + (configure) => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithMessageSerializeSettings( + new JsonSerializerSettings + { + DateTimeZoneHandling = DateTimeZoneHandling.Utc, + TypeNameHandling = TypeNameHandling.Auto + }) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .Enabled(true) + .WithRetryTopicName("test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-sql-server-retry") + .WithRetryConsumerBufferSize(100) + .WithRetryConsumerWorkersCount(10) + .WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler())) + .WithQueuePollingJobConfiguration( + configure => configure + .Enabled(true) + .WithId("custom_search_key_null_durable_guarantee_ordered_consumption_sql_server") + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + .WithSqlServerDataProvider( + sqlServerConnectionString, + sqlServerDatabaseName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false))) + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler()))); + return cluster; + } + + internal static IClusterConfigurationBuilder SetupNullRetryDurableLatestConsumptionMongoDbCluster( + this IClusterConfigurationBuilder cluster, + string mongoDbConnectionString, + string mongoDbDatabaseName, + string mongoDbRetryQueueCollectionName, + string mongoDbRetryQueueItemCollectionName) + { + cluster + .AddProducer( + producer => producer + .DefaultTopic("test-kafka-flow-retry-null-retry-durable-latest-consumption-mongo-db") + .WithCompression(Confluent.Kafka.CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer())) + .AddConsumer( + consumer => consumer + .Topic("test-kafka-flow-retry-null-retry-durable-latest-consumption-mongo-db") + .WithGroupId("test-consumer-kafka-flow-retry-null-retry-durable-latest-consumption-mongo-db") + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer(typeof(RetryDurableTestMessage)) + .RetryDurable( + (configure) => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithMessageSerializeSettings( + new JsonSerializerSettings + { + DateTimeZoneHandling = DateTimeZoneHandling.Utc, + TypeNameHandling = TypeNameHandling.Auto + }) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .Enabled(true) + .WithRetryTopicName("test-kafka-flow-retry-null-retry-durable-latest-consumption-mongo-db-retry") + .WithRetryConsumerBufferSize(100) + .WithRetryConsumerWorkersCount(10) + .WithRetryConsumerStrategy(RetryConsumerStrategy.LatestConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler())) + .WithQueuePollingJobConfiguration( + configure => configure + .Enabled(true) + .WithId("custom_search_key_null_durable_latest_consumption_mongo_db") + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + .WithMongoDbDataProvider( + mongoDbConnectionString, + mongoDbDatabaseName, + mongoDbRetryQueueCollectionName, + mongoDbRetryQueueItemCollectionName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false))) + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler()))); + return cluster; + } + + internal static IClusterConfigurationBuilder SetupNullRetryDurableLatestConsumptionSqlServerCluster( this IClusterConfigurationBuilder cluster, + string sqlServerConnectionString, + string sqlServerDatabaseName) + { + cluster + .AddProducer( + producer => producer + .DefaultTopic("test-kafka-flow-retry-null-retry-durable-latest-consumption-sql-server") + .WithCompression(Confluent.Kafka.CompressionType.Gzip) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer())) + .AddConsumer( + consumer => consumer + .Topic("test-kafka-flow-retry-null-retry-durable-latest-consumption-sql-server") + .WithGroupId("test-consumer-kafka-flow-retry-null-retry-durable-latest-consumption-sql-server") + .WithBufferSize(100) + .WithWorkersCount(10) + .WithAutoOffsetReset(AutoOffsetReset.Latest) + .AddMiddlewares( + middlewares => middlewares + .AddSingleTypeSerializer(typeof(RetryDurableTestMessage)) + .RetryDurable( + (configure) => configure + .Handle() + .WithMessageType(typeof(RetryDurableTestMessage)) + .WithMessageSerializeSettings( + new JsonSerializerSettings + { + DateTimeZoneHandling = DateTimeZoneHandling.Utc, + TypeNameHandling = TypeNameHandling.Auto + }) + .WithEmbeddedRetryCluster( + cluster, + configure => configure + .Enabled(true) + .WithRetryTopicName("test-kafka-flow-retry-null-retry-durable-latest-consumption-sql-server-retry") + .WithRetryConsumerBufferSize(100) + .WithRetryConsumerWorkersCount(10) + .WithRetryConsumerStrategy(RetryConsumerStrategy.LatestConsumption) + .WithRetryTypedHandlers( + handlers => handlers + .WithHandlerLifetime(InstanceLifetime.Transient) + .AddHandler())) + .WithQueuePollingJobConfiguration( + configure => configure + .Enabled(true) + .WithId("custom_search_key_null_durable_latest_consumption_sql_server") + .WithCronExpression("0/30 * * ? * * *") + .WithExpirationIntervalFactor(1) + .WithFetchSize(256)) + .WithSqlServerDataProvider( + sqlServerConnectionString, + sqlServerDatabaseName) + .WithRetryPlanBeforeRetryDurable( + configure => configure + .TryTimes(3) + .WithTimeBetweenTriesPlan( + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500), + TimeSpan.FromMilliseconds(1000)) + .ShouldPauseConsumer(false))) + .AddTypedHandlers( + handlers => + handlers + .WithHandlerLifetime(InstanceLifetime.Singleton) + .AddHandler()))); + return cluster; + } + + internal static IClusterConfigurationBuilder SetupRetryDurableGuaranteeOrderedConsumptionMongoDbCluster( + this IClusterConfigurationBuilder cluster, string mongoDbConnectionString, string mongoDbDatabaseName, string mongoDbRetryQueueCollectionName, diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperFixtureTemplate.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperFixtureTemplate.cs index 9de79b52..81c15b50 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperFixtureTemplate.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperFixtureTemplate.cs @@ -50,7 +50,25 @@ protected async Task InitializeKafkaAsync(IConfiguration configuration) "test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db", "test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db-retry", "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server", - "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server-retry" + "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server-retry", + + "test-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-mongo-db", + "test-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-mongo-db-retry", + "test-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-sql-server", + "test-kafka-flow-retry-empty-retry-durable-guarantee-ordered-consumption-sql-server-retry", + "test-kafka-flow-retry-empty-retry-durable-latest-consumption-mongo-db", + "test-kafka-flow-retry-empty-retry-durable-latest-consumption-mongo-db-retry", + "test-kafka-flow-retry-empty-retry-durable-latest-consumption-sql-server", + "test-kafka-flow-retry-empty-retry-durable-latest-consumption-sql-server-retry", + + "test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-mongo-db", + "test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-mongo-db-retry", + "test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-sql-server", + "test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-sql-server-retry", + "test-kafka-flow-retry-null-retry-durable-latest-consumption-mongo-db", + "test-kafka-flow-retry-null-retry-durable-latest-consumption-mongo-db-retry", + "test-kafka-flow-retry-null-retry-durable-latest-consumption-sql-server", + "test-kafka-flow-retry-null-retry-durable-latest-consumption-sql-server-retry" }; await BootstrapperKafka.RecreateKafkaTopicsAsync(this.KafkaSettings.Brokers, topics); diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs index bf1c7fff..9c153c02 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Bootstrappers/Fixtures/BootstrapperHostFixture.cs @@ -89,25 +89,70 @@ private void SetupServices(HostBuilderContext context, IServiceCollection servic this.MongoDbSettings.DatabaseName, this.MongoDbSettings.RetryQueueCollectionName, this.MongoDbSettings.RetryQueueItemCollectionName) + .SetupEmptyRetryDurableGuaranteeOrderedConsumptionMongoDbCluster( + this.MongoDbSettings.ConnectionString, + this.MongoDbSettings.DatabaseName, + this.MongoDbSettings.RetryQueueCollectionName, + this.MongoDbSettings.RetryQueueItemCollectionName) + .SetupNullRetryDurableGuaranteeOrderedConsumptionMongoDbCluster( + this.MongoDbSettings.ConnectionString, + this.MongoDbSettings.DatabaseName, + this.MongoDbSettings.RetryQueueCollectionName, + this.MongoDbSettings.RetryQueueItemCollectionName) .SetupRetryDurableGuaranteeOrderedConsumptionSqlServerCluster( this.SqlServerSettings.ConnectionString, this.SqlServerSettings.DatabaseName) + .SetupEmptyRetryDurableGuaranteeOrderedConsumptionSqlServerCluster( + this.SqlServerSettings.ConnectionString, + this.SqlServerSettings.DatabaseName) + .SetupNullRetryDurableGuaranteeOrderedConsumptionSqlServerCluster( + this.SqlServerSettings.ConnectionString, + this.SqlServerSettings.DatabaseName) .SetupRetryDurableLatestConsumptionMongoDbCluster( this.MongoDbSettings.ConnectionString, this.MongoDbSettings.DatabaseName, this.MongoDbSettings.RetryQueueCollectionName, this.MongoDbSettings.RetryQueueItemCollectionName) + .SetupEmptyRetryDurableLatestConsumptionMongoDbCluster( + this.MongoDbSettings.ConnectionString, + this.MongoDbSettings.DatabaseName, + this.MongoDbSettings.RetryQueueCollectionName, + this.MongoDbSettings.RetryQueueItemCollectionName) + .SetupNullRetryDurableLatestConsumptionMongoDbCluster( + this.MongoDbSettings.ConnectionString, + this.MongoDbSettings.DatabaseName, + this.MongoDbSettings.RetryQueueCollectionName, + this.MongoDbSettings.RetryQueueItemCollectionName) .SetupRetryDurableLatestConsumptionSqlServerCluster( this.SqlServerSettings.ConnectionString, this.SqlServerSettings.DatabaseName) + .SetupEmptyRetryDurableLatestConsumptionSqlServerCluster( + this.SqlServerSettings.ConnectionString, + this.SqlServerSettings.DatabaseName) + .SetupNullRetryDurableLatestConsumptionSqlServerCluster( + this.SqlServerSettings.ConnectionString, + this.SqlServerSettings.DatabaseName) )); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(sp => this.RepositoryProvider); services.AddSingleton(); services.AddSingleton(); diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableGuaranteeOrderedConsumptionMongoDbProducer.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableGuaranteeOrderedConsumptionMongoDbProducer.cs new file mode 100644 index 00000000..27c3fd7c --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableGuaranteeOrderedConsumptionMongoDbProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Producers +{ + internal class EmptyRetryDurableGuaranteeOrderedConsumptionMongoDbProducer + { + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableGuaranteeOrderedConsumptionSqlServerProducer.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableGuaranteeOrderedConsumptionSqlServerProducer.cs new file mode 100644 index 00000000..5c99f437 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableGuaranteeOrderedConsumptionSqlServerProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Producers +{ + internal class EmptyRetryDurableGuaranteeOrderedConsumptionSqlServerProducer + { + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableLatestConsumptionMongoDbProducer.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableLatestConsumptionMongoDbProducer.cs new file mode 100644 index 00000000..614032d1 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableLatestConsumptionMongoDbProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Producers +{ + internal class EmptyRetryDurableLatestConsumptionMongoDbProducer + { + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableLatestConsumptionSqlServerProducer.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableLatestConsumptionSqlServerProducer.cs new file mode 100644 index 00000000..104aefb8 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/EmptyRetryDurableLatestConsumptionSqlServerProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Producers +{ + internal class EmptyRetryDurableLatestConsumptionSqlServerProducer + { + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableGuaranteeOrderedConsumptionMongoDbProducer.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableGuaranteeOrderedConsumptionMongoDbProducer.cs new file mode 100644 index 00000000..29a44b71 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableGuaranteeOrderedConsumptionMongoDbProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Producers +{ + internal class NullRetryDurableGuaranteeOrderedConsumptionMongoDbProducer + { + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableGuaranteeOrderedConsumptionSqlServerProducer.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableGuaranteeOrderedConsumptionSqlServerProducer.cs new file mode 100644 index 00000000..92e712dc --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableGuaranteeOrderedConsumptionSqlServerProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Producers +{ + internal class NullRetryDurableGuaranteeOrderedConsumptionSqlServerProducer + { + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableLatestConsumptionMongoDbProducer.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableLatestConsumptionMongoDbProducer.cs new file mode 100644 index 00000000..5d19f62e --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableLatestConsumptionMongoDbProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Producers +{ + internal class NullRetryDurableLatestConsumptionMongoDbProducer + { + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableLatestConsumptionSqlServerProducer.cs b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableLatestConsumptionSqlServerProducer.cs new file mode 100644 index 00000000..2759cf08 --- /dev/null +++ b/src/KafkaFlow.Retry.IntegrationTests/Core/Producers/NullRetryDurableLatestConsumptionSqlServerProducer.cs @@ -0,0 +1,6 @@ +namespace KafkaFlow.Retry.IntegrationTests.Core.Producers +{ + internal class NullRetryDurableLatestConsumptionSqlServerProducer + { + } +} \ No newline at end of file diff --git a/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs b/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs index f85ff527..6e1d32a7 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs @@ -5,7 +5,6 @@ namespace KafkaFlow.Retry.IntegrationTests using System.Text; using System.Threading.Tasks; using Confluent.Kafka; - using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers; using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers.Fixtures; using KafkaFlow.Retry.IntegrationTests.Core.Messages; using KafkaFlow.Retry.IntegrationTests.Core.Producers; @@ -13,6 +12,7 @@ namespace KafkaFlow.Retry.IntegrationTests using KafkaFlow.Retry.IntegrationTests.Core.Storages.Assertion; using KafkaFlow.Retry.IntegrationTests.Core.Storages.Repositories; using Microsoft.Extensions.DependencyInjection; + using Newtonsoft.Json; using Xunit; [Collection("BootstrapperHostCollection")] @@ -31,19 +31,6 @@ public EmptyOrNullPartitionKeyRetryDurableTests(BootstrapperHostFixture bootstra InMemoryAuxiliarStorage.ThrowException = true; this.bootstrapperHostFixture = bootstrapperHostFixture; - BootstrapperKafka.RecreateKafkaTopicsAsync(bootstrapperHostFixture.KafkaSettings.Brokers, new[] { - "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db", - "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db-retry", - "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-sql-server", - "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-sql-server-retry", - "test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db", - "test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db-retry", - "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server", - "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server-retry" - }) - .GetAwaiter() - .GetResult(); - repositoryProvider.GetRepositoryOfType(RepositoryType.MongoDb).CleanDatabaseAsync().GetAwaiter().GetResult(); repositoryProvider.GetRepositoryOfType(RepositoryType.SqlServer).CleanDatabaseAsync().GetAwaiter().GetResult(); } @@ -53,28 +40,28 @@ public static IEnumerable EmptyKeyScenarios() yield return new object[] { RepositoryType.MongoDb, - typeof(IMessageProducer), + typeof(IMessageProducer), typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert), 3 //numberOfMessagesToBeProduced }; yield return new object[] { RepositoryType.SqlServer, - typeof(IMessageProducer), + typeof(IMessageProducer), typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert), 3 }; yield return new object[] { RepositoryType.MongoDb, - typeof(IMessageProducer), + typeof(IMessageProducer), typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), 1 }; yield return new object[] { RepositoryType.SqlServer, - typeof(IMessageProducer), + typeof(IMessageProducer), typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), 1 }; @@ -85,29 +72,29 @@ public static IEnumerable NullKeyScenarios() yield return new object[] { RepositoryType.MongoDb, - typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert), - "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-mongo-db", + typeof(NullRetryDurableGuaranteeOrderedConsumptionMongoDbProducer), + "test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-mongo-db", 2 //numberOfMessagesToBeProduced }; yield return new object[] { RepositoryType.SqlServer, - typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert), - "test-kafka-flow-retry-retry-durable-guarantee-ordered-consumption-sql-server", + typeof(NullRetryDurableGuaranteeOrderedConsumptionSqlServerProducer), + "test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-sql-server", 2 }; yield return new object[] { RepositoryType.MongoDb, - typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), - "test-kafka-flow-retry-retry-durable-latest-consumption-mongo-db", + typeof(NullRetryDurableLatestConsumptionMongoDbProducer), + "test-kafka-flow-retry-null-retry-durable-latest-consumption-mongo-db", 1 }; yield return new object[] { RepositoryType.SqlServer, - typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), - "test-kafka-flow-retry-retry-durable-latest-consumption-sql-server", + typeof(NullRetryDurableLatestConsumptionSqlServerProducer), + "test-kafka-flow-retry-null-retry-durable-latest-consumption-sql-server", 1 }; } @@ -246,7 +233,7 @@ public byte[] Serialize(RetryDurableTestMessage data, SerializationContext conte return null; } - return Encoding.UTF8.GetBytes(Newtonsoft.Json.JsonConvert.SerializeObject(data)); + return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data)); } } } From 2f661b25a630844a445ca457ee1b5ed54b169fc5 Mon Sep 17 00:00:00 2001 From: "sergio.ribeiro" Date: Thu, 14 Jul 2022 18:45:04 +0100 Subject: [PATCH 10/11] chore: undo to storage type on Null Key Scenarios test --- .../EmptyOrNullPartitionKeyRetryDurableTests.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs b/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs index 6e1d32a7..6a0bb2ea 100644 --- a/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs +++ b/src/KafkaFlow.Retry.IntegrationTests/EmptyOrNullPartitionKeyRetryDurableTests.cs @@ -72,28 +72,28 @@ public static IEnumerable NullKeyScenarios() yield return new object[] { RepositoryType.MongoDb, - typeof(NullRetryDurableGuaranteeOrderedConsumptionMongoDbProducer), + typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert), "test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-mongo-db", 2 //numberOfMessagesToBeProduced }; yield return new object[] { RepositoryType.SqlServer, - typeof(NullRetryDurableGuaranteeOrderedConsumptionSqlServerProducer), + typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert), "test-kafka-flow-retry-null-retry-durable-guarantee-ordered-consumption-sql-server", 2 }; yield return new object[] { RepositoryType.MongoDb, - typeof(NullRetryDurableLatestConsumptionMongoDbProducer), + typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), "test-kafka-flow-retry-null-retry-durable-latest-consumption-mongo-db", 1 }; yield return new object[] { RepositoryType.SqlServer, - typeof(NullRetryDurableLatestConsumptionSqlServerProducer), + typeof(RetryDurableLatestConsumptionPhysicalStorageAssert), "test-kafka-flow-retry-null-retry-durable-latest-consumption-sql-server", 1 }; From fb79e845eb68440284eb566d6c21cedd674b2405 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Leandro=20Magalh=C3=A3es?= Date: Fri, 15 Jul 2022 12:03:26 +0100 Subject: [PATCH 11/11] chore: change verbosity on dotnet test execution --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 388bd9f0..5b03139b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -162,7 +162,7 @@ jobs: auto create topic: "true" # Optional, auto create kafka topic - name: Test - run: dotnet test --no-build -c Release --verbosity normal --collect:"XPlat Code Coverage" --results-directory:"../../coverage-outputs" -m:1 src/KafkaFlow.Retry.sln + run: dotnet test --no-build -c Release --verbosity detailed --collect:"XPlat Code Coverage" --results-directory:"../../coverage-outputs" -m:1 src/KafkaFlow.Retry.sln # - name: Merge coverage results # run: |