Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document recoverability pipeline #5632

Merged
merged 41 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
f62a28c
introduce recoverability pipeline
lailabougria Feb 9, 2022
35923f2
wip
lailabougria Feb 10, 2022
f449097
add menu item
lailabougria Feb 10, 2022
3615db2
split up connectors and steps
lailabougria Feb 10, 2022
1a7395a
Update to core alpha 3019
andreasohlund Feb 10, 2022
b9124b4
Fix intro
andreasohlund Feb 10, 2022
b316633
revert text for v6-v7
lailabougria Feb 10, 2022
d5fd766
Add the routing component to the incoming pipeline graph
lailabougria Feb 10, 2022
0bf0f8b
wording
andreasohlund Feb 11, 2022
5d50489
More wording
andreasohlund Feb 11, 2022
0972a31
forwarding is obsoleted in v8
lailabougria Feb 11, 2022
a7728b7
Add snippet
andreasohlund Feb 11, 2022
aeb7a6f
Merge branch 'introduce-recoverability-pipeline' of https://github.co…
andreasohlund Feb 11, 2022
5c42d18
Whitespace
andreasohlund Feb 11, 2022
d4802d9
Fix link
andreasohlund Feb 11, 2022
aa51718
this file should only contain the extension bag documentation
lailabougria Feb 11, 2022
bcbef5e
Fix file name issue and text
lailabougria Feb 11, 2022
63a9f5c
fix broken link
lailabougria Feb 11, 2022
9ab0164
fixed broken graphs
lailabougria Feb 11, 2022
4d582a0
found another section that diverges between v6 and 8
lailabougria Feb 11, 2022
214badd
Fix filename
andreasohlund Feb 11, 2022
728ca3c
Clarify state propagation between parent and child pipelines
andreasohlund Feb 11, 2022
27dc49b
No forwarding in v8
andreasohlund Feb 11, 2022
82c8e45
Remove forwarding from optional stages in v8
andreasohlund Feb 11, 2022
ad7b11f
Add dispatch
andreasohlund Feb 11, 2022
02eb5b3
Typo
andreasohlund Feb 11, 2022
8090384
Update nservicebus/recoverability/pipeline.md
andreasohlund Feb 11, 2022
2164a98
Typos
andreasohlund Feb 11, 2022
27bc7b1
Update nservicebus/pipeline/index_intro_core_[6,8).partial.md
andreasohlund Feb 11, 2022
16b1e58
Update nservicebus/pipeline/index_intro_core_[8,).partial.md
andreasohlund Feb 11, 2022
48658fb
Apply lailas suggestion
andreasohlund Feb 11, 2022
e28992c
Fix link
andreasohlund Feb 11, 2022
4fa1e9e
Wording
andreasohlund Feb 11, 2022
c5db148
moved definition of the pipeline
lailabougria Feb 11, 2022
524d11b
co-authored-by:Andreas Ohlund<andreas.ohlund@particular.net>
lailabougria Feb 11, 2022
54b4800
small tweaks
lailabougria Feb 11, 2022
72ea858
Update nservicebus/pipeline/steps-stages-connectors_extensionbag_core…
andreasohlund Feb 11, 2022
6877d00
Update nservicebus/pipeline/steps-stages-connectors_incoming_core_[8.…
andreasohlund Feb 11, 2022
a610404
Update nservicebus/pipeline/steps-stages-connectors_recoverability_co…
andreasohlund Feb 11, 2022
b6ee189
Update nservicebus/pipeline/steps-stages-connectors_incoming_core_[6,…
andreasohlund Feb 11, 2022
d99a6c1
Make ctor public
andreasohlund Feb 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Snippets/Core/Core_8/Audit/AddAuditData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class CustomAuditDataBehavior : Behavior<IAuditContext>
{
public override Task Invoke(IAuditContext context, Func<Task> next)
{
context.AddAuditData("myKey", "myValue");
context.AuditMetadata["myKey"] = "MyValue";
return next();
}
}
Expand Down
2 changes: 1 addition & 1 deletion Snippets/Core/Core_8/Core_8.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.AspNet.Mvc" Version="5.2.7" />
<PackageReference Include="Microsoft.ServiceFabric.Services" Version="2.*" />
<PackageReference Include="NServiceBus" Version="8.0.0-alpha.1902" />
<PackageReference Include="NServiceBus" Version="8.0.0-alpha.3019" />
<PackageReference Include="NServiceBus.Callbacks" Version="3.*" />
<PackageReference Include="NServiceBus.Encryption.MessageProperty" Version="2.*" />
<PackageReference Include="NServiceBus.Metrics.ServiceControl" Version="3.*" />
Expand Down
2 changes: 1 addition & 1 deletion Snippets/Core/Core_8/Pipeline/CustomForkConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ await next()
var auditAddress = "AuditAddress";

// Fork into new pipeline
await fork(this.CreateAuditContext(message, auditAddress, context))
await fork(this.CreateAuditContext(message, auditAddress, TimeSpan.FromHours(1), context))
.ConfigureAwait(false);
}
}
Expand Down
62 changes: 62 additions & 0 deletions Snippets/Core/Core_8/Recoverability/Pipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
namespace Core8.Recoverability
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Pipeline;

