Skip to content

Commit 3b97f3d

Browse files
committed
Switch to Parallel and add a logging line
1 parent 0a94314 commit 3b97f3d

File tree

1 file changed

+14
-25
lines changed

1 file changed

+14
-25
lines changed

Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/AzureTableAPIDataSinkExtension.cs

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -45,39 +45,28 @@ public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfigurati
4545

4646
var tableClient = serviceClient.GetTableClient(settings.Table);
4747

48-
await tableClient.CreateIfNotExistsAsync();
48+
await tableClient.CreateIfNotExistsAsync().ConfigureAwait(false);
4949

5050
var maxConcurrency = settings.MaxConcurrentEntityWrites ?? 10;
5151

5252
logger.LogInformation("Writing data to Azure Table Storage with a maximum of {MaxConcurrency} concurrent writes.", maxConcurrency);
5353

54-
var semaphore = new SemaphoreSlim(maxConcurrency);
55-
var tasks = new List<Task>();
54+
logger.LogInformation("Using PartitionKeyFieldName: `{ParitionKeyFieldName}` and RowKeyFieldName: `{RowKeyFieldName}`", settings.PartitionKeyFieldName, settings.RowKeyFieldName);
5655

57-
await foreach (var item in dataItems.WithCancellation(cancellationToken))
56+
await Parallel.ForEachAsync<IDataItem>(dataItems,
57+
new ParallelOptions { MaxDegreeOfParallelism = maxConcurrency, CancellationToken = cancellationToken },
58+
async (item, ct) =>
5859
{
59-
await semaphore.WaitAsync(cancellationToken);
60-
61-
tasks.Add(Task.Run(async () =>
60+
try
6261
{
63-
try
64-
{
65-
var entity = item.ToTableEntity(settings.PartitionKeyFieldName, settings.RowKeyFieldName);
66-
await AddEntityWithRetryAsync(tableClient, entity, cancellationToken);
67-
}
68-
catch (Exception ex)
69-
{
70-
logger.LogError(ex, "Error adding entity to table.");
71-
}
72-
finally
73-
{
74-
semaphore.Release();
75-
}
76-
}, cancellationToken));
77-
}
78-
79-
await Task.WhenAll(tasks);
80-
62+
var entity = item.ToTableEntity(settings.PartitionKeyFieldName, settings.RowKeyFieldName);
63+
await AddEntityWithRetryAsync(tableClient, entity, ct);
64+
}
65+
catch (Exception ex)
66+
{
67+
logger.LogError(ex, "Error adding entity to table.");
68+
}
69+
});
8170
logger.LogInformation("Finished writing data to Azure Table Storage.");
8271
}
8372

0 commit comments

Comments
 (0)