Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions eng/Versions.props
Original file line number Diff line number Diff line change
Expand Up @@ -168,5 +168,8 @@
Starting with 3.0.0, xunit.runner.visualstudio only supports net472, but we target net462
-->
<XUnitRunnerVisualStudioVersion>2.8.2</XUnitRunnerVisualStudioVersion>
<!-- MEVD is still part of the Semantic Kernel repo -->
<MicrosoftExtensionsVectorDataAbstractionsVersion>9.7.0</MicrosoftExtensionsVectorDataAbstractionsVersion>
<MicrosoftSemanticKernelConnectorsVersion>1.66.0-preview</MicrosoftSemanticKernelConnectorsVersion>
</PropertyGroup>
</Project>
1 change: 1 addition & 0 deletions eng/packages/General.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="$(MicrosoftCodeAnalysisVersion)" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="$(MicrosoftCodeAnalysisVersion)" />
<PackageVersion Include="Microsoft.CodeAnalysis" Version="$(MicrosoftCodeAnalysisVersion)" />
<PackageVersion Include="Microsoft.Extensions.VectorData.Abstractions" Version="$(MicrosoftExtensionsVectorDataAbstractionsVersion)" />
<PackageVersion Include="Microsoft.IO.RecyclableMemoryStream" Version="3.0.0" />
<PackageVersion Include="Microsoft.ML.Tokenizers" Version="$(MicrosoftMLTokenizersVersion)" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
Expand Down
2 changes: 2 additions & 0 deletions eng/packages/TestOnly.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
<PackageVersion Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration.UserSecrets" Version="9.0.0" />
<PackageVersion Include="Microsoft.ML.Tokenizers.Data.O200kBase" Version="$(MicrosoftMLTokenizersVersion)" />
<PackageVersion Include="Microsoft.SemanticKernel.Connectors.InMemory" Version="$(MicrosoftSemanticKernelConnectorsVersion)" />
<PackageVersion Include="Microsoft.SemanticKernel.Connectors.SqliteVec" Version="$(MicrosoftSemanticKernelConnectorsVersion)" />
<PackageVersion Include="Microsoft.TemplateEngine.Authoring.TemplateVerifier" Version="9.0.201" />
<PackageVersion Include="Microsoft.TemplateEngine.TestHelper" Version="9.0.200-rtm.25066.4" />
<PackageVersion Include="Moq.AutoMock" Version="3.1.0" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@
<ProjectReference Include="..\Microsoft.Extensions.DataIngestion.Abstractions\Microsoft.Extensions.DataIngestion.Abstractions.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.VectorData.Abstractions" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.VectorData;
using Microsoft.Shared.Diagnostics;

namespace Microsoft.Extensions.DataIngestion;

