Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/claude-code.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ jobs:
steps:
- name: Check if user is allowed
run: |
HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" \
HTTP_STATUS=$(curl -L -s -o /dev/null -w "%{http_code}" \
-H "Authorization: Bearer ${{ github.token }}" \
-H "Accept: application/vnd.github.v3+json" \
"https://api.github.com/orgs/${{ github.repository_owner }}/members/${{ github.actor }}")
if [ "$HTTP_STATUS" -ne 200 ]; then
if [ "$HTTP_STATUS" -lt 200 ] || [ "$HTTP_STATUS" -ge 300 ]; then
echo "User ${{ github.actor }} is not allowed to trigger this workflow."
exit 1
fi
Expand Down
19 changes: 10 additions & 9 deletions examples/DatastreamTestServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,27 @@
string apiKey = Environment.GetEnvironmentVariable("SCHEMATIC_API_KEY") ??
throw new InvalidOperationException("SCHEMATIC_API_KEY environment variable is not set");

var redisConfig = new SchematicHQ.Client.Datastream.RedisCacheConfig
{
Endpoints = new List<string> { "localhost:6379" }, // Replace with your Redis connection string
KeyPrefix = "schematic-test:", // Optional key prefix
};

var options = new ClientOptions
{
BaseUrl = "http://localhost:8080",
BaseUrl = "https://datastream.schematichq.com",
UseDatastream = true,
DatastreamOptions = new SchematicHQ.Client.Datastream.DatastreamOptions
{
CacheTTL = TimeSpan.FromHours(24)
CacheTTL = TimeSpan.FromMilliseconds(5000),
RedisConfig = redisConfig
}
};

options.WithRedisCache(
new List<string> { "localhost:6379" }, // Redis connection string
keyPrefix: "schematic-test:", // Optional key prefix
cacheTtl: TimeSpan.FromHours(24)// Optional cache TTL
redisConfig
);

options.WithHttpClient(new HttpClient
{
BaseAddress = new Uri("http://localhost:8080")
});

