Skip to content

Commit

Permalink
Create message-driven pub/sub sample (#5023)
Browse files Browse the repository at this point in the history
* move native pubsub sample to subfolder

* copy native sample as a starting point

* switch sample to message driven pub-sub using MSMQ

* create index file for sample folder

* update sample description

* update native sample description

* Apply suggestions from code review

Co-authored-by: Andreas Öhlund <andreas.ohlund@particular.net>

* rename ambiguous Start method

Co-authored-by: Andreas Öhlund <andreas.ohlund@particular.net>
  • Loading branch information
timbussmann and andreasohlund authored Sep 28, 2020
1 parent 2b1e56d commit 173eee4
Show file tree
Hide file tree
Showing 41 changed files with 409 additions and 8 deletions.
7 changes: 7 additions & 0 deletions samples/pubsub/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
title: Publish/Subscribe
summary: Sample code related to publish/subscribe
reviewed: 2020-09-28
related:
- nservicebus/messaging/publish-subscribe
---
File renamed without changes.
56 changes: 56 additions & 0 deletions samples/pubsub/message-driven/Core_6/Publisher/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Threading.Tasks;
using NServiceBus;

static class Program
{
static async Task Main()
{
Console.Title = "Samples.PubSub.MessageDrivenPublisher";
var endpointConfiguration = new EndpointConfiguration("Samples.PubSub.MessageDrivenPublisher");
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.UseTransport<MsmqTransport>();

endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();

var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
await PublishOrderEvent(endpointInstance)
.ConfigureAwait(false);
await endpointInstance.Stop()
.ConfigureAwait(false);
}

static async Task PublishOrderEvent(IEndpointInstance endpointInstance)
{
Console.WriteLine("Press '1' to publish the OrderReceived event");
Console.WriteLine("Press any other key to exit");

#region PublishLoop

while (true)
{
var key = Console.ReadKey();
Console.WriteLine();

var orderReceivedId = Guid.NewGuid();
if (key.Key == ConsoleKey.D1)
{
var orderReceived = new OrderReceived
{
OrderId = orderReceivedId
};
await endpointInstance.Publish(orderReceived)
.ConfigureAwait(false);
Console.WriteLine($"Published OrderReceived Event with Id {orderReceivedId}.");
}
else
{
return;
}
}

#endregion
}
}
File renamed without changes.
28 changes: 28 additions & 0 deletions samples/pubsub/message-driven/Core_6/Subscriber/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Threading.Tasks;
using NServiceBus;

static class Program
{
static async Task Main()
{
Console.Title = "Samples.PubSub.MessageDrivenSubscriber";
var endpointConfiguration = new EndpointConfiguration("Samples.PubSub.MessageDrivenSubscriber");
endpointConfiguration.UsePersistence<InMemoryPersistence>();

#region SubscriptionConfiguration
var transport = endpointConfiguration.UseTransport<MsmqTransport>();
transport.Routing().RegisterPublisher(typeof(OrderReceived), "Samples.PubSub.MessageDrivenPublisher");
#endregion

endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();

var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
Console.WriteLine("Press any key to exit");
Console.ReadKey();
await endpointInstance.Stop()
.ConfigureAwait(false);
}
}
File renamed without changes.
56 changes: 56 additions & 0 deletions samples/pubsub/message-driven/Core_7/Publisher/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Threading.Tasks;
using NServiceBus;

static class Program
{
static async Task Main()
{
Console.Title = "Samples.PubSub.MessageDrivenPublisher";
var endpointConfiguration = new EndpointConfiguration("Samples.PubSub.MessageDrivenPublisher");
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.UseTransport<MsmqTransport>();

endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();

var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
await PublishOrderEvent(endpointInstance)
.ConfigureAwait(false);
await endpointInstance.Stop()
.ConfigureAwait(false);
}

static async Task PublishOrderEvent(IEndpointInstance endpointInstance)
{
Console.WriteLine("Press '1' to publish the OrderReceived event");
Console.WriteLine("Press any other key to exit");

#region PublishLoop

while (true)
{
var key = Console.ReadKey();
Console.WriteLine();

var orderReceivedId = Guid.NewGuid();
if (key.Key == ConsoleKey.D1)
{
var orderReceived = new OrderReceived
{
OrderId = orderReceivedId
};
await endpointInstance.Publish(orderReceived)
.ConfigureAwait(false);
Console.WriteLine($"Published OrderReceived Event with Id {orderReceivedId}.");
}
else
{
return;
}
}

#endregion
}
}
12 changes: 12 additions & 0 deletions samples/pubsub/message-driven/Core_7/Publisher/Publisher.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net48</TargetFramework>
<OutputType>Exe</OutputType>
<LangVersion>7.3</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Shared\Shared.csproj" />
<PackageReference Include="NServiceBus" Version="7.*" />
<PackageReference Include="NServiceBus.Transport.Msmq" Version="1.*" />
</ItemGroup>
</Project>
9 changes: 9 additions & 0 deletions samples/pubsub/message-driven/Core_7/Shared/Shared.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net48</TargetFramework>
<LangVersion>7.3</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NServiceBus" Version="7.*" />
</ItemGroup>
</Project>
28 changes: 28 additions & 0 deletions samples/pubsub/message-driven/Core_7/Subscriber/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Threading.Tasks;
using NServiceBus;

static class Program
{
static async Task Main()
{
Console.Title = "Samples.PubSub.MessageDrivenSubscriber";
var endpointConfiguration = new EndpointConfiguration("Samples.PubSub.MessageDrivenSubscriber");
endpointConfiguration.UsePersistence<InMemoryPersistence>();

#region SubscriptionConfiguration
var transport = endpointConfiguration.UseTransport<MsmqTransport>();
transport.Routing().RegisterPublisher(typeof(OrderReceived), "Samples.PubSub.MessageDrivenPublisher");
#endregion

endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();

var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
Console.WriteLine("Press any key to exit");
Console.ReadKey();
await endpointInstance.Stop()
.ConfigureAwait(false);
}
}
12 changes: 12 additions & 0 deletions samples/pubsub/message-driven/Core_7/Subscriber/Subscriber.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net48</TargetFramework>
<OutputType>Exe</OutputType>
<LangVersion>7.3</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Shared\Shared.csproj" />
<PackageReference Include="NServiceBus" Version="7.*" />
<PackageReference Include="NServiceBus.Transport.Msmq" Version="1.*" />
</ItemGroup>
</Project>
52 changes: 52 additions & 0 deletions samples/pubsub/message-driven/sample.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
title: Message-driven Publish/Subscribe
summary: Persistence based Publish/Subscribe for unicast-only transports.
reviewed: 2020-09-28
component: Core
related:
- nservicebus/messaging/publish-subscribe
---

This sample shows how to publish an event from one endpoint, subscribe to the event in a separate endpoint and execute a message handler when an event is received. This sample uses the [message-driven publish-subscribe mechanism](/nservicebus/messaging/publish-subscribe#mechanics-message-driven-persistence-based) for unicast transports.

downloadbutton

Before running the sample, look over the solution structure, the projects and the classes. The projects `Publisher` and `Subscriber` are console applications that each host an instance of an NServiceBus messaging endpoint.

## Defining messages

The `Shared` project contains the definition of the messages that are sent between the processes. Open "OrderReceived.cs" to see the message that will be published by this sample. Note that this event implements an interface called `IEvent` to denote that this message is an event. To define messages without adding a dependency to NServiceBus, use [Unobtrusive Mode Messages](/nservicebus/messaging/unobtrusive-mode.md).

## Publishing the event

As the name implies, the `Publisher` project is a publisher of event messages. It uses the NServiceBus API to publish the `OrderReceived` event every time the `1` key is pressed. The created message is populated and [published](/nservicebus/messaging/publish-subscribe/) using the `Publish` API.

snippet: PublishLoop

## Subscribing to the event

To receive messages from the publisher, the subscribers [must subscribe to the message types](/nservicebus/messaging/publish-subscribe/) they are designed to handle. A subscriber must have a handler for the type of message and a configuration that tells the endpoint where to send subscriptions for messages to:

snippet: SubscriptionConfiguration

* The `Subscriber` handles and subscribes to the `OrderReceived` event type.
* The handlers in each project are in files that end with the word `Handler` for example `OrderReceivedHandler.cs`.
* `Subscriber` uses the default auto-subscription feature of the bus where the bus automatically subscribes to the configured publisher. [The auto-subscribe feature can be explicitly disabled](/nservicebus/messaging/publish-subscribe/controlling-what-is-subscribed.md) as part of the endpoint configuration.

## Run the sample

When running the sample, two console applications will open. Bring the `Publisher` endpoint to the foreground.

Click the `1` key repeatedly in the `Publisher` process console window and notice how the messages appear in the `Subscriber` console window.

## Subscription mechanics

When starting up, the `Subscriber` endpoint sends a subscription message to the `Publisher` endpoint. The `Publisher` then retrieves all subscribed endpoints from the persistence once an event is published. See the [publish-subscribe documentation](/nservicebus/messaging/publish-subscribe#mechanics-message-driven-persistence-based) for further details.

## Fault-tolerant messaging

Shut down `Subscriber` by closing its console window. Return to the `Publisher` process and publish a few more messages by pressing the `1` key several more times. Notice how the publishing process does not change and there are no errors even though the subscriber process is no longer running.

In Visual Studio, right-click the project of the closed subscriber. Restart it by right-clicking the `Subscriber` project and selecting `Debug` followed by `Start new instance`.

Note how `Subscriber` immediately receives the messages that were published while it was not running. The publisher safely places the event into the subscriber's queue without knowledge of the running status of any subscriber. Even when processes or machines restart, NServiceBus protects messages from being lost.
27 changes: 27 additions & 0 deletions samples/pubsub/native/Core_6/PubSub.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29728.190
MinimumVisualStudioVersion = 15.0.26730.12
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Shared", "Shared\Shared.csproj", "{5686FE6C-A5E3-40D1-A6BD-25F94DA612F8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Publisher", "Publisher\Publisher.csproj", "{7036A49B-359F-4BC7-AFBA-DE3C7AB41986}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Subscriber", "Subscriber\Subscriber.csproj", "{6A699A4E-F2FD-4B71-AF73-199B499482BD}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{5686FE6C-A5E3-40D1-A6BD-25F94DA612F8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5686FE6C-A5E3-40D1-A6BD-25F94DA612F8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7036A49B-359F-4BC7-AFBA-DE3C7AB41986}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7036A49B-359F-4BC7-AFBA-DE3C7AB41986}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6A699A4E-F2FD-4B71-AF73-199B499482BD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6A699A4E-F2FD-4B71-AF73-199B499482BD}.Debug|Any CPU.Build.0 = Debug|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal
6 changes: 6 additions & 0 deletions samples/pubsub/native/Core_6/PubSub.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/Environment/InjectedLayers/FileInjectedLayer/=2EF370E98D1FDC4E9FD6E29F173EB613/RelativePath/@EntryValue">..\..\..\..\tools\Shared.DotSettings</s:String>
<s:Boolean x:Key="/Default/Environment/InjectedLayers/FileInjectedLayer/=2EF370E98D1FDC4E9FD6E29F173EB613/@KeyIndexDefined">True</s:Boolean>
<s:Boolean x:Key="/Default/Environment/InjectedLayers/InjectedLayerCustomization/=File2EF370E98D1FDC4E9FD6E29F173EB613/@KeyIndexDefined">True</s:Boolean>
<s:Double x:Key="/Default/Environment/InjectedLayers/InjectedLayerCustomization/=File2EF370E98D1FDC4E9FD6E29F173EB613/RelativePriority/@EntryValue">1</s:Double>
</wpf:ResourceDictionary>
File renamed without changes.
11 changes: 11 additions & 0 deletions samples/pubsub/native/Core_6/Publisher/Publisher.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net48</TargetFramework>
<OutputType>Exe</OutputType>
<LangVersion>7.3</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Shared\Shared.csproj" />
<PackageReference Include="NServiceBus" Version="6.*" />
</ItemGroup>
</Project>
8 changes: 8 additions & 0 deletions samples/pubsub/native/Core_6/Shared/OrderReceived.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
using System;
using NServiceBus;

public class OrderReceived :
IEvent
{
public Guid OrderId { get; set; }
}
9 changes: 9 additions & 0 deletions samples/pubsub/native/Core_6/Shared/Shared.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net48</TargetFramework>
<LangVersion>7.3</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NServiceBus" Version="6.*" />
</ItemGroup>
</Project>
15 changes: 15 additions & 0 deletions samples/pubsub/native/Core_6/Subscriber/OrderReceivedHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Logging;

public class OrderReceivedHandler :
IHandleMessages<OrderReceived>
{
static ILog log = LogManager.GetLogger<OrderReceivedHandler>();

public Task Handle(OrderReceived message, IMessageHandlerContext context)
{
log.Info($"Subscriber has received OrderReceived event with OrderId {message.OrderId}.");
return Task.CompletedTask;
}
}
File renamed without changes.
11 changes: 11 additions & 0 deletions samples/pubsub/native/Core_6/Subscriber/Subscriber.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net48</TargetFramework>
<OutputType>Exe</OutputType>
<LangVersion>7.3</LangVersion>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Shared\Shared.csproj" />
<PackageReference Include="NServiceBus" Version="6.*" />
</ItemGroup>
</Project>
Loading

0 comments on commit 173eee4

Please sign in to comment.