/// <summary>
/// Writes chunks to the <see cref="VectorStore"/> using the default schema.
/// </summary>
/// <typeparam name="T">The type of the chunk content.</typeparam>
public sealed class VectorStoreWriter<T> : IngestionChunkWriter<T>
{
// The names are lowercase with no special characters to ensure compatibility with various vector stores.
private const string KeyName = "key";
private const string EmbeddingName = "embedding";
private const string ContentName = "content";
private const string ContextName = "context";
private const string DocumentIdName = "documentid";

private readonly VectorStore _vectorStore;
private readonly int _dimensionCount;
private readonly VectorStoreWriterOptions _options;
private readonly bool _keysAreStrings;

private VectorStoreCollection<object, Dictionary<string, object?>>? _vectorStoreCollection;

/// <summary>
/// Initializes a new instance of the <see cref="VectorStoreWriter{T}"/> class.
/// </summary>
/// <param name="vectorStore">The <see cref="VectorStore"/> to use to store the <see cref="IngestionChunk{T}"/> instances.</param>
/// <param name="dimensionCount">The number of dimensions that the vector has. This value is required when creating collections.</param>
/// <param name="options">The options for the vector store writer.</param>
/// <exception cref="ArgumentNullException">When <paramref name="vectorStore"/> is null.</exception>
/// <exception cref="ArgumentOutOfRangeException">When <paramref name="dimensionCount"/> is less or equal zero.</exception>
public VectorStoreWriter(VectorStore vectorStore, int dimensionCount, VectorStoreWriterOptions? options = default)
{
_vectorStore = Throw.IfNull(vectorStore);
_dimensionCount = Throw.IfLessThanOrEqual(dimensionCount, 0);
_options = options ?? new VectorStoreWriterOptions();

// Not all vector store support string as the key type, examples:
// Qdrant: https://github.com/microsoft/semantic-kernel/blob/28ea2f4df872e8fd03ef0792ebc9e1989b4be0ee/dotnet/src/VectorData/Qdrant/QdrantCollection.cs#L104
// When https://github.com/microsoft/semantic-kernel/issues/13141 gets released,
// we need to remove this workaround.
_keysAreStrings = vectorStore.GetType().Name != "QdrantVectorStore";
}

/// <summary>
/// Gets the underlying <see cref="VectorStoreCollection{TKey,TRecord}"/> used to store the chunks.
/// </summary>
/// <remarks>
/// The collection is initialized when <see cref="WriteAsync(IAsyncEnumerable{IngestionChunk{T}}, CancellationToken)"/> is called for the first time.
/// </remarks>
/// <exception cref="InvalidOperationException">The collection has not been initialized yet.
/// Call <see cref="WriteAsync(IAsyncEnumerable{IngestionChunk{T}}, CancellationToken)"/> first.</exception>
public VectorStoreCollection<object, Dictionary<string, object?>> VectorStoreCollection
=> _vectorStoreCollection ?? throw new InvalidOperationException("The collection has not been initialized yet. Call WriteAsync first.");

/// <inheritdoc/>
public override async Task WriteAsync(IAsyncEnumerable<IngestionChunk<T>> chunks, CancellationToken cancellationToken = default)
{
_ = Throw.IfNull(chunks);

IReadOnlyList<object>? preExistingKeys = null;
await foreach (IngestionChunk<T> chunk in chunks.WithCancellation(cancellationToken))
{
if (_vectorStoreCollection is null)
{
_vectorStoreCollection = _vectorStore.GetDynamicCollection(_options.CollectionName, GetVectorStoreRecordDefinition(chunk));

await _vectorStoreCollection.EnsureCollectionExistsAsync(cancellationToken).ConfigureAwait(false);
}

// We obtain the IDs of the pre-existing chunks for given document,
// and delete them after we finish inserting the new chunks,
// to avoid a situation where we delete the chunks and then fail to insert the new ones.
preExistingKeys ??= await GetPreExistingChunksIdsAsync(chunk.Document, cancellationToken).ConfigureAwait(false);

var key = Guid.NewGuid();
Dictionary<string, object?> record = new()
{
[KeyName] = _keysAreStrings ? key.ToString() : key,
[ContentName] = chunk.Content,
[EmbeddingName] = chunk.Content,
[ContextName] = chunk.Context,
[DocumentIdName] = chunk.Document.Identifier,
};

if (chunk.HasMetadata)
{
foreach (var metadata in chunk.Metadata)
{
record[metadata.Key] = metadata.Value;
}
}

await _vectorStoreCollection.UpsertAsync(record, cancellationToken).ConfigureAwait(false);
}

if (preExistingKeys?.Count > 0)
{
await _vectorStoreCollection!.DeleteAsync(preExistingKeys, cancellationToken).ConfigureAwait(false);
}
}

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
try
{
_vectorStoreCollection?.Dispose();
}
finally
{
_vectorStore.Dispose();
base.Dispose(disposing);
}
}

private VectorStoreCollectionDefinition GetVectorStoreRecordDefinition(IngestionChunk<T> representativeChunk)
{
VectorStoreCollectionDefinition definition = new()
{
Properties =
{
new VectorStoreKeyProperty(KeyName, _keysAreStrings ? typeof(string) : typeof(Guid)),

// By using T as the type here we allow the vector store
// to handle the conversion from T to the actual vector type it supports.
new VectorStoreVectorProperty(EmbeddingName, typeof(T), _dimensionCount)
{
DistanceFunction = _options.DistanceFunction,
IndexKind = _options.IndexKind
},
new VectorStoreDataProperty(ContentName, typeof(T)),
new VectorStoreDataProperty(ContextName, typeof(string)),
new VectorStoreDataProperty(DocumentIdName, typeof(string))
{
IsIndexed = true
}
}
};

if (representativeChunk.HasMetadata)
{
foreach (var metadata in representativeChunk.Metadata)
{
Type propertyType = metadata.Value.GetType();
definition.Properties.Add(new VectorStoreDataProperty(metadata.Key, propertyType)
{
// We use lowercase storage names to ensure compatibility with various vector stores.
#pragma warning disable CA1308 // Normalize strings to uppercase
StorageName = metadata.Key.ToLowerInvariant()
#pragma warning restore CA1308 // Normalize strings to uppercase

// We could consider indexing for certain keys like classification etc. but for now we leave it as non-indexed.
// The reason is that not every DB supports it, moreover we would need to expose the ability to configure it.
});
}
}

return definition;
}

