Skip to content

Commit dd15711

Browse files
authored
Merge pull request #200 from AzureCosmosDB/copilot/fix-198
Add MongoDB query filtering support to enable document filtering during migration
2 parents 31d75aa + 3b507fd commit dd15711

File tree

13 files changed

+364
-5
lines changed

13 files changed

+364
-5
lines changed

CosmosDbDataMigrationTool.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Interfaces", "Interfaces",
115115
EndProject
116116
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cosmos.DataTransfer.Common.UnitTests", "Interfaces\Cosmos.DataTransfer.Common.UnitTests\Cosmos.DataTransfer.Common.UnitTests.csproj", "{8C755891-38F4-4155-9A60-51BC6841FA36}"
117117
EndProject
118+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cosmos.DataTransfer.MongoExtension.UnitTests", "Extensions\Mongo\Cosmos.DataTransfer.MongoExtension.UnitTests\Cosmos.DataTransfer.MongoExtension.UnitTests.csproj", "{3139E1AA-189A-4E62-8358-0507F503CEFF}"
119+
EndProject
118120
Global
119121
GlobalSection(SolutionConfigurationPlatforms) = preSolution
120122
Debug|Any CPU = Debug|Any CPU
@@ -217,6 +219,10 @@ Global
217219
{EEC6B3F5-68DE-4BA5-B0B8-A91D2FC9E3DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
218220
{EEC6B3F5-68DE-4BA5-B0B8-A91D2FC9E3DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
219221
{EEC6B3F5-68DE-4BA5-B0B8-A91D2FC9E3DE}.Release|Any CPU.Build.0 = Release|Any CPU
222+
{3139E1AA-189A-4E62-8358-0507F503CEFF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
223+
{3139E1AA-189A-4E62-8358-0507F503CEFF}.Debug|Any CPU.Build.0 = Debug|Any CPU
224+
{3139E1AA-189A-4E62-8358-0507F503CEFF}.Release|Any CPU.ActiveCfg = Release|Any CPU
225+
{3139E1AA-189A-4E62-8358-0507F503CEFF}.Release|Any CPU.Build.0 = Release|Any CPU
220226
EndGlobalSection
221227
GlobalSection(SolutionProperties) = preSolution
222228
HideSolutionNode = FALSE
@@ -253,6 +259,7 @@ Global
253259
{31BC84E1-55E5-45AA-BFAC-90732F20588B} = {F18E789A-D32D-48D3-B75F-1196D7215F74}
254260
{8C755891-38F4-4155-9A60-51BC6841FA36} = {AC56D40C-6A65-42E5-8881-D126FD080774}
255261
{EEC6B3F5-68DE-4BA5-B0B8-A91D2FC9E3DE} = {F18E789A-D32D-48D3-B75F-1196D7215F74}
262+
{3139E1AA-189A-4E62-8358-0507F503CEFF} = {F18E789A-D32D-48D3-B75F-1196D7215F74}
256263
EndGlobalSection
257264
GlobalSection(ExtensibilityGlobals) = postSolution
258265
SolutionGuid = {662B3F27-70D8-45E6-A1C0-1438A9C8A542}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
8+
<IsPackable>false</IsPackable>
9+
<CollectCoverage>true</CollectCoverage>
10+
<CoverletOutputFormat>cobertura</CoverletOutputFormat>
11+
<CoverletOutput>../Cosmos.DataTransfer.MongoExtension/coverage.cobertura.xml</CoverletOutput>
12+
</PropertyGroup>
13+
14+
<ItemGroup>
15+
<PackageReference Include="Microsoft.Extensions.Configuration" />
16+
<PackageReference Include="Microsoft.NET.Test.Sdk" />
17+
<PackageReference Include="MSTest.TestAdapter" />
18+
<PackageReference Include="MSTest.TestFramework" />
19+
<PackageReference Include="Newtonsoft.Json" />
20+
<PackageReference Include="Moq" />
21+
<PackageReference Include="MongoDB.Driver" />
22+
<PackageReference Include="coverlet.collector">
23+
<PrivateAssets>all</PrivateAssets>
24+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
25+
</PackageReference>
26+
</ItemGroup>
27+
28+
<ItemGroup>
29+
<ProjectReference Include="..\Cosmos.DataTransfer.MongoExtension\Cosmos.DataTransfer.MongoExtension.csproj" />
30+
</ItemGroup>
31+
32+
<ItemGroup>
33+
<None Update="Data\*.json">
34+
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
35+
</None>
36+
</ItemGroup>
37+
38+
</Project>
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"timestamp":{"$gte":"2025-01-01","$lt":"2025-02-01"}}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"status":"active","type":"user"}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
using Cosmos.DataTransfer.MongoExtension.Settings;
2+
using Microsoft.Extensions.Configuration;
3+
using Microsoft.Extensions.Logging;
4+
using Moq;
5+
using MongoDB.Bson;
6+
7+
namespace Cosmos.DataTransfer.MongoExtension.UnitTests;
8+
9+
[TestClass]
10+
public class MongoDataSourceExtensionTests
11+
{
12+
private Mock<ILogger> _mockLogger;
13+
private MongoDataSourceExtension _extension;
14+
15+
[TestInitialize]
16+
public void Setup()
17+
{
18+
_mockLogger = new Mock<ILogger>();
19+
_extension = new MongoDataSourceExtension();
20+
}
21+
22+
[TestMethod]
23+
public void MongoDataSourceExtension_ShouldHaveDisplayName()
24+
{
25+
// Assert
26+
Assert.AreEqual("MongoDB", _extension.DisplayName);
27+
}
28+
29+
[TestMethod]
30+
public void MongoDataSourceExtension_ShouldReturnSettings()
31+
{
32+
// Act
33+
var settings = _extension.GetSettings();
34+
35+
// Assert
36+
Assert.IsNotNull(settings);
37+
Assert.IsTrue(settings.Any());
38+
Assert.IsInstanceOfType(settings.First(), typeof(MongoSourceSettings));
39+
}
40+
41+
[TestMethod]
42+
public void BsonDocumentParsing_ShouldWorkWithValidJson()
43+
{
44+
// Arrange
45+
var jsonQuery = """{"status": "active", "type": "user"}""";
46+
47+
// Act & Assert - This should not throw
48+
var bsonDoc = BsonDocument.Parse(jsonQuery);
49+
Assert.IsNotNull(bsonDoc);
50+
Assert.AreEqual("active", bsonDoc["status"].AsString);
51+
Assert.AreEqual("user", bsonDoc["type"].AsString);
52+
}
53+
54+
[TestMethod]
55+
public void BsonDocumentParsing_ShouldWorkWithComplexQuery()
56+
{
57+
// Arrange
58+
var jsonQuery = """{"timestamp":{"$gte":"2025-01-01","$lt":"2025-02-01"}}""";
59+
60+
// Act & Assert - This should not throw
61+
var bsonDoc = BsonDocument.Parse(jsonQuery);
62+
Assert.IsNotNull(bsonDoc);
63+
Assert.IsTrue(bsonDoc.Contains("timestamp"));
64+
65+
var timestampFilter = bsonDoc["timestamp"].AsBsonDocument;
66+
Assert.AreEqual("2025-01-01", timestampFilter["$gte"].AsString);
67+
Assert.AreEqual("2025-02-01", timestampFilter["$lt"].AsString);
68+
}
69+
70+
[TestMethod]
71+
[ExpectedException(typeof(FormatException))]
72+
public void BsonDocumentParsing_ShouldThrowOnInvalidJson()
73+
{
74+
// Arrange
75+
var invalidJsonQuery = """{"status": "active", "type": }"""; // Invalid JSON
76+
77+
// Act & Assert - This should throw
78+
BsonDocument.Parse(invalidJsonQuery);
79+
}
80+
81+
[TestMethod]
82+
public void QueryFileReading_ShouldWorkWithValidFile()
83+
{
84+
// Arrange
85+
var testFilePath = Path.Combine("Data", "simple-query.json");
86+
87+
// Act & Assert - File should exist and be readable
88+
Assert.IsTrue(File.Exists(testFilePath));
89+
var content = File.ReadAllText(testFilePath);
90+
Assert.IsFalse(string.IsNullOrWhiteSpace(content));
91+
92+
// Should be valid JSON that can be parsed to BSON
93+
var bsonDoc = BsonDocument.Parse(content);
94+
Assert.IsNotNull(bsonDoc);
95+
}
96+
97+
[TestMethod]
98+
public void QueryFileReading_ShouldWorkWithComplexQueryFile()
99+
{
100+
// Arrange
101+
var testFilePath = Path.Combine("Data", "date-range-query.json");
102+
103+
// Act & Assert - File should exist and be readable
104+
Assert.IsTrue(File.Exists(testFilePath));
105+
var content = File.ReadAllText(testFilePath);
106+
Assert.IsFalse(string.IsNullOrWhiteSpace(content));
107+
108+
// Should be valid JSON that can be parsed to BSON
109+
var bsonDoc = BsonDocument.Parse(content);
110+
Assert.IsNotNull(bsonDoc);
111+
Assert.IsTrue(bsonDoc.Contains("timestamp"));
112+
}
113+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
using Cosmos.DataTransfer.MongoExtension.Settings;
2+
3+
namespace Cosmos.DataTransfer.MongoExtension.UnitTests;
4+
5+
[TestClass]
6+
public class MongoSourceSettingsTests
7+
{
8+
[TestMethod]
9+
public void MongoSourceSettings_ShouldHaveQueryProperty()
10+
{
11+
// Arrange & Act
12+
var settings = new MongoSourceSettings();
13+
14+
// Assert
15+
Assert.IsNull(settings.Query);
16+
}
17+
18+
[TestMethod]
19+
public void MongoSourceSettings_ShouldAllowQueryToBeSet()
20+
{
21+
// Arrange
22+
var settings = new MongoSourceSettings();
23+
var testQuery = """{"status": "active"}""";
24+
25+
// Act
26+
settings.Query = testQuery;
27+
28+
// Assert
29+
Assert.AreEqual(testQuery, settings.Query);
30+
}
31+
32+
[TestMethod]
33+
public void MongoSourceSettings_ShouldAllowNullQuery()
34+
{
35+
// Arrange
36+
var settings = new MongoSourceSettings();
37+
38+
// Act
39+
settings.Query = null;
40+
41+
// Assert
42+
Assert.IsNull(settings.Query);
43+
}
44+
45+
[TestMethod]
46+
public void MongoSourceSettings_ShouldAllowEmptyQuery()
47+
{
48+
// Arrange
49+
var settings = new MongoSourceSettings();
50+
51+
// Act
52+
settings.Query = string.Empty;
53+
54+
// Assert
55+
Assert.AreEqual(string.Empty, settings.Query);
56+
}
57+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
global using Microsoft.VisualStudio.TestTools.UnitTesting;

Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
<OutputType>Exe</OutputType>
88
</PropertyGroup>
99

10+
<ItemGroup>
11+
<AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleTo">
12+
<_Parameter1>Cosmos.DataTransfer.MongoExtension.UnitTests</_Parameter1>
13+
</AssemblyAttribute>
14+
</ItemGroup>
15+
1016
<ItemGroup>
1117
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
1218
<PackageReference Include="MongoDB.Driver" />

Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/IRepository.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Linq.Expressions;
2+
using MongoDB.Driver;
23

34
namespace Cosmos.DataTransfer.MongoExtension;
45

@@ -11,4 +12,5 @@ public interface IRepository<TDocument>
1112
ValueTask Remove(Expression<Func<TDocument, bool>> filter);
1213
ValueTask RemoveRange(Expression<Func<TDocument, bool>> filter);
1314
IQueryable<TDocument> AsQueryable();
15+
IAsyncEnumerable<TDocument> FindAsync(FilterDefinition<TDocument> filter);
1416
}

Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Microsoft.Extensions.Configuration;
66
using Microsoft.Extensions.Logging;
77
using MongoDB.Bson;
8+
using MongoDB.Driver;
89

910
namespace Cosmos.DataTransfer.MongoExtension;
1011
[Export(typeof(IDataSourceExtension))]
@@ -27,30 +28,97 @@ public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogge
2728

2829
foreach (var collection in collectionNames)
2930
{
30-
await foreach (var item in EnumerateCollectionAsync(context, collection, logger).WithCancellation(cancellationToken))
31+
await foreach (var item in EnumerateCollectionAsync(context, collection, settings.Query, logger).WithCancellation(cancellationToken))
3132
{
3233
yield return item;
3334
}
3435
}
3536
}
3637
}
3738

