Skip to content
Merged
60 changes: 60 additions & 0 deletions .github/workflows/Kafka-Integration-Tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
name: Kafka Integration Tests

on:
pull_request:
branches:
- 'master'
- 'release-*'
push:
branches:
- 'master'
- 'beta'
- 'release-*'

jobs:
test-kafka:
env:
GIT_REF: ${{ github.ref }}
GIT_SHA: ${{ github.sha }}
Configuration: Release
SolutionFile: dotnet/KafkaTestSolution.sln
SolutionFileName: KafkaTestSolution

runs-on: ubuntu-latest
environment: kafka-integration-tests

steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Install Docker
uses: docker/setup-buildx-action@v2

- name: Start kafka
run: |
docker compose -f dotnet/src/extensions/kafka/test/docker-compose.yml up -d
env:
COMPOSE_DOCKER_CLI_BUILD: 1
DOCKER_BUILDKIT: 1

- name: Install .NET 8
uses: actions/setup-dotnet@v3
with:
dotnet-version: '8.x'
include-prerelease: true

- name: Install libgdiplus
run: sudo apt-get install -y libgdiplus

- name: Create temporary solution with Kafka projects
run: |
dotnet new sln --name $SolutionFileName --output dotnet --force
dotnet sln $SolutionFile add dotnet/src/extensions/kafka/src/Core.Messaging.csproj
dotnet sln $SolutionFile add dotnet/src/extensions/kafka/test/DotNetCoreKafkaTest/DotNetCoreKafkaTest.csproj