private async Task<IReadOnlyList<object>> GetPreExistingChunksIdsAsync(IngestionDocument document, CancellationToken cancellationToken)
{
if (!_options.IncrementalIngestion)
{
return [];
}

// Each Vector Store has a different max top count limit, so we use low value and loop.
const int MaxTopCount = 1_000;

List<object> keys = [];
int insertedCount;
do
{
insertedCount = 0;

await foreach (var record in _vectorStoreCollection!.GetAsync(
filter: record => (string)record[DocumentIdName]! == document.Identifier,
top: MaxTopCount,
cancellationToken: cancellationToken).ConfigureAwait(false))
{
keys.Add(record[KeyName]!);
insertedCount++;
}
}
while (insertedCount == MaxTopCount);

return keys;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Microsoft.Shared.Diagnostics;

namespace Microsoft.Extensions.DataIngestion;

/// <summary>
/// Represents options for the <see cref="VectorStoreWriter{T}"/>.
/// </summary>
public sealed class VectorStoreWriterOptions
{
/// <summary>
/// Gets or sets the name of the collection. When not provided, "chunks" will be used.
/// </summary>
public string CollectionName
{
get => field ?? "chunks";
set => field = Throw.IfNullOrEmpty(value);
}

/// <summary>
/// Gets or sets the distance function to use when creating the collection.
/// </summary>
/// <remarks>
/// When not provided, the default specific to given database will be used. Check <see cref="VectorData.DistanceFunction"/> for available values.
/// </remarks>
public string? DistanceFunction { get; set; }

/// <summary>
/// Gets or sets the index kind to use when creating the collection.
/// </summary>
public string? IndexKind { get; set; }

/// <summary>
/// Gets or sets a value indicating whether to perform incremental ingestion.
/// </summary>
/// <remarks>
/// When enabled, the writer will delete the chunks for the given document after inserting the new ones.
/// Effectively the ingestion will "replace" the existing chunks for the document with the new ones.
/// </remarks>
public bool IncrementalIngestion { get; set; } = true;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Microsoft.Extensions.VectorData;
using Microsoft.SemanticKernel.Connectors.InMemory;

namespace Microsoft.Extensions.DataIngestion.Tests;

public class InMemoryVectorStoreWriterTests : VectorStoreWriterTests
{
protected override VectorStore CreateVectorStore(TestEmbeddingGenerator<string> testEmbeddingGenerator)
=> new InMemoryVectorStore(new() { EmbeddingGenerator = testEmbeddingGenerator });
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,23 @@
<NoWarn>$(NoWarn);S3967</NoWarn>
<!-- Project reference can be removed -->
<NoWarn>$(NoWarn);RT0002</NoWarn>

<!-- https://github.com/dotnet/efcore/issues/33472#issuecomment-2042146335 -->
<PlatformTarget Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">x64</PlatformTarget>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Libraries\Microsoft.Extensions.DataIngestion\Microsoft.Extensions.DataIngestion.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.SemanticKernel.Connectors.InMemory" />
<PackageReference Include="Microsoft.SemanticKernel.Connectors.SqliteVec" />
</ItemGroup>

<ItemGroup>
<!-- We don't run Sqlite tests on Full Framework due to some native dependency issues -->
<Compile Remove="SqliteVectorStoreWriterTests.cs" Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.IO;
using Microsoft.Extensions.VectorData;
using Microsoft.SemanticKernel.Connectors.SqliteVec;

namespace Microsoft.Extensions.DataIngestion.Tests;

public sealed class SqliteVectorStoreWriterTests : VectorStoreWriterTests, IDisposable
{
private readonly string _tempFile = Path.GetTempFileName();

public void Dispose() => File.Delete(_tempFile);

protected override VectorStore CreateVectorStore(TestEmbeddingGenerator<string> testEmbeddingGenerator)
=> new SqliteVectorStore($"Data Source={_tempFile};Pooling=false", new() { EmbeddingGenerator = testEmbeddingGenerator });
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace Microsoft.Extensions.DataIngestion;

// Once .NET 10 is shipped, we are going to switch to System.Linq.AsyncEnumerable.
internal static class IAsyncEnumerableExtensions
{
internal static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(this IEnumerable<T> source)
{
foreach (T item in source)
{
await Task.Yield();
yield return item;
}
}

internal static async ValueTask<int> CountAsync<T>(this IAsyncEnumerable<T> source)
{
int count = 0;
await foreach (T _ in source)
{
count++;
}

return count;
}

internal static async ValueTask<T> SingleAsync<T>(this IAsyncEnumerable<T> source)
{
bool found = false;
T result = default!;
await foreach (T item in source)
{
if (found)
{
throw new InvalidOperationException();
}

result = item;
found = true;
}

return found
? result
: throw new InvalidOperationException();
}
}
Loading
Loading