Skip to content

Commit 7f1e964

Browse files
fix: support Kafka messages with nullable or empty partition key
1 parent e6e7d41 commit 7f1e964

File tree

15 files changed

+407
-68
lines changed

15 files changed

+407
-68
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ Install packages related to your context. The Core package is required for all o
8989
.WithRetryTopicName("test-topic-retry")
9090
.WithRetryConsumerBufferSize(4)
9191
.WithRetryConsumerWorkersCount(2)
92-
.WithRetryConusmerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption)
92+
.WithRetryConsumerStrategy(RetryConsumerStrategy.GuaranteeOrderedConsumption)
9393
.WithRetryTypedHandlers(
9494
handlers => handlers
9595
.WithHandlerLifetime(InstanceLifetime.Transient)

src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/IPhysicalStorageAssert.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
internal interface IPhysicalStorageAssert
88
{
9+
Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount);
10+
911
Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count);
1012

1113
Task AssertRetryDurableMessageDoneAsync(RepositoryType repositoryType, RetryDurableTestMessage message);

src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,29 @@ public RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert(IRepositoryP
1818
this.repositoryProvider = repositoryProvider;
1919
}
2020

21+
public async Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount)
22+
{
23+
var retryQueue = await this
24+
.repositoryProvider
25+
.GetRepositoryOfType(repositoryType)
26+
.GetRetryQueueAsync(message.Key)
27+
.ConfigureAwait(false);
28+
29+
Assert.True(retryQueue.Id != Guid.Empty, "Retry Durable Creation Get Retry Queue cannot be asserted.");
30+
31+
var retryQueueItems = await this
32+
.repositoryProvider
33+
.GetRepositoryOfType(repositoryType)
34+
.GetRetryQueueItemsAsync(retryQueue.Id, rqi => rqi.Count(item => item.Status == RetryQueueItemStatus.InRetry) != retryCount)
35+
.ConfigureAwait(false);
36+
37+
Assert.True(retryQueueItems != null, "Retry Durable Creation Get Retry Queue Item Message cannot be asserted.");
38+
39+
Assert.Equal(retryQueueItems.Count() - 1, retryQueueItems.Max(i => i.Sort));
40+
Assert.True(Enum.Equals(retryQueue.Status, RetryQueueStatus.Active));
41+
Assert.All(retryQueueItems, i => Enum.Equals(i.Status, RetryQueueItemStatus.Waiting));
42+
}
43+
2144
public async Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count)
2245
{
2346
var retryQueue = await this

src/KafkaFlow.Retry.IntegrationTests/Core/Storages/Assertion/RetryDurableLatestConsumptionPhysicalStorageAssert.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace KafkaFlow.Retry.IntegrationTests.Core.Storages.Assertion
22
{
33
using System;
4+
using System.Collections.Generic;
45
using System.Linq;
56
using System.Threading.Tasks;
67
using KafkaFlow.Retry.Durable.Repository.Model;
@@ -18,6 +19,37 @@ public RetryDurableLatestConsumptionPhysicalStorageAssert(IRepositoryProvider re
1819
this.repositoryProvider = repositoryProvider;
1920
}
2021

22+
public async Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount)
23+
{
24+
var retryQueue = await this
25+
.repositoryProvider
26+
.GetRepositoryOfType(repositoryType)
27+
.GetRetryQueueAsync(message.Key).ConfigureAwait(false);
28+
29+
Assert.True(retryQueue.Id != Guid.Empty, "Retry Durable Retrying Get Retry Queue cannot be asserted.");
30+
31+
var retryQueueItems = await this
32+
.repositoryProvider
33+
.GetRepositoryOfType(repositoryType)
34+
.GetRetryQueueItemsAsync(
35+
retryQueue.Id,
36+
rqi =>
37+
{
38+
return rqi.Count(item => item.Status == RetryQueueItemStatus.Waiting) != retryCount;
39+
}).ConfigureAwait(false);
40+
41+
var lastRetryItem = retryQueueItems.OrderBy(x => x.Sort).Last();
42+
var numberOrRetryItems = retryQueueItems.Count();
43+
var maxSortValue = retryQueueItems.Max(i => i.Sort);
44+
var cancelledRetryItems = retryQueueItems.Except(new List<RetryQueueItem> { lastRetryItem });
45+
46+
Assert.True(retryQueueItems != null, "Retry Durable Retrying Get Retry Queue Item Message cannot be asserted.");
47+
Assert.True(Enum.Equals(retryQueue.Status, RetryQueueStatus.Active), "Actual retry queue should be in active state");
48+
Assert.Equal(numberOrRetryItems - 1, maxSortValue);
49+
Assert.Equal(RetryQueueItemStatus.Waiting, lastRetryItem.Status);
50+
Assert.All(cancelledRetryItems, i => Enum.Equals(i.Status, RetryQueueItemStatus.Cancelled));
51+
}
52+
2153
public async Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count)
2254
{
2355
var retryQueue = await this

src/KafkaFlow.Retry.IntegrationTests/Core/Storages/InMemoryAuxiliarStorage.cs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,34 +11,52 @@
1111
internal static class InMemoryAuxiliarStorage<T> where T : ITestMessage
1212
{
1313
private const int TimeoutSec = 60;
14-
private static readonly ConcurrentBag<T> Message = new ConcurrentBag<T>();
14+
private static readonly ConcurrentBag<T> Messages = new ConcurrentBag<T>();
1515

1616
public static bool ThrowException { get; set; }
1717

1818
public static void Add(T message)
1919
{
20-
Message.Add(message);
20+
Messages.Add(message);
2121
}
2222

2323
public static async Task AssertCountMessageAsync(T message, int count)
2424
{
2525
var start = DateTime.Now;
2626

27-
while (Message.Count(x => x.Key == message.Key && x.Value == message.Value) != count)
27+
while (Messages.Count(x => x.Key == message.Key && x.Value == message.Value) != count)
2828
{
2929
if (DateTime.Now.Subtract(start).TotalSeconds > TimeoutSec && !Debugger.IsAttached)
3030
{
31-
Assert.True(false, "Message not received.");
31+
Assert.True(false, $"Message not received - {message.Key}:{message.Value}.");
3232
return;
3333
}
3434

3535
await Task.Delay(100).ConfigureAwait(false);
3636
}
3737
}
3838

39+
public static async Task AssertEmptyPartitionKeyCountMessageAsync(T message, int count, int timoutSeconds = TimeoutSec)
40+
{
41+
var start = DateTime.Now;
42+
int numberOfMessages = 0;
43+
do
44+
{
45+
numberOfMessages = Messages.Count(x => x.Value == message.Value);
46+
47+
if (DateTime.Now.Subtract(start).TotalSeconds > timoutSeconds && !Debugger.IsAttached)
48+
{
49+
Assert.True(false, $"Message {message.Key}:{message.Value} not received. Expected {count}, messages received {numberOfMessages}");
50+
return;
51+
}
52+
53+
await Task.Delay(1000).ConfigureAwait(false);
54+
} while (numberOfMessages != count);
55+
}
56+
3957
public static void Clear()
4058
{
41-
Message.Clear();
59+
Messages.Clear();
4260
}
4361
}
4462
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
namespace KafkaFlow.Retry.IntegrationTests
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Threading.Tasks;
6+
using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers.Fixtures;
7+
using KafkaFlow.Retry.IntegrationTests.Core.Messages;
8+
using KafkaFlow.Retry.IntegrationTests.Core.Producers;
9+
using KafkaFlow.Retry.IntegrationTests.Core.Storages;
10+
using KafkaFlow.Retry.IntegrationTests.Core.Storages.Assertion;
11+
using KafkaFlow.Retry.IntegrationTests.Core.Storages.Repositories;
12+
using Microsoft.Extensions.DependencyInjection;
13+
using Xunit;
14+
15+
[Collection("BootstrapperHostCollection")]
16+
public class EmptyPartitionKeyRetryDurableTests
17+
{
18+
private const int defaultWaitingTimeSeconds = 120;
19+
private readonly IRepositoryProvider repositoryProvider;
20+
private readonly IServiceProvider serviceProvider;
21+
22+
public EmptyPartitionKeyRetryDurableTests(BootstrapperHostFixture bootstrapperHostFixture)
23+
{
24+
this.serviceProvider = bootstrapperHostFixture.ServiceProvider;
25+
this.repositoryProvider = bootstrapperHostFixture.ServiceProvider.GetRequiredService<IRepositoryProvider>();
26+
InMemoryAuxiliarStorage<RetryDurableTestMessage>.Clear();
27+
InMemoryAuxiliarStorage<RetryDurableTestMessage>.ThrowException = true;
28+
}
29+
30+
public static IEnumerable<object[]> EmptyKeyScenarios()
31+
{
32+
yield return new object[]
33+
{
34+
RepositoryType.MongoDb,
35+
typeof(IMessageProducer<RetryDurableGuaranteeOrderedConsumptionMongoDbProducer>),
36+
typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert),
37+
3 //numberOfMessagesToBeProduced
38+
};
39+
yield return new object[]
40+
{
41+
RepositoryType.SqlServer,
42+
typeof(IMessageProducer<RetryDurableGuaranteeOrderedConsumptionSqlServerProducer>),
43+
typeof(RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert),
44+
3
45+
};
46+
yield return new object[]
47+
{
48+
RepositoryType.MongoDb,
49+
typeof(IMessageProducer<RetryDurableLatestConsumptionMongoDbProducer>),
50+
typeof(RetryDurableLatestConsumptionPhysicalStorageAssert),
51+
1
52+
};
53+
yield return new object[]
54+
{
55+
RepositoryType.SqlServer,
56+
typeof(IMessageProducer<RetryDurableLatestConsumptionSqlServerProducer>),
57+
typeof(RetryDurableLatestConsumptionPhysicalStorageAssert),
58+
1
59+
};
60+
}
61+
62+
[Theory]
63+
[MemberData(nameof(EmptyKeyScenarios))]
64+
internal async Task EmptyKeyRetryDurableTest(
65+
RepositoryType repositoryType,
66+
Type producerType,
67+
Type physicalStorageType,
68+
int numberOfMessagesToBeProduced)
69+
{
70+
// Arrange
71+
var numberOfMessagesByEachSameKey = 1;
72+
var numberOfTimesThatEachMessageIsTriedWhenDone = 1;
73+
var numberOfTimesThatEachMessageIsTriedDuringDurable = 1;
74+
var producer = this.serviceProvider.GetRequiredService(producerType) as IMessageProducer;
75+
var physicalStorageAssert = this.serviceProvider.GetRequiredService(physicalStorageType) as IPhysicalStorageAssert;
76+
var messages = new List<RetryDurableTestMessage>();
77+
for (int i = 0; i < numberOfMessagesToBeProduced; i++)
78+
{
79+
messages.Add(new RetryDurableTestMessage { Key = string.Empty, Value = $"Message_{i + 1}" });
80+
}
81+
82+
await this.repositoryProvider.GetRepositoryOfType(repositoryType).CleanDatabaseAsync().ConfigureAwait(false);
83+
84+
// Act
85+
foreach (var message in messages)
86+
{
87+
await producer.ProduceAsync(message.Key, message).ConfigureAwait(false);
88+
}
89+
90+
RetryDurableTestMessage messageToValidate = messages[0];
91+
92+
await physicalStorageAssert
93+
.AssertEmptyKeyRetryDurableMessageRetryingAsync(repositoryType, messageToValidate, numberOfMessagesByEachSameKey)
94+
.ConfigureAwait(false);
95+
96+
// Assert - Retrying
97+
InMemoryAuxiliarStorage<RetryDurableTestMessage>.Clear();
98+
99+
await InMemoryAuxiliarStorage<RetryDurableTestMessage>
100+
.AssertEmptyPartitionKeyCountMessageAsync(messageToValidate, numberOfTimesThatEachMessageIsTriedDuringDurable, defaultWaitingTimeSeconds)
101+
.ConfigureAwait(false);
102+
103+
await physicalStorageAssert
104+
.AssertEmptyKeyRetryDurableMessageRetryingAsync(repositoryType, messageToValidate, numberOfTimesThatEachMessageIsTriedDuringDurable)
105+
.ConfigureAwait(false);
106+
107+
// Assert - Done
108+
InMemoryAuxiliarStorage<RetryDurableTestMessage>.ThrowException = false;
109+
InMemoryAuxiliarStorage<RetryDurableTestMessage>.Clear();
110+
111+
await InMemoryAuxiliarStorage<RetryDurableTestMessage>
112+
.AssertEmptyPartitionKeyCountMessageAsync(messageToValidate, numberOfTimesThatEachMessageIsTriedWhenDone, defaultWaitingTimeSeconds)
113+
.ConfigureAwait(false);
114+
115+
await physicalStorageAssert
116+
.AssertRetryDurableMessageDoneAsync(repositoryType, messageToValidate)
117+
.ConfigureAwait(false);
118+
}
119+
}
120+
}

