Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: dubbo reference annotation failed to register transaction on tcc mode #1816

Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c9bed94
dubbo注解无法开启事务
zhongfuhua Oct 24, 2019
d83a172
TCC事务针对dubbo注解无法开启问题解决
zhongfuhua Oct 24, 2019
04be0ee
TCC事务针对dubbo注解无法开启问题解决
zhongfuhua Oct 24, 2019
6ff3415
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
slievrly Oct 24, 2019
5a15390
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
xingfudeshi Oct 28, 2019
7d0a1c2
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
xingfudeshi Nov 3, 2019
20a7456
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
xingfudeshi Nov 5, 2019
7566e61
Update TransactionPropagationFilter.java
zhongfuhua Nov 12, 2019
c2ee9f0
Update TransactionPropagationFilter.java
zhongfuhua Nov 12, 2019
2e96a9f
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
zjinlei Nov 12, 2019
97ea24e
Update TccReferenceAnnotationFilter.java
zhongfuhua Nov 13, 2019
f481cca
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
zjinlei Nov 13, 2019
3411f68
optimization
zhongfuhua Nov 13, 2019
55bd5e3
optimization
zhongfuhua Nov 13, 2019
7ef4a9f
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
jsbxyyx Nov 14, 2019
12a03d1
optimization exeption throw
zhongfuhua Nov 15, 2019
7b270b8
Merge branch 'feature_tcc_dubbo_annotation_unregister' of https://git…
zhongfuhua Nov 15, 2019
4ec628d
optimization exeption throw
zhongfuhua Nov 15, 2019
a45c463
Update RootContext.java
zhongfuhua Nov 18, 2019
02a6978
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
jsbxyyx Nov 18, 2019
74d54e4
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
zjinlei Nov 22, 2019
92edbe8
check style change
zhongfuhua Nov 22, 2019
24f5efd
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
zjinlei Nov 22, 2019
bedc7fb
优化事务ID传递以及区分是拦截器过来的分支事务还是过滤器产生的分支事务
zhongfuhua Dec 4, 2019
50e6fe1
优化事务ID传递以及区分是拦截器过来的分支事务还是过滤器产生的分支事务
zhongfuhua Dec 4, 2019
6d68082
优化事务ID传递以及区分是拦截器过来的分支事务还是过滤器产生的分支事务
zhongfuhua Dec 4, 2019
b9901a1
优化事务ID传递以及区分是拦截器过来的分支事务还是过滤器产生的分支事务
zhongfuhua Dec 4, 2019
86a17a6
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
zjinlei Dec 4, 2019
6c196f0
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
zjinlei Dec 11, 2019
30f5a8b
Merge branch 'develop' into feature_tcc_dubbo_annotation_unregister
zjinlei Dec 24, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions core/src/main/java/io/seata/core/context/RootContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
package io.seata.core.context;

import io.seata.common.exception.ShouldNeverHappenException;

