Skip to content

Commit

Permalink
adding Receiver loom based module Implementation. (#3215)
Browse files Browse the repository at this point in the history
* Initial Impl of Loom Producer

* LoomKafkaProducerTest

* Initial work

* bump up the java version for loom Dataplane test

* Test added

* Span implementation

* Do Not stuck in infinit loop.

* Add test dependency

* reverse dataplanetest

* Tracer always

* benchmark core dependency

* jacoco pluging update

* Use BlockingQueue in producer

* Check if Tracer is there

* Refactoring the DataPlaneTest.

* after rebase

* Licence Header and Refrence Header

* Grammer Typo

* Codegen Update

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* method name changed

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* Condition to stop sending thread

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* using Thread.interrupt()

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* Suggested changes.

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* New Test for LoomKafkaProducer

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* Test Factory create

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* Added licence header

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* Make wait 10 sec before check thread

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* Some refactore

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* Test fixed

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* Complete the promise

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* fix the test mock to implemente send with callback

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* wait to join and end of thread

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* Suggested cgenges in test and close to complete

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* change sendFromQueue

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* change code seq and some comments

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

* set the numRecord final

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>

---------

Signed-off-by: debasishbsws <debasishbsws.abc@gmail.com>
  • Loading branch information
Debasish Biswas authored Aug 1, 2023
1 parent acb04ea commit 76e992e
Show file tree
Hide file tree
Showing 17 changed files with 831 additions and 28 deletions.
3 changes: 2 additions & 1 deletion data-plane/THIRD-PARTY.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Lists of 230 third-party dependencies.
Lists of 231 third-party dependencies.
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Classic Module (ch.qos.logback:logback-classic:1.2.11 - http://logback.qos.ch/logback-classic)
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Core Module (ch.qos.logback:logback-core:1.2.11 - http://logback.qos.ch/logback-core)
(Apache License 2.0) brotli4j (com.aayushatharva.brotli4j:brotli4j:1.12.0 - https://github.com/hyperxpro/Brotli4j/brotli4j)
Expand Down Expand Up @@ -36,6 +36,7 @@ Lists of 230 third-party dependencies.
(Unknown license) dispatcher (dev.knative.eventing.kafka.broker:dispatcher:1.0-SNAPSHOT - no url defined)
(Unknown license) dispatcher-vertx (dev.knative.eventing.kafka.broker:dispatcher-vertx:1.0-SNAPSHOT - no url defined)
(Unknown license) receiver (dev.knative.eventing.kafka.broker:receiver:1.0-SNAPSHOT - no url defined)
(Unknown license) receiver-loom (dev.knative.eventing.kafka.broker:receiver-loom:1.0-SNAPSHOT - no url defined)
(Unknown license) receiver-vertx (dev.knative.eventing.kafka.broker:receiver-vertx:1.0-SNAPSHOT - no url defined)
(The Apache Software License, Version 2.0) CloudEvents - API (io.cloudevents:cloudevents-api:2.5.0 - https://cloudevents.github.io/sdk-java/cloudevents-api/)
(The Apache Software License, Version 2.0) CloudEvents - Core (io.cloudevents:cloudevents-core:2.5.0 - https://cloudevents.github.io/sdk-java/cloudevents-core/)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This piece of code is inspired from vert-x3/vertx-kafka-client project.
* The original source code can be found here: https://github.com/vert-x3/vertx-kafka-client
*/
package dev.knative.eventing.kafka.broker.core.tracing.kafka;

import io.vertx.core.Context;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingPolicy;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Utils;

/**
* Tracer for Kafka producer, wrapping the generic tracer.
*/
public class ProducerTracer<S> {
private final VertxTracer<Void, S> tracer;
private final String address;
private final String hostname;
private final String port;
private final TracingPolicy policy;

/**
* Creates a ProducerTracer, which provides an opinionated facade for using {@link io.vertx.core.spi.tracing.VertxTracer}
* with a Kafka Producer use case. The method will return {@code null} if Tracing is not setup in Vert.x.
* {@code TracingPolicy} is always set to {@code TracingPolicy.ALWAYS}.
* @param tracer the generic tracer object
* @param <S> the type of spans that is going to be generated, depending on the tracing system (zipkin, opentracing ...)
* @return a new instance of {@code ProducerTracer}, or {@code null}
*/
public static <S> ProducerTracer create(VertxTracer tracer) {
if (tracer == null) {
return null;
}
TracingPolicy policy = TracingPolicy.ALWAYS;
return new ProducerTracer<S>(tracer, policy, "");
}

private ProducerTracer(VertxTracer<Void, S> tracer, TracingPolicy policy, String bootstrapServer) {
this.tracer = tracer;
this.address = bootstrapServer;
this.hostname = Utils.getHost(bootstrapServer);
Integer port = Utils.getPort(bootstrapServer);
this.port = port == null ? null : port.toString();
this.policy = policy;
}

public StartedSpan prepareSendMessage(Context context, ProducerRecord record) {
TraceContext tc = new TraceContext("producer", address, hostname, port, record.topic());
S span = tracer.sendRequest(
context,
SpanKind.MESSAGING,
policy,
tc,
"kafka_send",
(k, v) -> record.headers().add(k, v.getBytes()),
TraceTags.TAG_EXTRACTOR);
return new StartedSpan(span);
}

public class StartedSpan {
private final S span;

private StartedSpan(S span) {
this.span = span;
}

public void finish(Context context) {
// We don't add any new tag to the span here, just stop span timer
tracer.receiveResponse(context, null, span, null, TagExtractor.<TraceContext>empty());
}

public void fail(Context context, Throwable failure) {
tracer.receiveResponse(context, null, span, failure, TagExtractor.empty());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This piece of code is inspired from vert-x3/vertx-kafka-client project.
* The original source code can be found here: https://github.com/vert-x3/vertx-kafka-client
*/
package dev.knative.eventing.kafka.broker.core.tracing.kafka;

/**
* TraceContext holds some context for tracing during a message writing / reading process
*/
class TraceContext {
final String kind;
final String address;
final String hostname;
final String port;
final String topic;

TraceContext(String kind, String address, String hostname, String port, String topic) {
this.kind = kind;
this.address = address;
this.hostname = hostname;
this.port = port;
this.topic = topic;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This piece of code is inspired from vert-x3/vertx-kafka-client project.
* The original source code can be found here: https://github.com/vert-x3/vertx-kafka-client
*/
package dev.knative.eventing.kafka.broker.core.tracing.kafka;

import io.vertx.core.spi.tracing.TagExtractor;
import java.util.function.Function;

/**
* Tags for Kafka Tracing
*/
public enum TraceTags {
// See https://github.com/opentracing/specification/blob/master/semantic_conventions.md
PEER_ADDRESS("peer.address", q -> q.address),
PEER_HOSTNAME("peer.hostname", q -> q.hostname),
PEER_PORT("peer.port", q -> q.port),
PEER_SERVICE("peer.service", q -> "kafka"),
BUS_DESTINATION("message_bus.destination", q -> q.topic);

static final TagExtractor<TraceContext> TAG_EXTRACTOR = new TagExtractor<TraceContext>() {
private final TraceTags[] TAGS = TraceTags.values();

@Override
public int len(TraceContext obj) {
return TAGS.length;
}

@Override
public String name(TraceContext obj, int index) {
return TAGS[index].name;
}

@Override
public String value(TraceContext obj, int index) {
return TAGS[index].fn.apply(obj);
}
};

final String name;
final Function<TraceContext, String> fn;

TraceTags(String name, Function<TraceContext, String> fn) {
this.name = name;
this.fn = fn;
}
}
2 changes: 1 addition & 1 deletion data-plane/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<maven.surefire.plugin.version>2.22.2</maven.surefire.plugin.version>
<maven.shade.plugin.version>3.3.0</maven.shade.plugin.version>
<maven.editorconfig.plugin.version>0.1.1</maven.editorconfig.plugin.version>
<maven.jacoco.plugin.version>0.8.8</maven.jacoco.plugin.version>
<maven.jacoco.plugin.version>0.8.10</maven.jacoco.plugin.version>
<maven.enforcer.plugin.version>3.1.0</maven.enforcer.plugin.version>
<maven.license.plugin.version>2.0.0</maven.license.plugin.version>
<mycila.license.plugin.version>4.1</mycila.license.plugin.version>
Expand Down
44 changes: 44 additions & 0 deletions data-plane/receiver-loom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

<properties>
<java.version>20</java.version>
<argLine>--enable-preview</argLine>
</properties>


Expand All @@ -39,6 +40,49 @@
<artifactId>receiver</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.knative.eventing.kafka.broker</groupId>
<artifactId>receiver</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>dev.knative.eventing.kafka.broker</groupId>
<artifactId>core</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit 76e992e

Please sign in to comment.