Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:rocketmq插件 #115

Open
wants to merge 1 commit into
base: develop20210614
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .gitignore

This file was deleted.

3 changes: 2 additions & 1 deletion bin/package.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ cp ./repeater-logback.xml ${REPEATER_TARGET_DIR}/cfg/repeater-logback.xml \
&& cp ../repeater-plugins/redis-plugin/target/redis-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/redis-plugin.jar \
&& cp ../repeater-plugins/http-plugin/target/http-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/http-plugin.jar \
&& cp ../repeater-plugins/hibernate-plugin/target/hibernate-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/hibernate-plugin.jar \
&& cp ../repeater-plugins/spring-data-jpa-plugin/target/spring-data-jpa-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/spring-data-jpa-plugin.jar
&& cp ../repeater-plugins/spring-data-jpa-plugin/target/spring-data-jpa-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/spring-data-jpa-plugin.jar \
&& cp ../repeater-plugins/rocketmq-plugin/target/rocketmq-plugin-*-jar-with-dependencies.jar ${REPEATER_TARGET_DIR}/plugins/rocketmq-plugin.jar
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class InvokeType implements java.io.Serializable {
public static InvokeType EH_CACHE = new InvokeType("eh-cache");

public static InvokeType CAFFEINE_CACHE = new InvokeType("caffeine-cache");

public static InvokeType ROCKET_MQ = new InvokeType("rocket_mq");

private String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public class RepeaterConfig implements java.io.Serializable{
*/
private List<String> repeatIdentities = Lists.newArrayList();

/**
* rockMq的Topic,用于回放时替换掉原来mock的topic。key是
*/
private List<ReplaceObject> rockMqTopic = Lists.newArrayList();

public boolean isUseTtl() {
return useTtl;
}
Expand Down Expand Up @@ -157,11 +162,19 @@ public void setRepeatIdentities(List<String> repeatIdentities) {
this.repeatIdentities = repeatIdentities;
}

public List<ReplaceObject> getRockMqTopic() {
return rockMqTopic;
}

public void setRockMqTopic(List<ReplaceObject> rockMqTopic) {
this.rockMqTopic = rockMqTopic;
}

@Override
public String toString() {
return "{" +
"sampleRate=" + sampleRate +
", plugin=" + pluginIdentities +
'}';
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.alibaba.jvm.sandbox.repeater.plugin.domain;

/**
* 替换对象,用于mock参数与回放参数不同时替换。例子:生产录制了topic是xx-prd的mq消息,回放时选的是xx-test的测试环境
*
* @author zzm
* @date 2021年6月7日
*/
public class ReplaceObject {
private Object source;// 源对象(如xx-prd),如果不填则代表通配
private Object target;// 目标对象(如xx-test)

public Object getSource() {
return source;
}

public void setSource(Object source) {
this.source = source;
}

public Object getTarget() {
return target;
}

public void setTarget(Object target) {
this.target = target;
}
}
14 changes: 13 additions & 1 deletion repeater-plugin-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>repeater-plugin-core</artifactId>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
Expand Down Expand Up @@ -62,4 +74,4 @@
</dependency>
</dependencies>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package com.alibaba.jvm.sandbox.repeater.plugin.core.impl.spi;

import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.kohsuke.MetaInfServices;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.jvm.sandbox.repeater.plugin.core.model.ApplicationModel;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.Identity;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.Invocation;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.ReplaceObject;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.mock.MockRequest;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.mock.MockResponse;
import com.alibaba.jvm.sandbox.repeater.plugin.spi.MockInterceptor;

/**
* rocketMQ Mock策略执行中的拦截器
*
* @author zzm
* @date 2021年6月7日
*/
@MetaInfServices(MockInterceptor.class)
public class RocketMockInterceptor implements MockInterceptor {

private static Logger log = LoggerFactory.getLogger(RocketMockInterceptor.class);

@Override
public void beforeSelect(MockRequest request) {
// 如果一次调用里有多次rocketMq的子调用,会进入这个方法多次。这是必须的,因为要设置ModifiedInvocationIdentity,给ParameterMatchMockStrategy.calcSimilarity的判断使用
List<ReplaceObject> rockMqTopic = ApplicationModel.instance().getConfig().getRockMqTopic();

// DefaultMQProducerImpl#sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final
// SendCallback sendCallback,final long timeout)
Object[] argumentArray = request.getArgumentArray();
Object messageObj = argumentArray[0];
replaceTopic(messageObj, rockMqTopic);
replaceProperties(messageObj);

List<Invocation> subInvocations = request.getRecordModel().getSubInvocations();
if (CollectionUtils.isEmpty(subInvocations)) {
return;
}

Set<Identity> modifySet = new HashSet<>();
for (Invocation subInvocation : subInvocations) {
if (InvokeType.ROCKET_MQ.equals(subInvocation.getType())) {
messageObj = subInvocation.getRequest()[0];
replaceTopic(messageObj, rockMqTopic);
replaceProperties(messageObj);
modifySet.add(subInvocation.getIdentity());
}
}
request.setModifiedInvocationIdentity(modifySet);
}

private void replaceProperties(Object messageObj) {
Field propertiesField = FieldUtils.getDeclaredField(messageObj.getClass(), "properties", true);
if (propertiesField == null) {
return;
}
try {
// {traceid=a38e3769b72178a4, KEYS=rocketMqtest1, id=b74774f8-fc54-03a7-f3d3-3d8ea0aa85b0, WAIT=false,
// contentType=application/json;charset=UTF-8, TAGS=rocketMqtest1, timestamp=1624868276567}
Object properties = propertiesField.get(messageObj);
MethodUtils.invokeMethod(properties, "remove", "traceid");
MethodUtils.invokeMethod(properties, "remove", "id");
MethodUtils.invokeMethod(properties, "remove", "timestamp");
MethodUtils.invokeMethod(properties, "remove", "contentType");
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

private void replaceTopic(Object messageObj, List<ReplaceObject> rockMqTopic) {
if (CollectionUtils.isEmpty(rockMqTopic)) {
return;
}
Field topicField = FieldUtils.getDeclaredField(messageObj.getClass(), "topic", true);
if (topicField == null) {
return;
}

try {
Object sourceTopic = topicField.get(messageObj);
String targetTopic = getTarget(sourceTopic, rockMqTopic);
if (StringUtils.isNotBlank(targetTopic) && !targetTopic.equals(sourceTopic)) {
topicField.setAccessible(true);
topicField.set(messageObj, targetTopic);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

private String getTarget(Object sourceTopic, List<ReplaceObject> rockMqTopic) {
Optional<ReplaceObject> findAny = rockMqTopic.stream()
.filter(r -> sourceTopic != null && r.getSource() != null && sourceTopic.toString().equals(r.getSource()))
.findAny();

if (findAny.isPresent()) {
Object target = findAny.get().getTarget();
if (target != null) {
return target.toString();
}
} else {
findAny = rockMqTopic.stream().filter(r -> r.getSource() == null).findAny();
if (findAny.isPresent()) {
Object target = findAny.get().getTarget();
if (target != null) {
return target.toString();
}
}
}
return null;
}

@Override
public void beforeReturn(MockRequest request, MockResponse response) {}

@Override
public boolean matchingSelect(MockRequest request) {
if (InvokeType.ROCKET_MQ.equals(request.getType())) {
return true;
}
return false;
}

@Override
public boolean matchingReturn(MockRequest request, MockResponse response) {
return false;
}

}
1 change: 1 addition & 0 deletions repeater-plugins/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<module>eh-cache-plugin</module>
<module>guava-cache-plugin</module>
<module>okhttp-plugin</module>
<module>rocketmq-plugin</module>
</modules>

<dependencies>
Expand Down
36 changes: 36 additions & 0 deletions repeater-plugins/rocketmq-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>repeater-plugins</artifactId>
<groupId>com.alibaba.jvm.sandbox</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rocketmq-plugin</artifactId>

<build>
<finalName>${project.name}-${project.version}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>attached</goal>
</goals>
<phase>package</phase>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.alibaba.jvm.sandbox.repater.plugin.rocketmq;

import com.alibaba.jvm.sandbox.api.event.Event;
import com.alibaba.jvm.sandbox.repeater.plugin.api.InvocationProcessor;
import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.AbstractInvokePluginAdapter;
import com.alibaba.jvm.sandbox.repeater.plugin.core.model.EnhanceModel;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType;
import com.alibaba.jvm.sandbox.repeater.plugin.spi.InvokePlugin;
import com.google.common.collect.Lists;
import org.kohsuke.MetaInfServices;

import java.util.List;

/**
* @author fengLiang
*/
@MetaInfServices(InvokePlugin.class)
public class RocketMqPlugin extends AbstractInvokePluginAdapter {
@Override
protected List<EnhanceModel> getEnhanceModels() {
EnhanceModel em = EnhanceModel.builder().classPattern("org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl")
.methodPatterns(EnhanceModel.MethodPattern.transform("sendDefaultImpl"))
.watchTypes(Event.Type.BEFORE, Event.Type.RETURN, Event.Type.THROWS)
.build();
return Lists.newArrayList(em);
}

@Override
protected InvocationProcessor getInvocationProcessor() {
return new RocketMqProcessor(getType());
}

@Override
public InvokeType getType() {
return InvokeType.ROCKET_MQ;
}

@Override
public String identity() {
return "rocket_mq";
}

@Override
public boolean isEntrance() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.alibaba.jvm.sandbox.repater.plugin.rocketmq;

import com.alibaba.jvm.sandbox.api.event.BeforeEvent;
import com.alibaba.jvm.sandbox.repeater.plugin.core.impl.api.DefaultInvocationProcessor;
import com.alibaba.jvm.sandbox.repeater.plugin.domain.InvokeType;

/**
* @author fengLiang
*/
public class RocketMqProcessor extends DefaultInvocationProcessor {
public RocketMqProcessor(InvokeType type) {
super(type);
}

@Override
public Object[] assembleRequest(BeforeEvent event) {
// DefaultMQProducerImpl#sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout)
// 只序列化前两个参数
return new Object[]{event.argumentArray[0], event.argumentArray[1]};
}

}