Skip to content

[ISSUE #4621] Implement TransformerEngine for EventMesh Transformer #4622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class EventMeshHTTPServer extends AbstractHTTPServer {

private FilterEngine filterEngine;

private TransformerEngine transformerEngine;

private HttpRetryer httpRetryer;

private transient RateLimiter msgRateLimiter;
Expand Down Expand Up @@ -137,6 +139,8 @@ public void init() throws Exception {

filterEngine = new FilterEngine(metaStorage, producerManager, consumerManager);

transformerEngine = new TransformerEngine(metaStorage, producerManager, consumerManager);

super.setHandlerService(new HandlerService());
super.getHandlerService().setMetrics(this.getMetrics());

Expand Down Expand Up @@ -180,6 +184,8 @@ public void shutdown() throws Exception {

filterEngine.shutdown();

transformerEngine.shutdown();

consumerManager.shutdown();

httpClientPool.shutdown();
Expand Down Expand Up @@ -354,6 +360,10 @@ public FilterEngine getFilterEngine() {
return filterEngine;
}

public TransformerEngine getTransformerEngine() {
return transformerEngine;
}

public MetaStorage getMetaStorage() {
return metaStorage;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.runtime.boot;

import org.apache.eventmesh.api.meta.MetaServiceListener;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerGroupManager;
import org.apache.eventmesh.runtime.core.protocol.http.consumer.ConsumerManager;
import org.apache.eventmesh.runtime.core.protocol.producer.EventMeshProducer;
import org.apache.eventmesh.runtime.core.protocol.producer.ProducerManager;
import org.apache.eventmesh.runtime.meta.MetaStorage;
import org.apache.eventmesh.transformer.Transformer;
import org.apache.eventmesh.transformer.TransformerBuilder;
import org.apache.eventmesh.transformer.TransformerParam;

import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.JsonNode;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TransformerEngine {

/**
* key:group-topic
**/
private final Map<String, Transformer> transformerMap = new HashMap<>();

private final String transformerPrefix = "transformer-";

private final MetaStorage metaStorage;

private MetaServiceListener metaServiceListener;

private final ProducerManager producerManager;

private final ConsumerManager consumerManager;

private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

public TransformerEngine(MetaStorage metaStorage, ProducerManager producerManager, ConsumerManager consumerManager) {
this.metaStorage = metaStorage;
this.producerManager = producerManager;
this.consumerManager = consumerManager;
}

public void start() {
Map<String, String> transformerMetaData = metaStorage.getMetaData(transformerPrefix, true);
for (Entry<String, String> transformerDataEntry : transformerMetaData.entrySet()) {
// transformer-group
String key = transformerDataEntry.getKey();
// topic-transformerParam list
String value = transformerDataEntry.getValue();
updateTransformerMap(key, value);
}
metaServiceListener = this::updateTransformerMap;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this step be placed before for( ), otherwise, is it possible for metaServiceListener to be null when addTransformerListener() in updateTransformerMap()?

这一步是不是应该放到for( )前面,否则的话,updateTransformerMap()addTransformerListener()时,metaServiceListener是不是为null?


// addListeners for producerManager & consumerManager
scheduledExecutorService.scheduleAtFixedRate(() -> {
ConcurrentHashMap<String, EventMeshProducer> producerMap = producerManager.getProducerTable();
for (String producerGroup : producerMap.keySet()) {
for (String transformerKey : transformerMap.keySet()) {
if (!StringUtils.contains(transformerKey, producerGroup)) {
addTransformerListener(producerGroup);
LogUtils.info(log, "addTransformerListener for producer group: " + producerGroup);
}
}
}
ConcurrentHashMap<String, ConsumerGroupManager> consumerMap = consumerManager.getClientTable();
for (String consumerGroup : consumerMap.keySet()) {
for (String transformerKey : transformerMap.keySet()) {
if (!StringUtils.contains(transformerKey, consumerGroup)) {
addTransformerListener(consumerGroup);
LogUtils.info(log, "addTransformerListener for consumer group: " + consumerGroup);
}
}
}
}, 10_000, 5_000, TimeUnit.MILLISECONDS);
}

private void updateTransformerMap(String key, String value) {
String group = StringUtils.substringAfter(key, transformerPrefix);

JsonNode transformerJsonNodeArray = JsonUtils.getJsonNode(value);

if (transformerJsonNodeArray != null) {
for (JsonNode transformerJsonNode : transformerJsonNodeArray) {
String topic = transformerJsonNode.get("topic").asText();
String transformerParam = transformerJsonNode.get("transformerParam").toString();
TransformerParam tfp = JsonUtils.parseObject(transformerParam, TransformerParam.class);
Transformer transformer = TransformerBuilder.buildTransformer(tfp);
transformerMap.put(group + "-" + topic, transformer);
}
}
addTransformerListener(group);
}

public void addTransformerListener(String group) {
String transformerKey = transformerPrefix + group;
try {
metaStorage.getMetaDataWithListener(metaServiceListener, transformerKey);
} catch (Exception e) {
throw new RuntimeException("addTransformerListener exception", e);
}
}

public void shutdown() {
scheduledExecutorService.shutdown();
}

public Transformer getTransformer(String key) {
return transformerMap.get(key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.trace.api.common.EventMeshTraceConstants;
import org.apache.eventmesh.transformer.Transformer;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -161,6 +162,7 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
final String topic = event.getSubject();

Pattern filterPattern = eventMeshHTTPServer.getFilterEngine().getFilterPattern(producerGroup + "-" + topic);
Transformer transformer = eventMeshHTTPServer.getTransformerEngine().getTransformer(producerGroup + "-" + topic);

// validate body
if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
Expand Down Expand Up @@ -252,6 +254,14 @@ public void handler(final HandlerService.HandlerSpecific handlerSpecific, final
isFiltered = filterPattern.filter(JsonUtils.toJSONString(event));
}

// apply transformer
if (isFiltered && transformer != null) {
String data = transformer.transform(JsonUtils.toJSONString(event));
event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
.getBytes(StandardCharsets.UTF_8)).build();
sendMessageContext.setEvent(event);
}

if (isFiltered) {
eventMeshProducer.send(sendMessageContext, new SendCallback() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.eventmesh.runtime.core.protocol.http.consumer.HandleMsgContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.WebhookUtil;
import org.apache.eventmesh.transformer.Transformer;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
Expand All @@ -53,6 +54,7 @@

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -138,6 +140,20 @@ public void tryHTTPRequest() {
return;
}
}
Transformer transformer = eventMeshHTTPServer.getTransformerEngine()
.getTransformer(handleMsgContext.getConsumerGroup() + "-" + handleMsgContext.getTopic());
if (transformer != null) {
try {
String data = transformer.transform(JsonUtils.toJSONString(event));
event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
.getBytes(StandardCharsets.UTF_8)).build();
} catch (Exception exception) {
LOGGER.warn("apply transformer to cloudevents error, group:{}, topic:{}, bizSeqNo={}, uniqueId={}",
this.handleMsgContext.getConsumerGroup(),
this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), exception);
return;
}
}
handleMsgContext.setEvent(event);
super.setEvent(event);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public List<Variable> match(String json) throws JsonProcessingException {

List<Variable> variableList = new ArrayList<>(variablesList.size());
for (Variable element : variablesList) {
if (JsonPathUtils.isValidAndDefinite(element.getJsonPath())) {
String res = JsonPathUtils.matchJsonPathValueWithString(json, element.getJsonPath());
if (JsonPathUtils.isValidAndDefinite(element.getValue())) {
String res = JsonPathUtils.matchJsonPathValueWithString(json, element.getValue());
Variable variable = new Variable(element.getName(), res);
variableList.add(variable);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public Template(String template) {
public String substitute(List<Variable> variables) throws TransformException {

Map<String, String> valuesMap = variables.stream()
.filter(variable -> variable.getJsonPath() != null)
.collect(Collectors.toMap(Variable::getName, Variable::getJsonPath));
.filter(variable -> variable.getValue() != null)
.collect(Collectors.toMap(Variable::getName, Variable::getValue));
StringSubstitutor sub = new StringSubstitutor(valuesMap);

return sub.replace(template);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ public String transform(String json) throws JsonProcessingException {
// 1: get variable match results
List<Variable> variableList = jsonPathParser.match(json);
// 2: use results replace template
String res = template.substitute(variableList);
return res;
return template.substitute(variableList);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,20 @@

public class TransformerBuilder {

public static final class Builder {

private final TransformerType transformerType;
private String template;
private String content;

public Builder(TransformerType transformerType) {
this.transformerType = transformerType;
}

public Builder setContent(String content) {
this.content = content;
return this;
}

public Builder setTemplate(String template) {
this.template = template;
return this;
}

public Transformer build() {
switch (this.transformerType) {
case CONSTANT:
return buildConstantTransformer(this.content);
case ORIGINAL:
return buildOriginalTransformer();
case TEMPLATE:
return buildTemplateTransFormer(this.content, this.template);
default:
throw new TransformException("invalid config");
}
public static Transformer buildTransformer(TransformerParam transformerParam) {
switch (transformerParam.getTransformerType()) {
case ORIGINAL:
return buildOriginalTransformer();
case CONSTANT:
return buildConstantTransformer(transformerParam.getValue());
case TEMPLATE:
return buildTemplateTransFormer(transformerParam.getValue(), transformerParam.getTemplate());
default:
throw new TransformException("invalid config");
}

}


public static Transformer buildTemplateTransFormer(String jsonContent, String template) {
JsonPathParser jsonPathParser = new JsonPathParser(jsonContent);
Template templateEntry = new Template(template);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.transformer;

public class TransformerParam {

private TransformerType transformerType;
private String value;
private String template;

public TransformerParam() {
}

public TransformerParam(TransformerType transformerType, String value, String template) {
this.transformerType = transformerType;
this.value = value;
this.template = template;
}

public TransformerParam(TransformerType transformerType, String value) {
this(transformerType, value, null);
}

public TransformerType getTransformerType() {
return transformerType;
}

public void setTransformerType(TransformerType transformerType) {
this.transformerType = transformerType;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}

public String getTemplate() {
return template;
}

public void setTemplate(String template) {
this.template = template;
}

}
Loading