Schematic schematic = new Schematic(apiKey, options);

Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public static (DatastreamClient Client, MockWebSocket WebSocket, MockSchematicLo
var mockWebSocket = new MockWebSocket();
mockWebSocket.SetState(WebSocketState.Open);

var monitorSource = new TaskCompletionSource<bool>();
var monitorSource = new Action<bool>(isConnected =>{});
var client = new DatastreamClient("wss://test.example.com", logger, apiKey, monitorSource, cacheTtl, mockWebSocket);

return (client, mockWebSocket, logger);
Expand Down
163 changes: 112 additions & 51 deletions src/SchematicHQ.Client/Datastream/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ public class DatastreamClient : IDisposable
private readonly string _apiKey;
private readonly Uri _baseUrl;
private readonly TimeSpan _cacheTtl;
private readonly TaskCompletionSource<bool> _monitorSource;

private readonly Action<bool> _connectionStateCallback;
private IWebSocketClient _webSocket;
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly SemaphoreSlim _reconnectSemaphore = new SemaphoreSlim(1, 1);
private CancellationTokenSource _readCancellationSource = new CancellationTokenSource();

// Cache providers
private readonly ICacheProvider<Flag> _flagsCache;
Expand Down Expand Up @@ -58,16 +58,16 @@ public DatastreamClient(
string baseUrl,
ISchematicLogger logger,
string apiKey,
TaskCompletionSource<bool> monitorSource,
Action<bool> connectionStateCallback,
TimeSpan? cacheTtl = null,
IWebSocketClient? webSocket = null,
DatastreamOptions? options = null
)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_apiKey = apiKey ?? throw new ArgumentNullException(nameof(apiKey));
_monitorSource = monitorSource ?? throw new ArgumentNullException(nameof(monitorSource));
_connectionStateCallback = connectionStateCallback ?? throw new ArgumentNullException(nameof(connectionStateCallback));

// Use options if provided, otherwise use default values
options ??= new DatastreamOptions();
_cacheTtl = cacheTtl ?? options.CacheTTL ?? TimeSpan.FromHours(24);
Expand All @@ -80,12 +80,12 @@ public DatastreamClient(
_baseUrl = GetBaseUrl(baseUrl);

// Initialize cache providers

// Flags always use LocalCache with unlimited TTL regardless of configuration
_flagsCache = new LocalCache<Flag>(options.LocalCacheCapacity, TimeSpan.MaxValue); // Flags don't expire

// Company and User caches use the configured provider type
if (options.CacheProviderType == DatastreamCacheProviderType.Redis &&
if (options.CacheProviderType == DatastreamCacheProviderType.Redis &&
options.RedisConfig != null)
{
try
Expand Down Expand Up @@ -141,17 +141,23 @@ private async Task ConnectAndReadAsync()
try
{
await _reconnectSemaphore.WaitAsync();
Copy link

Copilot AI Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semaphore is acquired but the corresponding Release() call is moved to a catch block far below. This makes the resource management pattern hard to follow and error-prone. Consider using a using statement or try-finally block immediately after acquisition.

Copilot uses AI. Check for mistakes.
if (_readCancellationSource.IsCancellationRequested)
{
_readCancellationSource.Dispose();
_readCancellationSource = new CancellationTokenSource();
}
_webSocket.Options.SetRequestHeader("X-Schematic-Api-Key", _apiKey);
_webSocket.Options.KeepAliveInterval = PingPeriod; // Set keep-alive interval

try
{
_webSocket.Options.SetRequestHeader("X-Schematic-Api-Key", _apiKey);
_webSocket.Options.KeepAliveInterval = PingPeriod; // Set keep-alive interval

await _webSocket.ConnectAsync(_baseUrl, _cancellationTokenSource.Token);
_logger.Info("Connected to Schematic WebSocket");
attempts = 0;

// Signal monitor that we're connected
_monitorSource.TrySetResult(true);
// Signal connection state
_connectionStateCallback(true);

// Start reading messages
var readTask = ReadMessagesAsync();
Expand All @@ -169,21 +175,53 @@ private async Task ConnectAndReadAsync()

// Wait for the read task to complete, which happens on disconnection
await readTask;

_readCancellationSource.Token.ThrowIfCancellationRequested();
}
catch (Exception connectEx)
{
// Handle connection errors specifically
_logger.Error("Failed to connect to WebSocket: {0}", connectEx.Message);
// Don't rethrow - allow the outer exception handler to handle retries
throw;
}
finally
{
_reconnectSemaphore.Release();
// Ensure connection is closed before reconnecting
if (_webSocket.State == WebSocketState.Open)
{
try
{
await _webSocket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
"Reconnecting",
CancellationToken.None
);
}
catch (Exception ex)
{
_logger.Error("Error closing WebSocket: {0}", ex.Message);
_webSocket.Abort();
}
}
}
}
catch (Exception ex)
catch (Exception connectionEx)
{
_logger.Error("WebSocket connection error: {Message}", ex.Message);
_reconnectSemaphore.Release();
_logger.Error("WebSocket connection error: {0}", connectionEx.Message);
attempts++;
_monitorSource.TrySetResult(false);
_connectionStateCallback(false);

if (_webSocket != null)
{
try { _webSocket.Dispose(); } catch { /* ignore */ }
Copy link

Copilot AI Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty catch blocks that silently ignore exceptions make debugging difficult. Consider logging the exception or at minimum adding a comment explaining why the exception can be safely ignored.

Suggested change
try { _webSocket.Dispose(); } catch { /* ignore */ }
try { _webSocket.Dispose(); } catch (Exception disposeEx) { _logger.Error("Error disposing WebSocket: {0}", disposeEx.Message); }

Copilot uses AI. Check for mistakes.
}
_webSocket = new StandardWebSocketClient();

if (attempts >= MaxReconnectAttempts)
{
_logger.Error("Unable to connect to server after {Attempts} attempts", MaxReconnectAttempts);
_logger.Error("Unable to connect to server after {0} attempts", MaxReconnectAttempts);
return;
}

Expand Down Expand Up @@ -224,48 +262,70 @@ private async Task ReadMessagesAsync()
while (_webSocket.State == WebSocketState.Open && !_cancellationTokenSource.Token.IsCancellationRequested)
{
WebSocketReceiveResult result;
do
try
{
result = await _webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer),
_cancellationTokenSource.Token);

if (result.MessageType == WebSocketMessageType.Close)
do
{
return;
}
result = await _webSocket.ReceiveAsync(
new ArraySegment<byte>(buffer),
_cancellationTokenSource.Token);

receiveBuffer.AddRange(new ArraySegment<byte>(buffer, 0, result.Count));
}
while (!result.EndOfMessage);
if (result.MessageType == WebSocketMessageType.Close)
{
return;
}

if (result.MessageType == WebSocketMessageType.Text)
{
var buffArray = receiveBuffer.ToArray();
var message = Encoding.UTF8.GetString(buffArray);
receiveBuffer.Clear();
receiveBuffer.AddRange(new ArraySegment<byte>(buffer, 0, result.Count));
}
while (!result.EndOfMessage);

if (string.IsNullOrEmpty(message))
if (result.MessageType == WebSocketMessageType.Text)
{
_logger.Debug("Received empty message from WebSocket");
return; // Trigger reconnection
var buffArray = receiveBuffer.ToArray();
var message = Encoding.UTF8.GetString(buffArray);
receiveBuffer.Clear();

if (string.IsNullOrEmpty(message))
{
_logger.Debug("Received empty message from WebSocket");
return; // Trigger reconnection
}

try
{
var response = JsonSerializer.Deserialize<DataStreamResponse>(message);
HandleMessageResponse(response);
}
catch (Exception ex)
{
_logger.Error("Failed to process WebSocket message: {0}", ex.Message);
}
}

try
}
catch (OperationCanceledException)
{
var response = JsonSerializer.Deserialize<DataStreamResponse>(message);
HandleMessageResponse(response);
_logger.Info("WebSocket read operation was cancelled");
return;
}
catch (Exception ex)
{
_logger.Error("Failed to process WebSocket message: {0}", ex.Message);
_logger.Error("Error reading from WebSocket: {0}", ex.Message);
return; // Exit and trigger reconnection
}
}
}
}
catch (Exception ex)
{
_logger.Error("Error reading WebSocket messages: {0}", ex.Message);
_logger.Error("Fatal error in ReadMessagesAsync: {0}", ex.Message);
}
finally
{
// Signal that reconnection should happen
if (!_cancellationTokenSource.IsCancellationRequested)
{
_logger.Info("Signaling for WebSocket reconnection");
_readCancellationSource.Cancel();
}
}
}

