Skip to content

Added HTTP option for the platforms that does not support Web Socket & Handle null aggregate keys. #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Chronological.Tests/GenericFluentAggregateQueryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public async void Test2()
var from = new DateTime(2017, 12, 23, 12, 0, 0, DateTimeKind.Utc);
var to = new DateTime(2017, 12, 30, 12, 0, 0, DateTimeKind.Utc);

var result = await new GenericFluentAggregateQuery<TestType1>("Test", Search.Span(from, to), environment, new AggregateWebSocketRepository(new MockWebSocketRepository(_webSocketResult)))
var result = await new GenericFluentAggregateQuery<TestType1>("Test", Search.Span(from, to), environment, new AggregateApiRepository(new MockWebSocketRepository(_webSocketResult)))
.Select(builder => builder.UniqueValues(x => x.DataType, 10,
builder.DateHistogram(x => x.Date, Breaks.InDays(1),
new
Expand Down
130 changes: 130 additions & 0 deletions Chronological.Tests/HttpRepositoryTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using System;
using System.Linq;
using Newtonsoft.Json.Linq;
using Xunit;

namespace Chronological.Tests
{
public class HttpRepositoryTests
{
string applicationClientId = "testApplicationId";
string applicationClientSecret = "testApplicationSecret";
string tenant = "testTenant";

[Fact]
public async void GenericFluentEventQueryTest()
{
var environment = new Environment("TestFqdn", "TestAccessToken");

var from = new DateTime(2017, 12, 23, 12, 0, 0, DateTimeKind.Utc);
var to = new DateTime(2018, 10, 23, 12, 0, 0, DateTimeKind.Utc);

var result = await new GenericFluentEventQuery<EmailTest>("Test", Search.Span(from, to), Limit.CreateLimit(Limit.Take, 10), environment, new EventApiRepository(new MockHttpRepository(_eventsResult)))
.Where(x => x.ip == "127.0.0.1")
.ExecuteAsync();

Assert.NotNull(result);
}

[Fact]
public async void AggregateQueryTest()
{
var environment = new Environment("TestFqdn", "TestAccessToken");

var from = new DateTime(2017, 12, 23, 12, 0, 0, DateTimeKind.Utc);
var to = new DateTime(2018, 10, 23, 12, 0, 0, DateTimeKind.Utc);

var result = await new GenericFluentAggregateQuery<EmailTest>("Test", Search.Span(from, to), environment, new AggregateApiRepository(new MockHttpRepository(_aggregateResults)))
.Select(builder => builder.UniqueValues(x => x.ip, 10, new { Count = builder.Count() }))
.Where(x => x.ip == "127.0.0.1")
.ExecuteAsync();

Assert.NotNull(result);
}

private string _eventsResult = @"{
""warnings"": [],
""events"": [
{
""schema"": {
""rid"": 0,
""$esn"": ""source1"",
""properties"": [
{
""name"": ""email"",
""type"": ""String""
},
{
""name"": ""ip"",
""type"": ""String""
},
{
""name"": ""useragent"",
""type"": ""String""
}
]
},
""$ts"": ""2018-10-18T11:30:36Z"",
""values"": [
""test1@mail.com"",
""127.0.0.1"",
""IE""
]
},
{
""schemaRid"": 0,
""$ts"": ""2018-10-18T11:30:36Z"",
""values"": [
""test2@mail.com"",
""127.0.0.1"",
""FF""
]
},
{
""schemaRid"": 0,
""$ts"": ""2018-10-18T11:30:36Z"",
""values"": [
""test3@mail.com"",
""127.0.0.1"",
""IE""
]
}
]
}";

private string _aggregateResults = @"{
""aggregates"": [
{
""dimension"": [
null,
""EmailOpened"",
""EmailSent""
],
""measures"": [
[
10.0
],
[
20.0
],
[
200.0
]
]
}
],
""warnings"": []
}";

}

public class EmailTest
{
[ChronologicalEventField("email")]
public string email { get; set; }
[ChronologicalEventField("ip")]
public string ip { get; set; }
[ChronologicalEventField("useragent")]
public string useragent { get; set; }
}
}
33 changes: 33 additions & 0 deletions Chronological.Tests/MockHttpRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;

namespace Chronological.Tests
{
internal class MockHttpRepository : IWebRequestRepository
{
private readonly List<string> _results;

public MockHttpRepository(string result) : this(new List<string> { result })
{
}

public MockHttpRepository(List<string> results)
{
_results = results;
}

async Task<IReadOnlyList<JToken>> IWebRequestRepository.ExecuteRequestAsync(string query, string resourcePath, CancellationToken cancellationToken)
{
if ("aggregates".Equals(resourcePath, StringComparison.OrdinalIgnoreCase))
{
return new List<JToken>() { JToken.Parse(_results.First())["aggregates"] };
}

return new List<JToken> { JToken.Parse(_results.First()) };
}
}
}
4 changes: 2 additions & 2 deletions Chronological.Tests/MockWebSocketRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace Chronological.Tests
{
internal class MockWebSocketRepository : IWebSocketRepository
internal class MockWebSocketRepository : IWebRequestRepository
{
private readonly List<string> _results;

Expand All @@ -20,7 +20,7 @@ public MockWebSocketRepository(List<string> results)
_results = results;
}

async Task<IReadOnlyList<JToken>> IWebSocketRepository.ReadWebSocketResponseAsync(string query, string resourcePath, CancellationToken cancellationToken)
async Task<IReadOnlyList<JToken>> IWebRequestRepository.ExecuteRequestAsync(string query, string resourcePath, CancellationToken cancellationToken)
{
return new List<JToken> { JToken.Parse(_results.First())["content"] };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@

namespace Chronological
{
internal class AggregateWebSocketRepository : IAggregateWebSocketRepository
internal interface IAggregateApiRepository
{
private readonly IWebSocketRepository _webSocketRepository;
Task<IEnumerable<T>> Execute<T>(string query, IEnumerable<T> aggregates, CancellationToken cancellationToken = default);
}

internal class AggregateApiRepository : IAggregateApiRepository
{
private readonly IWebRequestRepository _webSocketRepository;

internal AggregateWebSocketRepository(IWebSocketRepository webSocketRepository)
internal AggregateApiRepository(IWebRequestRepository webSocketRepository)
{
_webSocketRepository = webSocketRepository;
}

async Task<IEnumerable<T>> IAggregateWebSocketRepository.Execute<T>(string query, IEnumerable<T> aggregates, CancellationToken cancellationToken)
async Task<IEnumerable<T>> IAggregateApiRepository.Execute<T>(string query, IEnumerable<T> aggregates, CancellationToken cancellationToken)
{
var executionResults = new List<T>();

var results = await _webSocketRepository.ReadWebSocketResponseAsync(query, "aggregates", cancellationToken);
var results = await _webSocketRepository.ExecuteRequestAsync(query, "aggregates", cancellationToken);

// According to samples here: https://github.com/Azure-Samples/Azure-Time-Series-Insights/blob/master/C-%20Hello%20World%20App%20Sample/Program.cs
// Aggregates should only use the final result set
Expand Down
5 changes: 2 additions & 3 deletions Chronological/Aggregate[TX,TY,TZ].cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

namespace Chronological
{

public abstract class Aggregate<TX, TY, TZ> : Dictionary<TY,TZ>, IAggregate, IInternalAggregate
{
internal abstract string AggregateType { get; }
Expand All @@ -17,8 +16,8 @@ public abstract class Aggregate<TX, TY, TZ> : Dictionary<TY,TZ>, IAggregate, IIn
IAggregate IInternalAggregate.GetPopulatedAggregate(JObject jObject, Func<JArray, JArray> measureAccessFunc)
{
var aggregate = Clone();
foreach (var dimension in jObject["dimension"].Select((x, y) => new { x, y }))

foreach (var dimension in jObject["dimension"].Select((x, y) => new { x = x.IsNullOrEmpty() ? "null" : x, y }))
{
JArray MeasureAccessFunc(JArray x)
{
Expand Down
17 changes: 16 additions & 1 deletion Chronological/Environment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ public class Environment
public string EnvironmentId { get; }
public string ResourceId { get; }
public string AccessToken { get; }
public WebRequestChannel WebRequestChannel { get; set; }

public Environment(string fqdn, string accessToken)
public Environment(string fqdn, string accessToken, WebRequestChannel webRequestChannel = WebRequestChannel.WebSocket)
{
EnvironmentFqdn = fqdn;
AccessToken = accessToken;
WebRequestChannel = webRequestChannel;
}

internal Environment(string displayName, string environmentId, string resourceId, string environmentFqdn, string accessToken)
Expand Down Expand Up @@ -95,6 +97,9 @@ public async Task<EnvironmentMetadata> GetMetadataAsync(DateTime from, DateTime

public GenericFluentAggregateQuery<T> AggregateQuery<T>(DateTime fromDate, DateTime toDate, string queryName = "ChronologicalQuery") where T: new()
{
if (WebRequestChannel == WebRequestChannel.Http)
return new GenericFluentAggregateQuery<T>(queryName, Search.Span(fromDate, toDate), this, new AggregateApiRepository(new HttpRepository(this)));

return new GenericFluentAggregateQuery<T>(queryName, Search.Span(fromDate,toDate), this);
}

Expand All @@ -106,6 +111,10 @@ public StringAggregateQuery AggregateQuery(string query, string queryName = "Chr
public GenericFluentEventQuery<T> EventQuery<T>(DateTime fromDate, DateTime toDate, INonSortableLimit limit, int limitCount, string queryName = "ChronologicalQuery") where T:new()
{
var populatedLimit = Limit.CreateLimit(limit, limitCount);

if(WebRequestChannel == WebRequestChannel.Http)
return new GenericFluentEventQuery<T>(queryName, Search.Span(fromDate, toDate), populatedLimit, this, new EventApiRepository(new HttpRepository(this)));

return new GenericFluentEventQuery<T>(queryName, Search.Span(fromDate, toDate), populatedLimit, this);
}

Expand All @@ -129,4 +138,10 @@ public StringEventQuery EventQuery(string query, string queryName = "Chronologic
}

}

public enum WebRequestChannel
{
WebSocket = 1,
Http = 2
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,23 @@

namespace Chronological
{

internal interface IEventWebSocketRepository
internal interface IEventApiRepository
{

Task<IEnumerable<T>> Execute<T>(string query, CancellationToken cancellationToken = default);
}

; internal class EventWebSocketRepository : IEventWebSocketRepository
; internal class EventApiRepository : IEventApiRepository
{
private readonly IWebSocketRepository _webSocketRepository;
private readonly IWebRequestRepository _webRequestRepository;

internal EventWebSocketRepository(IWebSocketRepository webSocketRepository)
internal EventApiRepository(IWebRequestRepository webRequestRepository)
{
_webSocketRepository = webSocketRepository;
_webRequestRepository = webRequestRepository;
}

async Task<IEnumerable<T>> IEventWebSocketRepository.Execute<T>(string query, CancellationToken cancellationToken)
async Task<IEnumerable<T>> IEventApiRepository.Execute<T>(string query, CancellationToken cancellationToken)
{
var results = await _webSocketRepository.ReadWebSocketResponseAsync(query, "events", cancellationToken);
var results = await _webRequestRepository.ExecuteRequestAsync(query, "events", cancellationToken);

// According to samples here: https://github.com/Azure-Samples/Azure-Time-Series-Insights/blob/master/C-%20Hello%20World%20App%20Sample/Program.cs
// Events should combine all results recevied
Expand Down
8 changes: 4 additions & 4 deletions Chronological/EventQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ public abstract class EventQuery
{
private readonly string _queryName;
private readonly Environment _environment;
private readonly IEventWebSocketRepository _eventWebSocketRepository;
private readonly IEventApiRepository _eventApiRepository;

internal EventQuery(string queryName, Environment environment,
IEventWebSocketRepository eventWebSocketRepository)
IEventApiRepository eventApiRepository)
{
_queryName = queryName;
_environment = environment;
_eventWebSocketRepository = eventWebSocketRepository;
_eventApiRepository = eventApiRepository;
}

internal EventQuery(string queryName, Environment environment) : this(queryName, environment, new EventWebSocketRepository(new WebSocketRepository(environment)))
internal EventQuery(string queryName, Environment environment) : this(queryName, environment, new EventApiRepository(new WebSocketRepository(environment)))
{
}

Expand Down
12 changes: 6 additions & 6 deletions Chronological/GenericFluentAggregateQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ namespace Chronological
private readonly string _queryName;
private readonly Search _search;
private readonly Environment _environment;
private readonly IAggregateWebSocketRepository _webSocketRepository;
private readonly IAggregateApiRepository _webSocketRepository;

internal GenericFluentAggregateQuery(string queryName, Search search, Environment environment) : this(queryName,
search, environment, new AggregateWebSocketRepository(new WebSocketRepository(environment)))
search, environment, new AggregateApiRepository(new WebSocketRepository(environment)))
{
}

internal GenericFluentAggregateQuery(string queryName, Search search, Environment environment, IAggregateWebSocketRepository webSocketRepository)
internal GenericFluentAggregateQuery(string queryName, Search search, Environment environment, IAggregateApiRepository webSocketRepository)
{
_queryName = queryName;
_search = search;
Expand All @@ -39,7 +39,7 @@ public GenericFluentAggregatesQuery<T, Aggregate<T, TX, TY>> Select<TX, TY>(IEnu
{
private GenericFluentAggregatesQuery<TX, TY> _multiQuery;

internal GenericFluentAggregateQuery(string queryName, Search search, TY aggregate, Environment environment, IAggregateWebSocketRepository webSocketRepository)
internal GenericFluentAggregateQuery(string queryName, Search search, TY aggregate, Environment environment, IAggregateApiRepository webSocketRepository)
{
_multiQuery = new GenericFluentAggregatesQuery<TX, TY>(queryName, search, new List<TY> {aggregate},
environment, webSocketRepository);
Expand Down Expand Up @@ -77,11 +77,11 @@ public async Task<TY> ExecuteAsync(CancellationToken cancellationToken = default
private readonly string _queryName;
private readonly Search _search;
private readonly Environment _environment;
private readonly IAggregateWebSocketRepository _webSocketRepository;
private readonly IAggregateApiRepository _webSocketRepository;



internal GenericFluentAggregatesQuery(string queryName, Search search, IEnumerable<TY> aggregates, Environment environment, IAggregateWebSocketRepository webSocketRepository)
internal GenericFluentAggregatesQuery(string queryName, Search search, IEnumerable<TY> aggregates, Environment environment, IAggregateApiRepository webSocketRepository)
{
_queryName = queryName;
_search = search;
Expand Down
Loading