Skip to content

Commit

Permalink
update the pm almost there
Browse files Browse the repository at this point in the history
  • Loading branch information
mkcoder committed Feb 7, 2020
1 parent 6564eaf commit 0025253
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 7 deletions.
17 changes: 17 additions & 0 deletions SchoolBook/SchoolBook.Admin/SchoolBook.Admin.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SchoolBook.Admin", "SchoolBook.Admin.csproj", "{CF32E94B-FDAC-46BA-B387-0A3AC1911FCA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{CF32E94B-FDAC-46BA-B387-0A3AC1911FCA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CF32E94B-FDAC-46BA-B387-0A3AC1911FCA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CF32E94B-FDAC-46BA-B387-0A3AC1911FCA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CF32E94B-FDAC-46BA-B387-0A3AC1911FCA}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Linq;
using lifebook.core.eventstore.domain.models;
using lifebook.core.processmanager.Syntax;
using Newtonsoft.Json.Linq;
using NUnit.Framework;

namespace lifebook.core.processmanager.tests.SyntaxTests
Expand Down Expand Up @@ -49,5 +50,13 @@ public void ProcessManager_Can_Be_Configured_To_Only_Have_Many_Step()
.Configuration;
Assert.AreEqual(3, configuration.GetProcessManagerSteps.Count);
}