- name: Test Kafka
env:
KAFKA_TEST_ENABLED: "true"
run: dotnet test $SolutionFile --configuration $Configuration
31 changes: 31 additions & 0 deletions dotnet/DotNetStandardClasses.sln
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,21 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{F8959289-4ED7-430C-97B7-FAAA29829708}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GxSoapHandler", "src\extensions\ws\src\GxSoapHandler\GxSoapHandler.csproj", "{58C84EC7-A0B3-4C1B-BD78-989AEE87EA32}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Functions", "Functions", "{E59B3248-4C26-4DB0-96CB-67437319E22B}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "kafka", "kafka", "{7CABA1C5-F531-4DC7-AEFC-A33900D15E9D}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{26132DE8-B551-4A79-9363-696277DDB803}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{57F168E9-6A77-4263-BF61-E1FF7BCE855E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Core.Messaging", "src\extensions\kafka\src\Core.Messaging.csproj", "{8FE70544-0128-4C22-8C84-28D6842E110A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCoreKafkaTest", "src\extensions\kafka\test\DotNetCoreKafkaTest\DotNetCoreKafkaTest.csproj", "{C468AC77-A5F1-4455-91FE-D40AC2539FDD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetKafkaTest", "src\extensions\kafka\test\DotNetKafkaTest\DotNetKafkaTest.csproj", "{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -652,6 +665,18 @@ Global
{58C84EC7-A0B3-4C1B-BD78-989AEE87EA32}.Debug|Any CPU.Build.0 = Debug|Any CPU
{58C84EC7-A0B3-4C1B-BD78-989AEE87EA32}.Release|Any CPU.ActiveCfg = Release|Any CPU
{58C84EC7-A0B3-4C1B-BD78-989AEE87EA32}.Release|Any CPU.Build.0 = Release|Any CPU
{8FE70544-0128-4C22-8C84-28D6842E110A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8FE70544-0128-4C22-8C84-28D6842E110A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8FE70544-0128-4C22-8C84-28D6842E110A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8FE70544-0128-4C22-8C84-28D6842E110A}.Release|Any CPU.Build.0 = Release|Any CPU
{C468AC77-A5F1-4455-91FE-D40AC2539FDD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C468AC77-A5F1-4455-91FE-D40AC2539FDD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C468AC77-A5F1-4455-91FE-D40AC2539FDD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C468AC77-A5F1-4455-91FE-D40AC2539FDD}.Release|Any CPU.Build.0 = Release|Any CPU
{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -782,6 +807,12 @@ Global
{F8959289-4ED7-430C-97B7-FAAA29829708} = {B5C28D81-BCD9-4B29-9B68-EDD81D1018D5}
{58C84EC7-A0B3-4C1B-BD78-989AEE87EA32} = {F8959289-4ED7-430C-97B7-FAAA29829708}
{E59B3248-4C26-4DB0-96CB-67437319E22B} = {41E1D031-799F-484F-85DE-7A30AF1A6FBA}
{7CABA1C5-F531-4DC7-AEFC-A33900D15E9D} = {C6AFB6A3-FF0B-4970-B1F1-10BCD3D932B2}
{26132DE8-B551-4A79-9363-696277DDB803} = {7CABA1C5-F531-4DC7-AEFC-A33900D15E9D}
{57F168E9-6A77-4263-BF61-E1FF7BCE855E} = {7CABA1C5-F531-4DC7-AEFC-A33900D15E9D}
{8FE70544-0128-4C22-8C84-28D6842E110A} = {26132DE8-B551-4A79-9363-696277DDB803}
{C468AC77-A5F1-4455-91FE-D40AC2539FDD} = {57F168E9-6A77-4263-BF61-E1FF7BCE855E}
{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE} = {57F168E9-6A77-4263-BF61-E1FF7BCE855E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E18684C9-7D76-45CD-BF24-E3944B7F174C}
Expand Down
30 changes: 30 additions & 0 deletions dotnet/src/extensions/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# GeneXus Messaging EXO for Apache Kafka

GeneXus Messaging EXO for Apache Kafka provides a Producer and Consumer API to integrate GeneXus applications with Apache Kafka, enabling publish-subscribe and stream processing capabilities.

## Features ##
- Producer API: Publish messages to Kafka topics.
- Consumer API: Subscribe to Kafka topics and process messages.

## Prerequisites ##
- Apache Kafka: Ensure you have a running Kafka instance. You can use Confluent Kafka or set up Kafka locally using Docker.

## Setting up Kafka with Docker ##

You can use the docker-compose.yml file included in this repository to quickly set up a Kafka environment. The file is located at:
dotnet/src/extensions/kafka/test/docker-compose.yml

1. Navigate to the directory containing the docker-compose.yml file:
```cd dotnet/src/extensions/kafka/test```

2. Start the Kafka environment using Docker Compose:

```docker-compose up -d```

3. Verify that the Kafka and Zookeeper containers are running:
```docker ps```

4. Once the containers are running, you can use the Producer and Consumer APIs to interact with Kafka.

## Documentation ##
For detailed documentation, visit https://wiki.genexus.com/commwiki/wiki?40593,Kafka+Producer+and+Consumer+External+Objects
14 changes: 14 additions & 0 deletions dotnet/src/extensions/kafka/src/Core.Messaging.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net8.0;net462</TargetFrameworks>
<OutputType>Library</OutputType>
<RootNamespace>GeneXus.Messaging.Core</RootNamespace>
<AssemblyName>GeneXus.Messaging.Kafka</AssemblyName>
<PackageId>GeneXus.Messaging.Kafka</PackageId>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.10.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="StrongNamer" Version="0.2.5" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace GeneXus.Messaging.Core.Exceptions
{
public class MessageNotDeliveredException : Exception
{
public int ErrCode { set; get; }

public MessageNotDeliveredException(string message) : base(message)
{
}

public MessageNotDeliveredException(int code, string v) : base(v)
{
ErrCode = code;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;

namespace GeneXus.Messaging.Core.Exceptions
{
public class MessagingConsumeException: Exception
{
public int ErrCode { set; get; }

public MessagingConsumeException(string message) : base(message)
{
}

public MessagingConsumeException(int code, string v) : base(v)
{
ErrCode = code;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace GeneXus.Messaging.Core.Exceptions
{
public class MessagingException
{
}
}
84 changes: 84 additions & 0 deletions dotnet/src/extensions/kafka/src/GXMessaging.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System.Collections.Generic;
using GeneXus.Messaging.Core.Exceptions;
using GeneXus.Messaging.Core.Providers.Kafka;

namespace GeneXus.Messaging.Core
{
public class GXMessaging
{

private IConsumer consumer;
private IProducer producer;

public string Configuration { set; get; }

public int ErrCode { get; set; }

public string ErrDescription { get; set; }

public GXMessaging()
{
}

private bool Initialize()
{
bool ok = Configuration != null;
return ok;
}
public List<MessageResponse> Consume(string topic, int timeout)
{
List<MessageResponse> list = new List<MessageResponse>();

bool ok = false;

if (!Initialize())
{
return list;
}
if (consumer == null)
{

consumer = new KafkaConsumer(JsonHelper.Deserialize(Configuration));
}
try
{
var resultList = consumer.Consume(topic, timeout);
list.AddRange(resultList);
ok = resultList.Count > 0;
ErrDescription = "";
ErrCode = 0;
}
catch (MessagingConsumeException e)
{
ErrDescription = e.Message;
ErrCode = e.ErrCode;
}

return list;
}
public bool ProduceAsync(string topic, string key, string value)
{
if (!Initialize())
{
return false;
}

if (producer == null)
{
producer = new KafkaProducer(JsonHelper.Deserialize(Configuration));
}

return producer.ProduceAsync(topic,key, value);
}

public List<MessageResponse> Finish(int timeout)
{
if (!Initialize())
{
return new List<MessageResponse>();
}
return producer.Finish(timeout);
}

}
}
56 changes: 56 additions & 0 deletions dotnet/src/extensions/kafka/src/Helpers/JsonHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Collections.Generic;
using Newtonsoft.Json;

namespace GeneXus.Messaging.Core
{
public class JsonHelper
{
public static string Serialize<T>(T tobject) where T : new()
{
try
{
return Newtonsoft.Json.JsonConvert.SerializeObject(tobject);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
return null;
}
public static Dictionary<string, string> Deserialize(string jsonValue)
{

if (!string.IsNullOrEmpty(jsonValue))
{
try
{
var configDictionary = JsonConvert.DeserializeObject<Dictionary<string, object>>(jsonValue);

var flattenedConfig = new Dictionary<string, string>();
foreach (var kvp in configDictionary)
{
if (kvp.Value is Newtonsoft.Json.Linq.JObject nestedObject)
{
foreach (var nestedKvp in nestedObject.ToObject<Dictionary<string, string>>())
{
flattenedConfig[nestedKvp.Key] = nestedKvp.Value;
}
}
else
{
flattenedConfig[kvp.Key] = kvp.Value.ToString();
}
}
return flattenedConfig;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
return null;
}

}
}
9 changes: 9 additions & 0 deletions dotnet/src/extensions/kafka/src/IConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using System.Collections.Generic;

namespace GeneXus.Messaging.Core
{
public interface IConsumer
{
List<MessageResponse> Consume(string topic, int timeout);
}
}
10 changes: 10 additions & 0 deletions dotnet/src/extensions/kafka/src/IProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System.Collections.Generic;

namespace GeneXus.Messaging.Core
{
public interface IProducer
{
bool ProduceAsync(string topic, string key, string value);
List<MessageResponse> Finish(int miliseconds);
}
}
Loading
Loading