Skip to content

Commit ca4fc52

Browse files
authored
[MEDI] Don't stop document processing on enricher error (#7005)
* introduce EnricherOptions option bag * implement batching * don't validate results returned by IChatClient * don't expose FileInfo as source via IngestionResult, as it could be Stream in the future. Just expose the document id * Enricher failures should not fail the whole ingestion pipeline, as they are best-effort enhancements
1 parent b839e41 commit ca4fc52

20 files changed

+544
-330
lines changed

src/Libraries/Microsoft.Extensions.DataIngestion/IngestionPipeline.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInf
154154
processFileActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
155155
_logger?.ReadDocument(document.Identifier);
156156

157-
await IngestAsync(document, processFileActivity, cancellationToken).ConfigureAwait(false);
157+
document = await IngestAsync(document, processFileActivity, cancellationToken).ConfigureAwait(false);
158158
}
159159
catch (Exception ex)
160160
{
@@ -164,12 +164,13 @@ private async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInf
164164
failure = ex;
165165
}
166166

167-
yield return new IngestionResult(fileInfo, document, failure);
167+
string documentId = document?.Identifier ?? fileInfo.FullName;
168+
yield return new IngestionResult(documentId, document, failure);
168169
}
169170
}
170171
}
171172

172-
private async Task IngestAsync(IngestionDocument document, Activity? parentActivity, CancellationToken cancellationToken)
173+
private async Task<IngestionDocument> IngestAsync(IngestionDocument document, Activity? parentActivity, CancellationToken cancellationToken)
173174
{
174175
foreach (IngestionDocumentProcessor processor in DocumentProcessors)
175176
{
@@ -188,5 +189,7 @@ private async Task IngestAsync(IngestionDocument document, Activity? parentActiv
188189
_logger?.WritingChunks(GetShortName(_writer));
189190
await _writer.WriteAsync(chunks, cancellationToken).ConfigureAwait(false);
190191
_logger?.WroteChunks(document.Identifier);
192+
193+
return document;
191194
}
192195
}

src/Libraries/Microsoft.Extensions.DataIngestion/IngestionResult.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System;
5-
using System.IO;
65
using Microsoft.Shared.Diagnostics;
76

87
namespace Microsoft.Extensions.DataIngestion;
@@ -13,9 +12,9 @@ namespace Microsoft.Extensions.DataIngestion;
1312
public sealed class IngestionResult
1413
{
1514
/// <summary>
16-
/// Gets the source file that was ingested.
15+
/// Gets the ID of the document that was ingested.
1716
/// </summary>
18-
public FileInfo Source { get; }
17+
public string DocumentId { get; }
1918

2019
/// <summary>
2120
/// Gets the ingestion document created from the source file, if reading the document has succeeded.
@@ -32,9 +31,9 @@ public sealed class IngestionResult
3231
/// </summary>
3332
public bool Succeeded => Exception is null;
3433

35-
internal IngestionResult(FileInfo source, IngestionDocument? document, Exception? exception)
34+
internal IngestionResult(string documentId, IngestionDocument? document, Exception? exception)
3635
{
37-
Source = Throw.IfNull(source);
36+
DocumentId = Throw.IfNullOrEmpty(documentId);
3837
Document = document;
3938
Exception = exception;
4039
}

src/Libraries/Microsoft.Extensions.DataIngestion/Log.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,11 @@ internal static partial class Log
3131

3232
[LoggerMessage(6, LogLevel.Error, "An error occurred while ingesting document '{identifier}'.")]
3333
internal static partial void IngestingFailed(this ILogger logger, Exception exception, string identifier);
34+
35+
[LoggerMessage(7, LogLevel.Error, "The AI chat service returned {resultCount} instead of {expectedCount} results.")]
36+
internal static partial void UnexpectedResultsCount(this ILogger logger, int resultCount, int expectedCount);
37+
38+
[LoggerMessage(8, LogLevel.Error, "Unexpected enricher failure.")]
39+
internal static partial void UnexpectedEnricherFailure(this ILogger logger, Exception exception);
3440
}
3541
}

src/Libraries/Microsoft.Extensions.DataIngestion/Microsoft.Extensions.DataIngestion.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
<ItemGroup>
1717
<ProjectReference Include="..\Microsoft.Extensions.DataIngestion.Abstractions\Microsoft.Extensions.DataIngestion.Abstractions.csproj" />
18+
<ProjectReference Include="..\Microsoft.Extensions.AI\Microsoft.Extensions.AI.csproj" />
1819
</ItemGroup>
1920