[Test]
public void BasicTest()
{
dynamic viewbag = new JObject();
viewbag.Quote = JObject.FromObject(new object());
System.Console.WriteLine(viewbag.ToString());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using lifebook.core.cqrses.Domains;
using Newtonsoft.Json.Linq;

namespace lifebook.core.processmanager.Aggregates
{
internal class ModelEvent
{
public string EventName { get; internal set; }
public int EventVersion { get; internal set; }
public JObject Data { get; internal set; }

internal AggregateEvent Merge(AggregateEvent aggregateEvent)
{
var deepCopy = JObject.FromObject(aggregateEvent).DeepClone().ToObject<AggregateEvent>();
deepCopy.Data = new eventstore.domain.models.Data(ObjectToByteArray(Data));
deepCopy.EventName = EventName;
deepCopy.EventVersion = EventVersion;
return deepCopy;
}

private static byte[] ObjectToByteArray(Object obj)
{
BinaryFormatter bf = new BinaryFormatter();
using (var ms = new MemoryStream())
{
bf.Serialize(ms, obj);
return ms.ToArray();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,40 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Reflection;
using lifebook.core.cqrses.Domains;
using lifebook.core.messagebus.Models;
using lifebook.core.processmanager.Attributes;
using lifebook.core.processmanager.Models;
using lifebook.core.processmanager.ProcessStates;
using lifebook.core.processmanager.ProcessStates.ProcessSetup;
using lifebook.core.processmanager.Syntax;
using Newtonsoft.Json.Linq;

namespace lifebook.core.processmanager.Aggregates
{
// TODO: initalizing the ProcessManager
internal class ProcessManager
{
private static Dictionary<string, MethodInfo> eventNameToMethods = new Dictionary<string, MethodInfo>();
private static object _lock = new object();
private List<ModelEvent> _uncommitedEvents = new List<ModelEvent>();
internal ReadOnlyCollection<ModelEvent> GetUncommitedEvents => _uncommitedEvents.AsReadOnly();
private readonly Guid processId;
private List<ProcessStep> _steps = new List<ProcessStep>();
private ProcessStep.ProcessStepBuilder _currentStep;

public void Handle(List<AggregateEvent> events)
public ProcessManager(Guid processId)
{
this.processId = processId;
}
// aggregate properties
private bool _initalized = false;
private ProcessIdentity _processIdentity;
private dynamic _queueInfo;

internal void Handle(List<AggregateEvent> events)
{
BuildEventNameToMethodInfoDictionary();
foreach (var evt in events)
Expand All @@ -24,6 +46,97 @@ public void Handle(List<AggregateEvent> events)
}
}

internal bool AmIFirstStep()
{
return _initalized;
}

internal void InitalizeProcess(EventBusMessage<ProcessStateMessageDto> a, SetupProcess pRequest)
{
_initalized = true;
_processIdentity = pRequest.ProcessIdentity;
_queueInfo = new
{
a.ExchangeName,
a.RoutingKey,
a.MessageName
};
CreateUncommitedEvent("ProcessInitalized", 1, new JObject()
{
["Initalized"] = true,
["ProcessIdentity"] = JObject.FromObject(_processIdentity),
["QueueInfo"] = JObject.FromObject(_queueInfo),
["Steps"] = CreateStepsForInitalProcess(a, pRequest),
["Key"] = processId,
});
}

internal void ChangeProcessData(dynamic getViewBag)
{
CreateUncommitedEvent("ProcessDataChanged", 1, new JObject()
{
["Data"] = JObject.FromObject(getViewBag)
});
}

private JToken CreateStepsForInitalProcess(EventBusMessage<ProcessStateMessageDto> a, SetupProcess pRequest)
{
var stepNames = pRequest.ProcessManager.EventNameToProcessStepDictionary.Select(p => new { StepName = p.Key, Step = p.Value});

return JArray.FromObject(
stepNames.Select(kv => ProcessStep.ProcessStepBuilder
.WithRequiredProperties(a.Data.AggregateEvent.CorrelationId, kv.StepName, kv.Step.EventSpecifier))
);
}

internal void InitalizeProcessStep(ProcessManagerStep action, AggregateEvent a)
{
_currentStep = ProcessStep.ProcessStepBuilder.WithRequiredProperties(a.CorrelationId, action.StepDescription, action.EventSpecifier);
_currentStep
.Initalized()
.CausedBy(a)
.ChangeProcessStatus(ProcessStepStatus.Started);
CreateUncommitedEvent("ProcessStepInitalized", 1, JObject.FromObject(_currentStep.Build()));
}

internal void CompleteProcessStep()
{
_currentStep.ChangeProcessStatus(ProcessStepStatus.Completed);
var processStep = _currentStep.Build();

CreateUncommitedEvent("ProcessStepCompleted", 1, new JObject()
{
["StepName"] = processStep.StepName,
["Status"] = processStep.Status.ToString(),
["StatusInt"] = (int)processStep.Status
});
}

internal void FailProcessStep(Exception exception)
{
_currentStep.AddException(exception);
_currentStep.ChangeProcessStatus(ProcessStepStatus.Failed);
var processStep = _currentStep.Build();
CreateUncommitedEvent("ProcessStepFailed", 1, new JObject()
{
["StepName"] = processStep.StepName,
["Status"] = processStep.Status.ToString(),
["StatusInt"] = (int)processStep.Status
});
}

private void CreateUncommitedEvent(string eventName, int eventVersion, JObject data)
{
_uncommitedEvents.Add(
new ModelEvent()
{
EventName = eventName,
EventVersion = eventVersion,
Data = data
}
);
}

private void BuildEventNameToMethodInfoDictionary()
{
if (eventNameToMethods.Count == 0)
Expand All @@ -32,7 +145,7 @@ private void BuildEventNameToMethodInfoDictionary()
{
if (eventNameToMethods.Count == 0)
{
var mi = GetType().GetMethods(System.Reflection.BindingFlags.NonPublic)
var mi = GetType().GetMethods(BindingFlags.NonPublic)
.Where(m => m.GetCustomAttributes(typeof(UponProcessEvent), false).Count() > 0);

foreach (var m in mi)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System;
using System.Collections.Generic;
using lifebook.core.cqrses.Domains;
using lifebook.core.eventstore.domain.models;

namespace lifebook.core.processmanager.Aggregates
{
internal class ProcessStep
{
public bool Initiated { get; internal set; }
public Guid CorrelationId { get; internal set; }
public Guid? CausationId { get; internal set; }
public string StepName { get; internal set; }
public EventSpecifier[] EventSpecifier { get; internal set; }
public AggregateEvent CausedBy { get; internal set; }
public List<Exception> Exceptions { get; internal set; } = new List<Exception>();
public ProcessStepStatus Status { get; internal set; }
public int Hash { get => HashCode.ToHashCode(); }
public HashCode HashCode
{
get
{
var hashCode = new HashCode();
hashCode.Add($"{DateTime.Now.ToLongTimeString()}");
hashCode.Add($"{CorrelationId}");
hashCode.Add($"{Initiated}");
return HashCode;
}
}

private ProcessStep() { }

public class ProcessStepBuilder
{
private readonly ProcessStep _processStep;

private ProcessStepBuilder(ProcessStep processStep)
{
_processStep = processStep;
}

public static ProcessStepBuilder UsingProcessStep(ProcessStep processStep)
{
return new ProcessStepBuilder(processStep);
}

public static ProcessStepBuilder WithRequiredProperties(Guid correlationId, string stepName, EventSpecifier[] eventSpecifier)
{
var processStep = new ProcessStep();
processStep.CorrelationId = correlationId;
processStep.StepName = stepName;
processStep.EventSpecifier = eventSpecifier;
return new ProcessStepBuilder(processStep);
}

public ProcessStepBuilder Initalized()
{
_processStep.Initiated = true;
return this;
}

public ProcessStepBuilder CausedBy(AggregateEvent aggregateEvent)
{
_processStep.CausedBy = aggregateEvent;
_processStep.CausationId = aggregateEvent.CausationId;
return this;
}


public ProcessStepBuilder AddException(Exception exception)
{
_processStep.Exceptions.Add(exception);
return this;
}

public ProcessStepBuilder ChangeProcessStatus(ProcessStepStatus status)
{
_processStep.Status = status;
return this;
}

internal ProcessStep Build()
{
return _processStep;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
public enum ProcessStepStatus
{
Failed,
Completed,
Started
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Castle.MicroKernel;
Expand All @@ -25,23 +27,53 @@ public async Task<ProcessSetupCompleted> Handle(SetupProcess request, Cancellati
var bus = request.ProcessManager.ProcessManagerServices.Messagebus.TryConnectingDirectlyToQueue(request.ProcessManager.ProcessManagerServices.MessageQueueInformation);
bus.Subscribe<ProcessStateMessageDto>(request.ProcessManager.ProcessManagerServices.MessageQueueInformation, async a =>
{
var aggregate = new ProcessManager();
DetermineProccessIdFromEvent(a.Data.AggregateEvent, out Guid pid);
var aggregate = new ProcessManager(pid);
var category =
new StreamCategorySpecifier(request.ProcessManager.
ProcessManagerServices.ServiceName, request.ProcessManager.ProcessManagerServices.Instance, "Process", pid);
var events = await request.ProcessManager.ProcessManagerServices.EventReader
.ReadAllEventsFromSingleStreamCategoryAsync<AggregateEventCreator, AggregateEvent>(
new StreamCategorySpecifier(request.ProcessManager.ProcessManagerServices.ServiceName, request.ProcessManager.ProcessManagerServices.Instance, "Process", pid)
.ReadAllEventsFromSingleStreamCategoryAsync<AggregateEventCreator, AggregateEvent>(category
);
aggregate.Handle(events);
if(aggregate.AmIFirstStep())
{
aggregate.InitalizeProcess(a, request);
}

if (request.ProcessManager.EventNameToProcessStepDictionary.ContainsKey(a.Data.AggregateEvent.EventName))
{
var action = request.ProcessManager.EventNameToProcessStepDictionary[a.Data.AggregateEvent.EventName];
try
{
aggregate.InitalizeProcessStep(action, a.Data.AggregateEvent);
await action.StepAction(a.Data.AggregateEvent);
aggregate.CompleteProcessStep();
aggregate.ChangeProcessData(request.ProcessManager.GetViewBag);
}
catch (Exception ex)
{
// TODO: build in resilit logic here
aggregate.FailProcessStep();
}
}

var commitEvents = aggregate.GetUncommitedEvents.Select(me => me.Merge(a.Data.AggregateEvent));
await request.ProcessManager.ProcessManagerServices.EventWriter.WriteEventAsync(category, (List<Event>)commitEvents);
});
return new ProcessSetupCompleted();
}

private void DetermineProccessIdFromEvent(AggregateEvent a, out Guid pid)
{
if (a.ProcessId == null)
{
pid = a.EventId;

pid = a.ProcessId.Value;
}
else
{
pid = a.ProcessId.Value;
}
}
}
}
Loading

0 comments on commit 0025253

Please sign in to comment.