src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
namespace KafkaFlow.Retry.IntegrationTests
22
{
3-
using System;
4-
using System.Collections.Generic;
5-
using System.Linq;
6-
using System.Threading.Tasks;
73
using AutoFixture;
84
using KafkaFlow.Retry.IntegrationTests.Core.Bootstrappers.Fixtures;
95
using KafkaFlow.Retry.IntegrationTests.Core.Messages;
@@ -12,6 +8,10 @@ namespace KafkaFlow.Retry.IntegrationTests
128
using KafkaFlow.Retry.IntegrationTests.Core.Storages.Assertion;
139
using KafkaFlow.Retry.IntegrationTests.Core.Storages.Repositories;
1410
using Microsoft.Extensions.DependencyInjection;
11+
using System;
12+
using System.Collections.Generic;
13+
using System.Linq;
14+
using System.Threading.Tasks;
1515
using Xunit;
1616

1717
[Collection("BootstrapperHostCollection")]

src/KafkaFlow.Retry.UnitTests/API/RetryRequestHandlerBaseTests.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
namespace KafkaFlow.Retry.UnitTests.API
22
{
3-
using System.Text;
4-
using System.Threading;
5-
using System.Threading.Tasks;
63
using FluentAssertions;
4+
using global::KafkaFlow.Retry.Durable.Repository.Model;
75
using global::KafkaFlow.Retry.UnitTests.API.Surrogate;
86
using global::KafkaFlow.Retry.UnitTests.API.Utilities;
97
using Microsoft.AspNetCore.Http;
108
using Moq;
9+
using System.Text;
10+
using System.Threading;
11+
using System.Threading.Tasks;
1112
using Xunit;
1213

1314
public class RetryRequestHandlerBaseTests
@@ -21,7 +22,7 @@ public async Task RetryRequestHandlerBase_HandleAsync_CallsHandleRequestAsync()
2122
// Arrange
2223
var dto = new DtoSurrogate
2324
{
24-
Text = Durable.Repository.Model.RetryQueueStatus.Active
25+
Text = RetryQueueStatus.Active
2526
};
2627

2728
var mockHttpContext = HttpContextHelper.MockHttpContext(ResourcePath, HttpMethod, requestBody: dto);

0 commit comments

Comments
 (0)