38-
public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context context, string collectionName, ILogger logger)
39+
public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context context, string collectionName, string? query, ILogger logger)
3940
{
4041
logger.LogInformation("Reading collection '{Collection}'", collectionName);
4142
var collection = context.GetRepository<BsonDocument>(collectionName);
4243
int itemCount = 0;
43-
foreach (var record in await Task.Run(() => collection.AsQueryable()))
44+
45+
IAsyncEnumerable<BsonDocument> documents;
46+
47+
if (!string.IsNullOrWhiteSpace(query))
48+
{
49+
logger.LogInformation("Applying query filter to collection '{Collection}': {Query}", collectionName, query);
50+
documents = GetQueryDocumentsAsync(collection, query, collectionName, logger);
51+
}
52+
else
53+
{
54+
logger.LogInformation("No query filter specified for collection '{Collection}', reading all documents", collectionName);
55+
// Use existing queryable approach when no filter is specified
56+
documents = GetAllDocumentsAsync(collection);
57+
}
58+
59+
await foreach (var record in documents)
4460
{
4561
yield return new MongoDataItem(record);
4662
itemCount++;
4763
}
64+
4865
if (itemCount > 0)
4966
logger.LogInformation("Read {ItemCount} items from collection '{Collection}'", itemCount, collectionName);
5067
else
5168
logger.LogWarning("No items read from collection '{Collection}'", collectionName);
5269
}
5370

