Skip to content

Commit 8928229

Browse files
authored
Merge pull request #212 from AzureCosmosDB/copilot/fix-tablestorage-to-parquet-issue
Fix TableStorage to Parquet migration by converting DateTimeOffset to UTC DateTime
2 parents f87b549 + afd7c6e commit 8928229

File tree

6 files changed

+311
-6
lines changed

6 files changed

+311
-6
lines changed
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<LangVersion>latest</LangVersion>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
<IsPackable>false</IsPackable>
9+
<IsTestProject>true</IsTestProject>
10+
</PropertyGroup>
11+
12+
<ItemGroup>
13+
<PackageReference Include="Microsoft.NET.Test.Sdk" />
14+
<PackageReference Include="MSTest.TestAdapter" />
15+
<PackageReference Include="MSTest.TestFramework" />
16+
<PackageReference Include="coverlet.collector" />
17+
<PackageReference Include="Azure.Data.Tables" />
18+
<PackageReference Include="Microsoft.Extensions.Configuration" />
19+
<PackageReference Include="System.Linq.Async" />
20+
<PackageReference Include="Parquet.Net" />
21+
</ItemGroup>
22+
23+
<ItemGroup>
24+
<ProjectReference Include="../Cosmos.DataTransfer.ParquetExtension/Cosmos.DataTransfer.ParquetExtension.csproj" />
25+
<ProjectReference Include="../../../Interfaces/Cosmos.DataTransfer.Interfaces/Cosmos.DataTransfer.Interfaces.csproj" />
26+
<ProjectReference Include="../../AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/Cosmos.DataTransfer.AzureTableAPIExtension.csproj" />
27+
</ItemGroup>
28+
29+
<ItemGroup>
30+
<Using Include="Microsoft.VisualStudio.TestTools.UnitTesting" />
31+
</ItemGroup>
32+
33+
</Project>
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
using Azure.Data.Tables;
2+
using Cosmos.DataTransfer.AzureTableAPIExtension.Data;
3+
using Cosmos.DataTransfer.Interfaces;
4+
using Microsoft.Extensions.Configuration;
5+
using Microsoft.Extensions.Logging;
6+
using Parquet;
7+
8+
namespace Cosmos.DataTransfer.ParquetExtension.UnitTests
9+
{
10+
[TestClass]
11+
public class ParquetFormatWriterTests
12+
{
13+
private class TestLogger : ILogger
14+
{
15+
public IDisposable? BeginScope<TState>(TState state) where TState : notnull => null;
16+
public bool IsEnabled(LogLevel logLevel) => true;
17+
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
18+
{
19+
Console.WriteLine(formatter(state, exception));
20+
}
21+
}
22+
23+
[TestMethod]
24+
public async Task FormatDataAsync_WithDateTimeOffsetField_ShouldNotThrow()
25+
{
26+
// Arrange
27+
var writer = new ParquetFormatWriter();
28+
var logger = new TestLogger();
29+
var config = new ConfigurationBuilder().Build();
30+
31+
// Create a TableEntity with DateTimeOffset Timestamp
32+
var entity = new TableEntity("partition1", "row1");
33+
entity["CustomField"] = "CustomValue";
34+
entity["NumberField"] = 42;
35+
entity.Timestamp = DateTimeOffset.Now;
36+
37+
var dataItem = new AzureTableAPIDataItem(entity, null, null);
38+
var dataItems = new[] { dataItem }.ToAsyncEnumerable();
39+
40+
using var stream = new MemoryStream();
41+
42+
// Act & Assert
43+
// This should not throw NotSupportedException for DateTimeOffset
44+
await writer.FormatDataAsync(dataItems, stream, config, logger, CancellationToken.None);
45+
46+
// Verify that data was written
47+
Assert.IsTrue(stream.Length > 0, "Parquet data should have been written to stream");
48+
}
49+
50+
[TestMethod]
51+
public async Task FormatDataAsync_WithMultipleTableEntities_ShouldHandleTimestamp()
52+
{
53+
// Arrange
54+
var writer = new ParquetFormatWriter();
55+
var logger = new TestLogger();
56+
var config = new ConfigurationBuilder().Build();
57+
58+
// Create multiple TableEntity objects with different timestamps
59+
var entities = new List<IDataItem>();
60+
for (int i = 0; i < 5; i++)
61+
{
62+
var entity = new TableEntity($"partition{i}", $"row{i}");
63+
entity["Name"] = $"Entity{i}";
64+
entity["Value"] = i * 100;
65+
entity.Timestamp = DateTimeOffset.Now.AddMinutes(i);
66+
entities.Add(new AzureTableAPIDataItem(entity, null, null));
67+
}
68+
69+
var dataItems = entities.ToAsyncEnumerable();
70+
using var stream = new MemoryStream();
71+
72+
// Act & Assert
73+
await writer.FormatDataAsync(dataItems, stream, config, logger, CancellationToken.None);
74+
75+
// Verify that data was written
76+
Assert.IsTrue(stream.Length > 0, "Parquet data should have been written to stream");
77+
}
78+
79+
[TestMethod]
80+
public async Task FormatDataAsync_WithDateTimeOffset_ShouldConvertToDateTime()
81+
{
82+
// Arrange
83+
var writer = new ParquetFormatWriter();
84+
var logger = new TestLogger();
85+
var config = new ConfigurationBuilder().Build();
86+
87+
// Use a non-zero offset to ensure UTC conversion is happening
88+
var testTimestamp = new DateTimeOffset(2024, 1, 15, 10, 30, 45, TimeSpan.FromHours(5));
89+
var entity = new TableEntity("partition1", "row1");
90+
entity["Name"] = "TestEntity";
91+
entity.Timestamp = testTimestamp;
92+
93+
var dataItem = new AzureTableAPIDataItem(entity, null, null);
94+
var dataItems = new[] { dataItem }.ToAsyncEnumerable();
95+
96+
using var stream = new MemoryStream();
97+
98+
// Act
99+
await writer.FormatDataAsync(dataItems, stream, config, logger, CancellationToken.None);
100+
101+
// Assert - Verify the Parquet file can be read
102+
stream.Position = 0;
103+
using var parquetReader = await ParquetReader.CreateAsync(stream);
104+
var schema = parquetReader.Schema;
105+
106+
// Find the Timestamp field in the schema
107+
var timestampField = schema.Fields.FirstOrDefault(f => f.Name == "Timestamp");
108+
Assert.IsNotNull(timestampField, "Timestamp field should exist in schema");
109+
110+
// Verify the type is DateTime, not DateTimeOffset
111+
var dataField = timestampField as Parquet.Schema.DataField;
112+
Assert.IsNotNull(dataField, "Timestamp field should be a DataField");
113+
Assert.AreEqual(typeof(DateTime), dataField.ClrType, "Timestamp should be stored as DateTime");
114+
115+
// Read the actual data to verify the value
116+
using var rowGroupReader = parquetReader.OpenRowGroupReader(0);
117+
var timestampColumn = await rowGroupReader.ReadColumnAsync(dataField);
118+
119+
// Verify the value is correct (converted from DateTimeOffset to UTC DateTime)
120+
var timestampValue = timestampColumn.Data.GetValue(0) as DateTime?;
121+
Assert.IsNotNull(timestampValue, "Timestamp value should not be null");
122+
Assert.AreEqual(testTimestamp.UtcDateTime, timestampValue.Value, "Timestamp value should match the original DateTimeOffset.UtcDateTime");
123+
124+
// Verify it's different from the local DateTime (confirming UTC conversion happened)
125+
Assert.AreNotEqual(testTimestamp.DateTime, timestampValue.Value, "Timestamp should be converted to UTC, not preserve local time");
126+
}
127+
}
128+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
using Azure.Data.Tables;
2+
using Cosmos.DataTransfer.AzureTableAPIExtension.Data;
3+
using Cosmos.DataTransfer.Interfaces;
4+
using Microsoft.Extensions.Configuration;
5+
using Microsoft.Extensions.Logging;
6+
using Parquet;
7+
8+
namespace Cosmos.DataTransfer.ParquetExtension.UnitTests
9+
{
10+
[TestClass]
11+
public class RoundtripTests
12+
{
13+
private class TestLogger : ILogger
14+
{
15+
public IDisposable? BeginScope<TState>(TState state) where TState : notnull => null;
16+
public bool IsEnabled(LogLevel logLevel) => true;
17+
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
18+
{
19+
Console.WriteLine(formatter(state, exception));
20+
}
21+
}
22+
23+
[TestMethod]
24+
public async Task Roundtrip_ParquetToTableStorage_DateTimeShouldConvertToDateTimeOffset()
25+
{
26+
// Arrange - Write data with DateTimeOffset to Parquet
27+
var writer = new ParquetFormatWriter();
28+
var logger = new TestLogger();
29+
var config = new ConfigurationBuilder().Build();
30+
31+
var testTimestamp = new DateTimeOffset(2024, 1, 15, 10, 30, 45, TimeSpan.FromHours(5));
32+
var sourceEntity = new TableEntity("partition1", "row1");
33+
sourceEntity["Name"] = "TestEntity";
34+
sourceEntity["Value"] = 100;
35+
sourceEntity.Timestamp = testTimestamp;
36+
37+
var dataItem = new AzureTableAPIDataItem(sourceEntity, null, null);
38+
var dataItems = new[] { dataItem }.ToAsyncEnumerable();
39+
40+
using var stream = new MemoryStream();
41+
42+
// Act 1: Write to Parquet (converts DateTimeOffset to UTC DateTime)
43+
await writer.FormatDataAsync(dataItems, stream, config, logger, CancellationToken.None);
44+
45+
// Act 2: Read from Parquet
46+
stream.Position = 0;
47+
using var parquetReader = await ParquetReader.CreateAsync(stream);
48+
using var rowGroupReader = parquetReader.OpenRowGroupReader(0);
49+
50+
var readData = new Dictionary<string, object?>();
51+
foreach (var field in parquetReader.Schema.GetDataFields())
52+
{
53+
var column = await rowGroupReader.ReadColumnAsync(field);
54+
readData[field.Name] = column.Data.GetValue(0);
55+
}
56+
57+
// Act 3: Convert back to TableEntity (simulating Parquet->TableStorage migration)
58+
var dataItemFromParquet = new ParquetDictionaryDataItem(readData);
59+
var targetEntity = dataItemFromParquet.ToTableEntity(null, null);
60+
61+
// Assert
62+
Assert.IsTrue(readData.ContainsKey("Timestamp"), "Timestamp should be in Parquet data");
63+
64+
var timestampFromParquet = readData["Timestamp"];
65+
Assert.IsNotNull(timestampFromParquet, "Timestamp should not be null");
66+
Assert.IsInstanceOfType(timestampFromParquet, typeof(DateTime), "Parquet stores as DateTime");
67+
68+
var dateTimeValue = (DateTime)timestampFromParquet;
69+
Assert.AreEqual(DateTimeKind.Utc, dateTimeValue.Kind, "DateTime from Parquet should be UTC");
70+
71+
// The key assertion: Can we add this DateTime to a TableEntity?
72+
// TableEntity accepts DateTime values just fine
73+
Assert.IsNotNull(targetEntity, "Should be able to create TableEntity");
74+
Assert.IsTrue(targetEntity.ContainsKey("Timestamp"), "TableEntity should have Timestamp");
75+
76+
var storedValue = targetEntity["Timestamp"];
77+
Assert.IsInstanceOfType(storedValue, typeof(DateTime), "TableEntity stores DateTime as DateTime");
78+
79+
// Verify the value is correct (should match the UTC time)
80+
var storedDateTime = (DateTime)storedValue;
81+
Assert.AreEqual(testTimestamp.UtcDateTime, storedDateTime, "UTC time should be preserved");
82+
}
83+
84+
[TestMethod]
85+
public void DateTime_ImplicitConversion_ToDateTimeOffset()
86+
{
87+
// This test verifies that DateTime can be implicitly converted to DateTimeOffset
88+
// which is what would happen if Azure Table Storage needs to convert it
89+
90+
var utcDateTime = new DateTime(2024, 1, 15, 10, 30, 45, DateTimeKind.Utc);
91+
92+
// Should not throw - DateTime has implicit conversion to DateTimeOffset
93+
DateTimeOffset result = utcDateTime;
94+
95+
Assert.AreEqual(utcDateTime, result.UtcDateTime);
96+
Assert.AreEqual(TimeSpan.Zero, result.Offset);
97+
}
98+
}
99+
}

Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetDataCol.cs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,20 @@ public ParquetDataCol()
3232
public ParquetDataCol(string name, Type coltype)
3333
{
3434
ColumnName = name;
35-
ColumnType = coltype;
36-
if (coltype != System.Type.Missing.GetType())
35+
36+
// Convert DateTimeOffset to DateTime for Parquet compatibility
37+
if (coltype == typeof(DateTimeOffset) || coltype == typeof(DateTimeOffset?))
3738
{
38-
ParquetDataType = MapDataType(name, coltype);
39+
ColumnType = typeof(DateTime);
40+
ParquetDataType = MapDataType(name, typeof(DateTime));
41+
}
42+
else
43+
{
44+
ColumnType = coltype;
45+
if (coltype != System.Type.Missing.GetType())
46+
{
47+
ParquetDataType = MapDataType(name, coltype);
48+
}
3949
}
4050
}
4151

@@ -51,7 +61,15 @@ public void AddColumnValue(long row, object? value)
5161
return;
5262
}
5363

