Skip to content

Commit 5ca1721

Browse files
authored
Introduce IngestionChunkWriter build on top of MEVD (#6951)
1 parent acd0cdb commit 5ca1721

File tree

12 files changed

+502
-0
lines changed

12 files changed

+502
-0
lines changed

eng/Versions.props

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,5 +168,8 @@
168168
Starting with 3.0.0, xunit.runner.visualstudio only supports net472, but we target net462
169169
-->
170170
<XUnitRunnerVisualStudioVersion>2.8.2</XUnitRunnerVisualStudioVersion>
171+
<!-- MEVD is still part of the Semantic Kernel repo -->
172+
<MicrosoftExtensionsVectorDataAbstractionsVersion>9.7.0</MicrosoftExtensionsVectorDataAbstractionsVersion>
173+
<MicrosoftSemanticKernelConnectorsVersion>1.66.0-preview</MicrosoftSemanticKernelConnectorsVersion>
171174
</PropertyGroup>
172175
</Project>

eng/packages/General.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="$(MicrosoftCodeAnalysisVersion)" />
1414
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="$(MicrosoftCodeAnalysisVersion)" />
1515
<PackageVersion Include="Microsoft.CodeAnalysis" Version="$(MicrosoftCodeAnalysisVersion)" />
16+
<PackageVersion Include="Microsoft.Extensions.VectorData.Abstractions" Version="$(MicrosoftExtensionsVectorDataAbstractionsVersion)" />
1617
<PackageVersion Include="Microsoft.IO.RecyclableMemoryStream" Version="3.0.0" />
1718
<PackageVersion Include="Microsoft.ML.Tokenizers" Version="$(MicrosoftMLTokenizersVersion)" />
1819
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />

eng/packages/TestOnly.props

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
<PackageVersion Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="9.0.0" />
1313
<PackageVersion Include="Microsoft.Extensions.Configuration.UserSecrets" Version="9.0.0" />
1414
<PackageVersion Include="Microsoft.ML.Tokenizers.Data.O200kBase" Version="$(MicrosoftMLTokenizersVersion)" />
15+
<PackageVersion Include="Microsoft.SemanticKernel.Connectors.InMemory" Version="$(MicrosoftSemanticKernelConnectorsVersion)" />
16+
<PackageVersion Include="Microsoft.SemanticKernel.Connectors.SqliteVec" Version="$(MicrosoftSemanticKernelConnectorsVersion)" />
1517
<PackageVersion Include="Microsoft.TemplateEngine.Authoring.TemplateVerifier" Version="9.0.201" />
1618
<PackageVersion Include="Microsoft.TemplateEngine.TestHelper" Version="9.0.200-rtm.25066.4" />
1719
<PackageVersion Include="Moq.AutoMock" Version="3.1.0" />

src/Libraries/Microsoft.Extensions.DataIngestion/Microsoft.Extensions.DataIngestion.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,8 @@
1717
<ProjectReference Include="..\Microsoft.Extensions.DataIngestion.Abstractions\Microsoft.Extensions.DataIngestion.Abstractions.csproj" />
1818
</ItemGroup>
1919

20+
<ItemGroup>
21+
<PackageReference Include="Microsoft.Extensions.VectorData.Abstractions" />
22+
</ItemGroup>
23+
2024
</Project>
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Microsoft.Extensions.VectorData;
9+
using Microsoft.Shared.Diagnostics;
10+
11+
namespace Microsoft.Extensions.DataIngestion;
12+
13+
/// <summary>
14+
/// Writes chunks to the <see cref="VectorStore"/> using the default schema.
15+
/// </summary>
16+
/// <typeparam name="T">The type of the chunk content.</typeparam>
17+
public sealed class VectorStoreWriter<T> : IngestionChunkWriter<T>
18+
{
19+
// The names are lowercase with no special characters to ensure compatibility with various vector stores.
20+
private const string KeyName = "key";
21+
private const string EmbeddingName = "embedding";
22+
private const string ContentName = "content";
23+
private const string ContextName = "context";
24+
private const string DocumentIdName = "documentid";
25+
26+
private readonly VectorStore _vectorStore;
27+
private readonly int _dimensionCount;
28+
private readonly VectorStoreWriterOptions _options;
29+
private readonly bool _keysAreStrings;
30+
31+
private VectorStoreCollection<object, Dictionary<string, object?>>? _vectorStoreCollection;
32+
33+
/// <summary>
34+
/// Initializes a new instance of the <see cref="VectorStoreWriter{T}"/> class.
35+
/// </summary>
36+
/// <param name="vectorStore">The <see cref="VectorStore"/> to use to store the <see cref="IngestionChunk{T}"/> instances.</param>
37+
/// <param name="dimensionCount">The number of dimensions that the vector has. This value is required when creating collections.</param>
38+
/// <param name="options">The options for the vector store writer.</param>
39+
/// <exception cref="ArgumentNullException">When <paramref name="vectorStore"/> is null.</exception>
40+
/// <exception cref="ArgumentOutOfRangeException">When <paramref name="dimensionCount"/> is less or equal zero.</exception>
41+
public VectorStoreWriter(VectorStore vectorStore, int dimensionCount, VectorStoreWriterOptions? options = default)
42+
{
43+
_vectorStore = Throw.IfNull(vectorStore);
44+
_dimensionCount = Throw.IfLessThanOrEqual(dimensionCount, 0);
45+
_options = options ?? new VectorStoreWriterOptions();
46+
47+
// Not all vector store support string as the key type, examples:
48+
// Qdrant: https://github.com/microsoft/semantic-kernel/blob/28ea2f4df872e8fd03ef0792ebc9e1989b4be0ee/dotnet/src/VectorData/Qdrant/QdrantCollection.cs#L104
49+
// When https://github.com/microsoft/semantic-kernel/issues/13141 gets released,
50+
// we need to remove this workaround.
51+
_keysAreStrings = vectorStore.GetType().Name != "QdrantVectorStore";
52+
}
53+
54+
/// <summary>
55+
/// Gets the underlying <see cref="VectorStoreCollection{TKey,TRecord}"/> used to store the chunks.
56+
/// </summary>
57+
/// <remarks>
58+
/// The collection is initialized when <see cref="WriteAsync(IAsyncEnumerable{IngestionChunk{T}}, CancellationToken)"/> is called for the first time.
59+
/// </remarks>
60+
/// <exception cref="InvalidOperationException">The collection has not been initialized yet.
61+
/// Call <see cref="WriteAsync(IAsyncEnumerable{IngestionChunk{T}}, CancellationToken)"/> first.</exception>
62+
public VectorStoreCollection<object, Dictionary<string, object?>> VectorStoreCollection
63+
=> _vectorStoreCollection ?? throw new InvalidOperationException("The collection has not been initialized yet. Call WriteAsync first.");
64+
65+
/// <inheritdoc/>
66+
public override async Task WriteAsync(IAsyncEnumerable<IngestionChunk<T>> chunks, CancellationToken cancellationToken = default)
67+
{
68+
_ = Throw.IfNull(chunks);
69+
70+
IReadOnlyList<object>? preExistingKeys = null;
71+
await foreach (IngestionChunk<T> chunk in chunks.WithCancellation(cancellationToken))
72+
{
73+
if (_vectorStoreCollection is null)
74+
{
75+
_vectorStoreCollection = _vectorStore.GetDynamicCollection(_options.CollectionName, GetVectorStoreRecordDefinition(chunk));
76+
77+
await _vectorStoreCollection.EnsureCollectionExistsAsync(cancellationToken).ConfigureAwait(false);
78+
}
79+
80+
// We obtain the IDs of the pre-existing chunks for given document,
81+
// and delete them after we finish inserting the new chunks,
82+
// to avoid a situation where we delete the chunks and then fail to insert the new ones.
83+
preExistingKeys ??= await GetPreExistingChunksIdsAsync(chunk.Document, cancellationToken).ConfigureAwait(false);
84+
85+
var key = Guid.NewGuid();
86+
Dictionary<string, object?> record = new()
87+
{
88+
[KeyName] = _keysAreStrings ? key.ToString() : key,
89+
[ContentName] = chunk.Content,
90+
[EmbeddingName] = chunk.Content,
91+
[ContextName] = chunk.Context,
92+
[DocumentIdName] = chunk.Document.Identifier,
93+
};
94+
95+
if (chunk.HasMetadata)
96+
{
97+
foreach (var metadata in chunk.Metadata)
98+
{
99+
record[metadata.Key] = metadata.Value;
100+
}
101+
}
102+
103+
await _vectorStoreCollection.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
104+
}
105+
106+
if (preExistingKeys?.Count > 0)
107+
{
108+
await _vectorStoreCollection!.DeleteAsync(preExistingKeys, cancellationToken).ConfigureAwait(false);
109+
}
110+
}
111+
112+
/// <inheritdoc/>
113+
protected override void Dispose(bool disposing)
114+
{
115+
try
116+
{
117+
_vectorStoreCollection?.Dispose();
118+
}
119+
finally
120+
{
121+
_vectorStore.Dispose();
122+
base.Dispose(disposing);
123+
}
124+
}
125+
126+
private VectorStoreCollectionDefinition GetVectorStoreRecordDefinition(IngestionChunk<T> representativeChunk)
127+
{
128+
VectorStoreCollectionDefinition definition = new()
129+
{
130+
Properties =
131+
{
132+
new VectorStoreKeyProperty(KeyName, _keysAreStrings ? typeof(string) : typeof(Guid)),
133+
134+
// By using T as the type here we allow the vector store
135+
// to handle the conversion from T to the actual vector type it supports.
136+
new VectorStoreVectorProperty(EmbeddingName, typeof(T), _dimensionCount)
137+
{
138+
DistanceFunction = _options.DistanceFunction,
139+
IndexKind = _options.IndexKind
140+
},
141+
new VectorStoreDataProperty(ContentName, typeof(T)),
142+
new VectorStoreDataProperty(ContextName, typeof(string)),
143+
new VectorStoreDataProperty(DocumentIdName, typeof(string))
144+
{
145+
IsIndexed = true
146+
}
147+
}
148+
};
149+
150+
if (representativeChunk.HasMetadata)
151+
{
152+
foreach (var metadata in representativeChunk.Metadata)
153+
{
154+
Type propertyType = metadata.Value.GetType();
155+
definition.Properties.Add(new VectorStoreDataProperty(metadata.Key, propertyType)
156+
{
157+
// We use lowercase storage names to ensure compatibility with various vector stores.
158+
#pragma warning disable CA1308 // Normalize strings to uppercase
159+
StorageName = metadata.Key.ToLowerInvariant()
160+
#pragma warning restore CA1308 // Normalize strings to uppercase
161+
162+
// We could consider indexing for certain keys like classification etc. but for now we leave it as non-indexed.
163+
// The reason is that not every DB supports it, moreover we would need to expose the ability to configure it.
164+
});
165+
}
166+
}
167+
168+
return definition;
169+
}
170+
171+
private async Task<IReadOnlyList<object>> GetPreExistingChunksIdsAsync(IngestionDocument document, CancellationToken cancellationToken)
172+
{
173+
if (!_options.IncrementalIngestion)
174+
{
175+
return [];
176+
}
177+
178+
// Each Vector Store has a different max top count limit, so we use low value and loop.
179+
const int MaxTopCount = 1_000;
180+
181+
List<object> keys = [];
182+
int insertedCount;
183+
do
184+
{
185+
insertedCount = 0;
186+
187+
await foreach (var record in _vectorStoreCollection!.GetAsync(
188+
filter: record => (string)record[DocumentIdName]! == document.Identifier,
189+
top: MaxTopCount,
190+
cancellationToken: cancellationToken).ConfigureAwait(false))
191+
{
192+
keys.Add(record[KeyName]!);
193+
insertedCount++;
194+
}
195+
}
196+
while (insertedCount == MaxTopCount);
197+
198+
return keys;
199+
}
200+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using Microsoft.Shared.Diagnostics;
5+
6+
namespace Microsoft.Extensions.DataIngestion;
7+
8+
/// <summary>
9+
/// Represents options for the <see cref="VectorStoreWriter{T}"/>.
10+
/// </summary>
11+
public sealed class VectorStoreWriterOptions
12+
{
13+
/// <summary>
14+
/// Gets or sets the name of the collection. When not provided, "chunks" will be used.
15+
/// </summary>
16+
public string CollectionName
17+
{
18+
get => field ?? "chunks";
19+
set => field = Throw.IfNullOrEmpty(value);
20+
}
21+
22+
/// <summary>
23+
/// Gets or sets the distance function to use when creating the collection.
24+
/// </summary>
25+
/// <remarks>
26+
/// When not provided, the default specific to given database will be used. Check <see cref="VectorData.DistanceFunction"/> for available values.
27+
/// </remarks>
28+
public string? DistanceFunction { get; set; }
29+
30+
/// <summary>
31+
/// Gets or sets the index kind to use when creating the collection.
32+
/// </summary>
33+
public string? IndexKind { get; set; }
34+
35+
/// <summary>
36+
/// Gets or sets a value indicating whether to perform incremental ingestion.
37+
/// </summary>
38+
/// <remarks>
39+
/// When enabled, the writer will delete the chunks for the given document after inserting the new ones.
40+
/// Effectively the ingestion will "replace" the existing chunks for the document with the new ones.
41+
/// </remarks>
42+
public bool IncrementalIngestion { get; set; } = true;
43+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using Microsoft.Extensions.VectorData;
5+
using Microsoft.SemanticKernel.Connectors.InMemory;
6+
7+
namespace Microsoft.Extensions.DataIngestion.Tests;
8+
9+
public class InMemoryVectorStoreWriterTests : VectorStoreWriterTests
10+
{
11+
protected override VectorStore CreateVectorStore(TestEmbeddingGenerator<string> testEmbeddingGenerator)
12+
=> new InMemoryVectorStore(new() { EmbeddingGenerator = testEmbeddingGenerator });
13+
}

test/Libraries/Microsoft.Extensions.DataIngestion.Tests/Microsoft.Extensions.DataIngestion.Tests.csproj

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,23 @@
55
<NoWarn>$(NoWarn);S3967</NoWarn>
66
<!-- Project reference can be removed -->
77
<NoWarn>$(NoWarn);RT0002</NoWarn>
8+
9+
<!-- https://github.com/dotnet/efcore/issues/33472#issuecomment-2042146335 -->
10+
<PlatformTarget Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">x64</PlatformTarget>
811
</PropertyGroup>
912

1013
<ItemGroup>
1114
<ProjectReference Include="..\..\..\src\Libraries\Microsoft.Extensions.DataIngestion\Microsoft.Extensions.DataIngestion.csproj" />
1215
</ItemGroup>
1316

17+
<ItemGroup>
18+
<PackageReference Include="Microsoft.SemanticKernel.Connectors.InMemory" />
19+
<PackageReference Include="Microsoft.SemanticKernel.Connectors.SqliteVec" />
20+
</ItemGroup>
21+
22+
<ItemGroup>
23+
<!-- We don't run Sqlite tests on Full Framework due to some native dependency issues -->
24+
<Compile Remove="SqliteVectorStoreWriterTests.cs" Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'" />
25+
</ItemGroup>
26+
1427
</Project>
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System;
5+
using System.IO;
6+
using Microsoft.Extensions.VectorData;
7+
using Microsoft.SemanticKernel.Connectors.SqliteVec;
8+
9+
namespace Microsoft.Extensions.DataIngestion.Tests;
10+
11+
public sealed class SqliteVectorStoreWriterTests : VectorStoreWriterTests, IDisposable
12+
{
13+
private readonly string _tempFile = Path.GetTempFileName();
14+
15+
public void Dispose() => File.Delete(_tempFile);
16+
17+
protected override VectorStore CreateVectorStore(TestEmbeddingGenerator<string> testEmbeddingGenerator)
18+
=> new SqliteVectorStore($"Data Source={_tempFile};Pooling=false", new() { EmbeddingGenerator = testEmbeddingGenerator });
19+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Threading.Tasks;
7+
8+
namespace Microsoft.Extensions.DataIngestion;
9+
10+
// Once .NET 10 is shipped, we are going to switch to System.Linq.AsyncEnumerable.
11+
internal static class IAsyncEnumerableExtensions
12+
{
13+
internal static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<T> source)
14+
{
15+
foreach (T item in source)
16+
{
17+
await Task.Yield();
18+
yield return item;
19+
}
20+
}
21+
22+
internal static async ValueTask<int> CountAsync<T>(this IAsyncEnumerable<T> source)
23+
{
24+
int count = 0;
25+
await foreach (T _ in source)
26+
{
27+
count++;
28+
}
29+
30+
return count;
31+
}
32+
33+
internal static async ValueTask<T> SingleAsync<T>(this IAsyncEnumerable<T> source)
34+
{
35+
bool found = false;
36+
T result = default!;
37+
await foreach (T item in source)
38+
{
39+
if (found)
40+
{
41+
throw new InvalidOperationException();
42+
}
43+
44+
result = item;
45+
found = true;
46+
}
47+
48+
return found
49+
? result
50+
: throw new InvalidOperationException();
51+
}
52+
}

0 commit comments

Comments
 (0)