Skip to content

Commit 216314f

Browse files
authored
Merge pull request #186 from AzureCosmosDB/copilot/fix-180
Add MongoDB Legacy Extension for Wire Version 2 Support
2 parents f7e06c1 + 70c50c5 commit 216314f

17 files changed

+394
-0
lines changed

CosmosDbDataMigrationTool.sln

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Mongo", "Mongo", "{F18E789A
4141
EndProject
4242
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.MongoVectorExtension", "Extensions\Mongo\Cosmos.DataTransfer.MongoVectorExtension\Cosmos.DataTransfer.MongoVectorExtension.csproj", "{F6EAC33B-9F7D-433B-9328-622FB8938C24}"
4343
EndProject
44+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.MongoLegacyExtension", "Extensions\Mongo\Cosmos.DataTransfer.MongoLegacyExtension\Cosmos.DataTransfer.MongoLegacyExtension.csproj", "{EEC6B3F5-68DE-4BA5-B0B8-A91D2FC9E3DE}"
45+
EndProject
4446
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.JsonExtension.UnitTests", "Extensions\Json\Cosmos.DataTransfer.JsonExtension.UnitTests\Cosmos.DataTransfer.JsonExtension.UnitTests.csproj", "{ED1E375E-A5A3-47EA-A7D5-07344C7E152F}"
4547
EndProject
4648
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.CosmosExtension.UnitTests", "Extensions\Cosmos\Cosmos.DataTransfer.CosmosExtension.UnitTests\Cosmos.DataTransfer.CosmosExtension.UnitTests.csproj", "{C7A3910D-A7F6-4767-9A7A-19CFA4CCB7A8}"
@@ -211,6 +213,10 @@ Global
211213
{8C755891-38F4-4155-9A60-51BC6841FA36}.Debug|Any CPU.Build.0 = Debug|Any CPU
212214
{8C755891-38F4-4155-9A60-51BC6841FA36}.Release|Any CPU.ActiveCfg = Release|Any CPU
213215
{8C755891-38F4-4155-9A60-51BC6841FA36}.Release|Any CPU.Build.0 = Release|Any CPU
216+
{EEC6B3F5-68DE-4BA5-B0B8-A91D2FC9E3DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
217+
{EEC6B3F5-68DE-4BA5-B0B8-A91D2FC9E3DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
218+
{EEC6B3F5-68DE-4BA5-B0B8-A91D2FC9E3DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
219+
{EEC6B3F5-68DE-4BA5-B0B8-A91D2FC9E3DE}.Release|Any CPU.Build.0 = Release|Any CPU
214220
EndGlobalSection
215221
GlobalSection(SolutionProperties) = preSolution
216222
HideSolutionNode = FALSE
@@ -246,6 +252,7 @@ Global
246252
{1B927C5F-50FC-42A6-BAF6-B00E6D760543} = {A8A1CEAB-2D82-460C-9B86-74ABD17CD201}
247253
{31BC84E1-55E5-45AA-BFAC-90732F20588B} = {F18E789A-D32D-48D3-B75F-1196D7215F74}
248254
{8C755891-38F4-4155-9A60-51BC6841FA36} = {AC56D40C-6A65-42E5-8881-D126FD080774}
255+
{EEC6B3F5-68DE-4BA5-B0B8-A91D2FC9E3DE} = {F18E789A-D32D-48D3-B75F-1196D7215F74}
249256
EndGlobalSection
250257
GlobalSection(ExtensibilityGlobals) = postSolution
251258
SolutionGuid = {662B3F27-70D8-45E6-A1C0-1438A9C8A542}

Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="8.0.1" />
2828
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
2929
<PackageVersion Include="MongoDB.Driver" Version="2.30.0" />
30+
<PackageVersion Include="mongocsharpdriver" Version="1.11.0" />
3031
<PackageVersion Include="Moq" Version="4.18.4" />
3132
<PackageVersion Include="MSTest.TestAdapter" Version="3.6.2" />
3233
<PackageVersion Include="MSTest.TestFramework" Version="3.6.2" />

ExampleConfigs.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,30 @@
6767
}
6868
```
6969

70+
## MongoDB Legacy (Wire v2) to Cosmos-NoSQL
71+
72+
```json
73+
{
74+
"Source": "MongoDB-Legacy (Wire v2)",
75+
"Sink": "cosmos-nosql",
76+
"SourceSettings": {
77+
"ConnectionString": "******mycluster.documents.azure.com:10255/?ssl=true",
78+
"DatabaseName": "sales",
79+
"Collection": "person"
80+
},
81+
"SinkSettings": {
82+
"ConnectionString": "AccountEndpoint=https://...",
83+
"Database": "users",
84+
"Container": "migrated",
85+
"PartitionKeyPath": "/id",
86+
"ConnectionMode": "Direct",
87+
"WriteMode": "UpsertStream",
88+
"CreatedContainerMaxThroughput": 8000,
89+
"UseAutoscaleForCreatedContainer": false
90+
}
91+
}
92+
```
93+
7094
## SqlServer to AzureTableAPI
7195

7296
```json
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using MongoDB.Bson;
2+
using MongoDB.Driver;
3+
4+
namespace Cosmos.DataTransfer.MongoLegacyExtension;
5+
public class Context
6+
{
7+
private readonly MongoDatabase database = null!;
8+
9+
public Context(string connectionString, string databaseName)
10+
{
11+
var mongoUrl = new MongoUrl(connectionString);
12+
var client = new MongoClient(mongoUrl);
13+
var server = client.GetServer();
14+
database = server.GetDatabase(databaseName);
15+
}
16+
17+
public virtual IRepository<T> GetRepository<T>(string name)
18+
{
19+
return new MongoRepository<T>(database.GetCollection<T>(name));
20+
}
21+
22+
public virtual MongoCollection<T> GetCollection<T>(string name) where T : class
23+
{
24+
return database.GetCollection<T>(name);
25+
}
26+
27+
public virtual async Task<IList<string>> GetCollectionNamesAsync()
28+
{
29+
var names = await Task.Run(() => database.GetCollectionNames().ToList());
30+
return names;
31+
}
32+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
<OutputType>Exe</OutputType>
8+
<RuntimeIdentifiers>win-x64;linux-x64;osx-x64</RuntimeIdentifiers>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" />
13+
<PackageReference Include="mongocsharpdriver" />
14+
<PackageReference Include="System.ComponentModel.Composition" />
15+
</ItemGroup>
16+
17+
<ItemGroup>
18+
<ProjectReference Include="..\..\..\Interfaces\Cosmos.DataTransfer.Interfaces\Cosmos.DataTransfer.Interfaces.csproj" />
19+
</ItemGroup>
20+
21+
<Target Name="PublishToExtensionsFolder"
22+
AfterTargets="Build"
23+
Condition=" '$(Configuration)' == 'Debug' AND '$(PublishingToExtensionsFolder)' != 'true' ">
24+
<MSBuild
25+
Projects="$(MSBuildProjectFile)"
26+
Targets="Publish"
27+
Properties="Configuration=$(Configuration);
28+
PublishProfile=PublishToExtensionsFolder;
29+
BuildProjectReferences=false;
30+
PublishingToExtensionsFolder=true" />
31+
</Target>
32+
33+
</Project>
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System.Linq.Expressions;
2+
3+
namespace Cosmos.DataTransfer.MongoLegacyExtension;
4+
5+
public interface IRepository<TDocument>
6+
{
7+
ValueTask Add(TDocument item);
8+
ValueTask AddRange(IEnumerable<TDocument> items);
9+
ValueTask AddRange(params TDocument[] items);
10+
IQueryable<TDocument> AsQueryable();
11+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using Cosmos.DataTransfer.Interfaces;
2+
using MongoDB.Bson;
3+
4+
namespace Cosmos.DataTransfer.MongoLegacyExtension;
5+
public class MongoDataItem : IDataItem
6+
{
7+
private readonly BsonDocument record;
8+
9+
public MongoDataItem(BsonDocument record)
10+
{
11+
this.record = record;
12+
}
13+
14+
public IEnumerable<string> GetFieldNames()
15+
{
16+
return record.Names;
17+
}
18+
19+
public object? GetValue(string fieldName)
20+
{
21+
if (!record.Contains(fieldName))
22+
return null;
23+
24+
var value = record[fieldName];
25+
return BsonTypeMapper.MapToDotNetValue(value);
26+
}
27+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
using System.ComponentModel.Composition;
2+
using Cosmos.DataTransfer.Interfaces;
3+
using Cosmos.DataTransfer.MongoLegacyExtension.Settings;
4+
using Microsoft.Extensions.Configuration;
5+
using Microsoft.Extensions.Logging;
6+
using MongoDB.Bson;
7+
8+
namespace Cosmos.DataTransfer.MongoLegacyExtension;
9+
[Export(typeof(IDataSinkExtension))]
10+
public class MongoLegacyDataSinkExtension : IDataSinkExtensionWithSettings
11+
{
12+
public string DisplayName => "MongoDB-Legacy (Wire v2)";
13+
14+
public async Task WriteAsync(IAsyncEnumerable<IDataItem> dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default)
15+
{
16+
var settings = config.Get<MongoLegacySinkSettings>();
17+
18+
if (settings == null)
19+
{
20+
logger.LogError("Failed to parse MongoDB Legacy sink settings");
21+
return;
22+
}
23+
24+
if (settings.ConnectionString == null || settings.DatabaseName == null || settings.Collection == null)
25+
{
26+
logger.LogError("MongoDB Legacy sink settings missing required fields");
27+
return;
28+
}
29+
30+
if (!string.IsNullOrEmpty(settings.ConnectionString) && !string.IsNullOrEmpty(settings.DatabaseName) && !string.IsNullOrEmpty(settings.Collection))
31+
{
32+
var context = new Context(settings.ConnectionString, settings.DatabaseName);
33+
var repo = context.GetRepository<BsonDocument>(settings.Collection);
34+
35+
var batchSize = settings.BatchSize ?? 1000;
36+
37+
var objects = new List<BsonDocument>();
38+
int itemCount = 0;
39+
await foreach (var item in dataItems.WithCancellation(cancellationToken))
40+
{
41+
var dict = item.BuildDynamicObjectTree();
42+
objects.Add(new BsonDocument(dict));
43+
itemCount++;
44+
45+
if (objects.Count == batchSize)
46+
{
47+
await repo.AddRange(objects);
48+
logger.LogInformation("Added {ItemCount} items to collection '{Collection}'", itemCount, settings.Collection);
49+
objects.Clear();
50+
}
51+
}
52+
53+
if (objects.Any())
54+
{
55+
await repo.AddRange(objects);
56+
}
57+
58+
if (itemCount > 0)
59+
logger.LogInformation("Added {ItemCount} total items to collection '{Collection}'", itemCount, settings.Collection);
60+
else
61+
logger.LogWarning("No items added to collection '{Collection}'", settings.Collection);
62+
}
63+
}
64+
65+
public IEnumerable<IDataExtensionSettings> GetSettings()
66+
{
67+
yield return new MongoLegacySinkSettings();
68+
}
69+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
using System.ComponentModel.Composition;
2+
using System.Runtime.CompilerServices;
3+
using Cosmos.DataTransfer.Interfaces;
4+
using Cosmos.DataTransfer.MongoLegacyExtension.Settings;
5+
using Microsoft.Extensions.Configuration;
6+
using Microsoft.Extensions.Logging;
7+
using MongoDB.Bson;
8+
9+
namespace Cosmos.DataTransfer.MongoLegacyExtension;
10+
[Export(typeof(IDataSourceExtension))]
11+
internal class MongoLegacyDataSourceExtension : IDataSourceExtensionWithSettings
12+
{
13+
public string DisplayName => "MongoDB-Legacy (Wire v2)";
14+
15+
public async IAsyncEnumerable<IDataItem> ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default)
16+
{
17+
var settings = config.Get<MongoLegacySourceSettings>();
18+
19+
if (settings == null)
20+
{
21+
logger.LogError("Failed to parse MongoDB Legacy source settings");
22+
yield break;
23+
}
24+
25+
if (settings.ConnectionString == null || settings.DatabaseName == null)
26+
{
27+
logger.LogError("MongoDB Legacy source settings missing required fields");
28+
yield break;
29+
}
30+
31+
if (!string.IsNullOrEmpty(settings.ConnectionString) && !string.IsNullOrEmpty(settings.DatabaseName))
32+
{
33+
var context = new Context(settings.ConnectionString, settings.DatabaseName);
34+
35+
var collectionNames = !string.IsNullOrEmpty(settings.Collection)
36+
? new List<string> { settings.Collection }
37+
: await context.GetCollectionNamesAsync();
38+
39+
foreach (var collection in collectionNames)
40+
{
41+
await foreach (var item in EnumerateCollectionAsync(context, collection, logger).WithCancellation(cancellationToken))
42+
{
43+
yield return item;
44+
}
45+
}
46+
}
47+
}
48+
49+
public async IAsyncEnumerable<IDataItem> EnumerateCollectionAsync(Context context, string collectionName, ILogger logger)
50+
{
51+
logger.LogInformation("Reading collection '{Collection}' using legacy MongoDB driver", collectionName);
52+
var collection = context.GetRepository<BsonDocument>(collectionName);
53+
int itemCount = 0;
54+
foreach (var record in await Task.Run(() => collection.AsQueryable()))
55+
{
56+
yield return new MongoDataItem(record);
57+
itemCount++;
58+
}
59+
if (itemCount > 0)
60+
logger.LogInformation("Read {ItemCount} items from collection '{Collection}'", itemCount, collectionName);
61+
else
62+
logger.LogWarning("No items read from collection '{Collection}'", collectionName);
63+
}
64+
65+
public IEnumerable<IDataExtensionSettings> GetSettings()
66+
{
67+
yield return new MongoLegacySourceSettings();
68+
}
69+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System.Linq.Expressions;
2+
using MongoDB.Driver;
3+
using System.Linq;
4+
5+
namespace Cosmos.DataTransfer.MongoLegacyExtension;
6+
7+
public class MongoRepository<TDocument> : IRepository<TDocument>
8+
{
9+
private readonly MongoCollection<TDocument> collection;
10+
11+
public MongoRepository(MongoCollection<TDocument> collection)
12+
{
13+
this.collection = collection;
14+
}
15+
16+
public async ValueTask Add(TDocument item)
17+
{
18+
await Task.Run(() => collection.Insert(item));
19+
}
20+
21+
public async ValueTask AddRange(IEnumerable<TDocument> items)
22+
{
23+
await Task.Run(() => collection.InsertBatch(items));
24+
}
25+
26+
public async ValueTask AddRange(params TDocument[] items)
27+
{
28+
await Task.Run(() => collection.InsertBatch(items));
29+
}
30+
31+
public IQueryable<TDocument> AsQueryable()
32+
{
33+
return collection.FindAll().AsQueryable();
34+
}
35+
}

0 commit comments

Comments
 (0)