54-
SparseColumnData[row] = value;
64+
// Convert DateTimeOffset to DateTime (UTC) for Parquet compatibility
65+
if (value is DateTimeOffset dateTimeOffset)
66+
{
67+
SparseColumnData[row] = dateTimeOffset.UtcDateTime;
68+
}
69+
else
70+
{
71+
SparseColumnData[row] = value;
72+
}
5573
}
5674
}
5775
}

Extensions/Parquet/Cosmos.DataTransfer.ParquetExtension/ParquetFormatWriter.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,28 @@ private void ProcessColumns(IDataItem item, long row)
7171
}
7272
else if (coltype != Type.Missing.GetType() && current.ColumnType != coltype)
7373
{
74-
if (current != null)
74+
// Handle DateTimeOffset type compatibility with DateTime
75+
var isDateTimeOffsetToDateTime =
76+
(coltype == typeof(DateTimeOffset) || coltype == typeof(DateTimeOffset?)) &&
77+
current.ColumnType == typeof(DateTime);
78+
var isDateTimeToDateTimeOffset =
79+
(current.ColumnType == typeof(DateTimeOffset) || current.ColumnType == typeof(DateTimeOffset?)) &&
80+
coltype == typeof(DateTime);
81+
82+
if (!isDateTimeOffsetToDateTime && !isDateTimeToDateTimeOffset && current != null)
7583
{
7684
current.ColumnType = coltype;
7785
if (coltype != null)
7886
{
79-
current.ParquetDataType = new DataField(col, coltype, true);
87+
// Convert DateTimeOffset to DateTime for Parquet compatibility
88+
if (coltype == typeof(DateTimeOffset) || coltype == typeof(DateTimeOffset?))
89+
{
90+
current.ParquetDataType = new DataField(col, typeof(DateTime), true);
91+
}
92+
else
93+
{
94+
current.ParquetDataType = new DataField(col, coltype, true);
95+
}
8096
}
8197
}
8298
}

Extensions/Parquet/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,17 @@ The Parquet extension provides formatter capabilities for reading from and writi
66
77
> **Note**: When specifying the Parquet extension as the Source or Sink property in configuration, utilize the names listed below.
88
9+
## DateTimeOffset Handling
10+
11+
**Important**: `DateTimeOffset` values are automatically converted to UTC `DateTime` when writing to Parquet format. This conversion is necessary because Parquet.NET dropped support for `DateTimeOffset` in version 4.3.0 due to ambiguity issues (see [Parquet.NET release notes](https://github.com/aloneguid/parquet-dotnet/releases/tag/4.3.0)).
12+
13+
When migrating data with `DateTimeOffset` fields (such as the `Timestamp` property in Azure Table Storage entities):
14+
- The absolute point in time is preserved by converting to UTC
15+
- Timezone offset information is lost during conversion
16+
- All timestamps are stored consistently in UTC format
17+
18+
This behavior enables migrations from sources like Azure Table Storage to Parquet, which would otherwise fail with a `NotSupportedException`.
19+
920
Supported storage sinks:
1021
- File - **Parquet**
1122
- Azure Blob Storage - **Parquet-AzureBlob**

0 commit comments

Comments
 (0)