Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions Cortex.sln
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.Tests", "src\Cortex.
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.Streams.Files", "src\Cortex.Streams.Files\Cortex.Streams.Files.csproj", "{D376D6CA-3192-4EDC-B840-31F58B6457DD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.States.Cassandra", "src\Cortex.States.Cassandra\Cortex.States.Cassandra.csproj", "{447970B9-C5AA-41D9-A07F-330A251597D0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.States.MongoDb", "src\Cortex.States.MongoDb\Cortex.States.MongoDb.csproj", "{00358701-D117-4953-A673-D60625D38466}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.States.MSSqlServer", "src\Cortex.States.MSSqlServer\Cortex.States.MSSqlServer.csproj", "{77AD462F-A248-43AF-9212-43031F22F23D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.States.PostgreSQL", "src\Cortex.States.PostgreSQL\Cortex.States.PostgreSQL.csproj", "{980EDBFE-40C2-4EFD-96C2-FED1032FB5E6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.States.ClickHouse", "src\Cortex.States.ClickHouse\Cortex.States.ClickHouse.csproj", "{0F9FCB99-D00F-4396-8E2B-6E627076ADA0}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cortex.Streams.Http", "src\Cortex.Streams.Http\Cortex.Streams.Http.csproj", "{20BD7107-8199-4CA8-815B-4D156B522B82}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -98,6 +110,30 @@ Global
{D376D6CA-3192-4EDC-B840-31F58B6457DD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D376D6CA-3192-4EDC-B840-31F58B6457DD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D376D6CA-3192-4EDC-B840-31F58B6457DD}.Release|Any CPU.Build.0 = Release|Any CPU
{447970B9-C5AA-41D9-A07F-330A251597D0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{447970B9-C5AA-41D9-A07F-330A251597D0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{447970B9-C5AA-41D9-A07F-330A251597D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{447970B9-C5AA-41D9-A07F-330A251597D0}.Release|Any CPU.Build.0 = Release|Any CPU
{00358701-D117-4953-A673-D60625D38466}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{00358701-D117-4953-A673-D60625D38466}.Debug|Any CPU.Build.0 = Debug|Any CPU
{00358701-D117-4953-A673-D60625D38466}.Release|Any CPU.ActiveCfg = Release|Any CPU
{00358701-D117-4953-A673-D60625D38466}.Release|Any CPU.Build.0 = Release|Any CPU
{77AD462F-A248-43AF-9212-43031F22F23D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{77AD462F-A248-43AF-9212-43031F22F23D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{77AD462F-A248-43AF-9212-43031F22F23D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{77AD462F-A248-43AF-9212-43031F22F23D}.Release|Any CPU.Build.0 = Release|Any CPU
{980EDBFE-40C2-4EFD-96C2-FED1032FB5E6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{980EDBFE-40C2-4EFD-96C2-FED1032FB5E6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{980EDBFE-40C2-4EFD-96C2-FED1032FB5E6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{980EDBFE-40C2-4EFD-96C2-FED1032FB5E6}.Release|Any CPU.Build.0 = Release|Any CPU
{0F9FCB99-D00F-4396-8E2B-6E627076ADA0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0F9FCB99-D00F-4396-8E2B-6E627076ADA0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0F9FCB99-D00F-4396-8E2B-6E627076ADA0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0F9FCB99-D00F-4396-8E2B-6E627076ADA0}.Release|Any CPU.Build.0 = Release|Any CPU
{20BD7107-8199-4CA8-815B-4D156B522B82}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{20BD7107-8199-4CA8-815B-4D156B522B82}.Debug|Any CPU.Build.0 = Debug|Any CPU
{20BD7107-8199-4CA8-815B-4D156B522B82}.Release|Any CPU.ActiveCfg = Release|Any CPU
{20BD7107-8199-4CA8-815B-4D156B522B82}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,30 @@
- **Cortex.Streams.Files:** Implementation of File Source and Sink operators.
[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.Files?label=Cortex.Streams.Files)](https://www.nuget.org/packages/Cortex.Streams.Files)

- **Cortex.Streams.Http:** Implementation of Http Source and Sink operators.
[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Streams.Http?label=Cortex.Streams.Http)](https://www.nuget.org/packages/Cortex.Streams.Http)

- **Cortex.States:** Core state management functionalities.
[![NuGet Version](https://img.shields.io/nuget/v/Cortex.States?label=Cortex.States)](https://www.nuget.org/packages/Cortex.States)

- **Cortex.States.RocksDb:** Persistent state storage using RocksDB.
[![NuGet Version](https://img.shields.io/nuget/v/Cortex.States.RocksDb?label=Cortex.States.RocksDb)](https://www.nuget.org/packages/Cortex.States.RocksDb)

- **Cortex.States.Cassandra:** Persistent state storage using Cassandra.
[![NuGet Version](https://img.shields.io/nuget/v/Cortex.States.Cassandra?label=Cortex.States.Cassandra)](https://www.nuget.org/packages/Cortex.States.Cassandra)

- **Cortex.States.MongoDb:** Persistent state storage using MongoDb.
[![NuGet Version](https://img.shields.io/nuget/v/Cortex.States.MongoDb?label=Cortex.States.MongoDb)](https://www.nuget.org/packages/Cortex.States.MongoDb)

- **Cortex.States.MSSqlServer:** Persistent state storage using Microsoft Sql Server.
[![NuGet Version](https://img.shields.io/nuget/v/Cortex.States.MSSqlServer?label=Cortex.States.MSSqlServer)](https://www.nuget.org/packages/Cortex.States.MSSqlServer)

- **Cortex.States.PostgreSQL:** Persistent state storage using PostgreSQL.
[![NuGet Version](https://img.shields.io/nuget/v/Cortex.States.PostgreSQL?label=Cortex.States.PostgreSQL)](https://www.nuget.org/packages/Cortex.States.PostgreSQL)

- **Cortex.States.ClickHouse:** Persistent state storage using Clickhouse.
[![NuGet Version](https://img.shields.io/nuget/v/Cortex.States.ClickHouse?label=Cortex.States.ClickHouse)](https://www.nuget.org/packages/Cortex.States.ClickHouse)

- **Cortex.Telemetry:** Core library to add support for Tracing and Matrics.
[![NuGet Version](https://img.shields.io/nuget/v/Cortex.Telemetry?label=Cortex.Telemetry)](https://www.nuget.org/packages/Cortex.Telemetry)

Expand Down
Binary file added src/Cortex.States.Cassandra/Assets/cortex.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 20 additions & 0 deletions src/Cortex.States.Cassandra/Assets/license.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
The MIT License (MIT)

Copyright (c) 2024 Buildersoft

Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
220 changes: 220 additions & 0 deletions src/Cortex.States.Cassandra/CassandraStateStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
using Cassandra;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace Cortex.States.Cassandra
{
public class CassandraStateStore<TKey, TValue> : IStateStore<TKey, TValue>
{
private readonly ISession _session;
private readonly string _keyspace;
private readonly string _tableName;
private readonly PreparedStatement _getStatement;
private readonly PreparedStatement _putStatement;
private readonly PreparedStatement _removeStatement;
private readonly PreparedStatement _getAllStatement;
private readonly PreparedStatement _getKeysStatement;
private readonly Func<TKey, string> _keySerializer;
private readonly Func<TValue, string> _valueSerializer;
private readonly Func<string, TKey> _keyDeserializer;
private readonly Func<string, TValue> _valueDeserializer;


// SemaphoreSlim for initialization synchronization
private static readonly SemaphoreSlim _initializationLock = new SemaphoreSlim(1, 1);

// Flag to track initialization status
private volatile bool _isInitialized;

// Cancellation token source for cleanup
private readonly CancellationTokenSource _cancellationTokenSource;

public string Name { get; }


/// <summary>
/// Initializes a new instance of the CassandraStateStore.
/// </summary>
/// <param name="name">Name of the state store</param>
/// <param name="keyspace">Keyspace name</param>
/// <param name="tableName">Table name</param>
/// <param name="session">Cassandra session</param>
/// <param name="keyspaceConfig">Optional keyspace configuration</param>
/// <param name="readConsistency">Read consistency level</param>
/// <param name="writeConsistency">Write consistency level</param>
public CassandraStateStore(
string name,
string keyspace,
string tableName,
ISession session,
KeyspaceConfiguration keyspaceConfig = null,
ConsistencyLevel? readConsistency = null,
ConsistencyLevel? writeConsistency = null)
{
Name = name ?? throw new ArgumentNullException(nameof(name));
_session = session ?? throw new ArgumentNullException(nameof(session));
_keyspace = keyspace ?? throw new ArgumentNullException(nameof(keyspace));
_tableName = tableName ?? throw new ArgumentNullException(nameof(tableName));
_cancellationTokenSource = new CancellationTokenSource();

// Initialize with the provided configuration or default
InitializeAsync(keyspaceConfig ?? new KeyspaceConfiguration()).GetAwaiter().GetResult();

// Prepare statements with specified consistency levels
_getStatement = _session.Prepare(
$"SELECT value FROM {_keyspace}.{_tableName} WHERE key = ?")
.SetConsistencyLevel(readConsistency ?? ConsistencyLevel.Quorum);

_putStatement = _session.Prepare(
$"INSERT INTO {_keyspace}.{_tableName} (key, value) VALUES (?, ?)")
.SetConsistencyLevel(writeConsistency ?? ConsistencyLevel.Quorum);

_removeStatement = _session.Prepare(
$"DELETE FROM {_keyspace}.{_tableName} WHERE key = ?")
.SetConsistencyLevel(writeConsistency ?? ConsistencyLevel.Quorum);

_getAllStatement = _session.Prepare(
$"SELECT key, value FROM {_keyspace}.{_tableName}")
.SetConsistencyLevel(readConsistency ?? ConsistencyLevel.Quorum);

_getKeysStatement = _session.Prepare(
$"SELECT key FROM {_keyspace}.{_tableName}")
.SetConsistencyLevel(readConsistency ?? ConsistencyLevel.Quorum);

_keySerializer = key => JsonSerializer.Serialize(key);
_valueSerializer = value => JsonSerializer.Serialize(value);
_keyDeserializer = str => JsonSerializer.Deserialize<TKey>(str);
_valueDeserializer = str => JsonSerializer.Deserialize<TValue>(str);
}

private async Task InitializeAsync(KeyspaceConfiguration config)
{
if (_isInitialized) return;

await _initializationLock.WaitAsync();
try
{
if (_isInitialized) return;

// Create keyspace using the provided configuration
var createKeyspaceQuery = config.GenerateCreateKeyspaceCql(_keyspace);
await _session.ExecuteAsync(new SimpleStatement(createKeyspaceQuery));

// Create table if it doesn't exist
await _session.ExecuteAsync(new SimpleStatement(
$@"CREATE TABLE IF NOT EXISTS {_keyspace}.{_tableName} (
key text PRIMARY KEY,
value text
)"));

_isInitialized = true;
}
finally
{
_initializationLock.Release();
}
}

public TValue Get(TKey key)
{
EnsureInitialized();

var serializedKey = _keySerializer(key);
var boundStatement = _getStatement.Bind(serializedKey);

// Cassandra driver handles thread safety for execute operations
var row = _session.Execute(boundStatement).FirstOrDefault();

if (row == null)
return default;

var serializedValue = row.GetValue<string>("value");
return _valueDeserializer(serializedValue);
}

public void Put(TKey key, TValue value)
{
EnsureInitialized();

var serializedKey = _keySerializer(key);
var serializedValue = _valueSerializer(value);
var boundStatement = _putStatement.Bind(serializedKey, serializedValue);

_session.Execute(boundStatement);
}

public bool ContainsKey(TKey key)
{
EnsureInitialized();

var serializedKey = _keySerializer(key);
var boundStatement = _getStatement.Bind(serializedKey);
var row = _session.Execute(boundStatement).FirstOrDefault();
return row != null;
}

public void Remove(TKey key)
{
EnsureInitialized();

var serializedKey = _keySerializer(key);
var boundStatement = _removeStatement.Bind(serializedKey);
_session.Execute(boundStatement);
}

public IEnumerable<KeyValuePair<TKey, TValue>> GetAll()
{
EnsureInitialized();

var boundStatement = _getAllStatement.Bind();

// Execute and materialize results to avoid timeout issues during enumeration
var rows = _session.Execute(boundStatement).ToList();

foreach (var row in rows)
{
var serializedKey = row.GetValue<string>("key");
var serializedValue = row.GetValue<string>("value");
var key = _keyDeserializer(serializedKey);
var value = _valueDeserializer(serializedValue);
yield return new KeyValuePair<TKey, TValue>(key, value);
}
}

public IEnumerable<TKey> GetKeys()
{
EnsureInitialized();

var boundStatement = _getKeysStatement.Bind();

// Execute and materialize results to avoid timeout issues during enumeration
var rows = _session.Execute(boundStatement).ToList();

foreach (var row in rows)
{
var serializedKey = row.GetValue<string>("key");
yield return _keyDeserializer(serializedKey);
}
}

private void EnsureInitialized()
{
if (!_isInitialized)
{
throw new InvalidOperationException("CassandraStateStore is not properly initialized.");
}
}

public void Dispose()
{
_cancellationTokenSource.Cancel();
_cancellationTokenSource.Dispose();
_initializationLock.Dispose();
}
}
}
58 changes: 58 additions & 0 deletions src/Cortex.States.Cassandra/Cortex.States.Cassandra.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net9.0;net8.0;net7.0</TargetFrameworks>

<AssemblyVersion>1.0.1</AssemblyVersion>
<FileVersion>1.0.1</FileVersion>
<Product>Buildersoft Cortex Framework</Product>
<Company>Buildersoft</Company>
<Authors>Buildersoft,EnesHoxha</Authors>
<Copyright>Copyright © Buildersoft 2024</Copyright>

<Description>Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications. </Description>


<RepositoryUrl>https://github.com/buildersoftio/cortex</RepositoryUrl>
<PackageTags>cortex mediator eda streaming distributed streams states cassandra</PackageTags>

<Version>1.0.1</Version>
<PackageLicenseFile>license.md</PackageLicenseFile>
<PackageIcon>cortex.png</PackageIcon>
<PackageId>Cortex.States.Cassandra</PackageId>
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
<IsPublishable>True</IsPublishable>
<PackageRequireLicenseAcceptance>True</PackageRequireLicenseAcceptance>
<RepositoryType></RepositoryType>
<PackageReleaseNotes>Just as the Cortex in our brains handles complex processing efficiently, Cortex Data Framework brings brainpower to your data management! </PackageReleaseNotes>
<PackageProjectUrl>https://buildersoft.io/</PackageProjectUrl>
<PackageReadmeFile>README.md</PackageReadmeFile>

</PropertyGroup>

<ItemGroup>
<PackageReference Include="CassandraCSharpDriver" Version="3.22.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Cortex.States\Cortex.States.csproj" />
</ItemGroup>


<ItemGroup>
<None Include="..\..\README.md">
<Pack>True</Pack>
<PackagePath>\</PackagePath>
</None>
<None Include="Assets\cortex.png">
<Pack>True</Pack>
<PackagePath></PackagePath>
</None>
<None Include="Assets\license.md">
<Pack>True</Pack>
<PackagePath></PackagePath>
</None>
</ItemGroup>

</Project>
Loading
Loading