Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #5069] Enhancement for http source/sink connector #5070

Merged
merged 3 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,7 @@ public class SourceConnectorConfig {

// extra config, e.g. GitHub secret
private Map<String, String> extraConfig = new HashMap<>();

// data consistency enabled, default true
private boolean dataConsistencyEnabled = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.eventmesh.connector.http.sink.config.SinkConnectorConfig;
import org.apache.eventmesh.connector.http.sink.data.HttpConnectRecord;
import org.apache.eventmesh.connector.http.util.HttpUtils;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.net.URI;
Expand Down Expand Up @@ -111,14 +113,70 @@ public void handle(ConnectRecord record) {
// convert ConnectRecord to HttpConnectRecord
String type = String.format("%s.%s.%s", connectorConfig.getConnectorName(), url.getScheme(), "common");
HttpConnectRecord httpConnectRecord = HttpConnectRecord.convertConnectRecord(record, type);
deliver(url, httpConnectRecord);
// get timestamp and offset
Long timestamp = httpConnectRecord.getData().getTimestamp();
Map<String, ?> offset = null;
try {
// May throw NullPointerException.
offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
} catch (NullPointerException e) {
// ignore null pointer exception
}
final Map<String, ?> finalOffset = offset;
Future<HttpResponse<Buffer>> responseFuture = deliver(url, httpConnectRecord);
responseFuture.onSuccess(res -> {
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset);
// log the response
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
if (log.isDebugEnabled()) {
log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
record.getCallback().onSuccess(convertToSendResult(record));
} else {
if (log.isDebugEnabled()) {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
record.getCallback()
.onException(buildSendExceptionContext(record, new RuntimeException("HTTP response code: " + res.statusCode())));
}
}).onFailure(err -> {
log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err);
record.getCallback().onException(buildSendExceptionContext(record, err));
});
}
}

private SendResult convertToSendResult(ConnectRecord record) {
SendResult result = new SendResult();
result.setMessageId(record.getRecordId());
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
result.setTopic(record.getExtension("topic"));
}
return result;
}

private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) {
SendExceptionContext sendExceptionContext = new SendExceptionContext();
sendExceptionContext.setMessageId(record.getRecordId());
sendExceptionContext.setCause(e);
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
sendExceptionContext.setTopic(record.getExtension("topic"));
}
return sendExceptionContext;
}


/**
* Processes HttpConnectRecord on specified URL while returning its own processing logic.
* This method sends the HttpConnectRecord to the specified URL using the WebClient.
* Processes HttpConnectRecord on specified URL while returning its own processing logic. This method sends the HttpConnectRecord to the specified
* URL using the WebClient.
*
* @param url URI to which the HttpConnectRecord should be sent
* @param httpConnectRecord HttpConnectRecord to process
Expand All @@ -130,48 +188,13 @@ public Future<HttpResponse<Buffer>> deliver(URI url, HttpConnectRecord httpConne
MultiMap headers = HttpHeaders.headers()
.set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=utf-8")
.set(HttpHeaderNames.ACCEPT, "application/json; charset=utf-8");

// get timestamp and offset
Long timestamp = httpConnectRecord.getData().getTimestamp();
Map<String, ?> offset = null;
try {
// May throw NullPointerException.
offset = ((HttpRecordOffset) httpConnectRecord.getData().getPosition().getRecordOffset()).getOffsetMap();
} catch (NullPointerException e) {
// ignore null pointer exception
}
final Map<String, ?> finalOffset = offset;

// send the request
return this.webClient.post(url.getPath())
.host(url.getHost())
.port(url.getPort() == -1 ? (Objects.equals(url.getScheme(), "https") ? 443 : 80) : url.getPort())
.putHeaders(headers)
.ssl(Objects.equals(url.getScheme(), "https"))
.sendJson(httpConnectRecord)
.onSuccess(res -> {
log.info("Request sent successfully. Record: timestamp={}, offset={}", timestamp, finalOffset);
// log the response
if (HttpUtils.is2xxSuccessful(res.statusCode())) {
if (log.isDebugEnabled()) {
log.debug("Received successful response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.info("Received successful response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
} else {
if (log.isDebugEnabled()) {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}, responseBody={}",
res.statusCode(), timestamp, finalOffset, res.bodyAsString());
} else {
log.warn("Received non-2xx response: statusCode={}. Record: timestamp={}, offset={}", res.statusCode(), timestamp,
finalOffset);
}
}

})
.onFailure(err -> log.error("Request failed to send. Record: timestamp={}, offset={}", timestamp, finalOffset, err));
.sendJson(httpConnectRecord);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.LoggerHandler;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -52,22 +54,18 @@ public class HttpSourceConnector implements Source, ConnectorCreateService<Sourc

private int batchSize;

private Route route;

private Protocol protocol;

private HttpServer server;

@Getter
private volatile boolean started = false;

@Getter
private volatile boolean destroyed = false;

public boolean isStarted() {
return started;
}

public boolean isDestroyed() {
return destroyed;
}


@Override
public Class<? extends Config> configClass() {
Expand Down Expand Up @@ -106,7 +104,7 @@ private void doInit() {

final Vertx vertx = Vertx.vertx();
final Router router = Router.router(vertx);
final Route route = router.route()
route = router.route()
.path(this.sourceConfig.connectorConfig.getPath())
.handler(LoggerHandler.create());

Expand Down Expand Up @@ -136,7 +134,15 @@ public void start() {

@Override
public void commit(ConnectRecord record) {

if (this.route != null && sourceConfig.getConnectorConfig().isDataConsistencyEnabled()) {
this.route.handler(ctx -> {
// Return 200 OK
ctx.response()
.putHeader("content-type", "application/json")
.setStatusCode(HttpResponseStatus.OK.code())
.end("{\"status\":\"success\",\"recordId\":\"" + record.getRecordId() + "\"}");
});
}
}

@Override
Expand All @@ -146,7 +152,15 @@ public String name() {

@Override
public void onException(ConnectRecord record) {

if (this.route != null) {
this.route.failureHandler(ctx -> {
log.error("Failed to handle the request, recordId {}. ", record.getRecordId(), ctx.failure());
// Return Bad Response
ctx.response()
.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
.end("{\"status\":\"failed\",\"recordId\":\"" + record.getRecordId() + "\"}");
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,16 @@ public class CommonProtocol implements Protocol {

public static final String PROTOCOL_NAME = "Common";

private SourceConnectorConfig sourceConnectorConfig;

/**
* Initialize the protocol
*
* @param sourceConnectorConfig source connector config
*/
@Override
public void initialize(SourceConnectorConfig sourceConnectorConfig) {

this.sourceConnectorConfig = sourceConnectorConfig;
}

/**
Expand All @@ -77,10 +79,13 @@ public void setHandler(Route route, SynchronizedCircularFifoQueue<Object> queue)
throw new IllegalStateException("Failed to store the request.");
}

// Return 200 OK
ctx.response()
.setStatusCode(HttpResponseStatus.OK.code())
.end(CommonResponse.success().toJsonStr());
if (!sourceConnectorConfig.isDataConsistencyEnabled()) {
// Return 200 OK
ctx.response()
.setStatusCode(HttpResponseStatus.OK.code())
.end(CommonResponse.success().toJsonStr());
}

})
.failureHandler(ctx -> {
log.error("Failed to handle the request. ", ctx.failure());
Expand Down
Loading