Skip to content

Commit 1dc388e

Browse files
authored
Merge pull request #203 from AzureCosmosDB/copilot/fix-202
Replace static ItemProgressTracker with .NET IProgress interface pattern for comprehensive data transfer progress tracking
2 parents c7850c4 + c098d7f commit 1dc388e

File tree

19 files changed

+492
-14
lines changed

19 files changed

+492
-14
lines changed

ExampleConfigs.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
"SinkSettings":
3939
{
4040
"FilePath": "c:\\data\\cosmicworks\\customers.json",
41-
"Indented": true
41+
"Indented": true,
42+
"ItemProgressFrequency": 1000
4243
}
4344
}
4445
```
@@ -172,7 +173,8 @@
172173
"ContainerName": "operations-archive",
173174
"AccountEndpoint": "https://<storage-account>.blob.core.windows.net",
174175
"EnableInteractiveCredentials": true,
175-
"BlobName": "jan-alerts"
176+
"BlobName": "jan-alerts",
177+
"ItemProgressFrequency": 1000
176178
},
177179
"Operations": [
178180
]

Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/AzureBlobDataSink.cs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,21 @@
33
using Azure.Storage.Blobs.Models;
44
using Azure.Storage.Blobs.Specialized;
55
using Cosmos.DataTransfer.Interfaces;
6+
using Cosmos.DataTransfer.Common;
67
using Microsoft.Extensions.Configuration;
78
using Microsoft.Extensions.Logging;
89

910
namespace Cosmos.DataTransfer.AzureBlobStorage
1011
{
11-
public class AzureBlobDataSink : IComposableDataSink
12+
public class AzureBlobDataSink : IComposableDataSink, IProgressAwareComposableDataSink
1213
{
1314
public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default)
15+
{
16+
// Call the progress-aware version with null progress
17+
await WriteToTargetAsync(writeToStream, config, dataSource, logger, null, cancellationToken);
18+
}
19+
20+
public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, IProgress<DataTransferProgress>? progress, CancellationToken cancellationToken = default)
1421
{
1522
var settings = config.Get<AzureBlobSinkSettings>();
1623
settings.Validate();
@@ -49,6 +56,29 @@ public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfigur
4956
})
5057
}, cancellationToken);
5158
await writeToStream(blobStream);
59+
60+
// Log final summary after upload completes
61+
var finalBlob = account.GetBlobClient(settings.BlobName);
62+
var properties = await finalBlob.GetPropertiesAsync(cancellationToken: cancellationToken);
63+
64+
// Get the item count from the progress reporter if available
65+
int itemCount = 0;
66+
if (progress is DataTransferProgressReporter progressReporter &&
67+
progressReporter.Context != null)
68+
{
69+
itemCount = progressReporter.Context.GetCurrentProgress().ItemCount;
70+
}
71+
72+
if (itemCount > 0)
73+
{
74+
logger.LogInformation("Successfully transferred {TotalBytes} total bytes from {ItemCount} items to blob '{BlobName}' in container '{ContainerName}'",
75+
properties.Value.ContentLength, itemCount, settings.BlobName, settings.ContainerName);
76+
}
77+
else
78+
{
79+
logger.LogInformation("Successfully transferred {TotalBytes} total bytes to blob '{BlobName}' in container '{ContainerName}'",
80+
properties.Value.ContentLength, settings.BlobName, settings.ContainerName);
81+
}
5282
}
5383

5484
public IEnumerable<IDataExtensionSettings> GetSettings()

Extensions/AzureBlob/Cosmos.DataTransfer.AzureBlobStorage/Cosmos.DataTransfer.AzureBlobStorage.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
<ItemGroup>
1717
<ProjectReference Include="..\..\..\Interfaces\Cosmos.DataTransfer.Interfaces\Cosmos.DataTransfer.Interfaces.csproj" />
18+
<ProjectReference Include="..\..\..\Interfaces\Cosmos.DataTransfer.Common\Cosmos.DataTransfer.Common.csproj" />
1819
</ItemGroup>
1920

2021
</Project>

Extensions/Csv/Cosmos.DataTransfer.CsvExtension/CsvFormatWriter.cs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22
using System.Runtime.CompilerServices;
33
using Cosmos.DataTransfer.CsvExtension.Settings;
44
using Cosmos.DataTransfer.Interfaces;
5+
using Cosmos.DataTransfer.Common;
56
using CsvHelper;
67
using CsvHelper.Configuration;
78
using Microsoft.Extensions.Configuration;
89
using Microsoft.Extensions.Logging;
910

1011
namespace Cosmos.DataTransfer.CsvExtension;
1112

12-
public class CsvFormatWriter : IFormattedDataWriter
13+
public class CsvFormatWriter : IFormattedDataWriter, IProgressAwareFormattedDataWriter
1314
{
1415
public IEnumerable<IDataExtensionSettings> GetSettings()
1516
{
@@ -18,9 +19,18 @@ public IEnumerable<IDataExtensionSettings> GetSettings()
1819

1920
public async Task FormatDataAsync(IAsyncEnumerable<IDataItem> dataItems, Stream target, IConfiguration config, ILogger logger, CancellationToken cancellationToken = default)
2021
{
21-
var settings = config.Get<CsvWriterSettings>();
22+
// Call the progress-aware version with null progress
23+
await FormatDataAsync(dataItems, target, config, logger, null, cancellationToken);
24+
}
25+
26+
public async Task FormatDataAsync(IAsyncEnumerable<IDataItem> dataItems, Stream target, IConfiguration config, ILogger logger, IProgress<DataTransferProgress>? progress, CancellationToken cancellationToken = default)
27+
{
28+
var settings = config.Get<CsvWriterSettings>() ?? new CsvWriterSettings();
2229
settings.Validate();
2330

31+
// Track progress locally
32+
int itemCount = 0;
33+
2434
await using var textWriter = new StreamWriter(target, leaveOpen: true);
2535
await using var writer = new CsvWriter(textWriter, new CsvConfiguration(settings.GetCultureInfo())
2636
{
@@ -53,8 +63,21 @@ public async Task FormatDataAsync(IAsyncEnumerable<IDataItem> dataItems, Stream
5363
}
5464

5565
firstRecord = false;
66+
itemCount++;
67+
68+
// Report progress if progress reporter is available
69+
if (progress != null && itemCount % settings.ItemProgressFrequency == 0)
70+
{
71+
progress.Report(new DataTransferProgress(itemCount, 0, $"Formatted {itemCount} items for transfer"));
72+
}
5673
}
5774

5875
await writer.FlushAsync();
76+
77+
// Report final count
78+
if (progress != null && itemCount > 0)
79+
{
80+
progress.Report(new DataTransferProgress(itemCount, 0, null));
81+
}
5982
}
6083
}

Extensions/Csv/Cosmos.DataTransfer.CsvExtension/Settings/CsvWriterSettings.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ public class CsvWriterSettings : IDataExtensionSettings, IValidatableObject
99
public bool IncludeHeader { get; set; } = true;
1010
public string? Delimiter { get; set; } = ",";
1111
public string? Culture { get; set; } = "InvariantCulture";
12+
public int ItemProgressFrequency { get; set; } = 1000;
13+
1214
public CultureInfo GetCultureInfo() {
1315
switch (this.Culture?.ToLower())
1416
{

Extensions/Csv/README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,13 @@ or e.g., `"en"`, `"en-GB"`, or `"en-US"` for English standards (period, `.`, as
4545
Note, if using a culture with comma as decimal separator, specify a different delimiter (e.g., semi-colon, `;`), else all numbers
4646
will be written enclosed with quotes.
4747

48+
Additionally, an optional `ItemProgressFrequency` parameter (`1000` by default) controls how often item processing progress is logged during migration.
49+
4850
```json
4951
{
5052
"Delimiter": ",",
5153
"IncludeHeader": true,
52-
"Culture": "Invariant"
54+
"Culture": "Invariant",
55+
"ItemProgressFrequency": 1000
5356
}
5457
```
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
using Microsoft.Extensions.Configuration;
2+
using Microsoft.Extensions.Logging;
3+
using Cosmos.DataTransfer.JsonExtension;
4+
using Cosmos.DataTransfer.Common.UnitTests;
5+
using Cosmos.DataTransfer.Common;
6+
using Cosmos.DataTransfer.Interfaces;
7+
8+
namespace Cosmos.DataTransfer.JsonExtension.UnitTests
9+
{
10+
[TestClass]
11+
public class ItemCountingTest
12+
{
13+
private class TestLogger : ILogger
14+
{
15+
private readonly List<string> _logs = new List<string>();
16+
17+
public IDisposable BeginScope<TState>(TState state) => null!;
18+
public bool IsEnabled(LogLevel logLevel) => true;
19+
20+
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
21+
{
22+
_logs.Add(formatter(state, exception));
23+
}
24+
25+
public List<string> GetLogs() => _logs;
26+
}
27+
28+
private class TestProgressReporter : IProgress<DataTransferProgress>
29+
{
30+
public DataTransferProgress? LastProgress { get; private set; }
31+
public List<DataTransferProgress> AllProgress { get; } = new List<DataTransferProgress>();
32+
33+
public void Report(DataTransferProgress value)
34+
{
35+
LastProgress = value;
36+
AllProgress.Add(new DataTransferProgress(value.ItemCount, value.BytesTransferred, value.Message));
37+
}
38+
}
39+
40+
[TestMethod]
41+
public async Task JsonFormatWriter_CountsItems_LogsCorrectly()
42+
{
43+
// Arrange
44+
var formatter = new JsonFormatWriter();
45+
var logger = new TestLogger();
46+
var progressReporter = new TestProgressReporter();
47+
48+
var data = new List<DictionaryDataItem>
49+
{
50+
new(new Dictionary<string, object?> { { "Id", 1 }, { "Name", "One" } }),
51+
new(new Dictionary<string, object?> { { "Id", 2 }, { "Name", "Two" } }),
52+
new(new Dictionary<string, object?> { { "Id", 3 }, { "Name", "Three" } }),
53+
new(new Dictionary<string, object?> { { "Id", 4 }, { "Name", "Four" } }),
54+
new(new Dictionary<string, object?> { { "Id", 5 }, { "Name", "Five" } }),
55+
};
56+
57+
var config = new ConfigurationBuilder()
58+
.AddInMemoryCollection(new Dictionary<string, string?>
59+
{
60+
{ "ItemProgressFrequency", "2" } // Log every 2 items for testing
61+
})
62+
.Build();
63+
64+
using var stream = new MemoryStream();
65+
66+
// Act
67+
await formatter.FormatDataAsync(data.Cast<Cosmos.DataTransfer.Interfaces.IDataItem>().ToAsyncEnumerable(), stream, config, logger, progressReporter);
68+
69+
// Assert
70+
stream.Position = 0;
71+
using var reader = new StreamReader(stream);
72+
var result = await reader.ReadToEndAsync();
73+
74+
// Verify JSON was written correctly (basic sanity check)
75+
Assert.IsTrue(result.Contains("\"Id\":1"));
76+
Assert.IsTrue(result.Contains("\"Id\":5"));
77+
78+
// Verify progress reporting behavior
79+
var progressReports = progressReporter.AllProgress;
80+
81+
// Should have progress reports for items 2, 4, and final count (5)
82+
Assert.AreEqual(3, progressReports.Count, "Should have 3 progress reports (2, 4, and final 5)");
83+
84+
// Verify specific progress content
85+
Assert.AreEqual(2, progressReports[0].ItemCount);
86+
Assert.AreEqual(4, progressReports[1].ItemCount);
87+
Assert.AreEqual(5, progressReports[2].ItemCount);
88+
89+
// Verify the final count is available
90+
Assert.AreEqual(5, progressReporter.LastProgress?.ItemCount);
91+
}
92+
93+
[TestMethod]
94+
public async Task FormatDataAsync_TracksItemsWithProgress_MakesCountAvailable()
95+
{
96+
// Arrange
97+
var logger = new TestLogger();
98+
var formatter = new JsonFormatWriter();
99+
var progressReporter = new TestProgressReporter();
100+
var data = new[]
101+
{
102+
new JsonDictionaryDataItem(new Dictionary<string, object?> { { "Id", 1 }, { "Name", "One" } }),
103+
new JsonDictionaryDataItem(new Dictionary<string, object?> { { "Id", 2 }, { "Name", "Two" } }),
104+
new JsonDictionaryDataItem(new Dictionary<string, object?> { { "Id", 3 }, { "Name", "Three" } }),
105+
};
106+
107+
var config = new ConfigurationBuilder()
108+
.AddInMemoryCollection(new Dictionary<string, string?>
109+
{
110+
{ "ItemProgressFrequency", "2" }, // Log every 2 items for testing
111+
{ "BlobName", "test-data.json" },
112+
{ "ContainerName", "test-container" }
113+
})
114+
.Build();
115+
116+
using var stream = new MemoryStream();
117+
118+
// Act
119+
await formatter.FormatDataAsync(data.Cast<Cosmos.DataTransfer.Interfaces.IDataItem>().ToAsyncEnumerable(), stream, config, logger, progressReporter);
120+
121+
// Assert
122+
var progressReports = progressReporter.AllProgress;
123+
124+
// Should have progress reports for 2 items and final count (3)
125+
Assert.AreEqual(2, progressReports.Count, "Should have 2 progress reports (2 and final 3)");
126+
Assert.AreEqual(2, progressReports[0].ItemCount);
127+
Assert.AreEqual(3, progressReports[1].ItemCount);
128+
129+
// Verify the final count is available for the sink to use
130+
Assert.AreEqual(3, progressReporter.LastProgress?.ItemCount);
131+
}
132+
133+
[TestMethod]
134+
public void DataTransferProgressReporter_SharesCountAcrossContext()
135+
{
136+
// Arrange
137+
var logger = new TestLogger();
138+
var context = new DataTransferContext();
139+
var progressReporter = new DataTransferProgressReporter(logger, 1000, "item", context);
140+
141+
// Act - Simulate format writer setting count
142+
progressReporter.Report(new DataTransferProgress(1));
143+
progressReporter.Report(new DataTransferProgress(2));
144+
progressReporter.Report(new DataTransferProgress(3));
145+
146+
// Assert - Simulate sink reading count from context
147+
var currentProgress = context.GetCurrentProgress();
148+
Assert.AreEqual(3, currentProgress.ItemCount, "Sink should be able to read the item count set by format writer through context");
149+
}
150+
}
151+
}

Extensions/Json/Cosmos.DataTransfer.JsonExtension/JsonFormatWriter.cs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,22 @@
77

88
namespace Cosmos.DataTransfer.JsonExtension;
99

10-
public class JsonFormatWriter : IFormattedDataWriter
10+
public class JsonFormatWriter : IFormattedDataWriter, IProgressAwareFormattedDataWriter
1111
{
1212
public async Task FormatDataAsync(IAsyncEnumerable<IDataItem> dataItems, Stream target, IConfiguration config, ILogger logger, CancellationToken cancellationToken = default)
1313
{
14-
var settings = config.Get<JsonFormatWriterSettings>();
14+
// Call the progress-aware version with null progress
15+
await FormatDataAsync(dataItems, target, config, logger, null, cancellationToken);
16+
}
17+
18+
public async Task FormatDataAsync(IAsyncEnumerable<IDataItem> dataItems, Stream target, IConfiguration config, ILogger logger, IProgress<DataTransferProgress>? progress, CancellationToken cancellationToken = default)
19+
{
20+
var settings = config.Get<JsonFormatWriterSettings>() ?? new JsonFormatWriterSettings();
1521
settings.Validate();
1622

23+
// Track progress locally
24+
int itemCount = 0;
25+
1726
await using var writer = new Utf8JsonWriter(target, new JsonWriterOptions
1827
{
1928
Indented = settings.Indented
@@ -23,6 +32,14 @@ public async Task FormatDataAsync(IAsyncEnumerable<IDataItem> dataItems, Stream
2332
await foreach (var item in dataItems.WithCancellation(cancellationToken))
2433
{
2534
DataItemJsonConverter.WriteDataItem(writer, item, settings.IncludeNullFields);
35+
itemCount++;
36+
37+
// Report progress if progress reporter is available
38+
if (progress != null && itemCount % settings.ItemProgressFrequency == 0)
39+
{
40+
progress.Report(new DataTransferProgress(itemCount, 0, $"Formatted {itemCount} items for transfer"));
41+
}
42+
2643
int max = settings.BufferSizeMB * 1024 * 1024;
2744
if (writer.BytesPending > max)
2845
{
@@ -31,6 +48,12 @@ public async Task FormatDataAsync(IAsyncEnumerable<IDataItem> dataItems, Stream
3148
}
3249

3350
writer.WriteEndArray();
51+
52+
// Report final count
53+
if (progress != null && itemCount > 0)
54+
{
55+
progress.Report(new DataTransferProgress(itemCount, 0, null));
56+
}
3457
}
3558

3659
public IEnumerable<IDataExtensionSettings> GetSettings()

Extensions/Json/Cosmos.DataTransfer.JsonExtension/Settings/JsonFormatWriterSettings.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@ public class JsonFormatWriterSettings : IDataExtensionSettings
77
public bool IncludeNullFields { get; set; }
88
public bool Indented { get; set; }
99
public int BufferSizeMB { get; set; } = 200;
10+
public int ItemProgressFrequency { get; set; } = 1000;
1011
}
1112
}

Extensions/Json/README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,13 @@ Source does not require any formatter specific settings.
3333

3434
Sink supports an optional `Indented` parameter (`false` by default) and an optional `IncludeNullFields` parameter (`false` by default) to control the formatting of the JSON output. Sink also supports an optional `BufferSizeMB` parameter (`200` by default) to constrain the in-memory stream buffer.
3535

36+
Additionally, an optional `ItemProgressFrequency` parameter (`1000` by default) controls how often item processing progress is logged during migration.
37+
3638
```json
3739
{
3840
"Indented": true,
3941
"IncludeNullFields": true,
40-
"BufferSizeMB": 200
42+
"BufferSizeMB": 200,
43+
"ItemProgressFrequency": 1000
4144
}
4245
```

0 commit comments

Comments
 (0)