Skip to content

Commit 31d75aa

Browse files
authored
Merge pull request #194 from AzureCosmosDB/copilot/fix-193
Add ReplaceIfExists setting for Azure Table API upsert functionality
2 parents 3dadaac + d2e1abb commit 31d75aa

File tree

7 files changed

+171
-6
lines changed

7 files changed

+171
-6
lines changed

Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
2222
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="8.0.0" />
2323
<PackageVersion Include="Microsoft.Extensions.Configuration.Binder" Version="8.0.2" />
24+
<PackageVersion Include="Microsoft.Extensions.Configuration.Json" Version="8.0.0" />
2425
<PackageVersion Include="Microsoft.Extensions.Configuration.UserSecrets" Version="8.0.1" />
2526
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
2627
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" />
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
using Microsoft.VisualStudio.TestTools.UnitTesting;
2+
using Cosmos.DataTransfer.AzureTableAPIExtension.Settings;
3+
using Microsoft.Extensions.Configuration;
4+
using System.Text.Json;
5+
6+
namespace Cosmos.DataTransfer.AzureTableAPIExtension.UnitTests
7+
{
8+
[TestClass]
9+
public class AzureTableAPIDataSinkExtensionTests
10+
{
11+
[TestMethod]
12+
public void AzureTableAPIDataSinkSettings_WriteMode_DefaultsToCreate()
13+
{
14+
var settings = new AzureTableAPIDataSinkSettings();
15+
16+
Assert.AreEqual(EntityWriteMode.Create, settings.WriteMode ?? EntityWriteMode.Replace, "WriteMode should default to Create");
17+
}
18+
19+
[TestMethod]
20+
public void AzureTableAPIDataSinkSettings_WriteMode_CanBeSetToCreate()
21+
{
22+
var settings = new AzureTableAPIDataSinkSettings()
23+
{
24+
WriteMode = EntityWriteMode.Create
25+
};
26+
27+
Assert.AreEqual(EntityWriteMode.Create, settings.WriteMode ?? EntityWriteMode.Replace, "WriteMode should be settable to Create");
28+
}
29+
30+
[TestMethod]
31+
public void AzureTableAPIDataSinkSettings_WriteMode_CanBeSetToReplace()
32+
{
33+
var settings = new AzureTableAPIDataSinkSettings()
34+
{
35+
WriteMode = EntityWriteMode.Replace
36+
};
37+
38+
Assert.AreEqual(EntityWriteMode.Replace, settings.WriteMode ?? EntityWriteMode.Create, "WriteMode should be settable to Replace");
39+
}
40+
41+
[TestMethod]
42+
public void AzureTableAPIDataSinkSettings_WriteMode_CanBeSetToMerge()
43+
{
44+
var settings = new AzureTableAPIDataSinkSettings()
45+
{
46+
WriteMode = EntityWriteMode.Merge
47+
};
48+
49+
Assert.AreEqual(EntityWriteMode.Merge, settings.WriteMode ?? EntityWriteMode.Create, "WriteMode should be settable to Merge");
50+
}
51+
52+
[TestMethod]
53+
public void AzureTableAPIDataSinkSettings_WriteMode_CanBeNull()
54+
{
55+
var settings = new AzureTableAPIDataSinkSettings()
56+
{
57+
WriteMode = null
58+
};
59+
60+
Assert.IsNull(settings.WriteMode, "WriteMode should be settable to null");
61+
}
62+
63+
[TestMethod]
64+
public void AzureTableAPIDataSinkSettings_WriteMode_JsonSerializationSupported()
65+
{
66+
// Test JSON to enum conversion for Create
67+
var jsonCreate = """{"WriteMode": "Create"}""";
68+
var configCreate = new ConfigurationBuilder()
69+
.AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(jsonCreate)))
70+
.Build();
71+
var settingsCreate = configCreate.Get<AzureTableAPIDataSinkSettings>();
72+
Assert.AreEqual(EntityWriteMode.Create, settingsCreate?.WriteMode, "WriteMode should be deserialized from JSON string 'Create'");
73+
74+
// Test JSON to enum conversion for Replace
75+
var jsonReplace = """{"WriteMode": "Replace"}""";
76+
var configReplace = new ConfigurationBuilder()
77+
.AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(jsonReplace)))
78+
.Build();
79+
var settingsReplace = configReplace.Get<AzureTableAPIDataSinkSettings>();
80+
Assert.AreEqual(EntityWriteMode.Replace, settingsReplace?.WriteMode, "WriteMode should be deserialized from JSON string 'Replace'");
81+
82+
// Test JSON to enum conversion for Merge
83+
var jsonMerge = """{"WriteMode": "Merge"}""";
84+
var configMerge = new ConfigurationBuilder()
85+
.AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(jsonMerge)))
86+
.Build();
87+
var settingsMerge = configMerge.Get<AzureTableAPIDataSinkSettings>();
88+
Assert.AreEqual(EntityWriteMode.Merge, settingsMerge?.WriteMode, "WriteMode should be deserialized from JSON string 'Merge'");
89+
}
90+
}
91+
}

Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension.UnitTests/Cosmos.DataTransfer.AzureTableAPIExtension.UnitTests.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010