#region custom-recoverability-action

public class EnableExternalBodyStorageBehavior : Behavior<IRecoverabilityContext>
{
private readonly IExternalBodyStorage storage;

public EnableExternalBodyStorageBehavior(IExternalBodyStorage storage)
{
this.storage = storage;
}

public async override Task Invoke(IRecoverabilityContext context, Func<Task> next)
{
if(context.RecoverabilityAction is MoveToError errorAction)
{
var message = context.ErrorContext.Message;
var bodyUrl = await storage.StoreBody(message.MessageId, message.Body);

context.Metadata["body-url"] = bodyUrl;

context.RecoverabilityAction = new SkipFailedMessageBody(errorAction.ErrorQueue);
}

await next();
}

class SkipFailedMessageBody : MoveToError
{
public SkipFailedMessageBody(string errorQueue) : base(errorQueue)
{
}

public override IReadOnlyCollection<IRoutingContext> GetRoutingContexts(IRecoverabilityActionContext context)
{
var routingContexts = base.GetRoutingContexts(context);

foreach (var routingContext in routingContexts)
{
// clear out the message body
routingContext.Message.UpdateBody(ReadOnlyMemory<byte>.Empty);
}

return routingContexts;
}
}
}

#endregion

public interface IExternalBodyStorage
{
Task<string> StoreBody(string messageId, ReadOnlyMemory<byte> body);
}
}
2 changes: 2 additions & 0 deletions menu/menu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@
Title: Custom Policy
- Url: nservicebus/recoverability/subscribing-to-error-notifications
Title: Error notifications
- Url: nservicebus/recoverability/pipeline
Title: Recoverability pipeline
- Url: nservicebus/pipeline
Title: Pipeline
Articles:
Expand Down
15 changes: 5 additions & 10 deletions nservicebus/pipeline/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,24 @@
title: Message Handling Pipeline
summary: Overview of the message handling pipeline
reviewed: 2020-05-07
component: Core
redirects:
- nservicebus/nservicebus-pipeline-intro
related:
- samples/header-manipulation
- samples/pipeline/fix-messages-using-behavior
---

NServiceBus has the concept of a _pipeline_ which refers to the series of actions taken when an incoming message is processed and an outgoing message is sent.
partial: intro

## Customizing the pipeline
## Pipeline customization

There are several ways to customize this pipeline with varying levels of complexity.
There are several ways to customize the pipelines with varying levels of complexity.

* [Manipulate the pipeline with behaviors](/nservicebus/pipeline/manipulate-with-behaviors.md)
* [Steps, stages and connectors](/nservicebus/pipeline/steps-stages-connectors.md)
* [Message mutators](/nservicebus/pipeline/message-mutators.md)
* [Abort the pipeline](/nservicebus/pipeline/aborting.md)
* [React to pipeline events](/nservicebus/pipeline/events.md)