2021
<ItemGroup>
@@ -26,7 +27,6 @@
2627

2728
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' != '.NETCoreApp'">
2829
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
29-
<PackageReference Include="System.Collections.Immutable" />
3030
</ItemGroup>
3131

3232
<ItemGroup Condition="'$(TargetFrameworkIdentifier)' == '.NETFramework'">

src/Libraries/Microsoft.Extensions.DataIngestion/Processors/ClassificationEnricher.cs

Lines changed: 13 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,11 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System;
5-
using System.Collections.Frozen;
65
using System.Collections.Generic;
7-
using System.Runtime.CompilerServices;
86
using System.Text;
97
using System.Threading;
10-
using System.Threading.Tasks;
118
using Microsoft.Extensions.AI;
9+
using Microsoft.Extensions.Logging;
1210
using Microsoft.Shared.Diagnostics;
1311

1412
namespace Microsoft.Extensions.DataIngestion;
@@ -21,30 +19,28 @@ namespace Microsoft.Extensions.DataIngestion;
2119
/// an optional fallback class for cases where no suitable classification can be determined.</remarks>
2220
public sealed class ClassificationEnricher : IngestionChunkProcessor<string>
2321
{
24-
private readonly IChatClient _chatClient;
25-
private readonly ChatOptions? _chatOptions;
26-
private readonly FrozenSet<string> _predefinedClasses;
22+
private readonly EnricherOptions _options;
2723
private readonly ChatMessage _systemPrompt;
24+
private readonly ILogger? _logger;
2825

2926
/// <summary>
3027
/// Initializes a new instance of the <see cref="ClassificationEnricher"/> class.
3128
/// </summary>
32-
/// <param name="chatClient">The chat client used for classification.</param>
29+
/// <param name="options">The options for the classification enricher.</param>
3330
/// <param name="predefinedClasses">The set of predefined classification classes.</param>
34-
/// <param name="chatOptions">Options for the chat client.</param>
3531
/// <param name="fallbackClass">The fallback class to use when no suitable classification is found. When not provided, it defaults to "Unknown".</param>
36-
public ClassificationEnricher(IChatClient chatClient, ReadOnlySpan<string> predefinedClasses,
37-
ChatOptions? chatOptions = null, string? fallbackClass = null)
32+
public ClassificationEnricher(EnricherOptions options, ReadOnlySpan<string> predefinedClasses,
33+
string? fallbackClass = null)
3834
{
39-
_chatClient = Throw.IfNull(chatClient);
40-
_chatOptions = chatOptions;
35+
_options = Throw.IfNull(options).Clone();
4136
if (string.IsNullOrWhiteSpace(fallbackClass))
4237
{
4338
fallbackClass = "Unknown";
4439
}
4540

46-
_predefinedClasses = CreatePredefinedSet(predefinedClasses, fallbackClass!);
41+
Validate(predefinedClasses, fallbackClass!);
4742
_systemPrompt = CreateSystemPrompt(predefinedClasses, fallbackClass!);
43+
_logger = _options.LoggerFactory?.CreateLogger<ClassificationEnricher>();
4844
}
4945

5046
/// <summary>
@@ -53,28 +49,10 @@ public ClassificationEnricher(IChatClient chatClient, ReadOnlySpan<string> prede
5349
public static string MetadataKey => "classification";
5450

5551
/// <inheritdoc />
56-
public override async IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(IAsyncEnumerable<IngestionChunk<string>> chunks,
57-
[EnumeratorCancellation] CancellationToken cancellationToken = default)
58-
{
59-
_ = Throw.IfNull(chunks);
60-
61-
await foreach (IngestionChunk<string> chunk in chunks.WithCancellation(cancellationToken))
62-
{
63-
var response = await _chatClient.GetResponseAsync(
64-
[
65-
_systemPrompt,
66-
new(ChatRole.User, chunk.Content)
67-
], _chatOptions, cancellationToken: cancellationToken).ConfigureAwait(false);
68-
69-
chunk.Metadata[MetadataKey] = _predefinedClasses.Contains(response.Text)
70-
? response.Text
71-
: throw new InvalidOperationException($"Classification returned an unexpected class: '{response.Text}'.");
72-
73-
yield return chunk;
74-
}
75-
}
52+
public override IAsyncEnumerable<IngestionChunk<string>> ProcessAsync(IAsyncEnumerable<IngestionChunk<string>> chunks, CancellationToken cancellationToken = default)
53+
=> Batching.ProcessAsync<string>(chunks, _options, MetadataKey, _systemPrompt, _logger, cancellationToken);
7654

77-
private static FrozenSet<string> CreatePredefinedSet(ReadOnlySpan<string> predefinedClasses, string fallbackClass)
55+
private static void Validate(ReadOnlySpan<string> predefinedClasses, string fallbackClass)
7856
{
7957
if (predefinedClasses.Length == 0)
8058
{
@@ -84,15 +62,6 @@ private static FrozenSet<string> CreatePredefinedSet(ReadOnlySpan<string> predef
8462
HashSet<string> predefinedClassesSet = new(StringComparer.Ordinal) { fallbackClass };
8563
foreach (string predefinedClass in predefinedClasses)
8664
{
87-
#if NET
88-
if (predefinedClass.Contains(',', StringComparison.Ordinal))
89-
#else
90-
if (predefinedClass.IndexOf(',') >= 0)
91-
#endif
92-
{
93-
Throw.ArgumentException(nameof(predefinedClasses), $"Predefined class '{predefinedClass}' must not contain ',' character.");
94-
}
95-
9665
if (!predefinedClassesSet.Add(predefinedClass))
9766
{
9867
if (predefinedClass.Equals(fallbackClass, StringComparison.Ordinal))
@@ -103,13 +72,11 @@ private static FrozenSet<string> CreatePredefinedSet(ReadOnlySpan<string> predef
10372
Throw.ArgumentException(nameof(predefinedClasses), $"Duplicate class found: '{predefinedClass}'.");
10473
}
10574
}
106-
107-
return predefinedClassesSet.ToFrozenSet();
10875
}
10976

11077
private static ChatMessage CreateSystemPrompt(ReadOnlySpan<string> predefinedClasses, string fallbackClass)
11178
{
112-
StringBuilder sb = new("You are a classification expert. Analyze the given text and assign a single, most relevant class. Use only the following predefined classes: ");
79+
StringBuilder sb = new("You are a classification expert. For each of the following texts, assign a single, most relevant class. Use only the following predefined classes: ");
11380

11481
#if NET9_0_OR_GREATER
11582
sb.AppendJoin(", ", predefinedClasses!);
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
4+
using Microsoft.Extensions.AI;
5+
using Microsoft.Extensions.Logging;
6+
using Microsoft.Shared.Diagnostics;
7+
8+
namespace Microsoft.Extensions.DataIngestion;
9+
10+
/// <summary>
11+
/// Represents options for enrichers that use an AI chat client.
12+
/// </summary>
13+
public class EnricherOptions
14+
{
15+
/// <summary>
16+
/// Initializes a new instance of the <see cref="EnricherOptions"/> class.
17+
/// </summary>
18+
/// <param name="chatClient">The AI chat client to be used.</param>
19+
public EnricherOptions(IChatClient chatClient)
20+
{
21+
ChatClient = Throw.IfNull(chatClient);
22+
}
23+
24+
/// <summary>
25+
/// Gets the AI chat client to be used.
26+
/// </summary>
27+
public IChatClient ChatClient { get; }
28+
29+
/// <summary>
30+
/// Gets or sets the options for the <see cref="ChatClient"/>.
31+
/// </summary>
32+
public ChatOptions? ChatOptions { get; set; }
33+
34+
/// <summary>
35+
/// Gets or sets the logger factory to be used for logging.
36+
/// </summary>
37+
/// <remarks>
38+
/// Enricher failures should not fail the whole ingestion pipeline, as they are best-effort enhancements.
39+
/// This logger factory can be used to create loggers to log such failures.
40+
/// </remarks>
41+
public ILoggerFactory? LoggerFactory { get; set; }
42+
43+
/// <summary>
44+
/// Gets or sets the batch size for processing chunks. Default is 20.
45+
/// </summary>
46+
public int BatchSize { get; set => field = Throw.IfLessThanOrEqual(value, 0); } = 20;
47+
48+
internal EnricherOptions Clone() => new(ChatClient)
49+
{
50+
ChatOptions = ChatOptions?.Clone(),
51+
LoggerFactory = LoggerFactory,
52+
BatchSize = BatchSize
53+
};
54+
}

0 commit comments

Comments
 (0)