Skip to content

[WIP] Enable External SDK #794

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
36 changes: 36 additions & 0 deletions src/DurableSDK/Commands/GetDurableTaskResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

#pragma warning disable 1591 // Missing XML comment for publicly visible type or member 'member'

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Commands
{
using System.Collections;
using System.Management.Automation;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Tasks;

[Cmdlet("Get", "DurableTaskResult")]
public class GetDurableTaskResultCommand : PSCmdlet
{
[Parameter(Mandatory = true)]
[ValidateNotNull]
public DurableTask[] Task { get; set; }

private readonly DurableTaskHandler _durableTaskHandler = new DurableTaskHandler();

protected override void EndProcessing()
{
var privateData = (Hashtable)MyInvocation.MyCommand.Module.PrivateData;
var context = (OrchestrationContext)privateData[SetFunctionInvocationContextCommand.ContextKey];

_durableTaskHandler.GetTaskResult(Task, context, WriteObject);
}

protected override void StopProcessing()
{
_durableTaskHandler.Stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ protected override void EndProcessing()
{
var privateData = (Hashtable)MyInvocation.MyCommand.Module.PrivateData;
var context = (OrchestrationContext)privateData[SetFunctionInvocationContextCommand.ContextKey];
var loadedFunctions = FunctionLoader.GetLoadedFunctions();

var loadedFunctions = FunctionLoader.GetLoadedFunctions();
var task = new ActivityInvocationTask(FunctionName, Input, RetryOptions);
ActivityInvocationTask.ValidateTask(task, loadedFunctions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Commands
{
using System.Collections;
using System.Management.Automation;
using Microsoft.PowerShell.Commands;

/// <summary>
/// Set the orchestration context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
using System.Collections;
using System.Collections.Generic;
using System.Management.Automation;
using System.Threading;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Tasks;
using Utility;
using Microsoft.PowerShell.Commands;

internal class DurableTaskHandler
{
Expand Down Expand Up @@ -47,6 +49,7 @@ public void StopAndInitiateDurableTaskOrReplay(
}

completedHistoryEvent.IsProcessed = true;
context.IsReplaying = completedHistoryEvent.IsPlayed;

switch (completedHistoryEvent.EventType)
{
Expand All @@ -57,6 +60,13 @@ public void StopAndInitiateDurableTaskOrReplay(
output(eventResult);
}
break;
case HistoryEventType.EventRaised:
var eventRaisedResult = GetEventResult(completedHistoryEvent);
if (eventRaisedResult != null)
{
output(eventRaisedResult);
}
break;

case HistoryEventType.TaskFailed:
if (retryOptions == null)
Expand All @@ -76,7 +86,7 @@ public void StopAndInitiateDurableTaskOrReplay(
retryOptions.MaxNumberOfAttempts,
onSuccess:
result => {
output(TypeExtensions.ConvertFromJson(result));
output(ConvertFromJson(result));
},
onFailure);

Expand Down Expand Up @@ -126,6 +136,7 @@ public void WaitAll(
var allTasksCompleted = completedEvents.Count == tasksToWaitFor.Count;
if (allTasksCompleted)
{
context.IsReplaying = completedEvents.Count == 0 ? false : completedEvents[0].IsPlayed;
CurrentUtcDateTimeUpdater.UpdateCurrentUtcDateTime(context);

foreach (var completedHistoryEvent in completedEvents)
Expand Down Expand Up @@ -164,6 +175,7 @@ public void WaitAny(
if (scheduledHistoryEvent != null)
{
scheduledHistoryEvent.IsProcessed = true;
scheduledHistoryEvent.IsPlayed = true;
}

if (completedHistoryEvent != null)
Expand All @@ -179,12 +191,14 @@ public void WaitAny(
}

completedHistoryEvent.IsProcessed = true;
completedHistoryEvent.IsPlayed = true;
}
}

var anyTaskCompleted = completedTasks.Count > 0;
if (anyTaskCompleted)
{
context.IsReplaying = context.History[firstCompletedHistoryEventIndex].IsPlayed;
CurrentUtcDateTimeUpdater.UpdateCurrentUtcDateTime(context);
// Return a reference to the first completed task
output(firstCompletedTask);
Expand All @@ -195,6 +209,21 @@ public void WaitAny(
}
}

public void GetTaskResult(
IReadOnlyCollection<DurableTask> tasksToQueryResultFor,
OrchestrationContext context,
Action<object> output)
{
foreach (var task in tasksToQueryResultFor) {
var scheduledHistoryEvent = task.GetScheduledHistoryEvent(context, true);
var processedHistoryEvent = task.GetCompletedHistoryEvent(context, scheduledHistoryEvent, true);
if (processedHistoryEvent != null)
{
output(GetEventResult(processedHistoryEvent));
}
}
}

public void Stop()
{
_waitForStop.Set();
Expand All @@ -206,15 +235,41 @@ private static object GetEventResult(HistoryEvent historyEvent)

if (historyEvent.EventType == HistoryEventType.TaskCompleted)
{
return TypeExtensions.ConvertFromJson(historyEvent.Result);
return ConvertFromJson(historyEvent.Result);
}
else if (historyEvent.EventType == HistoryEventType.EventRaised)
{
return TypeExtensions.ConvertFromJson(historyEvent.Input);
return ConvertFromJson(historyEvent.Input);
}
return null;
}

public static object ConvertFromJson(string json)
{
object retObj = JsonObject.ConvertFromJson(json, returnHashtable: true, error: out _);

if (retObj is PSObject psObj)
{
retObj = psObj.BaseObject;
}

if (retObj is Hashtable hashtable)
{
try
{
// ConvertFromJson returns case-sensitive Hashtable by design -- JSON may contain keys that only differ in case.
// We try casting the Hashtable to a case-insensitive one, but if that fails, we keep using the original one.
retObj = new Hashtable(hashtable, StringComparer.OrdinalIgnoreCase);
}
catch
{
retObj = hashtable;
}
}

return retObj;
}

private void InitiateAndWaitForStop(OrchestrationContext context)
{
context.OrchestrationActionCollector.Stop();
Expand Down
26 changes: 26 additions & 0 deletions src/DurableSDK/ExternalInvoker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
using System.Collections;
using System.Management.Automation;

internal class ExternalInvoker : IExternalInvoker
{
private readonly Func<PowerShell, object> _externalSDKInvokerFunction;

public ExternalInvoker(Func<PowerShell, object> invokerFunction)
{
_externalSDKInvokerFunction = invokerFunction;
}

public Hashtable Invoke(IPowerShellServices powerShellServices)
{
return (Hashtable)_externalSDKInvokerFunction.Invoke(powerShellServices.GetPowerShell());
}
}
}
File renamed without changes.
File renamed without changes.
16 changes: 16 additions & 0 deletions src/DurableSDK/IExternalInvoker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System.Collections;

// Represents a contract for the
internal interface IExternalInvoker
{
// Method to invoke an orchestration using the external Durable SDK
Hashtable Invoke(IPowerShellServices powerShellServices);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
internal interface IOrchestrationInvoker
{
Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowerShellServices pwsh);
void SetExternalInvoker(IExternalInvoker externalInvoker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,26 @@

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
using System;
using System.Management.Automation;

internal interface IPowerShellServices
{
PowerShell GetPowerShell();

bool UseExternalDurableSDK();

void SetDurableClient(object durableClient);

void SetOrchestrationContext(OrchestrationContext orchestrationContext);
OrchestrationBindingInfo SetOrchestrationContext(ParameterBinding context, out IExternalInvoker externalInvoker);

void ClearOrchestrationContext();

void TracePipelineObject();

void AddParameter(string name, object value);

IAsyncResult BeginInvoke(PSDataCollection<object> output);

void EndInvoke(IAsyncResult asyncResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
using System.Threading;

using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions;
using Newtonsoft.Json;

internal class OrchestrationActionCollector
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ public class OrchestrationContext
public object Input { get; internal set; }

[DataMember]
internal string InstanceId { get; set; }
public string InstanceId { get; set; }

[DataMember]
internal string ParentInstanceId { get; set; }

[DataMember]
internal bool IsReplaying { get; set; }
public bool IsReplaying { get; set; }

[DataMember]
internal HistoryEvent[] History { get; set; }
Expand Down
69 changes: 69 additions & 0 deletions src/DurableSDK/PowerShellExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

using System.Collections;
using System.Collections.ObjectModel;

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System.Management.Automation;

internal static class PowerShellExtensions
{
public static void InvokeAndClearCommands(this PowerShell pwsh)
{
try
{
pwsh.Invoke();
}
finally
{
pwsh.Streams.ClearStreams();
pwsh.Commands.Clear();
}
}

public static void InvokeAndClearCommands(this PowerShell pwsh, IEnumerable input)
{
try
{
pwsh.Invoke(input);
}
finally
{
pwsh.Streams.ClearStreams();
pwsh.Commands.Clear();
}
}

public static Collection<T> InvokeAndClearCommands<T>(this PowerShell pwsh)
{
try
{
var result = pwsh.Invoke<T>();
return result;
}
finally
{
pwsh.Streams.ClearStreams();
pwsh.Commands.Clear();
}
}

public static Collection<T> InvokeAndClearCommands<T>(this PowerShell pwsh, IEnumerable input)
{
try
{
var result = pwsh.Invoke<T>(input);
return result;
}
finally
{
pwsh.Streams.ClearStreams();
pwsh.Commands.Clear();
}
}
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
Loading