1111
<ItemGroup>
1212
<PackageReference Include="Azure.Data.Tables" />
13+
<PackageReference Include="Microsoft.Extensions.Configuration" />
14+
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
15+
<PackageReference Include="Microsoft.Extensions.Configuration.Json" />
1316
<PackageReference Include="Microsoft.NET.Test.Sdk" />
1417
<PackageReference Include="MSTest.TestAdapter" />
1518
<PackageReference Include="MSTest.TestFramework" />

Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/AzureTableAPIDataSinkExtension.cs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfigurati
4848
await tableClient.CreateIfNotExistsAsync().ConfigureAwait(false);
4949

5050
var maxConcurrency = settings.MaxConcurrentEntityWrites ?? 10;
51+
var writeMode = settings.WriteMode ?? EntityWriteMode.Create;
5152

5253
logger.LogInformation("Writing data to Azure Table Storage with a maximum of {MaxConcurrency} concurrent writes.", maxConcurrency);
5354

@@ -60,7 +61,7 @@ await Parallel.ForEachAsync<IDataItem>(dataItems,
6061
try
6162
{
6263
var entity = item.ToTableEntity(settings.PartitionKeyFieldName, settings.RowKeyFieldName);
63-
await AddEntityWithRetryAsync(tableClient, entity, ct);
64+
await AddEntityWithRetryAsync(tableClient, entity, writeMode, ct);
6465
}
6566
catch (Exception ex)
6667
{
@@ -81,17 +82,31 @@ public IEnumerable<IDataExtensionSettings> GetSettings()
8182
/// </summary>
8283
/// <param name="tableClient"></param>
8384
/// <param name="entity"></param>
85+
/// <param name="writeMode"></param>
8486
/// <param name="cancellationToken"></param>
8587
/// <returns></returns>
86-
private static async Task AddEntityWithRetryAsync(TableClient tableClient, TableEntity entity, CancellationToken cancellationToken)
88+
private static async Task AddEntityWithRetryAsync(TableClient tableClient, TableEntity entity, EntityWriteMode writeMode, CancellationToken cancellationToken)
8789
{
8890
var retryPolicy = Policy
8991
.Handle<RequestFailedException>(ex => TransientStatusCodes.Contains(ex.Status))
9092
.WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
9193

9294
await retryPolicy.ExecuteAsync(async () =>
9395
{
94-
await tableClient.AddEntityAsync(entity, cancellationToken);
96+
switch (writeMode)
97+
{
98+
case EntityWriteMode.Create:
99+
await tableClient.AddEntityAsync(entity, cancellationToken);
100+
break;
101+
case EntityWriteMode.Replace:
102+
await tableClient.UpsertEntityAsync(entity, TableUpdateMode.Replace, cancellationToken);
103+
break;
104+
case EntityWriteMode.Merge:
105+
await tableClient.UpsertEntityAsync(entity, TableUpdateMode.Merge, cancellationToken);
106+
break;
107+
default:
108+
throw new ArgumentOutOfRangeException(nameof(writeMode), writeMode, "Unsupported EntityWriteMode");
109+
}
95110
});
96111
}
97112
}

Extensions/AzureTableAPI/Cosmos.DataTransfer.AzureTableAPIExtension/Settings/AzureTableAPIDataSinkSettings.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,13 @@ public class AzureTableAPIDataSinkSettings : AzureTableAPISettingsBase
77
/// This setting is used to control the number of concurrent writes to the Azure Table API.
88
/// </summary>
99
public int? MaxConcurrentEntityWrites { get; set; }
10+
11+
/// <summary>
12+
/// Specifies the behavior when writing entities to the table.
13+
/// Create: Adds new entities only, fails if entity already exists (default).
14+
/// Replace: Upserts entities, completely replacing existing ones.
15+
/// Merge: Upserts entities, merging properties with existing ones.
16+
/// </summary>
17+
public EntityWriteMode? WriteMode { get; set; } = EntityWriteMode.Create;
1018
}
1119
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
namespace Cosmos.DataTransfer.AzureTableAPIExtension.Settings
2+
{
3+
/// <summary>
4+
/// Defines the behavior when writing entities to Azure Table API.
5+
/// </summary>
6+
public enum EntityWriteMode
7+
{
8+
/// <summary>
9+
/// Creates new entities only. Fails if an entity with the same partition key and row key already exists.
10+
/// Uses AddEntityAsync method.
11+
/// </summary>
12+
Create,
13+
14+
/// <summary>
15+
/// Replaces existing entities or creates new ones. Completely replaces the entity in the table.
16+
/// Uses UpsertEntityAsync with TableUpdateMode.Replace.
17+
/// </summary>
18+
Replace,
19+
20+
/// <summary>
21+
/// Merges with existing entities or creates new ones. Merges the properties of the supplied entity with the entity in the table.
22+
/// Uses UpsertEntityAsync with TableUpdateMode.Merge.
23+
/// </summary>
24+
Merge
25+
}
26+
}

