Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Receiver loom based module Implementation #3215

Merged
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
47aaa7a
Initial Impl of Loom Producer
Jul 14, 2023
09db442
LoomKafkaProducerTest
Jul 15, 2023
4995d87
Initial work
Jul 17, 2023
9835ce8
bump up the java version for loom Dataplane test
Jul 17, 2023
b35d7fa
Test added
Jul 17, 2023
5dbc5fb
Span implementation
Jul 18, 2023
10ef6c6
Do Not stuck in infinit loop.
Jul 18, 2023
40c039c
Add test dependency
Jul 19, 2023
d804228
reverse dataplanetest
Jul 20, 2023
7d3181d
Tracer always
Jul 25, 2023
efdce4a
benchmark core dependency
Jul 25, 2023
3760489
jacoco pluging update
Jul 25, 2023
9ce362c
Use BlockingQueue in producer
Jul 26, 2023
6e4ff62
Check if Tracer is there
Jul 26, 2023
893e4ee
Refactoring the DataPlaneTest.
Jul 26, 2023
7e8c26f
after rebase
Jul 26, 2023
64c1dcd
Licence Header and Refrence Header
Jul 26, 2023
c767c50
Grammer Typo
Jul 26, 2023
a269321
Codegen Update
Jul 27, 2023
15e73e6
method name changed
Jul 31, 2023
1d2d166
Condition to stop sending thread
Jul 31, 2023
c59d60c
using Thread.interrupt()
Jul 31, 2023
78172b8
Suggested changes.
Jul 31, 2023
f874986
New Test for LoomKafkaProducer
Jul 31, 2023
d9eb5dc
Test Factory create
Jul 31, 2023
bb5219a
Added licence header
Jul 31, 2023
6c24bee
Make wait 10 sec before check thread
Jul 31, 2023
6c77c83
Some refactore
Jul 31, 2023
ac6792d
Test fixed
Jul 31, 2023
27e95fd
Complete the promise
Jul 31, 2023
221b512
fix the test mock to implemente send with callback
Jul 31, 2023
8192ddb
wait to join and end of thread
Jul 31, 2023
2be5c02
Suggested cgenges in test and close to complete
Aug 1, 2023
dbb47e3
change sendFromQueue
Aug 1, 2023
4aa9d86
change code seq and some comments
Aug 1, 2023
eab64fa
set the numRecord final
Aug 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion data-plane/THIRD-PARTY.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Lists of 230 third-party dependencies.
Lists of 231 third-party dependencies.
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Classic Module (ch.qos.logback:logback-classic:1.2.11 - http://logback.qos.ch/logback-classic)
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Core Module (ch.qos.logback:logback-core:1.2.11 - http://logback.qos.ch/logback-core)
(Apache License 2.0) brotli4j (com.aayushatharva.brotli4j:brotli4j:1.12.0 - https://github.com/hyperxpro/Brotli4j/brotli4j)
Expand Down Expand Up @@ -36,6 +36,7 @@ Lists of 230 third-party dependencies.
(Unknown license) dispatcher (dev.knative.eventing.kafka.broker:dispatcher:1.0-SNAPSHOT - no url defined)
(Unknown license) dispatcher-vertx (dev.knative.eventing.kafka.broker:dispatcher-vertx:1.0-SNAPSHOT - no url defined)
(Unknown license) receiver (dev.knative.eventing.kafka.broker:receiver:1.0-SNAPSHOT - no url defined)
(Unknown license) receiver-loom (dev.knative.eventing.kafka.broker:receiver-loom:1.0-SNAPSHOT - no url defined)
(Unknown license) receiver-vertx (dev.knative.eventing.kafka.broker:receiver-vertx:1.0-SNAPSHOT - no url defined)
(The Apache Software License, Version 2.0) CloudEvents - API (io.cloudevents:cloudevents-api:2.5.0 - https://cloudevents.github.io/sdk-java/cloudevents-api/)
(The Apache Software License, Version 2.0) CloudEvents - Core (io.cloudevents:cloudevents-core:2.5.0 - https://cloudevents.github.io/sdk-java/cloudevents-core/)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This piece of code is inspired from vert-x3/vertx-kafka-client project.
* The original source code can be found here: https://github.com/vert-x3/vertx-kafka-client
*/
package dev.knative.eventing.kafka.broker.core.tracing.kafka;

import io.vertx.core.Context;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingPolicy;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Utils;

/**
* Tracer for Kafka producer, wrapping the generic tracer.
*/
public class ProducerTracer<S> {
private final VertxTracer<Void, S> tracer;
private final String address;
private final String hostname;
private final String port;
private final TracingPolicy policy;

/**
* Creates a ProducerTracer, which provides an opinionated facade for using {@link io.vertx.core.spi.tracing.VertxTracer}
* with a Kafka Producer use case. The method will return {@code null} if Tracing is not setup in Vert.x.
* {@code TracingPolicy} is always set to {@code TracingPolicy.ALWAYS}.
* @param tracer the generic tracer object
* @param <S> the type of spans that is going to be generated, depending on the tracing system (zipkin, opentracing ...)
* @return a new instance of {@code ProducerTracer}, or {@code null}
*/
public static <S> ProducerTracer create(VertxTracer tracer) {
if (tracer == null) {
return null;

Check warning on line 51 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java#L51

Added line #L51 was not covered by tests
}
TracingPolicy policy = TracingPolicy.ALWAYS;
return new ProducerTracer<S>(tracer, policy, "");

Check warning on line 54 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java#L53-L54

Added lines #L53 - L54 were not covered by tests
}

private ProducerTracer(VertxTracer<Void, S> tracer, TracingPolicy policy, String bootstrapServer) {
this.tracer = tracer;
this.address = bootstrapServer;
this.hostname = Utils.getHost(bootstrapServer);
Integer port = Utils.getPort(bootstrapServer);

Check warning on line 61 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java#L57-L61

Added lines #L57 - L61 were not covered by tests
this.port = port == null ? null : port.toString();
this.policy = policy;
}

Check warning on line 64 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java#L63-L64

Added lines #L63 - L64 were not covered by tests

public StartedSpan prepareSendMessage(Context context, ProducerRecord record) {
TraceContext tc = new TraceContext("producer", address, hostname, port, record.topic());
S span = tracer.sendRequest(

Check warning on line 68 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java#L67-L68

Added lines #L67 - L68 were not covered by tests
context,
SpanKind.MESSAGING,
policy,
tc,
"kafka_send",
(k, v) -> record.headers().add(k, v.getBytes()),

Check warning on line 74 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java#L74

Added line #L74 was not covered by tests
TraceTags.TAG_EXTRACTOR);
return new StartedSpan(span);

Check warning on line 76 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java#L76

Added line #L76 was not covered by tests
}

public class StartedSpan {
private final S span;

private StartedSpan(S span) {
this.span = span;
}

Check warning on line 84 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java#L82-L84

Added lines #L82 - L84 were not covered by tests

public void finish(Context context) {
// We don't add any new tag to the span here, just stop span timer
tracer.receiveResponse(context, null, span, null, TagExtractor.<TraceContext>empty());
}

Check warning on line 89 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java#L88-L89

Added lines #L88 - L89 were not covered by tests

public void fail(Context context, Throwable failure) {
tracer.receiveResponse(context, null, span, failure, TagExtractor.empty());
}

Check warning on line 93 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/ProducerTracer.java#L92-L93

Added lines #L92 - L93 were not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This piece of code is inspired from vert-x3/vertx-kafka-client project.
* The original source code can be found here: https://github.com/vert-x3/vertx-kafka-client
*/
package dev.knative.eventing.kafka.broker.core.tracing.kafka;

/**
* TraceContext holds some context for tracing during a message writing / reading process
*/
class TraceContext {
final String kind;
final String address;
final String hostname;
final String port;
final String topic;

TraceContext(String kind, String address, String hostname, String port, String topic) {
this.kind = kind;
this.address = address;
this.hostname = hostname;
this.port = port;
this.topic = topic;
}

Check warning on line 39 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceContext.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceContext.java#L33-L39

Added lines #L33 - L39 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* This piece of code is inspired from vert-x3/vertx-kafka-client project.
* The original source code can be found here: https://github.com/vert-x3/vertx-kafka-client
*/
package dev.knative.eventing.kafka.broker.core.tracing.kafka;

import io.vertx.core.spi.tracing.TagExtractor;
import java.util.function.Function;

/**
* Tags for Kafka Tracing
*/
public enum TraceTags {

Check warning on line 29 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java#L29

Added line #L29 was not covered by tests
// See https://github.com/opentracing/specification/blob/master/semantic_conventions.md
PEER_ADDRESS("peer.address", q -> q.address),
PEER_HOSTNAME("peer.hostname", q -> q.hostname),
PEER_PORT("peer.port", q -> q.port),
PEER_SERVICE("peer.service", q -> "kafka"),
BUS_DESTINATION("message_bus.destination", q -> q.topic);

Check warning on line 35 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java#L31-L35

Added lines #L31 - L35 were not covered by tests

static final TagExtractor<TraceContext> TAG_EXTRACTOR = new TagExtractor<TraceContext>() {
private final TraceTags[] TAGS = TraceTags.values();

Check warning on line 38 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java#L37-L38

Added lines #L37 - L38 were not covered by tests

@Override
public int len(TraceContext obj) {
return TAGS.length;

Check warning on line 42 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java#L42

Added line #L42 was not covered by tests
}

@Override
public String name(TraceContext obj, int index) {
return TAGS[index].name;

Check warning on line 47 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java#L47

Added line #L47 was not covered by tests
}

@Override
public String value(TraceContext obj, int index) {
return TAGS[index].fn.apply(obj);

Check warning on line 52 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java#L52

Added line #L52 was not covered by tests
}
};

final String name;
final Function<TraceContext, String> fn;
debasishbsws marked this conversation as resolved.
Show resolved Hide resolved

TraceTags(String name, Function<TraceContext, String> fn) {
this.name = name;
this.fn = fn;
}

Check warning on line 62 in data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java

View check run for this annotation

Codecov / codecov/patch

data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/tracing/kafka/TraceTags.java#L59-L62

Added lines #L59 - L62 were not covered by tests
}
2 changes: 1 addition & 1 deletion data-plane/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<maven.surefire.plugin.version>2.22.2</maven.surefire.plugin.version>
<maven.shade.plugin.version>3.3.0</maven.shade.plugin.version>
<maven.editorconfig.plugin.version>0.1.1</maven.editorconfig.plugin.version>
<maven.jacoco.plugin.version>0.8.8</maven.jacoco.plugin.version>
<maven.jacoco.plugin.version>0.8.10</maven.jacoco.plugin.version>
<maven.enforcer.plugin.version>3.1.0</maven.enforcer.plugin.version>
<maven.license.plugin.version>2.0.0</maven.license.plugin.version>
<mycila.license.plugin.version>4.1</mycila.license.plugin.version>
Expand Down
44 changes: 44 additions & 0 deletions data-plane/receiver-loom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

<properties>
<java.version>20</java.version>
<argLine>--enable-preview</argLine>
</properties>


Expand All @@ -39,6 +40,49 @@
<artifactId>receiver</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>dev.knative.eventing.kafka.broker</groupId>
<artifactId>receiver</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>dev.knative.eventing.kafka.broker</groupId>
<artifactId>core</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading
Loading