71+
private async IAsyncEnumerable<BsonDocument> GetAllDocumentsAsync(IRepository<BsonDocument> collection)
72+
{
73+
foreach (var record in await Task.Run(() => collection.AsQueryable()))
74+
{
75+
yield return record;
76+
}
77+
}
78+
79+
private async IAsyncEnumerable<BsonDocument> GetQueryDocumentsAsync(IRepository<BsonDocument> collection, string query, string collectionName, ILogger logger)
80+
{
81+
// Handle query as either a file path or direct JSON
82+
string queryJson;
83+
try
84+
{
85+
if (File.Exists(query))
86+
{
87+
logger.LogInformation("Reading query from file: {QueryFile}", query);
88+
queryJson = await File.ReadAllTextAsync(query);
89+
}
90+
else
91+
{
92+
logger.LogInformation("Treating query input as direct JSON string (file does not exist): {Query}", query);
93+
queryJson = query;
94+
}
95+
}
96+
catch (Exception ex)
97+
{
98+
logger.LogError(ex, "Error reading query for collection '{Collection}': {Query}", collectionName, query);
99+
throw;
100+
}
101+
102+
// Parse JSON to BsonDocument and create filter
103+
BsonDocument filterDocument;
104+
try
105+
{
106+
filterDocument = BsonDocument.Parse(queryJson);
107+
}
108+
catch (Exception ex)
109+
{
110+
logger.LogError(ex, "Error parsing query JSON for collection '{Collection}': {Query}", collectionName, queryJson);
111+
throw;
112+
}
113+
114+
var filter = new BsonDocumentFilterDefinition<BsonDocument>(filterDocument);
115+
116+
await foreach (var record in collection.FindAsync(filter))
117+
{
118+
yield return record;
119+
}
120+
}
121+
54122
public IEnumerable<IDataExtensionSettings> GetSettings()
55123
{
56124
yield return new MongoSourceSettings();

0 commit comments

Comments
 (0)