Skip to content

Commit 26e6579

Browse files
Relocate Kafka external object implementation to this repository (#1129)
* Relocate Kafka external object implementation to this repository Issue:201487 * Remove legacy code which was replaced with unit tests. * Remove unneeded Install Docker step. * Start docker service before docker compose * Migrate to ubuntu-latest * Fix docker compose command * Run only .net core tests on ubuntu-latest * Change directory separator * Unify unit test package versions * Add [SkippableFact] * Update README.md
1 parent bf3449b commit 26e6579

20 files changed

+864
-0
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
name: Kafka Integration Tests
2+
3+
on:
4+
pull_request:
5+
branches:
6+
- 'master'
7+
- 'release-*'
8+
push:
9+
branches:
10+
- 'master'
11+
- 'beta'
12+
- 'release-*'
13+
14+
jobs:
15+
test-kafka:
16+
env:
17+
GIT_REF: ${{ github.ref }}
18+
GIT_SHA: ${{ github.sha }}
19+
Configuration: Release
20+
SolutionFile: dotnet/KafkaTestSolution.sln
21+
SolutionFileName: KafkaTestSolution
22+
23+
runs-on: ubuntu-latest
24+
environment: kafka-integration-tests
25+
26+
steps:
27+
- name: Checkout
28+
uses: actions/checkout@v3
29+
with:
30+
fetch-depth: 0
31+
32+
- name: Install Docker
33+
uses: docker/setup-buildx-action@v2
34+
35+
- name: Start kafka
36+
run: |
37+
docker compose -f dotnet/src/extensions/kafka/test/docker-compose.yml up -d
38+
env:
39+
COMPOSE_DOCKER_CLI_BUILD: 1
40+
DOCKER_BUILDKIT: 1
41+
42+
- name: Install .NET 8
43+
uses: actions/setup-dotnet@v3
44+
with:
45+
dotnet-version: '8.x'
46+
include-prerelease: true
47+
48+
- name: Install libgdiplus
49+
run: sudo apt-get install -y libgdiplus
50+
51+
- name: Create temporary solution with Kafka projects
52+
run: |
53+
dotnet new sln --name $SolutionFileName --output dotnet --force
54+
dotnet sln $SolutionFile add dotnet/src/extensions/kafka/src/Core.Messaging.csproj
55+
dotnet sln $SolutionFile add dotnet/src/extensions/kafka/test/DotNetCoreKafkaTest/DotNetCoreKafkaTest.csproj
56+
57+
- name: Test Kafka
58+
env:
59+
KAFKA_TEST_ENABLED: "true"
60+
run: dotnet test $SolutionFile --configuration $Configuration

dotnet/DotNetStandardClasses.sln

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,21 @@ EndProject
268268
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{F8959289-4ED7-430C-97B7-FAAA29829708}"
269269
EndProject
270270
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GxSoapHandler", "src\extensions\ws\src\GxSoapHandler\GxSoapHandler.csproj", "{58C84EC7-A0B3-4C1B-BD78-989AEE87EA32}"
271+
EndProject
271272
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Functions", "Functions", "{E59B3248-4C26-4DB0-96CB-67437319E22B}"
272273
EndProject
274+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "kafka", "kafka", "{7CABA1C5-F531-4DC7-AEFC-A33900D15E9D}"
275+
EndProject
276+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{26132DE8-B551-4A79-9363-696277DDB803}"
277+
EndProject
278+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{57F168E9-6A77-4263-BF61-E1FF7BCE855E}"
279+
EndProject
280+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Core.Messaging", "src\extensions\kafka\src\Core.Messaging.csproj", "{8FE70544-0128-4C22-8C84-28D6842E110A}"
281+
EndProject
282+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCoreKafkaTest", "src\extensions\kafka\test\DotNetCoreKafkaTest\DotNetCoreKafkaTest.csproj", "{C468AC77-A5F1-4455-91FE-D40AC2539FDD}"
283+
EndProject
284+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetKafkaTest", "src\extensions\kafka\test\DotNetKafkaTest\DotNetKafkaTest.csproj", "{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE}"
285+
EndProject
273286
Global
274287
GlobalSection(SolutionConfigurationPlatforms) = preSolution
275288
Debug|Any CPU = Debug|Any CPU
@@ -652,6 +665,18 @@ Global
652665
{58C84EC7-A0B3-4C1B-BD78-989AEE87EA32}.Debug|Any CPU.Build.0 = Debug|Any CPU
653666
{58C84EC7-A0B3-4C1B-BD78-989AEE87EA32}.Release|Any CPU.ActiveCfg = Release|Any CPU
654667
{58C84EC7-A0B3-4C1B-BD78-989AEE87EA32}.Release|Any CPU.Build.0 = Release|Any CPU
668+
{8FE70544-0128-4C22-8C84-28D6842E110A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
669+
{8FE70544-0128-4C22-8C84-28D6842E110A}.Debug|Any CPU.Build.0 = Debug|Any CPU
670+
{8FE70544-0128-4C22-8C84-28D6842E110A}.Release|Any CPU.ActiveCfg = Release|Any CPU
671+
{8FE70544-0128-4C22-8C84-28D6842E110A}.Release|Any CPU.Build.0 = Release|Any CPU
672+
{C468AC77-A5F1-4455-91FE-D40AC2539FDD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
673+
{C468AC77-A5F1-4455-91FE-D40AC2539FDD}.Debug|Any CPU.Build.0 = Debug|Any CPU
674+
{C468AC77-A5F1-4455-91FE-D40AC2539FDD}.Release|Any CPU.ActiveCfg = Release|Any CPU
675+
{C468AC77-A5F1-4455-91FE-D40AC2539FDD}.Release|Any CPU.Build.0 = Release|Any CPU
676+
{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
677+
{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE}.Debug|Any CPU.Build.0 = Debug|Any CPU
678+
{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE}.Release|Any CPU.ActiveCfg = Release|Any CPU
679+
{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE}.Release|Any CPU.Build.0 = Release|Any CPU
655680
EndGlobalSection
656681
GlobalSection(SolutionProperties) = preSolution
657682
HideSolutionNode = FALSE
@@ -782,6 +807,12 @@ Global
782807
{F8959289-4ED7-430C-97B7-FAAA29829708} = {B5C28D81-BCD9-4B29-9B68-EDD81D1018D5}
783808
{58C84EC7-A0B3-4C1B-BD78-989AEE87EA32} = {F8959289-4ED7-430C-97B7-FAAA29829708}
784809
{E59B3248-4C26-4DB0-96CB-67437319E22B} = {41E1D031-799F-484F-85DE-7A30AF1A6FBA}
810+
{7CABA1C5-F531-4DC7-AEFC-A33900D15E9D} = {C6AFB6A3-FF0B-4970-B1F1-10BCD3D932B2}
811+
{26132DE8-B551-4A79-9363-696277DDB803} = {7CABA1C5-F531-4DC7-AEFC-A33900D15E9D}
812+
{57F168E9-6A77-4263-BF61-E1FF7BCE855E} = {7CABA1C5-F531-4DC7-AEFC-A33900D15E9D}
813+
{8FE70544-0128-4C22-8C84-28D6842E110A} = {26132DE8-B551-4A79-9363-696277DDB803}
814+
{C468AC77-A5F1-4455-91FE-D40AC2539FDD} = {57F168E9-6A77-4263-BF61-E1FF7BCE855E}
815+
{07B7EDAE-2A2A-43DE-B5E5-0D5904B323EE} = {57F168E9-6A77-4263-BF61-E1FF7BCE855E}
785816
EndGlobalSection
786817
GlobalSection(ExtensibilityGlobals) = postSolution
787818
SolutionGuid = {E18684C9-7D76-45CD-BF24-E3944B7F174C}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# GeneXus Messaging EXO for Apache Kafka
2+
3+
GeneXus Messaging EXO for Apache Kafka provides a Producer and Consumer API to integrate GeneXus applications with Apache Kafka, enabling publish-subscribe and stream processing capabilities.
4+
5+
## Features ##
6+
- Producer API: Publish messages to Kafka topics.
7+
- Consumer API: Subscribe to Kafka topics and process messages.
8+
9+
## Prerequisites ##
10+
- Apache Kafka: Ensure you have a running Kafka instance. You can use Confluent Kafka or set up Kafka locally using Docker.
11+
12+
## Setting up Kafka with Docker ##
13+
14+
You can use the docker-compose.yml file included in this repository to quickly set up a Kafka environment. The file is located at:
15+
dotnet/src/extensions/kafka/test/docker-compose.yml
16+
17+
1. Navigate to the directory containing the docker-compose.yml file:
18+
```cd dotnet/src/extensions/kafka/test```
19+
20+
2. Start the Kafka environment using Docker Compose:
21+
22+
```docker-compose up -d```
23+
24+
3. Verify that the Kafka and Zookeeper containers are running:
25+
```docker ps```
26+
27+
4. Once the containers are running, you can use the Producer and Consumer APIs to interact with Kafka.
28+
29+
## Documentation ##
30+
For detailed documentation, visit https://wiki.genexus.com/commwiki/wiki?40593,Kafka+Producer+and+Consumer+External+Objects
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFrameworks>net8.0;net462</TargetFrameworks>
4+
<OutputType>Library</OutputType>
5+
<RootNamespace>GeneXus.Messaging.Core</RootNamespace>
6+
<AssemblyName>GeneXus.Messaging.Kafka</AssemblyName>
7+
<PackageId>GeneXus.Messaging.Kafka</PackageId>
8+
</PropertyGroup>
9+
<ItemGroup>
10+
<PackageReference Include="Confluent.Kafka" Version="2.10.0" />
11+
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
12+
<PackageReference Include="StrongNamer" Version="0.2.5" />
13+
</ItemGroup>
14+
</Project>
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System;
2+
3+
namespace GeneXus.Messaging.Core.Exceptions
4+
{
5+
public class MessageNotDeliveredException : Exception
6+
{
7+
public int ErrCode { set; get; }
8+
9+
public MessageNotDeliveredException(string message) : base(message)
10+
{
11+
}
12+
13+
public MessageNotDeliveredException(int code, string v) : base(v)
14+
{
15+
ErrCode = code;
16+
}
17+
}
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System;
2+
3+
namespace GeneXus.Messaging.Core.Exceptions
4+
{
5+
public class MessagingConsumeException: Exception
6+
{
7+
public int ErrCode { set; get; }
8+
9+
public MessagingConsumeException(string message) : base(message)
10+
{
11+
}
12+
13+
public MessagingConsumeException(int code, string v) : base(v)
14+
{
15+
ErrCode = code;
16+
}
17+
}
18+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
namespace GeneXus.Messaging.Core.Exceptions
2+
{
3+
public class MessagingException
4+
{
5+
}
6+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
using System.Collections.Generic;
2+
using GeneXus.Messaging.Core.Exceptions;
3+
using GeneXus.Messaging.Core.Providers.Kafka;
4+
5+
namespace GeneXus.Messaging.Core
6+
{
7+
public class GXMessaging
8+
{
9+
10+
private IConsumer consumer;
11+
private IProducer producer;
12+
13+
public string Configuration { set; get; }
14+
15+
public int ErrCode { get; set; }
16+
17+
public string ErrDescription { get; set; }
18+
19+
public GXMessaging()
20+
{
21+
}
22+
23+
private bool Initialize()
24+
{
25+
bool ok = Configuration != null;
26+
return ok;
27+
}
28+
public List<MessageResponse> Consume(string topic, int timeout)
29+
{
30+
List<MessageResponse> list = new List<MessageResponse>();
31+
32+
bool ok = false;
33+
34+
if (!Initialize())
35+
{
36+
return list;
37+
}
38+
if (consumer == null)
39+
{
40+
41+
consumer = new KafkaConsumer(JsonHelper.Deserialize(Configuration));
42+
}
43+
try
44+
{
45+
var resultList = consumer.Consume(topic, timeout);
46+
list.AddRange(resultList);
47+
ok = resultList.Count > 0;
48+
ErrDescription = "";
49+
ErrCode = 0;
50+
}
51+
catch (MessagingConsumeException e)
52+
{
53+
ErrDescription = e.Message;
54+
ErrCode = e.ErrCode;
55+
}
56+
57+
return list;
58+
}
59+
public bool ProduceAsync(string topic, string key, string value)
60+
{
61+
if (!Initialize())
62+
{
63+
return false;
64+
}
65+
66+
if (producer == null)
67+
{
68+
producer = new KafkaProducer(JsonHelper.Deserialize(Configuration));
69+
}
70+
71+
return producer.ProduceAsync(topic,key, value);
72+
}
73+
74+
public List<MessageResponse> Finish(int timeout)
75+
{
76+
if (!Initialize())
77+
{
78+
return new List<MessageResponse>();
79+
}
80+
return producer.Finish(timeout);
81+
}
82+
83+
}
84+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using Newtonsoft.Json;
4+
5+
namespace GeneXus.Messaging.Core
6+
{
7+
public class JsonHelper
8+
{
9+
public static string Serialize<T>(T tobject) where T : new()
10+
{
11+
try
12+
{
13+
return Newtonsoft.Json.JsonConvert.SerializeObject(tobject);
14+
}
15+
catch (Exception ex)
16+
{
17+
Console.WriteLine(ex.Message);
18+
}
19+
return null;
20+
}
21+
public static Dictionary<string, string> Deserialize(string jsonValue)
22+
{
23+
24+
if (!string.IsNullOrEmpty(jsonValue))
25+
{
26+
try
27+
{
28+
var configDictionary = JsonConvert.DeserializeObject<Dictionary<string, object>>(jsonValue);
29+
30+
var flattenedConfig = new Dictionary<string, string>();
31+
foreach (var kvp in configDictionary)
32+
{
33+
if (kvp.Value is Newtonsoft.Json.Linq.JObject nestedObject)
34+
{
35+
foreach (var nestedKvp in nestedObject.ToObject<Dictionary<string, string>>())
36+
{
37+
flattenedConfig[nestedKvp.Key] = nestedKvp.Value;
38+
}
39+
}
40+
else
41+
{
42+
flattenedConfig[kvp.Key] = kvp.Value.ToString();
43+
}
44+
}
45+
return flattenedConfig;
46+
}
47+
catch (Exception ex)
48+
{
49+
Console.WriteLine(ex.Message);
50+
}
51+
}
52+
return null;
53+
}
54+
55+
}
56+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using System.Collections.Generic;
2+
3+
namespace GeneXus.Messaging.Core
4+
{
5+
public interface IConsumer
6+
{
7+
List<MessageResponse> Consume(string topic, int timeout);
8+
}
9+
}

0 commit comments

Comments
 (0)