Skip to content

Commit 7d5db84

Browse files
authored
Merge pull request #185 from philnach/TableMemory
Table memory
2 parents 216314f + 54aff7e commit 7d5db84

File tree

4 files changed

+62
-19
lines changed

4 files changed

+62
-19
lines changed

Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/AzureTableAPIDataSinkExtension.cs

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
using System.ComponentModel.Composition;
2+
using Azure;
23
using Azure.Data.Tables;
34
using Azure.Identity;
45
using Cosmos.DataTransfer.AzureTableAPIExtension.Data;
56
using Cosmos.DataTransfer.AzureTableAPIExtension.Settings;
67
using Cosmos.DataTransfer.Interfaces;
78
using Microsoft.Extensions.Configuration;
89
using Microsoft.Extensions.Logging;
10+
using Polly;
911

1012
namespace Cosmos.DataTransfer.AzureTableAPIExtension
1113
{
1214
[Export(typeof(IDataSinkExtension))]
1315
public class AzureTableAPIDataSinkExtension : IDataSinkExtensionWithSettings
1416
{
17+
private static readonly int[] TransientStatusCodes = { 408, 429, 500, 502, 503, 504 };
18+
1519
public string DisplayName => "AzureTableAPI";
1620

1721
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default)
@@ -41,21 +45,54 @@ public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfigurati
4145

4246
var tableClient = serviceClient.GetTableClient(settings.Table);
4347

44-
await tableClient.CreateIfNotExistsAsync();
48+
await tableClient.CreateIfNotExistsAsync().ConfigureAwait(false);
4549

46-
var createTasks = new List<Task>();
47-
await foreach(var item in dataItems.WithCancellation(cancellationToken))
48-
{
49-
var entity = item.ToTableEntity(settings.PartitionKeyFieldName, settings.RowKeyFieldName);
50-
createTasks.Add(tableClient.AddEntityAsync(entity));
51-
}
50+
var maxConcurrency = settings.MaxConcurrentEntityWrites ?? 10;
5251

53-
await Task.WhenAll(createTasks);
52+
logger.LogInformation("Writing data to Azure Table Storage with a maximum of {MaxConcurrency} concurrent writes.", maxConcurrency);
53+
54+
logger.LogInformation("Using PartitionKeyFieldName: `{ParitionKeyFieldName}` and RowKeyFieldName: `{RowKeyFieldName}`", settings.PartitionKeyFieldName, settings.RowKeyFieldName);
55+
56+
await Parallel.ForEachAsync<IDataItem>(dataItems,
57+
new ParallelOptions { MaxDegreeOfParallelism = maxConcurrency, CancellationToken = cancellationToken },
58+
async (item, ct) =>
59+
{
60+
try
61+
{
62+
var entity = item.ToTableEntity(settings.PartitionKeyFieldName, settings.RowKeyFieldName);
63+
await AddEntityWithRetryAsync(tableClient, entity, ct);
64+
}
65+
catch (Exception ex)
66+
{
67+
logger.LogError(ex, "Error adding entity to table.");
68+
}
69+
});
70+
logger.LogInformation("Finished writing data to Azure Table Storage.");
5471
}
5572

5673
public IEnumerable<IDataExtensionSettings> GetSettings()
5774
{
5875
yield return new AzureTableAPIDataSinkSettings();
5976
}
77+
78+
/// <summary>
79+
/// Adds an entity to the Azure Table Storage with retry logic for transient errors.
80+
/// This method uses the Polly library to implement a retry policy with exponential backoff.
81+
/// </summary>
82+
/// <param name="tableClient"></param>
83+
/// <param name="entity"></param>
84+
/// <param name="cancellationToken"></param>
85+
/// <returns></returns>
86+
private static async Task AddEntityWithRetryAsync(TableClient tableClient, TableEntity entity, CancellationToken cancellationToken)
87+
{
88+
var retryPolicy = Policy
89+
.Handle<RequestFailedException>(ex => TransientStatusCodes.Contains(ex.Status))
90+
.WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
91+
92+
await retryPolicy.ExecuteAsync(async () =>
93+
{
94+
await tableClient.AddEntityAsync(entity, cancellationToken);
95+
});
96+
}
6097
}
6198
}

Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/Cosmos.DataTransfer.AzureTableAPIExtension.csproj

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99

1010
<ItemGroup>
1111
<PackageReference Include="Azure.Data.Tables" />
12-
<PackageReference Include="Azure.Identity" />
12+
<PackageReference Include="Azure.Identity" />
1313
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
14+
<PackageReference Include="Polly" />
1415
<PackageReference Include="System.ComponentModel.Composition" />
1516
<PackageReference Include="System.Text.Json" />
1617
</ItemGroup>
@@ -19,16 +20,8 @@
1920
<ProjectReference Include="..\..\..\Interfaces\Cosmos.DataTransfer.Interfaces\Cosmos.DataTransfer.Interfaces.csproj" />
2021
</ItemGroup>
2122

22-
<Target Name="PublishToExtensionsFolder"
23-
AfterTargets="Build"
24-
Condition=" '$(Configuration)' == 'Debug' AND '$(PublishingToExtensionsFolder)' != 'true' ">
25-
<MSBuild
26-
Projects="$(MSBuildProjectFile)"
27-
Targets="Publish"
28-
Properties="Configuration=$(Configuration);
29-
PublishProfile=PublishToExtensionsFolder;
30-
BuildProjectReferences=false;
31-
PublishingToExtensionsFolder=true" />
23+
<Target Name="PublishToExtensionsFolder" AfterTargets="Build" Condition=" '$(Configuration)' == 'Debug' AND '$(PublishingToExtensionsFolder)' != 'true' ">
24+
<MSBuild Projects="$(MSBuildProjectFile)" Targets="Publish" Properties="Configuration=$(Configuration);&#xD;&#xA; PublishProfile=PublishToExtensionsFolder;&#xD;&#xA; BuildProjectReferences=false;&#xD;&#xA; PublishingToExtensionsFolder=true" />
3225
</Target>
3326

3427
</Project>

Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/Settings/AzureTableAPIDataSinkSettings.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,10 @@
22
{
33
public class AzureTableAPIDataSinkSettings : AzureTableAPISettingsBase
44
{
5+
/// <summary>
6+
/// The Maximum number of concurrent entity writes to the Azure Table API.
7+
/// This setting is used to control the number of concurrent writes to the Azure Table API.
8+
/// </summary>
9+
public int? MaxConcurrentEntityWrites { get; set; }
510
}
611
}

Extensions/AzureTableAPI/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,14 @@ The following setting is supported for the Source:
2828

2929
- `QueryFilter` - This enables you to specify an OData filter to be applied to the data being retrieved by the AzureTableAPI Source. This is used in cases where only a subset of data from the source Table is needed in the migration. Example usage to query a subset of entities from the source table: `PartitionKey eq 'foo'`.
3030

31+
### Additional Sink Settings
32+
33+
The AzureTableAPI Source extension has an additional setting that can be configured for writing Table entities.
34+
35+
The following setting is supported for the Sink:
36+
37+
- `MaxConcurrentEntityWrites` - The Maximum number of concurrent entity writes to the Azure Table API. This setting is used to control the number of concurrent writes to the Azure Table API.
38+
3139
## Authentication Methods
3240

3341
The AzureTableAPI extension supports two authentication methods for connecting to Azure Table API services:

0 commit comments

Comments
 (0)