Unit testing of custom extensions is supported by [the `NServiceBus.Testing` library](/nservicebus/testing/#testing-a-behavior).

## Features built using the pipeline

* [Data Bus](/nservicebus/messaging/databus/)
* [Message Property Encryption](/nservicebus/security/property-encryption.md)
* [Recoverability](/nservicebus/recoverability/)
Unit testing of custom extensions is supported by [the `NServiceBus.Testing` library](/nservicebus/testing/#testing-a-behavior).
1 change: 1 addition & 0 deletions nservicebus/pipeline/index_intro_core_[6,8).partial.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
NServiceBus uses the concept of _pipelines_. A pipeline refers to the series of actions taken as a result of the triggering action, e.g. an incoming message triggers the incoming pipeline, or an outgoing message triggers the outgoing pipeline.
7 changes: 7 additions & 0 deletions nservicebus/pipeline/index_intro_core_[8,).partial.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
NServiceBus uses the concept of _pipelines_. A pipeline refers to the series of actions taken as a result of the triggering action.

For example:

* The incoming pipeline is triggered due to an incoming message
* The outgoing pipeline is triggered due to an outgoing message
* The [recoverability](/nservicebus/recoverability/) pipeline is triggered due to failure during processing
17 changes: 13 additions & 4 deletions nservicebus/pipeline/steps-stages-connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@ related:
- nservicebus/pipeline/manipulate-with-behaviors
---

NServiceBus has the concept of a pipeline execution order that is executed when a message is received or dispatched. A *pipeline* refers to the series of actions taken when an incoming message is processed or an outgoing message is sent. This allows users to take full control of the incoming and outgoing message processing.
partial: intro

There are two explicit pipelines: one for the outgoing messages and one for the incoming messages.
partial: stages

Each pipeline is composed of *steps*. A step is an identifiable value in the pipeline used to programmatically define order of execution. Each step represents a behavior which will be executed at the given place within the pipeline. To add additional behavior to the pipeline by registering a new step or replace the behavior of an existing step with the custom implementation.
partial: incoming

partial: outgoing

partial: recoverability

partial: optional

partial: extensionbag

partial: connectors

partial: extra
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
## Stage connectors

Stage connectors connect from the current stage (i.e. `IOutgoingLogicalMessageContext`) to another stage (i.e. `IOutgoingPhysicalMessageContext`). In order to override an existing stage, inherit from `StageConnector<TFromContext, TToContext>` and then replace an existing stage connector. Most pipeline extensions can be done by inheriting from `Behavior<TContext>`. It is rarely necessary to replace existing stage connectors. When implementing a stage connector, ensure that all required data is passed along for the next stage.

snippet: CustomStageConnector

## Fork Connectors

```mermaid
graph LR
subgraph Root pipeline
A[TFromContext] --- B{Fork Connector}
B --- C[TFromContext]
end
subgraph Forked pipeline
B --> D[TForkContext]
end
```

Fork connectors fork from a current stage (i.e. `IIncomingPhysicalMessageContext`) to another independent pipeline (i.e. `IAuditContext`). In order to override an existing fork connector inherit from `ForkConnector<TFromContext, TForkContext>` and then replace an existing fork connector.

snippet: CustomForkConnector

## Stage Fork Connector

```mermaid
graph LR
subgraph Root stage
A[TFromContext] --- B{StageFork<br/>Connector}
B --- C[TToContext]
end
subgraph Forked pipeline
B --> D[TForkContext]
end
```

Stage fork connectors are essentially a combination of a stage connector and a fork connector. They have the ability to connect from the current stage (i.e. `ITransportReceiveContext`) to another stage (i.e. `IIncomingPhysicalMessageContext`) and fork to another independent pipeline (i.e. `IBatchedDispatchContext`). In order to override an existing stage fork connector inherit from `StageForkConnector<TFromContext, TToContext, TForkContext` and then replace an existing stage fork connector.

snippet: CustomStageForkConnector
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## Extension bag

Pipeline contexts have an extension bag which can be used to used to create, read, update or delete custom state with a key identifier. For example, this can be used to *set* metadata- or pipeline-specific state in an incoming behavior that can be used in later pipeline stages if needed. State stored via the extension bag will be unavailable once the extension bag runs out of scope at the end of the pipeline.

State set during a *forked* pipeline will not be available to the *forking* pipeline. For example, state changes during the *outgoing* pipeline will not be available in the *incoming* pipeline. If state has to be propagated, have the *forking* pipeline set a context object that the *forked* pipeline later can get and modify as needed.

snippet: SetContextBetweenIncomingAndOutgoing

This file was deleted.

Loading