Skip to content

Commit 71f4b0e

Browse files
authored
Merge pull request #141 from fonsecamar/main
Mongo Source CSFLE support and Cosmos NoSQL Always Encrypted support
2 parents df9f09d + 86e7fd3 commit 71f4b0e

File tree

14 files changed

+147
-20
lines changed

14 files changed

+147
-20
lines changed

Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
</PropertyGroup>
2020

2121
<ItemGroup>
22-
<PackageReference Include="Azure.Core" Version="1.36.0" />
23-
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.2.0" />
22+
<PackageReference Include="Azure.Core" Version="1.38.0" />
23+
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.2.2" />
2424
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="6.0.1" />
2525
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
2626
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" />

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/Cosmos.DataTransfer.CosmosExtension.csproj

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<PackageReference Include="Azure.Identity" Version="1.10.3" />
11+
<PackageReference Include="Azure.Identity" Version="1.11.4" />
12+
<PackageReference Include="Azure.Security.KeyVault.Keys" Version="4.6.0" />
1213
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.34.0" />
14+
<PackageReference Include="Microsoft.Azure.Cosmos.Encryption" Version="2.0.3" />
1315
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
1416
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
1517
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="6.0.0" />

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSinkExtension.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Text;
66
using Cosmos.DataTransfer.Interfaces;
77
using Microsoft.Azure.Cosmos;
8+
using Microsoft.Azure.Cosmos.Encryption;
89
using Microsoft.Extensions.Configuration;
910
using Microsoft.Extensions.Logging;
1011
using Newtonsoft.Json;
@@ -28,7 +29,14 @@ public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfigurati
2829
Container? container;
2930
if (settings.UseRbacAuth)
3031
{
31-
container = client.GetContainer(settings.Database, settings.Container);
32+
if (settings.InitClientEncryption)
33+
{
34+
container = await client.GetContainer(settings.Database, settings.Container).InitializeEncryptionAsync();
35+
}
36+
else
37+
{
38+
container = client.GetContainer(settings.Database, settings.Container);
39+
}
3240
}
3341
else
3442
{
@@ -94,7 +102,7 @@ void ReportCount(int i)
94102
}
95103

96104
var convertedObjects = dataItems
97-
.Select(di => di.BuildDynamicObjectTree(requireStringId: true, ignoreNullValues: settings.IgnoreNullValues, preserveMixedCaseIds: settings.PreserveMixedCaseIds))
105+
.Select(di => di.BuildDynamicObjectTree(requireStringId: true, ignoreNullValues: settings.IgnoreNullValues, preserveMixedCaseIds: settings.PreserveMixedCaseIds, transformations: settings.Transformations))
98106
.Where(o => o != null)
99107
.OfType<ExpandoObject>();
100108
var batches = convertedObjects.Buffer(settings.BatchSize);

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosDataSourceExtension.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Runtime.CompilerServices;
33
using Cosmos.DataTransfer.Interfaces;
44
using Microsoft.Azure.Cosmos;
5+
using Microsoft.Azure.Cosmos.Encryption;
56
using Microsoft.Extensions.Configuration;
67
using Microsoft.Extensions.Logging;
78

@@ -19,8 +20,17 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
1920

2021
var client = CosmosExtensionServices.CreateClient(settings, DisplayName);
2122

22-
var container = client.GetContainer(settings.Database, settings.Container);
23+
Container container;
2324

25+
if(settings.InitClientEncryption)
26+
{
27+
container = await client.GetContainer(settings.Database, settings.Container).InitializeEncryptionAsync();
28+
}
29+
else
30+
{
31+
container = client.GetContainer(settings.Database, settings.Container);
32+
}
33+
2434
await CosmosExtensionServices.VerifyContainerAccess(container, settings.Container, logger, cancellationToken);
2535

2636
var requestOptions = new QueryRequestOptions();

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosExtensionServices.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
using System.Reflection;
77
using Azure.Core;
88
using System.Text.RegularExpressions;
9+
using Microsoft.Azure.Cosmos.Encryption;
10+
using Azure.Security.KeyVault.Keys.Cryptography;
911
using System.Net;
1012

