Skip to content

Commit ae91141

Browse files
authored
Merge pull request #8 from vany0114/payment-handler-idempotent
Event bus handlers idempotency
2 parents ec82503 + cdf144f commit ae91141

File tree

16 files changed

+254
-39
lines changed

16 files changed

+254
-39
lines changed

ChangeLog.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
## 2.0.3
2+
**Event bus handlers idempotency:**
3+
* Add *Duber.Infrastructure.EventBus.Idempotency* project to handle idempotency at integration events level.
4+
* Make *TripFinishedIntegrationEvent* idempotent in order to avoid it can be paid more than once due to concurrency, retries, etc.
5+
16
## 2.0.2
27
**Notifications service:**
38
* Create an independent service to manage the notifications in order to decouple it from the frontend and to allow a better scaling out for both, frontend and notifications service.

microservices-netcore-docker-servicefabric.sln

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Duber.Infrastructure.Resili
8686
EndProject
8787
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Duber.Infrastructure.Resilience.Sql", "src\Infrastructure\Duber.Infrastructure.Resilience.Sql\Duber.Infrastructure.Resilience.Sql.csproj", "{C2AC0E1E-2B02-474F-9DB5-AF461A6C2FDD}"
8888
EndProject
89-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Duber.Trip.Notifications", "src\Application\Duber.Trip.Notifications\Duber.Trip.Notifications.csproj", "{8AAEEC31-B3CE-4583-9B42-1447B8694611}"
89+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Duber.Trip.Notifications", "src\Application\Duber.Trip.Notifications\Duber.Trip.Notifications.csproj", "{8AAEEC31-B3CE-4583-9B42-1447B8694611}"
90+
EndProject
91+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Duber.Infrastructure.EventBus.Idempotency", "src\Infrastructure\EventBus\Duber.Infrastructure.EventBus.Idempotency\Duber.Infrastructure.EventBus.Idempotency.csproj", "{426346B6-6631-40E7-9187-8721A0C23E0E}"
9092
EndProject
9193
Global
9294
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -296,6 +298,14 @@ Global
296298
{8AAEEC31-B3CE-4583-9B42-1447B8694611}.Release|Any CPU.Build.0 = Release|Any CPU
297299
{8AAEEC31-B3CE-4583-9B42-1447B8694611}.Release|x64.ActiveCfg = Release|Any CPU
298300
{8AAEEC31-B3CE-4583-9B42-1447B8694611}.Release|x64.Build.0 = Release|Any CPU
301+
{426346B6-6631-40E7-9187-8721A0C23E0E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
302+
{426346B6-6631-40E7-9187-8721A0C23E0E}.Debug|Any CPU.Build.0 = Debug|Any CPU
303+
{426346B6-6631-40E7-9187-8721A0C23E0E}.Debug|x64.ActiveCfg = Debug|Any CPU
304+
{426346B6-6631-40E7-9187-8721A0C23E0E}.Debug|x64.Build.0 = Debug|Any CPU
305+
{426346B6-6631-40E7-9187-8721A0C23E0E}.Release|Any CPU.ActiveCfg = Release|Any CPU
306+
{426346B6-6631-40E7-9187-8721A0C23E0E}.Release|Any CPU.Build.0 = Release|Any CPU
307+
{426346B6-6631-40E7-9187-8721A0C23E0E}.Release|x64.ActiveCfg = Release|Any CPU
308+
{426346B6-6631-40E7-9187-8721A0C23E0E}.Release|x64.Build.0 = Release|Any CPU
299309
EndGlobalSection
300310
GlobalSection(SolutionProperties) = preSolution
301311
HideSolutionNode = FALSE
@@ -337,6 +347,7 @@ Global
337347
{FB2CE5CF-AF46-41E0-95CC-88D8134DC324} = {69B890C5-C137-4967-8E15-26D07F312907}
338348
{C2AC0E1E-2B02-474F-9DB5-AF461A6C2FDD} = {69B890C5-C137-4967-8E15-26D07F312907}
339349
{8AAEEC31-B3CE-4583-9B42-1447B8694611} = {386D271D-8E5B-40F7-B017-EE38215FFAA7}
350+
{426346B6-6631-40E7-9187-8721A0C23E0E} = {FEDB83C2-C14A-4196-8973-2A5C68CE2CBA}
340351
EndGlobalSection
341352
GlobalSection(ExtensibilityGlobals) = postSolution
342353
SolutionGuid = {6B8CD1A2-251A-4CE0-94BB-12805E3492A0}

src/Application/Duber.Trip.API/Application/DomainEventHandlers/TripUpdatedDomainEventHandlerAsync.cs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using AutoMapper;
44
using Duber.Domain.Trip.Events;
55
using Duber.Infrastructure.EventBus.Abstractions;
6+
using Duber.Infrastructure.EventBus.Idempotency;
67
using Duber.Trip.API.Application.IntegrationEvents;
78
using Duber.Trip.API.Application.Model;
89
using Kledex.Events;
@@ -17,7 +18,8 @@ public class TripUpdatedDomainEventHandlerAsync : IEventHandlerAsync<TripUpdated
1718
private readonly IMapper _mapper;
1819
private readonly ILogger<TripUpdatedDomainEventHandlerAsync> _logger;
1920

20-
public TripUpdatedDomainEventHandlerAsync(IEventBus eventBus, IMapper mapper, ILogger<TripUpdatedDomainEventHandlerAsync> logger)
21+
public TripUpdatedDomainEventHandlerAsync(IEventBus eventBus, IMapper mapper,
22+
ILogger<TripUpdatedDomainEventHandlerAsync> logger)
2123
{
2224
_eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
2325
_mapper = mapper ?? throw new ArgumentNullException(nameof(mapper));
@@ -36,32 +38,56 @@ public async Task HandleAsync(TripUpdatedDomainEvent @event)
3638
if (@event.Status.Name == TripStatus.Finished.Name)
3739
{
3840
if (!@event.Distance.HasValue || !@event.Duration.HasValue || !@event.UserTripId.HasValue)
39-
throw new ArgumentException("Distance, duration and user id are required to trigger a TripFinishedIntegrationEvent");
41+
throw new ArgumentException(
42+
"Distance, duration and user id are required to trigger a TripFinishedIntegrationEvent");
4043

4144
_logger.LogInformation($"Trip {@event.AggregateRootId} has finished.");
42-
_eventBus.Publish(new TripFinishedIntegrationEvent(
45+
var tripFinishedIntegrationEvent = new TripFinishedIntegrationEvent(
4346
@event.AggregateRootId,
4447
@event.Distance.Value,
4548
@event.Duration.Value,
46-
new PaymentMethod { Id = @event.PaymentMethod.Id, Name = @event.PaymentMethod.Name },
49+
new PaymentMethod {Id = @event.PaymentMethod.Id, Name = @event.PaymentMethod.Name},
4750
@event.UserTripId.Value,
48-
@event.ConnectionId));
51+
@event.ConnectionId);
52+
53+
_eventBus.Publish(new IdempotentIntegrationEvent<TripFinishedIntegrationEvent>(tripFinishedIntegrationEvent, @event.AggregateRootId.ToString()));
4954
}
5055
else if (@event.Status.Name == TripStatus.Cancelled.Name)
5156
{
5257
if (!@event.Duration.HasValue || !@event.UserTripId.HasValue)
53-
throw new ArgumentException("Duration and user id are required to trigger a TripCancelledIntegrationEvent");
58+
throw new ArgumentException(
59+
"Duration and user id are required to trigger a TripCancelledIntegrationEvent");
5460

5561
_logger.LogInformation($"Trip {@event.AggregateRootId} has been canceled.");
56-
_eventBus.Publish(new TripCancelledIntegrationEvent(
62+
var tripCancelledIntegrationEvent = new TripCancelledIntegrationEvent(
5763
@event.AggregateRootId,
5864
@event.Duration.Value,
59-
new PaymentMethod { Id = @event.PaymentMethod.Id, Name = @event.PaymentMethod.Name },
65+
new PaymentMethod {Id = @event.PaymentMethod.Id, Name = @event.PaymentMethod.Name},
6066
@event.UserTripId.Value,
61-
@event.ConnectionId));
67+
@event.ConnectionId);
68+
69+
_eventBus.Publish(new IdempotentIntegrationEvent<TripCancelledIntegrationEvent>(tripCancelledIntegrationEvent, @event.AggregateRootId.ToString()));
6270
}
6371

6472
await Task.CompletedTask;
6573
}
6674
}
75+
76+
public class TripUpdatedIdempotentEventHandler : IdempotentIntegrationEventHandler<TripFinishedIntegrationEvent>
77+
{
78+
private readonly ILogger<TripUpdatedIdempotentEventHandler> _logger;
79+
80+
public TripUpdatedIdempotentEventHandler(IEventBus eventBus, IIdempotencyStoreProvider storeProvider,
81+
ILogger<TripUpdatedIdempotentEventHandler> logger) :
82+
base(eventBus, storeProvider)
83+
{
84+
_logger = logger;
85+
}
86+
87+
protected override void HandleDuplicatedRequest(TripFinishedIntegrationEvent message)
88+
{
89+
_logger.LogInformation($"TripFinishedIntegrationEvent was already handled. Trip {message.TripId}.");
90+
base.HandleDuplicatedRequest(message);
91+
}
92+
}
6793
}

