Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Here are a few things you can do that will increase the likelihood of your pull
- Keep your change as focused as possible. If there are multiple changes you would like to make that are not dependent upon each other, submit them as separate pull requests
- Write [good commit messages](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html) following [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/)
- Open a pull request with a title following [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/)
- Verify if the Kafka.Flow is at least in the last MINOR version available (MAJOR.MINOR.PATCH) [Kafka.Flow Releases](https://github.com/Farfetch/kafka-flow/releases)

## Merging a pull request

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -90,25 +90,70 @@ private void SetupServices(HostBuilderContext context, IServiceCollection servic
this.MongoDbSettings.DatabaseName,
this.MongoDbSettings.RetryQueueCollectionName,
this.MongoDbSettings.RetryQueueItemCollectionName)
.SetupEmptyRetryDurableGuaranteeOrderedConsumptionMongoDbCluster(
this.MongoDbSettings.ConnectionString,
this.MongoDbSettings.DatabaseName,
this.MongoDbSettings.RetryQueueCollectionName,
this.MongoDbSettings.RetryQueueItemCollectionName)
.SetupNullRetryDurableGuaranteeOrderedConsumptionMongoDbCluster(
this.MongoDbSettings.ConnectionString,
this.MongoDbSettings.DatabaseName,
this.MongoDbSettings.RetryQueueCollectionName,
this.MongoDbSettings.RetryQueueItemCollectionName)
.SetupRetryDurableGuaranteeOrderedConsumptionSqlServerCluster(
this.SqlServerSettings.ConnectionString,
this.SqlServerSettings.DatabaseName)
.SetupEmptyRetryDurableGuaranteeOrderedConsumptionSqlServerCluster(
this.SqlServerSettings.ConnectionString,
this.SqlServerSettings.DatabaseName)
.SetupNullRetryDurableGuaranteeOrderedConsumptionSqlServerCluster(
this.SqlServerSettings.ConnectionString,
this.SqlServerSettings.DatabaseName)
.SetupRetryDurableLatestConsumptionMongoDbCluster(
this.MongoDbSettings.ConnectionString,
this.MongoDbSettings.DatabaseName,
this.MongoDbSettings.RetryQueueCollectionName,
this.MongoDbSettings.RetryQueueItemCollectionName)
.SetupEmptyRetryDurableLatestConsumptionMongoDbCluster(
this.MongoDbSettings.ConnectionString,
this.MongoDbSettings.DatabaseName,
this.MongoDbSettings.RetryQueueCollectionName,
this.MongoDbSettings.RetryQueueItemCollectionName)
.SetupNullRetryDurableLatestConsumptionMongoDbCluster(
this.MongoDbSettings.ConnectionString,
this.MongoDbSettings.DatabaseName,
this.MongoDbSettings.RetryQueueCollectionName,
this.MongoDbSettings.RetryQueueItemCollectionName)
.SetupRetryDurableLatestConsumptionSqlServerCluster(
this.SqlServerSettings.ConnectionString,
this.SqlServerSettings.DatabaseName)
.SetupEmptyRetryDurableLatestConsumptionSqlServerCluster(
this.SqlServerSettings.ConnectionString,
this.SqlServerSettings.DatabaseName)
.SetupNullRetryDurableLatestConsumptionSqlServerCluster(
this.SqlServerSettings.ConnectionString,
this.SqlServerSettings.DatabaseName)
));

services.AddSingleton<RetrySimpleProducer>();
services.AddSingleton<RetryForeverProducer>();

services.AddSingleton<RetryDurableGuaranteeOrderedConsumptionMongoDbProducer>();
services.AddSingleton<EmptyRetryDurableGuaranteeOrderedConsumptionMongoDbProducer>();
services.AddSingleton<NullRetryDurableGuaranteeOrderedConsumptionMongoDbProducer>();

services.AddSingleton<RetryDurableGuaranteeOrderedConsumptionSqlServerProducer>();
services.AddSingleton<EmptyRetryDurableGuaranteeOrderedConsumptionSqlServerProducer>();
services.AddSingleton<NullRetryDurableGuaranteeOrderedConsumptionSqlServerProducer>();