Extensions/AzureTableAPI/README.md

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,15 @@ The following setting is supported for the Source:
3030

3131
### Additional Sink Settings
3232

33-
The AzureTableAPI Source extension has an additional setting that can be configured for writing Table entities.
33+
The AzureTableAPI Sink extension has additional settings that can be configured for writing Table entities.
3434

35-
The following setting is supported for the Sink:
35+
The following settings are supported for the Sink:
3636

3737
- `MaxConcurrentEntityWrites` - The Maximum number of concurrent entity writes to the Azure Table API. This setting is used to control the number of concurrent writes to the Azure Table API.
38+
- `WriteMode` - Specifies the behavior when writing entities to the table. Options are:
39+
- `Create` (default): Creates new entities only. Fails if an entity with the same partition key and row key already exists.
40+
- `Replace`: Upserts entities, completely replacing existing ones if they exist.
41+
- `Merge`: Upserts entities, merging properties with existing entities if they exist.
3842

3943
## Authentication Methods
4044

@@ -47,6 +51,7 @@ The AzureTableAPI extension supports two authentication methods for connecting t
4751
4852
### Example RBAC Settings
4953

54+
**Source Example (RBAC)**
5055
```json
5156
{
5257
"UseRbacAuth": true,
@@ -59,6 +64,20 @@ The AzureTableAPI extension supports two authentication methods for connecting t
5964
}
6065
```
6166

67+
**Sink Example (RBAC)**
68+
```json
69+
{
70+
"UseRbacAuth": true,
71+
"AccountEndpoint": "https://<storage-account-name>.table.core.windows.net",
72+
"Table": "DestinationTable1",
73+
"PartitionKeyFieldName": "State",
74+
"RowKeyFieldName": "id",
75+
"EnableInteractiveCredentials": true,
76+
"WriteMode": "Merge",
77+
"MaxConcurrentEntityWrites": 10
78+
}
79+
```
80+
6281
### Example ConnectionString Source and Sink Settings Usage
6382

6483
The following are a couple example `settings.json` files for configuring the AzureTableAPI Source and Sink extensions.
@@ -82,6 +101,8 @@ The following are a couple example `settings.json` files for configuring the Azu
82101
"ConnectionString": "DefaultEndpointsProtocol=https;AccountName=<storage-account-name>;AccountKey=<key>;EndpointSuffix=core.windows.net",
83102
"Table": "DestinationTable1",
84103
"PartitionKeyFieldName": "State",
85-
"RowKeyFieldName": "id"
104+
"RowKeyFieldName": "id",
105+
"WriteMode": "Replace",
106+
"MaxConcurrentEntityWrites": 5
86107
}
87108
```

0 commit comments

Comments
 (0)