Skip to content

Commit

Permalink
Fixes the error that caused the side not being scrollable (ChilliCrea…
Browse files Browse the repository at this point in the history
…m#3415)

Co-authored-by: Rafael Staib <me@rafaelstaib.com>

Fixed also a ton of bugs.
  • Loading branch information
fredericbirke authored May 4, 2021
1 parent 8a7c4b9 commit c59403c
Show file tree
Hide file tree
Showing 192 changed files with 4,807 additions and 1,428 deletions.
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/config.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
blank_issues_enabled: false
contact_links:
- name: ChilliCream Community Support
url: https://bit.ly/joinchilli
url: https://bit.ly/join-chillicream-slack
about: Join us on slack! We have a vibrant community that is helping each other.
- name: ChilliCream Enterprise Support
url: https://chillicream.com/support
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[![License](https://img.shields.io/github/license/ChilliCream/hotchocolate.svg)](https://github.com/ChilliCream/hotchocolate/blob/main/LICENSE)
[![Azure DevOps tests](https://img.shields.io/azure-devops/tests/chillicream/HotChocolate/70.svg)](https://chillicream.visualstudio.com/HotChocolate/_build?definitionId=70)
[![Coverage Status](https://sonarcloud.io/api/project_badges/measure?project=HotChocolate&metric=coverage)](https://sonarcloud.io/dashboard?id=HotChocolate)
[![Slack channel](https://img.shields.io/badge/join%20the%20community-on%20slack-blue.svg)](https://bit.ly/joinchilli)
[![Slack channel](https://img.shields.io/badge/join%20the%20community-on%20slack-blue.svg)](https://bit.ly/join-chillicream-slack)
[![Twitter](https://img.shields.io/badge/join%20us-on%20twitter-green.svg)](https://twitter.com/chilli_cream)

---
Expand Down
14 changes: 14 additions & 0 deletions src/HotChocolate/AspNetCore/src/AspNetCore/ErrorHelper.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using HotChocolate.AspNetCore.Properties;
using HotChocolate.Execution;

Expand Down Expand Up @@ -25,5 +26,18 @@ public static IQueryResult ResponseTypeNotSupported() =>
ErrorBuilder.New()
.SetMessage(AspNetCoreResources.ErrorHelper_ResponseTypeNotSupported)
.Build());

public static IQueryResult UnknownSubscriptionError(Exception ex)
{
IError error =
ErrorBuilder
.New()
.SetException(ex)
.SetCode(ErrorCodes.Execution.TaskProcessingError)
.SetMessage(AspNetCoreResources.Subscription_SendResultsAsync)
.Build();

return QueryResultBuilder.CreateError(error);
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,13 @@
<data name="ErrorHelper_ResponseTypeNotSupported" xml:space="preserve">
<value>The response type is not supported.</value>
</data>
<data name="Subscription_SendResultsAsync" xml:space="preserve">
<value>Unexpected Execution Error</value>
</data>
<data name="WebSocketSession_SessionEnded" xml:space="preserve">
<value>Session ended.</value>
</data>
<data name="DataStartMessageHandler_Not_A_SubscriptionResult" xml:space="preserve">
<value>The specified result object is not a valid subscription result.</value>
</data>
</root>

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
namespace HotChocolate.AspNetCore.Subscriptions
{
public interface ISubscriptionManager
: IEnumerable<ISubscription>
: IEnumerable<ISubscriptionSession>
, IDisposable
{
void Register(ISubscription subscription);
void Register(ISubscriptionSession subscriptionSession);

void Unregister(string subscriptionId);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using HotChocolate.Execution.Processing;

namespace HotChocolate.AspNetCore.Subscriptions
{
/// <summary>
/// Represents a session with an execution engine subscription.
/// A subscription session is created within a <see cref="ISocketSession"/>.
/// Each socket session can have multiple subscription sessions open.
/// </summary>
public interface ISubscriptionSession : IDisposable
{
/// <summary>
/// An event that indicates that the underlying subscription has completed.
/// </summary>
event EventHandler? Completed;

/// <summary>
/// Gets the subscription id that the client has provided.
/// </summary>
string Id { get; }

/// <summary>
/// Gets the underlying subscription.
/// </summary>
ISubscription Subscription { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,19 @@ private async Task KeepConnectionAliveAsync(
{
try
{
while (!_connection.Closed
&& !cancellationToken.IsCancellationRequested)
while (!_connection.Closed && !cancellationToken.IsCancellationRequested)
{
await Task.Delay(_timeout, cancellationToken)
;
await Task.Delay(_timeout, cancellationToken);

if (!_connection.Closed)
{
await _connection.SendAsync(
KeepConnectionAliveMessage.Default.Serialize(),
cancellationToken)
;
cancellationToken);
}
}
}
catch (TaskCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// the message processing was canceled.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,48 @@ public void Begin(CancellationToken cancellationToken)
TaskScheduler.Default);
}

private async Task ProcessMessagesAsync(
CancellationToken cancellationToken)
private async Task ProcessMessagesAsync(CancellationToken cancellationToken)
{
while (true)
try
{
SequencePosition? position;
ReadResult result = await _reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;

do
while (true)
{
position = buffer.PositionOf(Subscription.Delimiter);
SequencePosition? position;
ReadResult result = await _reader.ReadAsync(cancellationToken);
ReadOnlySequence<byte> buffer = result.Buffer;

if (position is not null)
do
{
await _pipeline.ProcessAsync(
_connection,
buffer.Slice(0, position.Value),
cancellationToken);
position = buffer.PositionOf(SubscriptionSession.Delimiter);

if (position is not null)
{
await _pipeline.ProcessAsync(
_connection,
buffer.Slice(0, position.Value),
cancellationToken);

// Skip the message which was read.
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
// Skip the message which was read.
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
}
}
}
while (position != null);
while (position != null);

_reader.AdvanceTo(buffer.Start, buffer.End);
_reader.AdvanceTo(buffer.Start, buffer.End);

if (result.IsCompleted)
{
break;
if (result.IsCompleted)
{
break;
}
}
}

await _reader.CompleteAsync();
catch(OperationCanceledException) when (cancellationToken.IsCancellationRequested) { }
finally
{
// reader should be completed always, so that related pipe writer can
// stop write new messages
await _reader.CompleteAsync();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,26 @@ public MessageReceiver(ISocketConnection connection, PipeWriter writer)

public async Task ReceiveAsync(CancellationToken cancellationToken)
{
while (!_connection.Closed &&
!cancellationToken.IsCancellationRequested)
try
{
await _connection.ReceiveAsync(_writer, cancellationToken);
await WriteMessageDelimiterAsync(cancellationToken);
while (!_connection.Closed && !cancellationToken.IsCancellationRequested)
{
await _connection.ReceiveAsync(_writer, cancellationToken);
await WriteMessageDelimiterAsync(cancellationToken);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { }
finally
{
// writer should be always completed
await _writer.CompleteAsync();
}
await _writer.CompleteAsync();
}

private async Task WriteMessageDelimiterAsync(
CancellationToken cancellationToken)
private async Task WriteMessageDelimiterAsync(CancellationToken cancellationToken)
{
Memory<byte> memory = _writer.GetMemory(1);
memory.Span[0] = Subscription.Delimiter;
memory.Span[0] = SubscriptionSession.Delimiter;
_writer.Advance(1);
await _writer.FlushAsync(cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,21 @@ namespace HotChocolate.AspNetCore.Subscriptions
internal static class MessageSerialization
{
private static readonly JsonSerializerSettings _jsonSettings =
new JsonSerializerSettings
new()
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
NullValueHandling = NullValueHandling.Ignore
};

private static readonly UTF8Encoding _encoding = new UTF8Encoding();
private static readonly UTF8Encoding _encoding = new();

private static readonly byte[] _keepConnectionAliveMessage =
SerializeInternal(KeepConnectionAliveMessage.Default);

private static readonly byte[] _acceptConnectionMessage =
SerializeInternal(AcceptConnectionMessage.Default);

public static byte[] Serialize(
this OperationMessage message)
public static byte[] Serialize(this OperationMessage message)
{
if (message is KeepConnectionAliveMessage)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using HotChocolate.AspNetCore.Properties;
using HotChocolate.Execution;
using HotChocolate.Execution.Instrumentation;
using HotChocolate.Execution.Processing;
using static HotChocolate.AspNetCore.ThrowHelper;
using static HotChocolate.AspNetCore.Properties.AspNetCoreResources;

namespace HotChocolate.AspNetCore.Subscriptions.Messages
{
Expand All @@ -11,15 +15,19 @@ public sealed class DataStartMessageHandler
{
private readonly IRequestExecutor _requestExecutor;
private readonly ISocketSessionInterceptor _socketSessionInterceptor;
private readonly IDiagnosticEvents _diagnosticEvents;

public DataStartMessageHandler(
IRequestExecutor requestExecutor,
ISocketSessionInterceptor socketSessionInterceptor)
ISocketSessionInterceptor socketSessionInterceptor,
IDiagnosticEvents diagnosticEvents)
{
_requestExecutor = requestExecutor ??
throw new ArgumentNullException(nameof(requestExecutor));
_socketSessionInterceptor = socketSessionInterceptor ??
throw new ArgumentNullException(nameof(socketSessionInterceptor));
_diagnosticEvents = diagnosticEvents ??
throw new ArgumentNullException(nameof(diagnosticEvents));
}

protected override async Task HandleAsync(
Expand All @@ -39,22 +47,72 @@ await _socketSessionInterceptor.OnRequestAsync(

switch (result)
{
case IResponseStream responseStream:
var subscription = new Subscription(connection, responseStream, message.Id);
connection.Subscriptions.Register(subscription);
case ISubscriptionResult subscriptionResult:
// while a subscription result must be disposed we are not handling it here
// and leave this responsibility to the subscription session.
ISubscription subscription = GetSubscription(result);

var subscriptionSession = new SubscriptionSession(
connection,
subscriptionResult,
subscription,
_diagnosticEvents,
message.Id);

connection.Subscriptions.Register(subscriptionSession);
break;

case IResponseStream streamResult:
// stream results represent deferred execution streams that use execution
// resources. We need to ensure that these are disposed when we are
// finished.
await using (streamResult)
{
await HandleStreamResultAsync(
connection,
message,
streamResult,
cancellationToken);
}
break;

case IQueryResult queryResult:
// query results use pooled memory an need to be disposed after we have
// used them.
using (queryResult)
{
await HandleQueryResultAsync(
connection, message, queryResult, cancellationToken);
connection,
message,
queryResult,
cancellationToken);
}
break;

default:
throw DataStartMessageHandler_RequestTypeNotSupported();
}
}

private static async Task HandleStreamResultAsync(
ISocketConnection connection,
DataStartMessage message,
IResponseStream responseStream,
CancellationToken cancellationToken)
{
await foreach (IQueryResult queryResult in responseStream.ReadResultsAsync()
.WithCancellation(cancellationToken))
{
await connection.SendAsync(
new DataResultMessage(message.Id, queryResult).Serialize(),
cancellationToken);
}

await connection.SendAsync(
new DataCompleteMessage(message.Id).Serialize(),
cancellationToken);
}

private static async Task HandleQueryResultAsync(
ISocketConnection connection,
DataStartMessage message,
Expand All @@ -69,5 +127,17 @@ await connection.SendAsync(
new DataCompleteMessage(message.Id).Serialize(),
cancellationToken);
}

private ISubscription GetSubscription(IExecutionResult result)
{
if (result.ContextData is not null &&
result.ContextData.TryGetValue(WellKnownContextData.Subscription, out var value) &&
value is ISubscription subscription)
{
return subscription;
}

throw new InvalidOperationException(DataStartMessageHandler_Not_A_SubscriptionResult);
}
}
}
Loading

0 comments on commit c59403c

Please sign in to comment.