1113
namespace Cosmos.DataTransfer.CosmosExtension
@@ -41,7 +43,17 @@ public static CosmosClient CreateClient(CosmosSettingsBase settings, string disp
4143
if (settings.UseRbacAuth)
4244
{
4345
TokenCredential tokenCredential = new DefaultAzureCredential(includeInteractiveCredentials: settings.EnableInteractiveCredentials);
44-
cosmosClient = new CosmosClient(settings.AccountEndpoint, tokenCredential, clientOptions);
46+
47+
if(settings.InitClientEncryption)
48+
{
49+
var keyResolver = new KeyResolver(tokenCredential);
50+
cosmosClient = new CosmosClient(settings.AccountEndpoint, tokenCredential, clientOptions)
51+
.WithEncryption(keyResolver, KeyEncryptionKeyResolverName.AzureKeyVault);
52+
}
53+
else
54+
{
55+
cosmosClient = new CosmosClient(settings.AccountEndpoint, tokenCredential, clientOptions);
56+
}
4557
}
4658
else
4759
{

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSettingsBase.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public abstract class CosmosSettingsBase : IValidatableObject
1515
public bool UseRbacAuth { get; set; }
1616
public string? AccountEndpoint { get; set; }
1717
public bool EnableInteractiveCredentials { get; set; } = true;
18+
public bool InitClientEncryption { get; set; } = false;
1819

1920
public virtual IEnumerable<ValidationResult> Validate(ValidationContext validationContext)
2021
{
@@ -26,6 +27,10 @@ public virtual IEnumerable<ValidationResult> Validate(ValidationContext validati
2627
{
2728
yield return new ValidationResult("AccountEndpoint must be specified when UseRbacAuth is true", new[] { nameof(AccountEndpoint) });
2829
}
30+
if (!UseRbacAuth && InitClientEncryption)
31+
{
32+
yield return new ValidationResult("InitClientEncryption can only be used when UseRbacAuth is true", new[] { nameof(InitClientEncryption) });
33+
}
2934
}
3035
}
3136
}

Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/CosmosSinkSettings.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public class CosmosSinkSettings : CosmosSettingsBase, IDataExtensionSettings
1818
public DataWriteMode WriteMode { get; set; } = DataWriteMode.Insert;
1919
public bool IgnoreNullValues { get; set; } = false;
2020
public List<string>? PartitionKeyPaths { get; set; }
21+
public Dictionary<string, DataItemTransformation>? Transformations { get; set; }
2122

2223
public override IEnumerable<ValidationResult> Validate(ValidationContext validationContext)
2324
{
@@ -58,6 +59,11 @@ public override IEnumerable<ValidationResult> Validate(ValidationContext validat
5859
{
5960
yield return new ValidationResult("PartitionKeyPath must be specified when WriteMode is set to InsertStream or UpsertStream", new[] { nameof(PartitionKeyPath), nameof(PartitionKeyPaths), nameof(WriteMode) });
6061
}
62+
63+
if (HasInvalidTransformations())
64+
{
65+
yield return new ValidationResult("Transformations must always specify SourceFieldName and either DestinationFieldName or DestinationFieldType", new[] { nameof(Transformations) });
66+
}
6167
}
6268

6369
private bool MissingPartitionKeys()
@@ -70,5 +76,11 @@ private bool MissingPartitionKeys()
7076

7177
return true;
7278
}
79+
80+
private bool HasInvalidTransformations()
81+
{
82+
return Transformations != null &&
83+
Transformations.Any(t => string.IsNullOrEmpty(t.Value.DestinationFieldName) && string.IsNullOrEmpty(t.Value.DestinationFieldTypeCode));
84+
}
7385
}
7486
}

Extensions/Cosmos/README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ Source and sink settings also both require parameters to specify the data locati
1616

1717
Source supports an optional `IncludeMetadataFields` parameter (`false` by default) to enable inclusion of built-in Cosmos fields prefixed with `"_"`, for example `"_etag"` and `"_ts"`. An optional PartitionKeyValue setting allows for filtering to a single partition. The optional Query setting allows further filtering using a Cosmos SQL statement. An optional `WebProxy` parameter (`null` by default) enables connections through a proxy.
1818

