Skip to content

Commit

Permalink
Binary data binding (Azure#17728)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLove-msft authored and annelo-msft committed Feb 17, 2021
1 parent 4ff3601 commit 3580b31
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public void Initialize(ExtensionConfigContext context)
.AddConverter<EventData, string>(ConvertEventDataToString)
.AddConverter<byte[], EventData>(ConvertBytes2EventData)
.AddConverter<EventData, byte[]>(ConvertEventDataToBytes)
.AddConverter<BinaryData, EventData>(ConvertBinaryDataToEventData)
.AddConverter<EventData, BinaryData>(ConvertEventDataToBinaryData)
.AddOpenConverter<OpenType.Poco, EventData>(ConvertPocoToEventData);

// register our trigger binding provider
Expand Down Expand Up @@ -114,5 +116,11 @@ private static Task<object> ConvertPocoToEventData(object arg, Attribute attrRes
{
return Task.FromResult<object>(ConvertStringToEventData(JsonConvert.SerializeObject(arg)));
}

private static EventData ConvertBinaryDataToEventData(BinaryData input)
=> new EventData(input);

private static BinaryData ConvertEventDataToBinaryData(EventData input)
=> input.EventBody;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Microsoft.Azure.WebJobs.Host.EndToEndTests
{
[NonParallelizable]
[LiveOnly]
public class EventHubEndToEndTests: WebJobsEventHubTestBase
public class EventHubEndToEndTests : WebJobsEventHubTestBase
{
private static EventWaitHandle _eventWait;
private static List<string> _results;
Expand All @@ -48,7 +48,7 @@ public async Task EventHub_PocoBinding()
using (jobHost)
{
var method = typeof(EventHubTestBindToPocoJobs).GetMethod(nameof(EventHubTestBindToPocoJobs.SendEvent_TestHub), BindingFlags.Static | BindingFlags.Public);
await jobHost.CallAsync(method, new { input = "{ Name: 'foo', Value: '" + _testId +"' }" });
await jobHost.CallAsync(method, new { input = "{ Name: 'foo', Value: '" + _testId + "' }" });

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
Expand Down Expand Up @@ -90,6 +90,26 @@ public async Task EventHub_SingleDispatch()
Assert.True(result);
}

AssertSingleDispatchLogs(host);
}

[Test]
public async Task EventHub_SingleDispatch_BinaryData()
{
var (jobHost, host) = BuildHost<EventHubTestSingleDispatchJobsBinaryData>();
using (jobHost)
{
await jobHost.CallAsync(nameof(EventHubTestSingleDispatchJobsBinaryData.SendEvent_TestHub), new { input = _testId });

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
}

AssertSingleDispatchLogs(host);
}

private static void AssertSingleDispatchLogs(IHost host)
{
IEnumerable<LogMessage> logMessages = host.GetTestLoggerProvider()
.GetAllLogMessages();

Expand Down Expand Up @@ -165,14 +185,35 @@ public async Task EventHub_MultipleDispatch()
var (jobHost, host) = BuildHost<EventHubTestMultipleDispatchJobs>();
using (jobHost)
{
var method = typeof(EventHubTestMultipleDispatchJobs).GetMethod("SendEvents_TestHub", BindingFlags.Static | BindingFlags.Public);
var method = typeof(EventHubTestMultipleDispatchJobs).GetMethod(nameof(EventHubTestMultipleDispatchJobs.SendEvents_TestHub), BindingFlags.Static | BindingFlags.Public);
int numEvents = 5;
await jobHost.CallAsync(method, new { numEvents = numEvents, input = _testId });

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
}

AssertMultipleDispatchLogs(host);
}

[Test]
public async Task EventHub_MultipleDispatch_BinaryData()
{
var (jobHost, host) = BuildHost<EventHubTestMultipleDispatchJobsBinaryData>();
using (jobHost)
{
int numEvents = 5;
await jobHost.CallAsync(nameof(EventHubTestMultipleDispatchJobsBinaryData.SendEvents_TestHub), new { numEvents = numEvents, input = _testId });

bool result = _eventWait.WaitOne(Timeout);
Assert.True(result);
}

AssertMultipleDispatchLogs(host);
}

private static void AssertMultipleDispatchLogs(IHost host)
{
IEnumerable<LogMessage> logMessages = host.GetTestLoggerProvider()
.GetAllLogMessages();

Expand Down Expand Up @@ -235,6 +276,26 @@ public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] string evt,
}
}

public class EventHubTestSingleDispatchJobsBinaryData
{
public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out BinaryData evt)
{
evt = new BinaryData(input);
}

public static void ProcessSingleEvent([EventHubTrigger(TestHubName)] BinaryData evt,
string partitionKey, DateTime enqueuedTimeUtc, IDictionary<string, object> properties,
IDictionary<string, object> systemProperties)
{
// filter for the ID the current test is using
if (evt.ToString() == _testId)
{
Assert.True((DateTime.Now - enqueuedTimeUtc).TotalSeconds < 30);
_eventWait.Set();
}
}
}

public class EventHubTestBindToPocoJobs
{
public static void SendEvent_TestHub(string input, [EventHub(TestHubName)] out EventData evt)
Expand Down Expand Up @@ -312,6 +373,39 @@ public static void ProcessMultipleEvents([EventHubTrigger(TestHubName)] string[]
}
}

public class EventHubTestMultipleDispatchJobsBinaryData
{
private static int s_eventCount;
private static int s_processedEventCount;
public static void SendEvents_TestHub(int numEvents, string input, [EventHub(TestHubName)] out BinaryData[] events)
{
s_eventCount = numEvents;
events = new BinaryData[numEvents];
for (int i = 0; i < numEvents; i++)
{
events[i] = new BinaryData(input);
}
}

public static void ProcessMultipleEventsBinaryData([EventHubTrigger(TestHubName)] BinaryData[] events,
string[] partitionKeyArray, DateTime[] enqueuedTimeUtcArray, IDictionary<string, object>[] propertiesArray,
IDictionary<string, object>[] systemPropertiesArray)
{
Assert.AreEqual(events.Length, partitionKeyArray.Length);
Assert.AreEqual(events.Length, enqueuedTimeUtcArray.Length);
Assert.AreEqual(events.Length, propertiesArray.Length);
Assert.AreEqual(events.Length, systemPropertiesArray.Length);

s_processedEventCount += events.Length;

// filter for the ID the current test is using
if (events[0].ToString() == _testId && s_processedEventCount == s_eventCount)
{
_eventWait.Set();
}
}
}

public class EventHubPartitionKeyTestJobs
{
// send more events per partition than the EventHubsOptions.MaxBatchSize
Expand Down

0 comments on commit 3580b31

Please sign in to comment.