-
Notifications
You must be signed in to change notification settings - Fork 841
Introduce IngestionChunkWriter build on top of MEVD #6951
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
6a80ab5
add MEVD dependency
adamsitnik 8508492
move code as is
adamsitnik 0ae48c5
solve the warnings
adamsitnik 96e9704
add tests
adamsitnik 11c225b
fix the build 1/n
adamsitnik 906a5c9
fix the build 2/n
adamsitnik a926479
address code review feedback
adamsitnik File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
200 changes: 200 additions & 0 deletions
200
src/Libraries/Microsoft.Extensions.DataIngestion/Writers/VectorStoreWriter.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,200 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Microsoft.Extensions.VectorData; | ||
| using Microsoft.Shared.Diagnostics; | ||
|
|
||
| namespace Microsoft.Extensions.DataIngestion; | ||
|
|
||
| /// <summary> | ||
| /// Writes chunks to the <see cref="VectorStore"/> using the default schema. | ||
| /// </summary> | ||
| /// <typeparam name="T">The type of the chunk content.</typeparam> | ||
| public sealed class VectorStoreWriter<T> : IngestionChunkWriter<T> | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| { | ||
| // The names are lowercase with no special characters to ensure compatibility with various vector stores. | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private const string KeyName = "key"; | ||
| private const string EmbeddingName = "embedding"; | ||
| private const string ContentName = "content"; | ||
| private const string ContextName = "context"; | ||
| private const string DocumentIdName = "documentid"; | ||
|
|
||
| private readonly VectorStore _vectorStore; | ||
| private readonly int _dimensionCount; | ||
| private readonly VectorStoreWriterOptions _options; | ||
| private readonly bool _keysAreStrings; | ||
|
|
||
| private VectorStoreCollection<object, Dictionary<string, object?>>? _vectorStoreCollection; | ||
|
|
||
| /// <summary> | ||
| /// Initializes a new instance of the <see cref="VectorStoreWriter{T}"/> class. | ||
| /// </summary> | ||
| /// <param name="vectorStore">The <see cref="VectorStore"/> to use to store the <see cref="IngestionChunk{T}"/> instances.</param> | ||
| /// <param name="dimensionCount">The number of dimensions that the vector has. This value is required when creating collections.</param> | ||
| /// <param name="options">The options for the vector store writer.</param> | ||
| /// <exception cref="ArgumentNullException">When <paramref name="vectorStore"/> is null.</exception> | ||
| /// <exception cref="ArgumentOutOfRangeException">When <paramref name="dimensionCount"/> is less or equal zero.</exception> | ||
| public VectorStoreWriter(VectorStore vectorStore, int dimensionCount, VectorStoreWriterOptions? options = default) | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| { | ||
| _vectorStore = Throw.IfNull(vectorStore); | ||
| _dimensionCount = Throw.IfLessThanOrEqual(dimensionCount, 0); | ||
| _options = options ?? new VectorStoreWriterOptions(); | ||
|
|
||
| // Not all vector store support string as the key type, examples: | ||
| // Qdrant: https://github.com/microsoft/semantic-kernel/blob/28ea2f4df872e8fd03ef0792ebc9e1989b4be0ee/dotnet/src/VectorData/Qdrant/QdrantCollection.cs#L104 | ||
| // When https://github.com/microsoft/semantic-kernel/issues/13141 gets released, | ||
| // we need to remove this workaround. | ||
| _keysAreStrings = vectorStore.GetType().Name != "QdrantVectorStore"; | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets the underlying <see cref="VectorStoreCollection{TKey,TRecord}"/> used to store the chunks. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// The collection is initialized when <see cref="WriteAsync(IAsyncEnumerable{IngestionChunk{T}}, CancellationToken)"/> is called for the first time. | ||
| /// </remarks> | ||
| /// <exception cref="InvalidOperationException">The collection has not been initialized yet. | ||
| /// Call <see cref="WriteAsync(IAsyncEnumerable{IngestionChunk{T}}, CancellationToken)"/> first.</exception> | ||
| public VectorStoreCollection<object, Dictionary<string, object?>> VectorStoreCollection | ||
| => _vectorStoreCollection ?? throw new InvalidOperationException("The collection has not been initialized yet. Call WriteAsync first."); | ||
|
|
||
| /// <inheritdoc/> | ||
| public override async Task WriteAsync(IAsyncEnumerable<IngestionChunk<T>> chunks, CancellationToken cancellationToken = default) | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| { | ||
| _ = Throw.IfNull(chunks); | ||
|
|
||
| IReadOnlyList<object>? preExistingKeys = null; | ||
| await foreach (IngestionChunk<T> chunk in chunks.WithCancellation(cancellationToken)) | ||
| { | ||
| if (_vectorStoreCollection is null) | ||
| { | ||
| _vectorStoreCollection = _vectorStore.GetDynamicCollection(_options.CollectionName, GetVectorStoreRecordDefinition(chunk)); | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| await _vectorStoreCollection.EnsureCollectionExistsAsync(cancellationToken).ConfigureAwait(false); | ||
| } | ||
|
|
||
| // We obtain the IDs of the pre-existing chunks for given document, | ||
| // and delete them after we finish inserting the new chunks, | ||
| // to avoid a situation where we delete the chunks and then fail to insert the new ones. | ||
| preExistingKeys ??= await GetPreExistingChunksIdsAsync(chunk.Document, cancellationToken).ConfigureAwait(false); | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| var key = Guid.NewGuid(); | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Dictionary<string, object?> record = new() | ||
| { | ||
| [KeyName] = _keysAreStrings ? key.ToString() : key, | ||
| [ContentName] = chunk.Content, | ||
| [EmbeddingName] = chunk.Content, | ||
| [ContextName] = chunk.Context, | ||
| [DocumentIdName] = chunk.Document.Identifier, | ||
| }; | ||
|
|
||
| if (chunk.HasMetadata) | ||
| { | ||
| foreach (var metadata in chunk.Metadata) | ||
| { | ||
| record[metadata.Key] = metadata.Value; | ||
| } | ||
| } | ||
|
|
||
| await _vectorStoreCollection.UpsertAsync(record, cancellationToken).ConfigureAwait(false); | ||
| } | ||
|
|
||
| if (preExistingKeys?.Count > 0) | ||
| { | ||
| await _vectorStoreCollection!.DeleteAsync(preExistingKeys, cancellationToken).ConfigureAwait(false); | ||
| } | ||
| } | ||
|
|
||
| /// <inheritdoc/> | ||
| protected override void Dispose(bool disposing) | ||
| { | ||
| try | ||
| { | ||
| _vectorStoreCollection?.Dispose(); | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| finally | ||
| { | ||
| _vectorStore.Dispose(); | ||
| base.Dispose(disposing); | ||
| } | ||
| } | ||
|
|
||
| private VectorStoreCollectionDefinition GetVectorStoreRecordDefinition(IngestionChunk<T> representativeChunk) | ||
| { | ||
| VectorStoreCollectionDefinition definition = new() | ||
| { | ||
| Properties = | ||
| { | ||
| new VectorStoreKeyProperty(KeyName, _keysAreStrings ? typeof(string) : typeof(Guid)), | ||
|
|
||
| // By using T as the type here we allow the vector store | ||
| // to handle the conversion from T to the actual vector type it supports. | ||
| new VectorStoreVectorProperty(EmbeddingName, typeof(T), _dimensionCount) | ||
roji marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| { | ||
| DistanceFunction = _options.DistanceFunction, | ||
| IndexKind = _options.IndexKind | ||
| }, | ||
| new VectorStoreDataProperty(ContentName, typeof(T)), | ||
| new VectorStoreDataProperty(ContextName, typeof(string)), | ||
| new VectorStoreDataProperty(DocumentIdName, typeof(string)) | ||
| { | ||
| IsIndexed = true | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| if (representativeChunk.HasMetadata) | ||
| { | ||
| foreach (var metadata in representativeChunk.Metadata) | ||
| { | ||
| Type propertyType = metadata.Value.GetType(); | ||
| definition.Properties.Add(new VectorStoreDataProperty(metadata.Key, propertyType) | ||
| { | ||
| // We use lowercase storage names to ensure compatibility with various vector stores. | ||
| #pragma warning disable CA1308 // Normalize strings to uppercase | ||
| StorageName = metadata.Key.ToLowerInvariant() | ||
| #pragma warning restore CA1308 // Normalize strings to uppercase | ||
|
|
||
| // We could consider indexing for certain keys like classification etc. but for now we leave it as non-indexed. | ||
| // The reason is that not every DB supports it, moreover we would need to expose the ability to configure it. | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| return definition; | ||
| } | ||
|
|
||
| private async Task<IReadOnlyList<object>> GetPreExistingChunksIdsAsync(IngestionDocument document, CancellationToken cancellationToken) | ||
| { | ||
| if (!_options.IncrementalIngestion) | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| { | ||
| return []; | ||
| } | ||
|
|
||
| // Each Vector Store has a different max top count limit, so we use low value and loop. | ||
| const int MaxTopCount = 1_000; | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| List<object> keys = []; | ||
| int insertedCount; | ||
| do | ||
| { | ||
| insertedCount = 0; | ||
|
|
||
| await foreach (var record in _vectorStoreCollection!.GetAsync( | ||
| filter: record => (string)record[DocumentIdName]! == document.Identifier, | ||
| top: MaxTopCount, | ||
| cancellationToken: cancellationToken).ConfigureAwait(false)) | ||
| { | ||
| keys.Add(record[KeyName]!); | ||
| insertedCount++; | ||
| } | ||
| } | ||
| while (insertedCount == MaxTopCount); | ||
|
|
||
| return keys; | ||
| } | ||
| } | ||
43 changes: 43 additions & 0 deletions
43
src/Libraries/Microsoft.Extensions.DataIngestion/Writers/VectorStoreWriterOptions.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| using Microsoft.Shared.Diagnostics; | ||
|
|
||
| namespace Microsoft.Extensions.DataIngestion; | ||
|
|
||
| /// <summary> | ||
| /// Represents options for the <see cref="VectorStoreWriter{T}"/>. | ||
| /// </summary> | ||
| public sealed class VectorStoreWriterOptions | ||
| { | ||
| /// <summary> | ||
| /// Gets or sets the name of the collection. When not provided, "chunks" will be used. | ||
| /// </summary> | ||
| public string CollectionName | ||
| { | ||
| get => field ?? "chunks"; | ||
| set => field = Throw.IfNullOrEmpty(value); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Gets or sets the distance function to use when creating the collection. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// When not provided, the default specific to given database will be used. Check <see cref="VectorData.DistanceFunction"/> for available values. | ||
| /// </remarks> | ||
| public string? DistanceFunction { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// Gets or sets the index kind to use when creating the collection. | ||
| /// </summary> | ||
| public string? IndexKind { get; set; } | ||
|
|
||
| /// <summary> | ||
| /// Gets or sets a value indicating whether to perform incremental ingestion. | ||
| /// </summary> | ||
| /// <remarks> | ||
| /// When enabled, the writer will delete the chunks for the given document after inserting the new ones. | ||
| /// Effectively the ingestion will "replace" the existing chunks for the document with the new ones. | ||
| /// </remarks> | ||
| public bool IncrementalIngestion { get; set; } = true; | ||
| } |
13 changes: 13 additions & 0 deletions
13
test/Libraries/Microsoft.Extensions.DataIngestion.Tests/InMemoryVectorStoreWriterTests.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| using Microsoft.Extensions.VectorData; | ||
| using Microsoft.SemanticKernel.Connectors.InMemory; | ||
|
|
||
| namespace Microsoft.Extensions.DataIngestion.Tests; | ||
|
|
||
| public class InMemoryVectorStoreWriterTests : VectorStoreWriterTests | ||
| { | ||
| protected override VectorStore CreateVectorStore(TestEmbeddingGenerator<string> testEmbeddingGenerator) | ||
| => new InMemoryVectorStore(new() { EmbeddingGenerator = testEmbeddingGenerator }); | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
19 changes: 19 additions & 0 deletions
19
test/Libraries/Microsoft.Extensions.DataIngestion.Tests/SqliteVectorStoreWriterTests.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| using System; | ||
| using System.IO; | ||
| using Microsoft.Extensions.VectorData; | ||
| using Microsoft.SemanticKernel.Connectors.SqliteVec; | ||
|
|
||
| namespace Microsoft.Extensions.DataIngestion.Tests; | ||
|
|
||
| public sealed class SqliteVectorStoreWriterTests : VectorStoreWriterTests, IDisposable | ||
| { | ||
| private readonly string _tempFile = Path.GetTempFileName(); | ||
|
|
||
| public void Dispose() => File.Delete(_tempFile); | ||
|
|
||
| protected override VectorStore CreateVectorStore(TestEmbeddingGenerator<string> testEmbeddingGenerator) | ||
| => new SqliteVectorStore($"Data Source={_tempFile};Pooling=false", new() { EmbeddingGenerator = testEmbeddingGenerator }); | ||
| } |
52 changes: 52 additions & 0 deletions
52
test/Libraries/Microsoft.Extensions.DataIngestion.Tests/Utils/IAsyncEnumerableExtensions.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| // Licensed to the .NET Foundation under one or more agreements. | ||
| // The .NET Foundation licenses this file to you under the MIT license. | ||
|
|
||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Threading.Tasks; | ||
|
|
||
| namespace Microsoft.Extensions.DataIngestion; | ||
|
|
||
| // Once .NET 10 is shipped, we are going to switch to System.Linq.AsyncEnumerable. | ||
adamsitnik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| internal static class IAsyncEnumerableExtensions | ||
| { | ||
| internal static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<T> source) | ||
| { | ||
| foreach (T item in source) | ||
| { | ||
| await Task.Yield(); | ||
| yield return item; | ||
| } | ||
| } | ||
|
|
||
| internal static async ValueTask<int> CountAsync<T>(this IAsyncEnumerable<T> source) | ||
| { | ||
| int count = 0; | ||
| await foreach (T _ in source) | ||
| { | ||
| count++; | ||
| } | ||
|
|
||
| return count; | ||
| } | ||
|
|
||
| internal static async ValueTask<T> SingleAsync<T>(this IAsyncEnumerable<T> source) | ||
| { | ||
| bool found = false; | ||
| T result = default!; | ||
| await foreach (T item in source) | ||
| { | ||
| if (found) | ||
| { | ||
| throw new InvalidOperationException(); | ||
| } | ||
|
|
||
| result = item; | ||
| found = true; | ||
| } | ||
|
|
||
| return found | ||
| ? result | ||
| : throw new InvalidOperationException(); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.