Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion src/Tests/PipelineTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Configuration;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Moq;

Expand Down Expand Up @@ -207,6 +208,64 @@ public async Task ConversationRestored()
Assert.IsAssignableFrom<ContentMessage>(messages[1][2]);
}

[Fact]
public async Task CanSendMessagesThroughPipeline()
{
var configuration = new ConfigurationBuilder()
.AddInMemoryCollection(new Dictionary<string, string?>()
{
{ "Meta:VerifyToken", "test-challenge" },
{ "Meta:Numbers:1234", "test-access-token" }
})
.Build();

var handler = new Mock<IWhatsAppHandler>();
handler.Setup(x => x.HandleAsync(It.IsAny<IEnumerable<IMessage>>(), It.IsAny<CancellationToken>()))
.Returns((IEnumerable<IMessage> input, CancellationToken _) =>
{
// Timestamp in WhatsApp is in Unix seconds, so we need to simulate a delay
Thread.Sleep(1000);
var message = input.OfType<ContentMessage>().Last();
return AsyncEnum<Response>([message.Reply(message.Content.ToString() + " Reply")]);
});

var services = new ServiceCollection()
.AddSingleton<IConfiguration>(configuration);

var sent = 0;

// Override default IWhatsAppClient to prevent any actual sending
var client = new Mock<IWhatsAppClient>();
client.Setup(x => x.SendAsync(It.IsAny<string>(), It.IsAny<object>(), It.IsAny<CancellationToken>()))
.Callback(() => sent++)
.ReturnsAsync(Ulid.NewUlid().ToString());

services.AddSingleton<IWhatsAppClient>(client.Object);

services.AddWhatsApp(handler.Object)
.Use(EchoAndHandle);

var pipeline = services.BuildServiceProvider().GetRequiredService<IWhatsAppHandler>();
await pipeline.HandleAsync(Text("Hello"));

// One from the echo, one from the actualy reply.
Assert.Equal(2, sent);

await pipeline.HandleAsync(Text("Again"));

Assert.Equal(4, sent);
}

static async IAsyncEnumerable<Response> EchoAndHandle(IEnumerable<IMessage> messages, IWhatsAppHandler inner, [EnumeratorCancellation] CancellationToken cancellation)
{
var content = messages.OfType<ContentMessage>().LastOrDefault();
if (content != null)
yield return content.Reply("Echo: " + content.Content.ToString());

await foreach (var response in inner.HandleAsync(messages, cancellation))
yield return response;
}

ContentMessage Text(string text) => new ContentMessage(
Ulid.NewUlid().ToString(), service, user, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), new TextContent(text));

Expand Down
6 changes: 5 additions & 1 deletion src/WhatsApp/SendResponsesHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ public async override IAsyncEnumerable<Response> HandleAsync(IEnumerable<IMessag
{
await foreach (var response in InnerHandler.HandleAsync(messages, cancellation))
{
yield return await response.SendAsync(client, cancellation);
// Only sent unsent messages.
if (string.IsNullOrEmpty(response.Id) && response.Timestamp == 0)
yield return await response.SendAsync(client, cancellation);
else
yield return response;
}
}
}
5 changes: 5 additions & 0 deletions src/WhatsApp/WhatsAppHandlerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ public IWhatsAppHandler Build(IServiceProvider? services = default)
// If we can get the IWhatsApp at all, we wrap the target handler in the
// sender one, which will process responses and send them out to WhatsApp.
if (services.GetService<IWhatsAppClient>() is { } client)
{
handler = new SendResponsesHandler(handler, client);
// We also need to make sure we send messages added across the pipeline
// So we also place ourselves as the outermost handler.
(factories ??= []).Insert(0, (inner, _) => new SendResponsesHandler(inner, client));
}

// Matches behavior of M.E.AI chat client builder
// Apply the factories in reverse order, so that the first factory added is the outermost.
Expand Down
4 changes: 3 additions & 1 deletion src/WhatsApp/WhatsAppServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ public static WhatsAppHandlerBuilder AddWhatsApp<TService1, TService2, TService3
static WhatsAppHandlerBuilder ConfigureServices(IServiceCollection services, WhatsAppHandlerBuilder builder, ServiceLifetime lifetime)
{
services.AddHttpClient("whatsapp").AddStandardResilienceHandler();
services.Add(new ServiceDescriptor(typeof(IWhatsAppClient), typeof(WhatsAppClient), lifetime));

if (services.FirstOrDefault(x => x.ServiceType == typeof(IWhatsAppClient)) == null)
services.Add(new ServiceDescriptor(typeof(IWhatsAppClient), typeof(WhatsAppClient), lifetime));

if (services.FirstOrDefault(x => x.ServiceType == typeof(QueueServiceClient)) == null)
{
Expand Down