src/Application/Duber.Trip.API/Controllers/TripController.cs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,8 @@ namespace Duber.Trip.API.Controllers
1616
public class TripController : Controller
1717
{
1818
private readonly IDispatcher _dispatcher;
19-
private IMapper _mapper;
19+
private readonly IMapper _mapper;
2020
private readonly IRepository<Domain.Trip.Model.Trip> _repository;
21-
private readonly String _fakeUser = Guid.NewGuid().ToString();
22-
private const string Source = "Duber.Trip.Api";
2321

2422
public TripController(IDispatcher dispatcher, IMapper mapper, IRepository<Domain.Trip.Model.Trip> repository)
2523
{
@@ -62,15 +60,9 @@ public async Task<IActionResult> GetTrip(Guid tripId)
6260
[ProducesResponseType((int)HttpStatusCode.InternalServerError)]
6361
public async Task<IActionResult> CreateTrip([FromBody]ViewModel.CreateTripCommand command)
6462
{
65-
// TODO: make command immutable
6663
// BadRequest and InternalServerError could be throw in HttpGlobalExceptionFilter
67-
var tripId = Guid.NewGuid();
6864
var domainCommand = _mapper.Map<CreateTripCommand>(command);
69-
domainCommand.AggregateRootId = tripId;
70-
domainCommand.Source = Source;
71-
domainCommand.UserId = _fakeUser;
72-
73-
await _dispatcher.SendAsync(domainCommand);
65+
var tripId = await _dispatcher.SendAsync<Guid>(domainCommand);
7466
return Created(HttpContext.Request.GetUri().AbsoluteUri, tripId);
7567
}
7668

@@ -85,12 +77,9 @@ public async Task<IActionResult> CreateTrip([FromBody]ViewModel.CreateTripComman
8577
[ProducesResponseType((int)HttpStatusCode.InternalServerError)]
8678
public async Task<IActionResult> AcceptTrip([FromBody]ViewModel.UpdateTripCommand command)
8779
{
88-
// TODO: make command immutable
8980
// BadRequest and InternalServerError could be throw in HttpGlobalExceptionFilter, and also by ValidatorActionFilter due to the UpdateTripCommandValidator.
9081
var domainCommand = _mapper.Map<UpdateTripCommand>(command);
9182
domainCommand.Action = Action.Accept;
92-
domainCommand.Source = Source;
93-
domainCommand.UserId = _fakeUser;
9483

9584
await _dispatcher.SendAsync(domainCommand);
9685
return Ok();
@@ -107,12 +96,9 @@ public async Task<IActionResult> AcceptTrip([FromBody]ViewModel.UpdateTripComman
10796
[ProducesResponseType((int)HttpStatusCode.InternalServerError)]
10897
public async Task<IActionResult> StartTrip([FromBody]ViewModel.UpdateTripCommand command)
10998
{
110-
// TODO: make command immutable
11199
// BadRequest and InternalServerError could be throw in HttpGlobalExceptionFilter, and also by ValidatorActionFilter due to the UpdateTripCommandValidator.
112100
var domainCommand = _mapper.Map<UpdateTripCommand>(command);
113101
domainCommand.Action = Action.Start;
114-
domainCommand.Source = Source;
115-
domainCommand.UserId = _fakeUser;
116102

117103
await _dispatcher.SendAsync(domainCommand);
118104
return Ok();
@@ -129,12 +115,9 @@ public async Task<IActionResult> StartTrip([FromBody]ViewModel.UpdateTripCommand
129115
[ProducesResponseType((int)HttpStatusCode.InternalServerError)]
130116
public async Task<IActionResult> CancelTrip([FromBody]ViewModel.UpdateTripCommand command)
131117
{
132-
// TODO: make command immutable
133118
// BadRequest and InternalServerError could be throw in HttpGlobalExceptionFilter, and also by ValidatorActionFilter due to the UpdateTripCommandValidator.
134119
var domainCommand = _mapper.Map<UpdateTripCommand>(command);
135120
domainCommand.Action = Action.Cancel;
136-
domainCommand.Source = Source;
137-
domainCommand.UserId = _fakeUser;
138121

139122
await _dispatcher.SendAsync(domainCommand);
140123
return Ok();
@@ -151,12 +134,9 @@ public async Task<IActionResult> CancelTrip([FromBody]ViewModel.UpdateTripComman
151134
[ProducesResponseType((int)HttpStatusCode.InternalServerError)]
152135
public async Task<IActionResult> UpdateCurrentLocation([FromBody]ViewModel.UpdateCurrentLocationTripCommand command)
153136
{
154-
// TODO: make command immutable
155137
// BadRequest and InternalServerError could be throw in HttpGlobalExceptionFilter, and also by ValidatorActionFilter due to the UpdateTripCommandValidator.
156138
var domainCommand = _mapper.Map<UpdateTripCommand>(command);
157139
domainCommand.Action = Action.UpdateCurrentLocation;
158-
domainCommand.Source = Source;
159-
domainCommand.UserId = _fakeUser;
160140

161141
await _dispatcher.SendAsync(domainCommand);
162142
return Ok();

src/Application/Duber.Trip.API/Duber.Trip.API.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
<ItemGroup>
3535
<ProjectReference Include="..\..\Domain\Trip\Duber.Domain.Trip\Duber.Domain.Trip.csproj" />
36+
<ProjectReference Include="..\..\Infrastructure\EventBus\Duber.Infrastructure.EventBus.Idempotency\Duber.Infrastructure.EventBus.Idempotency.csproj" />
3637
<ProjectReference Include="..\..\Infrastructure\EventBus\Duber.Infrastructure.EventBus.RabbitMQ\Duber.Infrastructure.EventBus.RabbitMQ.csproj" />
3738
<ProjectReference Include="..\..\Infrastructure\EventBus\Duber.Infrastructure.EventBus.ServiceBus\Duber.Infrastructure.EventBus.ServiceBus.csproj" />
3839
<ProjectReference Include="..\..\Infrastructure\EventBus\Duber.Infrastructure.EventBus\Duber.Infrastructure.EventBus.csproj" />

src/Application/Duber.Trip.API/Extensions/ServiceCollectionExtensions.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
using System.Linq;
1010
using System.Reflection;
1111
using AutoMapper;
12+
using Duber.Infrastructure.EventBus.Abstractions;
13+
using Duber.Infrastructure.EventBus.Idempotency;
14+
using Duber.Trip.API.Application.IntegrationEvents;
1215
using Kledex;
1316
using Kledex.Commands;
1417
using Kledex.Configuration;
@@ -19,6 +22,8 @@
1922
using Kledex.Store.Cosmos.Mongo.Configuration;
2023
using Microsoft.OpenApi.Models;
2124
using Kledex.Store.Cosmos.Mongo.Extensions;
25+
using Microsoft.AspNetCore.Builder;
26+
using MongoDB.Bson.Serialization;
2227

2328
namespace Duber.Trip.API.Extensions
2429
{
@@ -107,5 +112,25 @@ public static IServiceCollection AddCustomAutoMapper(this IServiceCollection ser
107112
services.AddSingleton(sp => autoMapperConfig.CreateMapper());
108113
return services;
109114
}
115+
116+
public static IServiceCollection AddIdempotency(this IServiceCollection services)
117+
{
118+
services.AddTransient<IIdempotencyStoreProvider, IdempotencyStoreProvider>();
119+
services.RegisterIdempotentHandlers(typeof(TripUpdatedIdempotentEventHandler));
120+
121+
BsonClassMap.RegisterClassMap<IdempotentMessage>(cm =>
122+
{
123+
cm.AutoMap();
124+
cm.SetIgnoreExtraElements(true);
125+
});
126+
127+
return services;
128+
}
129+
130+
public static void UseIdempotency(this IApplicationBuilder app)
131+
{
132+
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
133+
eventBus.Subscribe<IdempotentIntegrationEvent<TripFinishedIntegrationEvent>, TripUpdatedIdempotentEventHandler>();
134+
}
110135
}
111136
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
using System.Threading.Tasks;
2+
using Duber.Infrastructure.EventBus.Idempotency;
3+
using Kledex.Store.Cosmos.Mongo.Configuration;
4+
using Microsoft.Extensions.Options;
5+
using MongoDB.Driver;
6+
7+
namespace Duber.Trip.API.Infrastructure.Repository
8+
{
9+
public class IdempotencyStoreProvider : IIdempotencyStoreProvider
10+
{
11+
private readonly IMongoDatabase _db;
12+
13+
public IdempotencyStoreProvider(IOptions<MongoOptions> settings)
14+
{
15+
var mongoClient = new MongoClient(settings.Value.ConnectionString);
16+
_db = mongoClient.GetDatabase(settings.Value.DatabaseName);
17+
}
18+
19+
public async Task SaveAsync(IdempotentMessage message)
20+
{
21+
var exists = await ExistsAsync(message.MessageId);
22+
if (exists) return;
23+
24+
var collection = _db.GetCollection<IdempotentMessage>("IdempotentMessages");
25+
await collection.InsertOneAsync(message);
26+
}
27+
28+
public async Task<bool> ExistsAsync(string id)
29+
{
30+
var collection = _db.GetCollection<IdempotentMessage>("IdempotentMessages");
31+
var filter = Builders<IdempotentMessage>.Filter.Eq("MessageId", id);
32+
var message = await collection.FindAsync(filter).Result.FirstOrDefaultAsync();
33+
34+
return message != null;
35+
}
36+
}
37+
}

src/Application/Duber.Trip.API/Startup.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
using System;
2-
using System.Collections.Generic;
32
using Autofac;
43
using Autofac.Extensions.DependencyInjection;
5-
using AutoMapper;
64
using Duber.Infrastructure.EventBus.RabbitMQ.IoC;
75
using Duber.Infrastructure.EventBus.ServiceBus.IoC;
8-
using Duber.Trip.API.Application.Mapping;
96
using Duber.Trip.API.Application.Validations;
107
using Duber.Trip.API.Extensions;
118
using Duber.Trip.API.Infrastructure.Filters;
@@ -42,7 +39,9 @@ public IServiceProvider ConfigureServices(IServiceCollection services)
4239
.AddNewtonsoftJson(options => options.SerializerSettings.ReferenceLoopHandling = Newtonsoft.Json.ReferenceLoopHandling.Ignore)
4340
.AddFluentValidation(x => x.RegisterValidatorsFromAssemblyContaining<UpdateTripCommandValidator>());
4441

45-
services.AddCQRS(Configuration);
42+
services.AddCQRS(Configuration)
43+
.AddIdempotency();
44+
4645
services.AddOptions()
4746
.AddCors(options =>
4847
{
@@ -81,6 +80,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerF
8180
app.UseCors("CorsPolicy");
8281
app.UseRouting();
8382
app.UseEndpoints(endpoints => { endpoints.MapControllers(); });
83+
app.UseIdempotency();
8484

8585
app.UseSwagger()
8686
.UseSwaggerUI(c =>

src/Domain/Trip/Duber.Domain.Trip/Commands/Handlers/CreateTripCommandHandlerAsync.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ public class CreateTripCommandHandlerAsync : ICommandHandlerAsync<CreateTripComm
88
public async Task<CommandResponse> HandleAsync(CreateTripCommand command)
99
{
1010
var trip = new Model.Trip(
11-
command.AggregateRootId,
1211
command.UserTripId,
1312
command.DriverId,
1413
command.From,
@@ -22,7 +21,8 @@ public async Task<CommandResponse> HandleAsync(CreateTripCommand command)
2221
await Task.CompletedTask;
2322
return new CommandResponse
2423
{
25-
Events = trip.Events
24+
Events = trip.Events,
25+
Result = trip.Id
2626
};
2727
}
2828
}

src/Domain/Trip/Duber.Domain.Trip/Model/Trip.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public Trip()
6262
{
6363
}
6464

65-
public Trip(Guid id, int userId, int driverId, Location from, Location to, PaymentMethod paymentMethod, string plate, string brand, string model, string connectionId) : base(id)
65+
public Trip(int userId, int driverId, Location from, Location to, PaymentMethod paymentMethod, string plate, string brand, string model, string connectionId) : base()
6666
{
6767
if (userId <= 0) throw new TripDomainArgumentNullException(nameof(userId));
6868
if (driverId <= 0) throw new TripDomainArgumentNullException(nameof(driverId));

0 commit comments

Comments
 (0)