Skip to content

Commit

Permalink
Design and Implement Consumer for EventGrid (Azure#14048)
Browse files Browse the repository at this point in the history
* Added new EventGridConsumer and EventGridConsumerOptions classes
* Added unit tests for consuming events
* Renamed publish events methods to SendEvents and SendEventsAsync (with overloads for EventGridEvent, CloudEvent, and object)
  • Loading branch information
kerri-lee authored Aug 10, 2020
1 parent aeb9e80 commit 473753f
Show file tree
Hide file tree
Showing 26 changed files with 2,215 additions and 167 deletions.
Original file line number Diff line number Diff line change
@@ -1,37 +1,4 @@
namespace Azure.Messaging.EventGrid
{
public partial class EventGridPublisherClient
{
protected EventGridPublisherClient() { }
public EventGridPublisherClient(System.Uri endpoint, Azure.AzureKeyCredential credential) { }
public EventGridPublisherClient(System.Uri endpoint, Azure.AzureKeyCredential credential, Azure.Messaging.EventGrid.EventGridPublisherClientOptions options) { }
public EventGridPublisherClient(System.Uri endpoint, Azure.Messaging.EventGrid.EventGridSharedAccessSignatureCredential credential) { }
public EventGridPublisherClient(System.Uri endpoint, Azure.Messaging.EventGrid.EventGridSharedAccessSignatureCredential credential, Azure.Messaging.EventGrid.EventGridPublisherClientOptions options) { }
public static string BuildSharedAccessSignature(System.Uri endpoint, System.DateTimeOffset expirationUtc, Azure.AzureKeyCredential key, string apiVersion = "2018-01-01") { throw null; }
public virtual Azure.Response PublishCloudEvents(System.Collections.Generic.IEnumerable<Azure.Messaging.EventGrid.Models.CloudEvent> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> PublishCloudEventsAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventGrid.Models.CloudEvent> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response PublishCustomEvents(System.Collections.Generic.IEnumerable<object> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> PublishCustomEventsAsync(System.Collections.Generic.IEnumerable<object> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response PublishEvents(System.Collections.Generic.IEnumerable<Azure.Messaging.EventGrid.Models.EventGridEvent> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> PublishEventsAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventGrid.Models.EventGridEvent> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public partial class EventGridPublisherClientOptions : Azure.Core.ClientOptions
{
public EventGridPublisherClientOptions(Azure.Messaging.EventGrid.EventGridPublisherClientOptions.ServiceVersion version = Azure.Messaging.EventGrid.EventGridPublisherClientOptions.ServiceVersion.V2018_01_01) { }
public Azure.Core.Serialization.ObjectSerializer Serializer { get { throw null; } set { } }
public enum ServiceVersion
{
V2018_01_01 = 1,
}
}
public partial class EventGridSharedAccessSignatureCredential
{
public EventGridSharedAccessSignatureCredential(string signature) { }
public string Signature { get { throw null; } }
public void Update(string signature) { }
}
}
namespace Azure.Messaging.EventGrid.Models
{
public partial class CloudEvent
{
Expand All @@ -42,11 +9,25 @@ public CloudEvent(string source, string type) { }
public System.Collections.Generic.Dictionary<string, object> ExtensionAttributes { get { throw null; } }
public string Id { get { throw null; } set { } }
public string Source { get { throw null; } set { } }
public string SpecVersion { get { throw null; } set { } }
public string Subject { get { throw null; } set { } }
public System.DateTimeOffset? Time { get { throw null; } set { } }
public string Type { get { throw null; } set { } }
}
public partial class EventGridConsumer
{
public EventGridConsumer() { }
public EventGridConsumer(Azure.Messaging.EventGrid.EventGridConsumerOptions options) { }
public virtual Azure.Messaging.EventGrid.CloudEvent[] DeserializeCloudEvents(string requestContent, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventGrid.CloudEvent[]> DeserializeCloudEventsAsync(string requestContent, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Messaging.EventGrid.EventGridEvent[] DeserializeEventGridEvents(string requestContent, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Messaging.EventGrid.EventGridEvent[]> DeserializeEventGridEventsAsync(string requestContent, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public partial class EventGridConsumerOptions
{
public EventGridConsumerOptions() { }
public System.Collections.Generic.IDictionary<string, System.Type> CustomEventTypeMappings { get { throw null; } }
public Azure.Core.Serialization.ObjectSerializer DataSerializer { get { throw null; } set { } }
}
public partial class EventGridEvent
{
public EventGridEvent(string subject, object data, string eventType, string dataVersion) { }
Expand All @@ -55,10 +36,39 @@ public EventGridEvent(string subject, object data, string eventType, string data
public System.DateTimeOffset EventTime { get { throw null; } set { } }
public string EventType { get { throw null; } set { } }
public string Id { get { throw null; } set { } }
public string MetadataVersion { get { throw null; } set { } }
public string Subject { get { throw null; } set { } }
public string Topic { get { throw null; } set { } }
}
public partial class EventGridPublisherClient
{
protected EventGridPublisherClient() { }
public EventGridPublisherClient(System.Uri endpoint, Azure.AzureKeyCredential credential) { }
public EventGridPublisherClient(System.Uri endpoint, Azure.AzureKeyCredential credential, Azure.Messaging.EventGrid.EventGridPublisherClientOptions options) { }
public EventGridPublisherClient(System.Uri endpoint, Azure.Messaging.EventGrid.EventGridSharedAccessSignatureCredential credential) { }
public EventGridPublisherClient(System.Uri endpoint, Azure.Messaging.EventGrid.EventGridSharedAccessSignatureCredential credential, Azure.Messaging.EventGrid.EventGridPublisherClientOptions options) { }
public static string BuildSharedAccessSignature(System.Uri endpoint, System.DateTimeOffset expirationUtc, Azure.AzureKeyCredential key, Azure.Messaging.EventGrid.EventGridPublisherClientOptions.ServiceVersion apiVersion = Azure.Messaging.EventGrid.EventGridPublisherClientOptions.ServiceVersion.V2018_01_01) { throw null; }
public virtual Azure.Response SendEvents(System.Collections.Generic.IEnumerable<Azure.Messaging.EventGrid.CloudEvent> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response SendEvents(System.Collections.Generic.IEnumerable<Azure.Messaging.EventGrid.EventGridEvent> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response SendEvents(System.Collections.Generic.IEnumerable<object> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> SendEventsAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventGrid.CloudEvent> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> SendEventsAsync(System.Collections.Generic.IEnumerable<Azure.Messaging.EventGrid.EventGridEvent> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response> SendEventsAsync(System.Collections.Generic.IEnumerable<object> events, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public partial class EventGridPublisherClientOptions : Azure.Core.ClientOptions
{
public EventGridPublisherClientOptions(Azure.Messaging.EventGrid.EventGridPublisherClientOptions.ServiceVersion version = Azure.Messaging.EventGrid.EventGridPublisherClientOptions.ServiceVersion.V2018_01_01) { }
public Azure.Core.Serialization.ObjectSerializer DataSerializer { get { throw null; } set { } }
public enum ServiceVersion
{
V2018_01_01 = 1,
}
}
public partial class EventGridSharedAccessSignatureCredential
{
public EventGridSharedAccessSignatureCredential(string signature) { }
public string Signature { get { throw null; } }
public void Update(string signature) { }
}
}
namespace Azure.Messaging.EventGrid.SystemEvents
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using System.Collections.Generic;
using Azure.Core;

namespace Azure.Messaging.EventGrid.Models
namespace Azure.Messaging.EventGrid
{
/// <summary> Properties of an event published to an Event Grid topic using the CloudEvent 1.0 Schema. </summary>
public class CloudEvent
Expand Down Expand Up @@ -39,9 +39,6 @@ public CloudEvent(string source, string type)
/// <summary> The time (in UTC) the event was generated, in RFC3339 format. </summary>
public DateTimeOffset? Time { get; set; } = DateTimeOffset.UtcNow;

/// <summary> The version of the CloudEvents specification which the event uses. </summary>
public string SpecVersion { get; set; } = "1.0";

/// <summary> Identifies the schema that data adheres to. </summary>
public string DataSchema { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using Azure.Core;

namespace Azure.Messaging.EventGrid.Models
{
[CodeGenModel("CloudEvent")]
internal partial class CloudEventInternal
{
public JsonElement? Data { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text.Json;
using System.Threading;
using Azure.Core;
using Azure.Core.Serialization;

namespace Azure.Messaging.EventGrid
{
/// <summary>
/// UTF-8 JSON-serializable wrapper for objects such as custom schema events.
/// Takes a custom ObjectSerializer to use when writing the object as JSON text.
/// </summary>
internal class CustomModelSerializer : IUtf8JsonSerializable
{
public object _payload;
public CancellationToken _cancellationToken;
public ObjectSerializer _serializer;

/// <summary>
/// Initializes an instance of the CustomModelSerializer class.
/// </summary>
/// <param name="payload">
/// Object that can represent an event with a custom schema, or additional properties
/// added to the event envelope.
/// </param>
/// <param name="serializer">
/// Custom ObjectSerializer to use when writing the object as JSON text.
/// </param>
/// <param name="cancellationToken"> The cancellation token to use. </param>
public CustomModelSerializer(object payload, ObjectSerializer serializer, CancellationToken cancellationToken)
{
_payload = payload;
_serializer = serializer;
_cancellationToken = cancellationToken;
}
public void Write(Utf8JsonWriter writer)
{
var stream = new MemoryStream();
_serializer.Serialize(stream, _payload, _payload.GetType(), _cancellationToken);
stream.Seek(0, SeekOrigin.Begin);
JsonDocument.Parse(stream).WriteTo(writer);
}
}
}
Loading

0 comments on commit 473753f

Please sign in to comment.