Skip to content

Commit

Permalink
matteobortolazzo#58 Implements changes feed.
Browse files Browse the repository at this point in the history
  • Loading branch information
matteobortolazzo committed May 31, 2020
1 parent e4c25d2 commit e9e7718
Show file tree
Hide file tree
Showing 6 changed files with 329 additions and 39 deletions.
159 changes: 137 additions & 22 deletions src/CouchDB.Driver/CouchDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class CouchDatabase<TSource> where TSource : CouchDocument
{
private readonly QueryProvider _queryProvider;
private readonly IFlurlClient _flurlClient;
private readonly CouchSettings _settings;
private readonly string _connectionString;
private readonly string _database;

Expand All @@ -43,10 +44,10 @@ public class CouchDatabase<TSource> where TSource : CouchDocument
internal CouchDatabase(IFlurlClient flurlClient, CouchSettings settings, string connectionString, string db)
{
_flurlClient = flurlClient ?? throw new ArgumentNullException(nameof(flurlClient));
CouchSettings settings1 = settings ?? throw new ArgumentNullException(nameof(settings));
_settings = settings ?? throw new ArgumentNullException(nameof(settings));
_connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
_database = db ?? throw new ArgumentNullException(nameof(db));
_queryProvider = new CouchQueryProvider(flurlClient, settings1, connectionString, _database);
_queryProvider = new CouchQueryProvider(flurlClient, _settings, connectionString, _database);

Database = Uri.UnescapeDataString(_database);
Security = new CouchSecurity(NewRequest);
Expand Down Expand Up @@ -290,6 +291,7 @@ private async Task<List<TSource>> SendQueryAsync(Func<IFlurlRequest, Task<HttpRe
return documents;
}

/// <summary>
/// Finds all documents with given IDs.
/// </summary>
/// <param name="docIds">The collection of documents IDs.</param>
Expand Down Expand Up @@ -525,55 +527,168 @@ private async Task UpdateAttachments(TSource document)

#region Feed

public async Task<ChangesFeedResponse> GetChangesAsync(ChangesFeedOptions options = null, bool longPoll = false)
/// <summary>
/// Returns a sorted list of changes made to documents in the database.
/// </summary>
/// <remarks>
/// Only the most recent change for a given document is guaranteed to be provided.
/// </remarks>
/// <param name="options">Options to apply to the request.</param>
/// <param name="filter">A filter to apply to the result.</param>
/// <returns></returns>
public async Task<ChangesFeedResponse<TSource>> GetChangesAsync(ChangesFeedOptions options = null, ChangesFeedFilter filter = null)
{
IFlurlRequest request = NewRequest()
.AppendPathSegment("_changes");

if (longPoll)
if (options?.LongPoll == true)
{
_ = request.SetQueryParam("feed", "longpoll");
}

if (options != null)
SetChangesFeedOptions(request, options);

return filter == null
? await request.GetJsonAsync<ChangesFeedResponse<TSource>>()
.ConfigureAwait(false)
: await QueryWithFilterAsync(request, filter)
.ConfigureAwait(false);
}

private async Task<ChangesFeedResponse<TSource>> QueryWithFilterAsync(IFlurlRequest request, ChangesFeedFilter filter)
{
if (filter is DocumentIdsChangesFeedFilter documentIdsFilter)
{
foreach (var (name, value) in options.ToQueryParameters())
{
_ = request.SetQueryParam(name, value);
}
return await request
.SetQueryParam("filter", "_doc_ids")
.PostJsonAsync(new ChangesFeedFilterDocuments(documentIdsFilter.Value))
.ReceiveJson<ChangesFeedResponse<TSource>>()
.ConfigureAwait(false);
}
if (filter is SelectorChangesFeedFilter<TSource> selectorFilter)
{
MethodCallExpression whereExpression = Expression.Call(typeof(Queryable), nameof(Queryable.Where),
new[] {typeof(TSource)}, Expression.Constant(Array.Empty<TSource>().AsQueryable()), selectorFilter.Value);
var jsonSelector = new QueryTranslator(_settings).Translate(whereExpression);
return await request
.WithHeader("Content-Type", "application/json")
.SetQueryParam("filter", "_selector")
.PostStringAsync(jsonSelector)
.ReceiveJson<ChangesFeedResponse<TSource>>()
.ConfigureAwait(false);
}

return await request.GetJsonAsync<ChangesFeedResponse>().ConfigureAwait(false);
if (filter is DesignChangesFeedFilter)
{
return await request
.SetQueryParam("filter", "_design")
.GetJsonAsync<ChangesFeedResponse<TSource>>()
.ConfigureAwait(false);
}
if (filter is ViewChangesFeedFilter viewFilter)
{
return await request
.SetQueryParam("filter", "_view")
.SetQueryParam("view", viewFilter.Value)
.GetJsonAsync<ChangesFeedResponse<TSource>>()
.ConfigureAwait(false);
}
throw new InvalidOperationException($"Filter of type {filter.GetType().Name} not supported.");
}

public async IAsyncEnumerable<ChangesFeedResponseResult> GetContinuousChangesAsync(ChangesFeedOptions options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
/// <summary>
/// Returns changes as they happen.
/// </summary>
/// <remarks>
/// A continuous feed stays open and connected to the database until explicitly closed.
/// </remarks>
/// <param name="options">Options to apply to the request.</param>
/// <param name="filter">A filter to apply to the result.</param>
/// <param name="cancellationToken">A cancellation token to stop receiving changes.</param>
/// <returns></returns>
public async IAsyncEnumerable<ChangesFeedResponseResult<TSource>> GetContinuousChangesAsync(ChangesFeedOptions options, ChangesFeedFilter filter,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var infiniteTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
IFlurlRequest request = NewRequest()
.WithTimeout(infiniteTimeout)
.AppendPathSegment("_changes")
.SetQueryParam("feed", "continuous");

if (options != null)
SetChangesFeedOptions(request, options);

await using Stream stream = filter == null
? await request.GetStreamAsync(cancellationToken, HttpCompletionOption.ResponseHeadersRead)
.ConfigureAwait(false)
: await QueryContinuousWithFilterAsync(request, filter, cancellationToken)
.ConfigureAwait(false);
using var reader = new StreamReader(stream);
while (!cancellationToken.IsCancellationRequested && !reader.EndOfStream)
{
foreach (var (name, value) in options.ToQueryParameters())
if (cancellationToken.IsCancellationRequested)
{
_ = request.SetQueryParam(name, value);
continue;
}
}

request.WithTimeout(TimeSpan.FromMilliseconds(Timeout.Infinite));
Stream stream = await request.GetStreamAsync(cancellationToken, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
using var reader = new StreamReader(stream);
while (!reader.EndOfStream)
{
var line = await reader.ReadLineAsync().ConfigureAwait(false);
if (!string.IsNullOrEmpty(line))
{
yield return JsonConvert.DeserializeObject<ChangesFeedResponseResult>(line);
yield return JsonConvert.DeserializeObject<ChangesFeedResponseResult<TSource>>(line);
}
}
}

private async Task<Stream> QueryContinuousWithFilterAsync(IFlurlRequest request, ChangesFeedFilter filter, CancellationToken cancellationToken)
{
if (filter is DocumentIdsChangesFeedFilter documentIdsFilter)
{
return await request
.SetQueryParam("filter", "_doc_ids")
.PostJsonStreamAsync(new ChangesFeedFilterDocuments(documentIdsFilter.Value), cancellationToken, HttpCompletionOption.ResponseHeadersRead)
.ConfigureAwait(false);
}
if (filter is SelectorChangesFeedFilter<TSource> selectorFilter)
{
MethodCallExpression whereExpression = Expression.Call(typeof(Queryable), nameof(Queryable.Where),
new[] { typeof(TSource) }, Expression.Constant(Array.Empty<TSource>().AsQueryable()), selectorFilter.Value);
var jsonSelector = new QueryTranslator(_settings).Translate(whereExpression);
return await request
.WithHeader("Content-Type", "application/json")
.SetQueryParam("filter", "_selector")
.PostStringStreamAsync(jsonSelector, cancellationToken, HttpCompletionOption.ResponseHeadersRead)
.ConfigureAwait(false);
}
if (filter is DesignChangesFeedFilter)
{
return await request
.SetQueryParam("filter", "_design")
.GetStreamAsync(cancellationToken, HttpCompletionOption.ResponseHeadersRead)
.ConfigureAwait(false);
}
if (filter is ViewChangesFeedFilter viewFilter)
{
return await request
.SetQueryParam("filter", "_view")
.SetQueryParam("view", viewFilter.Value)
.GetStreamAsync(cancellationToken, HttpCompletionOption.ResponseHeadersRead)
.ConfigureAwait(false);
}
throw new InvalidOperationException($"Filter of type {filter.GetType().Name} not supported.");
}

private void SetChangesFeedOptions(IFlurlRequest request, ChangesFeedOptions options)
{
if (options == null)
{
return;
}

foreach (var (name, value) in options.ToQueryParameters())
{
_ = request.SetQueryParam(name, value);
}
}

#endregion

#region Utils
Expand Down
85 changes: 85 additions & 0 deletions src/CouchDB.Driver/DTOs/ChangesFeedFilter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using CouchDB.Driver.Types;

namespace CouchDB.Driver.DTOs
{
/// <summary>
/// Represent a filter for the changes feed.
/// </summary>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Design", "CA1052:Static holder types should be Static or NotInheritable", Justification = "<Pending>")]
public class ChangesFeedFilter
{
/// <summary>
/// Create a filter for document IDs.
/// </summary>
/// <param name="documentIds">The document IDs to use as filters.</param>
/// <returns></returns>
public static ChangesFeedFilter DocumentIds(IList<string> documentIds)
=> new DocumentIdsChangesFeedFilter(documentIds);

/// <summary>
/// Create a filter using the same syntax used to query.
/// </summary>
/// <remarks>
/// This is significantly more efficient than using a JavaScript filter function and is the recommended option if filtering on document attributes only.
/// </remarks>
/// <typeparam name="TSource">The type of database documents.</typeparam>
/// <param name="selector">The function used to filter.</param>
/// <returns></returns>
public static ChangesFeedFilter Selector<TSource>(Expression<Func<TSource, bool>> selector) where TSource : CouchDocument
=> new SelectorChangesFeedFilter<TSource>(selector);

/// <summary>
/// Create a filter that accepts only changes for any design document within the requested database.
/// </summary>
/// <returns></returns>
public static ChangesFeedFilter Design()
=> new DesignChangesFeedFilter();

/// <summary>
/// Create a filter that uses an existing map function to the filter.
/// </summary>
/// <remarks>
/// If the map function emits anything for the processed document it counts as accepted and the changes event emits to the feed.
/// </remarks>
/// <param name="view">The view.</param>
/// <returns></returns>
public static ChangesFeedFilter View(string view)
=> new ViewChangesFeedFilter(view);
}

internal class DocumentIdsChangesFeedFilter : ChangesFeedFilter
{
public IList<string> Value { get; }

public DocumentIdsChangesFeedFilter(IList<string> value)
{
Value = value;
}
}

internal class SelectorChangesFeedFilter<TSource> : ChangesFeedFilter
where TSource : CouchDocument
{
public Expression<Func<TSource, bool>> Value { get; }

public SelectorChangesFeedFilter(Expression<Func<TSource, bool>> value)
{
Value = value;
}
}

internal class DesignChangesFeedFilter : ChangesFeedFilter { }

internal class ViewChangesFeedFilter : ChangesFeedFilter
{
public string Value { get; }

public ViewChangesFeedFilter(string value)
{
Value = value;
}
}
}
29 changes: 27 additions & 2 deletions src/CouchDB.Driver/DTOs/ChangesFeedOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ namespace CouchDB.Driver.DTOs
/// </summary>
public class ChangesFeedOptions
{
/// <summary>
/// Waits until at least one change has occurred, sends the change, then closes the connection.
/// </summary>
/// <remarks>
/// Most commonly used in conjunction with since=now, to wait for the next change. Ignored with continuous feed.
/// </remarks>
public bool LongPoll { get; set; }

/// <summary>
/// Includes conflicts information in response. Ignored if <see cref="IncludeDocs"/> isn’t <c>True</c>.
/// </summary>
Expand Down Expand Up @@ -124,7 +132,8 @@ public class ChangesFeedOptions
[JsonProperty("seq_interval")]
[DefaultValue(null)]
public int? SeqInterval { get; set; }


[System.Diagnostics.CodeAnalysis.SuppressMessage("Globalization", "CA1308:Normalize strings to uppercase", Justification = "<Pending>")]
internal IEnumerable<(string Name, string Value)> ToQueryParameters()
{
TAttribute GetAttribute<TAttribute>(ICustomAttributeProvider propertyInfo)
Expand All @@ -145,9 +154,25 @@ TAttribute GetAttribute<TAttribute>(ICustomAttributeProvider propertyInfo)
string.Equals(propertyValue?.ToString(), propertyDefaultValue?.ToString(), StringComparison.InvariantCultureIgnoreCase);
if (!isDefault)
{
yield return (propertyName, propertyValue?.ToString());
var propertyStringValue = propertyValue?.ToString();
if (propertyInfo.PropertyType == typeof(bool))
{
propertyStringValue = propertyStringValue?.ToLowerInvariant();
}
yield return (propertyName, propertyStringValue);
}
}
}
}

internal class ChangesFeedFilterDocuments
{
public ChangesFeedFilterDocuments(IList<string> documentIds)
{
DocumentIds = documentIds;
}

[JsonProperty("doc_ids")]
public IList<string> DocumentIds { get; set; }
}
}
10 changes: 7 additions & 3 deletions src/CouchDB.Driver/DTOs/ChangesFeedResponse.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using System.Collections.Generic;
using CouchDB.Driver.Types;
using Newtonsoft.Json;

namespace CouchDB.Driver.DTOs
{
public class ChangesFeedResponse
public class ChangesFeedResponse<TSource> where TSource : CouchDocument
{
[JsonProperty("last_seq")]
public string LastSequence { get; set; }
Expand All @@ -12,10 +13,10 @@ public class ChangesFeedResponse
public int Pending { get; set; }

[JsonProperty("results")]
public IList<ChangesFeedResponseResult> Results { get; internal set; }
public IList<ChangesFeedResponseResult<TSource>> Results { get; internal set; }
}

public class ChangesFeedResponseResult
public class ChangesFeedResponseResult<TSource> where TSource: CouchDocument
{
[JsonProperty("changes")]
public IList<ChangesFeedResponseResultChange> Changes { get; internal set; }
Expand All @@ -28,6 +29,9 @@ public class ChangesFeedResponseResult

[JsonProperty("deleted")]
public bool Deleted { get; set; }

[JsonProperty("doc")]
public TSource Document { get; set; }
}

public class ChangesFeedResponseResultChange
Expand Down
Loading

0 comments on commit e9e7718

Please sign in to comment.