import io.seata.core.model.BranchType;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +37,8 @@ public class RootContext {
*/
public static final String KEY_XID = "TX_XID";

public static final String KEY_XID_FILTER_TYPE = "TX_XID_FILTER_TYPE";

public static final String KEY_GLOBAL_LOCK_FLAG = "TX_LOCK";

private static ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
Expand All @@ -46,7 +49,25 @@ public class RootContext {
* @return the xid
*/
public static String getXID() {
return CONTEXT_HOLDER.get(KEY_XID);
String xid = CONTEXT_HOLDER.get(KEY_XID);
if (StringUtils.isNotBlank(xid)) {
return xid;
}

String xidType = CONTEXT_HOLDER.get(KEY_XID_FILTER_TYPE);
if (StringUtils.isNotBlank(xidType)) {
return xidType.split("_")[0];
}
return null;
}

/**
* Gets xid.
*
* @return the xid
*/
public static String getXIDFilterType() {
return CONTEXT_HOLDER.get(KEY_XID_FILTER_TYPE);
}

/**
Expand All @@ -61,6 +82,30 @@ public static void bind(String xid) {
CONTEXT_HOLDER.put(KEY_XID, xid);
}

/**
* Bind filter type
*
* @param xidType
*/
public static void bindFilterType(String xidType) {
String[] xidTypes = xidType.split("_");
bindFilterType(xidTypes[0], BranchType.valueOf(xidTypes[1]));
}

/**
* Bind filter type
*
* @param xid
* @param branchType
*/
public static void bindFilterType(String xid, BranchType branchType) {
String xidType = String.format("%s_%s", xid, branchType.name());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind filter type xid={} branchType={}", xid, branchType);
}
CONTEXT_HOLDER.put(KEY_XID_FILTER_TYPE, xidType);
}

/**
* declare local transactions will use global lock check for update/delete/insert/selectForUpdate SQL
*/
Expand All @@ -82,11 +127,24 @@ public static void bindGlobalLockFlag() {
public static String unbind() {
String xid = CONTEXT_HOLDER.remove(KEY_XID);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind " + xid);
LOGGER.debug("unbind {} ", xid);
}
return xid;
}

/**
* Unbind temporary string
*
* @return the string
*/
public static String unbindFilterType() {
String xidType = CONTEXT_HOLDER.remove(KEY_XID_FILTER_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind filter type {}", xidType);
}
return xidType;
}

public static void unbindGlobalLockFlag() {
String lockFlag = CONTEXT_HOLDER.remove(KEY_GLOBAL_LOCK_FLAG);
if (LOGGER.isDebugEnabled() && lockFlag != null) {
Expand Down
5 changes: 5 additions & 0 deletions integration/dubbo-alibaba/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<artifactId>seata-tm</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-tcc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.integration.dubbo.alibaba;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;
import io.seata.core.context.RootContext;
import io.seata.core.model.BranchType;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import io.seata.rm.tcc.interceptor.ActionInterceptorHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.Map;

@Activate(group = {Constants.CONSUMER}, order = 95)
public class TccReferenceAnnotationFilter implements Filter {

private static final Logger LOGGER = LoggerFactory.getLogger(TccReferenceAnnotationFilter.class);

private ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler();

private final static String DUBBO_GENERIC_SERVICE_INVOKE = "$invoke";

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
try {
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
String methodName = rpcInvocation.getMethodName();
Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
Object[] arguments = rpcInvocation.getArguments();
Class interfaceClass = invoker.getInterface();
if (DUBBO_GENERIC_SERVICE_INVOKE.equals(methodName)) {
return invoker.invoke(invocation);
}

Method method = interfaceClass.getMethod(methodName, parameterTypes);
//not in transaction
if (!RootContext.inGlobalTransaction()) {
return invoker.invoke(invocation);
}

TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
//try method
if (businessAction != null) {
//save the xid
String xid = RootContext.getXID();
//clear the context
RootContext.unbind();
RootContext.bindFilterType(xid, BranchType.TCC);
try {
Map<String, Object> ret = actionInterceptorHandler.proceed(method, arguments, xid, businessAction,
() -> invoker.invoke(invocation));
return (Result) ret.get(io.seata.common.Constants.TCC_METHOD_RESULT);
} catch (Throwable throwable) {
throw throwable;
} finally {
//recovery the context
RootContext.unbindFilterType();
RootContext.bind(xid);
}
}
} catch (Throwable e) {
LOGGER.error("Tcc dubbo invokes service to register branch transaction exception:{}", e.getMessage(), e);
}
return invoker.invoke(invocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import io.seata.core.context.RootContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,19 +40,25 @@ public class TransactionPropagationFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String xid = RootContext.getXID();
String xidFilterType = RootContext.getXIDFilterType();

String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
String rpcXidFilterType = RpcContext.getContext().getAttachment(RootContext.KEY_XID_FILTER_TYPE);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
}
boolean bind = false;
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
RpcContext.getContext().setAttachment(RootContext.KEY_XID_FILTER_TYPE, xidFilterType);
} else {
if (rpcXid != null) {
RootContext.bind(rpcXid);
RootContext.bindFilterType(rpcXidFilterType);
bind = true;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[" + rpcXid + "] to RootContext");
LOGGER.debug("bind[{}] to RootContext", rpcXid);
LOGGER.debug("bind filterType[{}] to RootContext", rpcXidFilterType);
zhongfuhua marked this conversation as resolved.
Show resolved Hide resolved
zhongfuhua marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand All @@ -63,14 +68,19 @@ public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcExcept
} finally {
if (bind) {
String unbindXid = RootContext.unbind();
String unbindFilterType = RootContext.unbindFilterType();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
LOGGER.debug("unbind[{}] from RootContext", unbindXid);
LOGGER.debug("unbind filterType[{}] from RootContext", unbindFilterType);
zhongfuhua marked this conversation as resolved.
Show resolved Hide resolved
}
if (!rpcXid.equalsIgnoreCase(unbindXid)) {
LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
LOGGER.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
LOGGER.warn("xidFilterType in change during RPC from {} to {}", rpcXidFilterType, unbindFilterType);
zhongfuhua marked this conversation as resolved.
Show resolved Hide resolved
if (unbindXid != null) {
RootContext.bind(unbindXid);
LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
RootContext.bindFilterType(unbindFilterType);
LOGGER.warn("bind [{}] back to RootContext", unbindXid);
LOGGER.warn("bind filterType [{}] back to RootContext", unbindFilterType);
zhongfuhua marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
io.seata.integration.dubbo.alibaba.TransactionPropagationFilter
io.seata.integration.dubbo.alibaba.TransactionPropagationFilter
io.seata.integration.dubbo.alibaba.TccReferenceAnnotationFilter
5 changes: 5 additions & 0 deletions integration/dubbo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<artifactId>seata-tm</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>seata-tcc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.integration.dubbo;

import io.seata.core.context.RootContext;
import io.seata.core.model.BranchType;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import io.seata.rm.tcc.interceptor.ActionInterceptorHandler;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.Map;

zhongfuhua marked this conversation as resolved.
Show resolved Hide resolved
@Activate(group = {Constants.CONSUMER}, order = 95)
public class TccReferenceAnnotationFilter implements Filter {

private static final Logger LOGGER = LoggerFactory.getLogger(TccReferenceAnnotationFilter.class);

private ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler();

private final static String DUBBO_GENERIC_SERVICE_INVOKE = "$invoke";

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
try {
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
String methodName = rpcInvocation.getMethodName();
Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
Object[] arguments = rpcInvocation.getArguments();
Class interfaceClass = invoker.getInterface();
if (DUBBO_GENERIC_SERVICE_INVOKE.equals(methodName)) {
return invoker.invoke(invocation);
}

Method method = interfaceClass.getMethod(methodName, parameterTypes);
//not in transaction
if (!RootContext.inGlobalTransaction()) {
return invoker.invoke(invocation);
}

TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
//try method
if (businessAction != null) {
//save the xid
String xid = RootContext.getXID();
//clear the context
RootContext.unbind();
RootContext.bindFilterType(xid, BranchType.TCC);
try {
Map<String, Object> ret = actionInterceptorHandler.proceed(method, arguments, xid, businessAction,
() -> invoker.invoke(invocation));
return (Result) ret.get(io.seata.common.Constants.TCC_METHOD_RESULT);
} catch (Throwable throwable) {
throw throwable;
} finally {
//recovery the context
RootContext.unbindFilterType();
RootContext.bind(xid);
}
}
} catch (Throwable e) {
LOGGER.error("Tcc dubbo invokes service to register branch transaction exception:{}", e.getMessage(), e);
}
return invoker.invoke(invocation);
}
}
Loading