19+
### Always Encrypted
20+
21+
Source and Sink support Always Encrypted as an optional parameter. When `InitClientEncryption` is set to `true`, the extension will initialize the Cosmos client with the Always Encrypted feature enabled. This allows for the use of encrypted fields in the Cosmos DB container. The extension will automatically decrypt the fields when reading from the source and encrypt the fields when writing to the sink.
22+
</br>
23+
The extension will also automatically handle the encryption keys and encryption policy for the client, but it requires `UseRbacAuth` to be set to `true` and the user to have the necessary permissions to access the key vault.
24+
</br>
25+
> **Note**: To use Always Encrypted, Cosmos DB container must be pre-configured with the necessary encryption policy and the user must have the necessary permissions to access the key vault.
26+
1927
### Source
2028

2129
```json
@@ -42,6 +50,7 @@ Or with RBAC:
4250
"IncludeMetadataFields": false,
4351
"PartitionKeyValue":"123",
4452
"Query":"SELECT * FROM c WHERE c.category='event'",
53+
"InitClientEncryption": false
4554
"WebProxy":"http://yourproxy.server.com/"
4655
}
4756
```
@@ -67,6 +76,7 @@ Sink requires an additional `PartitionKeyPath` parameter which is used when crea
6776
"PreserveMixedCaseIds": false,
6877
"IgnoreNullValues": false,
6978
"IsServerlessAccount": false,
70-
"UseSharedThroughput": false
79+
"UseSharedThroughput": false,
80+
"InitClientEncryption": false
7181
}
7282
```

Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Context.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
using MongoDB.Bson;
22
using MongoDB.Driver;
33
using MongoDB.Driver.Core.Events;
4+
using MongoDB.Driver.Encryption;
45

56
namespace Cosmos.DataTransfer.MongoExtension;
67
public class Context
78
{
89
private readonly IMongoDatabase database = null!;
910

10-
public Context(string connectionString, string databaseName)
11+
public Context(string connectionString, string databaseName,
12+
string? keyVaultNamespace = null, Dictionary<string, IReadOnlyDictionary<string, object>>? kmsProviders = null)
1113
{
1214
var mongoConnectionUrl = new MongoUrl(connectionString);
1315
var mongoClientSettings = MongoClientSettings.FromUrl(mongoConnectionUrl);
@@ -16,6 +18,16 @@ public Context(string connectionString, string databaseName)
1618
System.Diagnostics.Debug.WriteLine($"{e.CommandName} - {e.Command.ToJson()}");
1719
});
1820
};
21+
22+
if(!string.IsNullOrEmpty(keyVaultNamespace) && kmsProviders?.Count != 0)
23+
{
24+
var autoEncryptionOptions = new AutoEncryptionOptions(
25+
keyVaultNamespace: CollectionNamespace.FromFullName(keyVaultNamespace),
26+
kmsProviders: kmsProviders,
27+
bypassAutoEncryption: true);
28+
mongoClientSettings.AutoEncryptionOptions = autoEncryptionOptions;
29+
}
30+
1931
var client = new MongoClient(mongoClientSettings);
2032
database = client.GetDatabase(databaseName);
2133
}

Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
1919

2020
if (!string.IsNullOrEmpty(settings.ConnectionString) && !string.IsNullOrEmpty(settings.DatabaseName))
2121
{
22-
var context = new Context(settings.ConnectionString, settings.DatabaseName);
22+
var context = new Context(settings.ConnectionString, settings.DatabaseName, settings.KeyVaultNamespace, settings.KMSProviders);
2323

2424
var collectionNames = !string.IsNullOrEmpty(settings.Collection)
2525
? new List<string> { settings.Collection }
@@ -40,7 +40,7 @@ public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context contex
4040
logger.LogInformation("Reading collection '{Collection}'", collectionName);
4141
var collection = context.GetRepository<BsonDocument>(collectionName);
4242
int itemCount = 0;
43-
foreach (var record in collection.AsQueryable())
43+
foreach (var record in await Task.Run(() => collection.AsQueryable()))
4444
{
4545
yield return new MongoDataItem(record);
4646
itemCount++;

0 commit comments

Comments
 (0)