From 0025253cb658b6ce82cf0c9dd498e306f142a634 Mon Sep 17 00:00:00 2001 From: muhammad k khan Date: Fri, 7 Feb 2020 12:48:57 -0600 Subject: [PATCH] update the pm almost there --- .../SchoolBook.Admin/SchoolBook.Admin.sln | 17 +++ .../SyntaxTests/BasicSyntaxTests.cs | 9 ++ .../Aggregates/ModelEvent.cs | 34 +++++ .../Aggregates/ProcessManager.cs | 117 +++++++++++++++++- .../Aggregates/ProcessStep.cs | 88 +++++++++++++ .../Aggregates/ProcessStepStatus.cs | 6 + .../CommandHandlers/HandleSetupProcess.cs | 42 ++++++- .../Services/ProcessManager.cs | 4 + 8 files changed, 310 insertions(+), 7 deletions(-) create mode 100644 SchoolBook/SchoolBook.Admin/SchoolBook.Admin.sln create mode 100644 lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ModelEvent.cs create mode 100644 lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessStep.cs create mode 100644 lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessStepStatus.cs diff --git a/SchoolBook/SchoolBook.Admin/SchoolBook.Admin.sln b/SchoolBook/SchoolBook.Admin/SchoolBook.Admin.sln new file mode 100644 index 0000000..5ec3d53 --- /dev/null +++ b/SchoolBook/SchoolBook.Admin/SchoolBook.Admin.sln @@ -0,0 +1,17 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SchoolBook.Admin", "SchoolBook.Admin.csproj", "{CF32E94B-FDAC-46BA-B387-0A3AC1911FCA}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {CF32E94B-FDAC-46BA-B387-0A3AC1911FCA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CF32E94B-FDAC-46BA-B387-0A3AC1911FCA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CF32E94B-FDAC-46BA-B387-0A3AC1911FCA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CF32E94B-FDAC-46BA-B387-0A3AC1911FCA}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection +EndGlobal diff --git a/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager.tests/SyntaxTests/BasicSyntaxTests.cs b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager.tests/SyntaxTests/BasicSyntaxTests.cs index 791fa11..89bf186 100644 --- a/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager.tests/SyntaxTests/BasicSyntaxTests.cs +++ b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager.tests/SyntaxTests/BasicSyntaxTests.cs @@ -1,6 +1,7 @@ using System.Linq; using lifebook.core.eventstore.domain.models; using lifebook.core.processmanager.Syntax; +using Newtonsoft.Json.Linq; using NUnit.Framework; namespace lifebook.core.processmanager.tests.SyntaxTests @@ -49,5 +50,13 @@ public void ProcessManager_Can_Be_Configured_To_Only_Have_Many_Step() .Configuration; Assert.AreEqual(3, configuration.GetProcessManagerSteps.Count); } + + [Test] + public void BasicTest() + { + dynamic viewbag = new JObject(); + viewbag.Quote = JObject.FromObject(new object()); + System.Console.WriteLine(viewbag.ToString()); + } } } diff --git a/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ModelEvent.cs b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ModelEvent.cs new file mode 100644 index 0000000..b0d4af5 --- /dev/null +++ b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ModelEvent.cs @@ -0,0 +1,34 @@ +using System; +using System.IO; +using System.Runtime.Serialization.Formatters.Binary; +using lifebook.core.cqrses.Domains; +using Newtonsoft.Json.Linq; + +namespace lifebook.core.processmanager.Aggregates +{ + internal class ModelEvent + { + public string EventName { get; internal set; } + public int EventVersion { get; internal set; } + public JObject Data { get; internal set; } + + internal AggregateEvent Merge(AggregateEvent aggregateEvent) + { + var deepCopy = JObject.FromObject(aggregateEvent).DeepClone().ToObject(); + deepCopy.Data = new eventstore.domain.models.Data(ObjectToByteArray(Data)); + deepCopy.EventName = EventName; + deepCopy.EventVersion = EventVersion; + return deepCopy; + } + + private static byte[] ObjectToByteArray(Object obj) + { + BinaryFormatter bf = new BinaryFormatter(); + using (var ms = new MemoryStream()) + { + bf.Serialize(ms, obj); + return ms.ToArray(); + } + } + } +} \ No newline at end of file diff --git a/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessManager.cs b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessManager.cs index 1292c90..abb6c6f 100644 --- a/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessManager.cs +++ b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessManager.cs @@ -1,18 +1,40 @@ using System; using System.Collections.Generic; +using System.Collections.ObjectModel; using System.Linq; using System.Reflection; using lifebook.core.cqrses.Domains; +using lifebook.core.messagebus.Models; using lifebook.core.processmanager.Attributes; +using lifebook.core.processmanager.Models; +using lifebook.core.processmanager.ProcessStates; +using lifebook.core.processmanager.ProcessStates.ProcessSetup; +using lifebook.core.processmanager.Syntax; +using Newtonsoft.Json.Linq; namespace lifebook.core.processmanager.Aggregates { + // TODO: initalizing the ProcessManager internal class ProcessManager { private static Dictionary eventNameToMethods = new Dictionary(); private static object _lock = new object(); + private List _uncommitedEvents = new List(); + internal ReadOnlyCollection GetUncommitedEvents => _uncommitedEvents.AsReadOnly(); + private readonly Guid processId; + private List _steps = new List(); + private ProcessStep.ProcessStepBuilder _currentStep; - public void Handle(List events) + public ProcessManager(Guid processId) + { + this.processId = processId; + } + // aggregate properties + private bool _initalized = false; + private ProcessIdentity _processIdentity; + private dynamic _queueInfo; + + internal void Handle(List events) { BuildEventNameToMethodInfoDictionary(); foreach (var evt in events) @@ -24,6 +46,97 @@ public void Handle(List events) } } + internal bool AmIFirstStep() + { + return _initalized; + } + + internal void InitalizeProcess(EventBusMessage a, SetupProcess pRequest) + { + _initalized = true; + _processIdentity = pRequest.ProcessIdentity; + _queueInfo = new + { + a.ExchangeName, + a.RoutingKey, + a.MessageName + }; + CreateUncommitedEvent("ProcessInitalized", 1, new JObject() + { + ["Initalized"] = true, + ["ProcessIdentity"] = JObject.FromObject(_processIdentity), + ["QueueInfo"] = JObject.FromObject(_queueInfo), + ["Steps"] = CreateStepsForInitalProcess(a, pRequest), + ["Key"] = processId, + }); + } + + internal void ChangeProcessData(dynamic getViewBag) + { + CreateUncommitedEvent("ProcessDataChanged", 1, new JObject() + { + ["Data"] = JObject.FromObject(getViewBag) + }); + } + + private JToken CreateStepsForInitalProcess(EventBusMessage a, SetupProcess pRequest) + { + var stepNames = pRequest.ProcessManager.EventNameToProcessStepDictionary.Select(p => new { StepName = p.Key, Step = p.Value}); + + return JArray.FromObject( + stepNames.Select(kv => ProcessStep.ProcessStepBuilder + .WithRequiredProperties(a.Data.AggregateEvent.CorrelationId, kv.StepName, kv.Step.EventSpecifier)) + ); + } + + internal void InitalizeProcessStep(ProcessManagerStep action, AggregateEvent a) + { + _currentStep = ProcessStep.ProcessStepBuilder.WithRequiredProperties(a.CorrelationId, action.StepDescription, action.EventSpecifier); + _currentStep + .Initalized() + .CausedBy(a) + .ChangeProcessStatus(ProcessStepStatus.Started); + CreateUncommitedEvent("ProcessStepInitalized", 1, JObject.FromObject(_currentStep.Build())); + } + + internal void CompleteProcessStep() + { + _currentStep.ChangeProcessStatus(ProcessStepStatus.Completed); + var processStep = _currentStep.Build(); + + CreateUncommitedEvent("ProcessStepCompleted", 1, new JObject() + { + ["StepName"] = processStep.StepName, + ["Status"] = processStep.Status.ToString(), + ["StatusInt"] = (int)processStep.Status + }); + } + + internal void FailProcessStep(Exception exception) + { + _currentStep.AddException(exception); + _currentStep.ChangeProcessStatus(ProcessStepStatus.Failed); + var processStep = _currentStep.Build(); + CreateUncommitedEvent("ProcessStepFailed", 1, new JObject() + { + ["StepName"] = processStep.StepName, + ["Status"] = processStep.Status.ToString(), + ["StatusInt"] = (int)processStep.Status + }); + } + + private void CreateUncommitedEvent(string eventName, int eventVersion, JObject data) + { + _uncommitedEvents.Add( + new ModelEvent() + { + EventName = eventName, + EventVersion = eventVersion, + Data = data + } + ); + } + private void BuildEventNameToMethodInfoDictionary() { if (eventNameToMethods.Count == 0) @@ -32,7 +145,7 @@ private void BuildEventNameToMethodInfoDictionary() { if (eventNameToMethods.Count == 0) { - var mi = GetType().GetMethods(System.Reflection.BindingFlags.NonPublic) + var mi = GetType().GetMethods(BindingFlags.NonPublic) .Where(m => m.GetCustomAttributes(typeof(UponProcessEvent), false).Count() > 0); foreach (var m in mi) diff --git a/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessStep.cs b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessStep.cs new file mode 100644 index 0000000..1ac1772 --- /dev/null +++ b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessStep.cs @@ -0,0 +1,88 @@ +using System; +using System.Collections.Generic; +using lifebook.core.cqrses.Domains; +using lifebook.core.eventstore.domain.models; + +namespace lifebook.core.processmanager.Aggregates +{ + internal class ProcessStep + { + public bool Initiated { get; internal set; } + public Guid CorrelationId { get; internal set; } + public Guid? CausationId { get; internal set; } + public string StepName { get; internal set; } + public EventSpecifier[] EventSpecifier { get; internal set; } + public AggregateEvent CausedBy { get; internal set; } + public List Exceptions { get; internal set; } = new List(); + public ProcessStepStatus Status { get; internal set; } + public int Hash { get => HashCode.ToHashCode(); } + public HashCode HashCode + { + get + { + var hashCode = new HashCode(); + hashCode.Add($"{DateTime.Now.ToLongTimeString()}"); + hashCode.Add($"{CorrelationId}"); + hashCode.Add($"{Initiated}"); + return HashCode; + } + } + + private ProcessStep() { } + + public class ProcessStepBuilder + { + private readonly ProcessStep _processStep; + + private ProcessStepBuilder(ProcessStep processStep) + { + _processStep = processStep; + } + + public static ProcessStepBuilder UsingProcessStep(ProcessStep processStep) + { + return new ProcessStepBuilder(processStep); + } + + public static ProcessStepBuilder WithRequiredProperties(Guid correlationId, string stepName, EventSpecifier[] eventSpecifier) + { + var processStep = new ProcessStep(); + processStep.CorrelationId = correlationId; + processStep.StepName = stepName; + processStep.EventSpecifier = eventSpecifier; + return new ProcessStepBuilder(processStep); + } + + public ProcessStepBuilder Initalized() + { + _processStep.Initiated = true; + return this; + } + + public ProcessStepBuilder CausedBy(AggregateEvent aggregateEvent) + { + _processStep.CausedBy = aggregateEvent; + _processStep.CausationId = aggregateEvent.CausationId; + return this; + } + + + public ProcessStepBuilder AddException(Exception exception) + { + _processStep.Exceptions.Add(exception); + return this; + } + + public ProcessStepBuilder ChangeProcessStatus(ProcessStepStatus status) + { + _processStep.Status = status; + return this; + } + + internal ProcessStep Build() + { + return _processStep; + } + } + } +} diff --git a/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessStepStatus.cs b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessStepStatus.cs new file mode 100644 index 0000000..ec96b8c --- /dev/null +++ b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Aggregates/ProcessStepStatus.cs @@ -0,0 +1,6 @@ +public enum ProcessStepStatus +{ + Failed, + Completed, + Started +} \ No newline at end of file diff --git a/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/ProcessStates/ProcessSetup/CommandHandlers/HandleSetupProcess.cs b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/ProcessStates/ProcessSetup/CommandHandlers/HandleSetupProcess.cs index 2cd1998..62c73ed 100644 --- a/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/ProcessStates/ProcessSetup/CommandHandlers/HandleSetupProcess.cs +++ b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/ProcessStates/ProcessSetup/CommandHandlers/HandleSetupProcess.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Castle.MicroKernel; @@ -25,13 +27,39 @@ public async Task Handle(SetupProcess request, Cancellati var bus = request.ProcessManager.ProcessManagerServices.Messagebus.TryConnectingDirectlyToQueue(request.ProcessManager.ProcessManagerServices.MessageQueueInformation); bus.Subscribe(request.ProcessManager.ProcessManagerServices.MessageQueueInformation, async a => { - var aggregate = new ProcessManager(); DetermineProccessIdFromEvent(a.Data.AggregateEvent, out Guid pid); + var aggregate = new ProcessManager(pid); + var category = + new StreamCategorySpecifier(request.ProcessManager. + ProcessManagerServices.ServiceName, request.ProcessManager.ProcessManagerServices.Instance, "Process", pid); var events = await request.ProcessManager.ProcessManagerServices.EventReader - .ReadAllEventsFromSingleStreamCategoryAsync( - new StreamCategorySpecifier(request.ProcessManager.ProcessManagerServices.ServiceName, request.ProcessManager.ProcessManagerServices.Instance, "Process", pid) + .ReadAllEventsFromSingleStreamCategoryAsync(category ); aggregate.Handle(events); + if(aggregate.AmIFirstStep()) + { + aggregate.InitalizeProcess(a, request); + } + + if (request.ProcessManager.EventNameToProcessStepDictionary.ContainsKey(a.Data.AggregateEvent.EventName)) + { + var action = request.ProcessManager.EventNameToProcessStepDictionary[a.Data.AggregateEvent.EventName]; + try + { + aggregate.InitalizeProcessStep(action, a.Data.AggregateEvent); + await action.StepAction(a.Data.AggregateEvent); + aggregate.CompleteProcessStep(); + aggregate.ChangeProcessData(request.ProcessManager.GetViewBag); + } + catch (Exception ex) + { + // TODO: build in resilit logic here + aggregate.FailProcessStep(); + } + } + + var commitEvents = aggregate.GetUncommitedEvents.Select(me => me.Merge(a.Data.AggregateEvent)); + await request.ProcessManager.ProcessManagerServices.EventWriter.WriteEventAsync(category, (List)commitEvents); }); return new ProcessSetupCompleted(); } @@ -39,9 +67,13 @@ public async Task Handle(SetupProcess request, Cancellati private void DetermineProccessIdFromEvent(AggregateEvent a, out Guid pid) { if (a.ProcessId == null) + { pid = a.EventId; - - pid = a.ProcessId.Value; + } + else + { + pid = a.ProcessId.Value; + } } } } diff --git a/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Services/ProcessManager.cs b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Services/ProcessManager.cs index 4102dae..c59e748 100644 --- a/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Services/ProcessManager.cs +++ b/lifebook.core/lifebook.core.processmanager/lifebook.core.processmanager/Services/ProcessManager.cs @@ -5,6 +5,7 @@ using lifebook.core.processmanager.ProcessStates; using lifebook.core.processmanager.ProcessStates.ProcessSetup; using lifebook.core.processmanager.Syntax; +using Newtonsoft.Json.Linq; namespace lifebook.core.processmanager.Services { @@ -13,6 +14,9 @@ public abstract class ProcessManager internal readonly ProcessManagerServices ProcessManagerServices; internal Dictionary EventNameToProcessStepDictionary; internal List Subscriptions = new List(); + protected dynamic ViewBag = new JObject(); + + internal dynamic GetViewBag => ViewBag; public ProcessManager(ProcessManagerServices processManagerServices) {