services.AddSingleton<RetryDurableLatestConsumptionMongoDbProducer>();
services.AddSingleton<EmptyRetryDurableLatestConsumptionMongoDbProducer>();
services.AddSingleton<NullRetryDurableLatestConsumptionMongoDbProducer>();

services.AddSingleton<RetryDurableLatestConsumptionSqlServerProducer>();
services.AddSingleton<EmptyRetryDurableLatestConsumptionSqlServerProducer>();
services.AddSingleton<NullRetryDurableLatestConsumptionSqlServerProducer>();

services.AddSingleton<IRepositoryProvider>(sp => this.RepositoryProvider);
services.AddSingleton<RetryDurableLatestConsumptionPhysicalStorageAssert>();
services.AddSingleton<RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.Retry.IntegrationTests.Core.Producers
{
internal class EmptyRetryDurableGuaranteeOrderedConsumptionMongoDbProducer
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.Retry.IntegrationTests.Core.Producers
{
internal class EmptyRetryDurableGuaranteeOrderedConsumptionSqlServerProducer
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.Retry.IntegrationTests.Core.Producers
{
internal class EmptyRetryDurableLatestConsumptionMongoDbProducer
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.Retry.IntegrationTests.Core.Producers
{
internal class EmptyRetryDurableLatestConsumptionSqlServerProducer
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.Retry.IntegrationTests.Core.Producers
{
internal class NullRetryDurableGuaranteeOrderedConsumptionMongoDbProducer
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.Retry.IntegrationTests.Core.Producers
{
internal class NullRetryDurableGuaranteeOrderedConsumptionSqlServerProducer
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.Retry.IntegrationTests.Core.Producers
{
internal class NullRetryDurableLatestConsumptionMongoDbProducer
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace KafkaFlow.Retry.IntegrationTests.Core.Producers
{
internal class NullRetryDurableLatestConsumptionSqlServerProducer
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

internal interface IPhysicalStorageAssert
{
Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount);

Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count);

Task AssertRetryDurableMessageDoneAsync(RepositoryType repositoryType, RetryDurableTestMessage message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,29 @@ public RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert(IRepositoryP
this.repositoryProvider = repositoryProvider;
}

public async Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount)
{
var retryQueue = await this
.repositoryProvider
.GetRepositoryOfType(repositoryType)
.GetRetryQueueAsync(message.Key)
.ConfigureAwait(false);

Assert.True(retryQueue.Id != Guid.Empty, "Retry Durable Creation Get Retry Queue cannot be asserted.");

var retryQueueItems = await this
.repositoryProvider
.GetRepositoryOfType(repositoryType)
.GetRetryQueueItemsAsync(retryQueue.Id, rqi => rqi.Count(item => item.Status == RetryQueueItemStatus.InRetry) != retryCount)
.ConfigureAwait(false);

Assert.True(retryQueueItems != null, "Retry Durable Creation Get Retry Queue Item Message cannot be asserted.");

Assert.Equal(retryQueueItems.Count() - 1, retryQueueItems.Max(i => i.Sort));
Assert.True(Enum.Equals(retryQueue.Status, RetryQueueStatus.Active));
Assert.All(retryQueueItems, i => Enum.Equals(i.Status, RetryQueueItemStatus.Waiting));
}

public async Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count)
{
var retryQueue = await this
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Retry.IntegrationTests.Core.Storages.Assertion
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using KafkaFlow.Retry.Durable.Repository.Model;
Expand All @@ -18,6 +19,37 @@ public RetryDurableLatestConsumptionPhysicalStorageAssert(IRepositoryProvider re
this.repositoryProvider = repositoryProvider;
}

public async Task AssertEmptyKeyRetryDurableMessageRetryingAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int retryCount)
{
var retryQueue = await this
.repositoryProvider
.GetRepositoryOfType(repositoryType)
.GetRetryQueueAsync(message.Key).ConfigureAwait(false);

Assert.True(retryQueue.Id != Guid.Empty, "Retry Durable Retrying Get Retry Queue cannot be asserted.");

var retryQueueItems = await this
.repositoryProvider
.GetRepositoryOfType(repositoryType)
.GetRetryQueueItemsAsync(
retryQueue.Id,
rqi =>
{
return rqi.Count(item => item.Status == RetryQueueItemStatus.Waiting) != retryCount;
}).ConfigureAwait(false);

var lastRetryItem = retryQueueItems.OrderBy(x => x.Sort).Last();
var numberOrRetryItems = retryQueueItems.Count();
var maxSortValue = retryQueueItems.Max(i => i.Sort);
var cancelledRetryItems = retryQueueItems.Except(new List<RetryQueueItem> { lastRetryItem });

Assert.True(retryQueueItems != null, "Retry Durable Retrying Get Retry Queue Item Message cannot be asserted.");
Assert.True(Enum.Equals(retryQueue.Status, RetryQueueStatus.Active), "Actual retry queue should be in active state");
Assert.Equal(numberOrRetryItems - 1, maxSortValue);
Assert.Equal(RetryQueueItemStatus.Waiting, lastRetryItem.Status);
Assert.All(cancelledRetryItems, i => Enum.Equals(i.Status, RetryQueueItemStatus.Cancelled));
}

public async Task AssertRetryDurableMessageCreationAsync(RepositoryType repositoryType, RetryDurableTestMessage message, int count)
{
var retryQueue = await this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,52 @@
internal static class InMemoryAuxiliarStorage<T> where T : ITestMessage
{
private const int TimeoutSec = 60;
private static readonly ConcurrentBag<T> Message = new ConcurrentBag<T>();
private static readonly ConcurrentBag<T> Messages = new ConcurrentBag<T>();

public static bool ThrowException { get; set; }

public static void Add(T message)
{
Message.Add(message);
Messages.Add(message);
}

public static async Task AssertCountMessageAsync(T message, int count)
{
var start = DateTime.Now;

while (Message.Count(x => x.Key == message.Key && x.Value == message.Value) != count)
while (Messages.Count(x => x.Key == message.Key && x.Value == message.Value) != count)
{
if (DateTime.Now.Subtract(start).TotalSeconds > TimeoutSec && !Debugger.IsAttached)
{
Assert.True(false, "Message not received.");
Assert.True(false, $"Message not received - {message.Key}:{message.Value}.");
return;
}

await Task.Delay(100).ConfigureAwait(false);
}
}

public static async Task AssertEmptyPartitionKeyCountMessageAsync(T message, int count, int timoutSeconds = TimeoutSec)
{
var start = DateTime.Now;
int numberOfMessages = 0;
do
{
numberOfMessages = Messages.Count(x => x.Value == message.Value);

if (DateTime.Now.Subtract(start).TotalSeconds > timoutSeconds && !Debugger.IsAttached)
{
Assert.True(false, $"Message {message.Key}:{message.Value} not received. Expected {count}, messages received {numberOfMessages}");
return;
}

await Task.Delay(1000).ConfigureAwait(false);
} while (numberOfMessages != count);
}

public static void Clear()
{
Message.Clear();
Messages.Clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public async Task<RetryQueue> GetRetryQueueAsync(string queueGroupKey)

await Task.Delay(100).ConfigureAwait(false);

var retryQueueCursor = await this.retryQueuesCollection.FindAsync(x => x.QueueGroupKey.Contains(queueGroupKey)).ConfigureAwait(false);
var retryQueueCursor = await this.retryQueuesCollection.FindAsync(x => x.QueueGroupKey.Contains(queueGroupKey ?? string.Empty)).ConfigureAwait(false);
var retryQueues = await retryQueueCursor.ToListAsync().ConfigureAwait(false);
if (retryQueues.Any())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ FROM [RetryQueues]
WHERE QueueGroupKey LIKE '%'+@QueueGroupKey
ORDER BY Id";

command.Parameters.AddWithValue("QueueGroupKey", queueGroupKey);
command.Parameters.AddWithValue("QueueGroupKey", queueGroupKey ?? string.Empty);
retryQueue = await this.ExecuteSingleLineReaderAsync(command).ConfigureAwait(false);
}

Expand Down
Loading