Expand Down Expand Up @@ -510,7 +570,7 @@ private void HandleErrorMessage(DataStreamResponse response)
var jsonString = response.Data.ToString() ?? string.Empty;
var error = JsonSerializer.Deserialize<DataStreamError>(jsonString);
if (error != null && !string.IsNullOrEmpty(error.Error))
_logger.Error("Received error from server: {Error}", error.Error);
_logger.Error("Received error from server: {0}", error.Error);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -556,12 +616,12 @@ public async Task<CheckFlagResult> CheckFlagAsync(CheckFlagRequestBody request,
{
_logger.Error("Error checking flag {0}: {1}", flagKey, ex.Message);
return new CheckFlagResult
{
Reason = "Error",
FlagKey = flagKey,
Error = ex,
Value = false,
};
{
Reason = "Error",
FlagKey = flagKey,
Error = ex,
Value = false,
};
}
}

Expand Down Expand Up @@ -904,7 +964,7 @@ public void Dispose()
}
catch (Exception ex)
{
_logger.Error("Error closing WebSocket connection: {Message}", ex.Message);
_logger.Error("Error closing WebSocket connection: {0}", ex.Message);

// Ensure we abort in case of errors
try { _webSocket.Abort(); } catch { /* Ignore any errors during abort */ }
Expand All @@ -914,6 +974,7 @@ public void Dispose()
_webSocket.Dispose();
_reconnectSemaphore.Dispose();
_cancellationTokenSource.Dispose();
_readCancellationSource.Dispose();
}
}
}
Expand Down
Loading
Loading