Skip to content

Commit

Permalink
fix #67
Browse files Browse the repository at this point in the history
  • Loading branch information
haocao committed May 11, 2016
1 parent 3e6d9f2 commit 01b9e1c
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 180 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.rdb.sharding.executor;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import java.util.HashMap;
import java.util.Map;

/**
* 执行器执行时数据处理类.
*
* @author caohao
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ExecutorDataMap {

private static ThreadLocal<Map<String, Object>> dataMap = new ThreadLocal<Map<String, Object>>() {

@Override
protected Map<String, Object> initialValue() {
return new HashMap<>();
}
};

/**
* 设置数据Map.
*
* @param dataMap 数据Map
*/
public static void setDataMap(final Map<String, Object> dataMap) {
ExecutorDataMap.dataMap.set(dataMap);
}

/**
* 获取数据Map.
*
* @return 数据Map
*/
public static Map<String, Object> getDataMap() {
return dataMap.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* 多线程执行预编译语句对象请求的执行器.
Expand Down Expand Up @@ -80,14 +81,15 @@ public int executeUpdate() throws SQLException {
Context context = MetricsContext.start("ShardingPreparedStatement-executeUpdate");
postDMLExecutionEvents();
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
if (1 == preparedStatementExecutorWrappers.size()) {
return executeUpdateInternal(preparedStatementExecutorWrappers.iterator().next(), isExceptionThrown, Optional.fromNullable(context));
return executeUpdateInternal(preparedStatementExecutorWrappers.iterator().next(), isExceptionThrown, dataMap, Optional.fromNullable(context));
}
int result = executorEngine.execute(preparedStatementExecutorWrappers, new ExecuteUnit<PreparedStatementExecutorWrapper, Integer>() {

@Override
public Integer execute(final PreparedStatementExecutorWrapper input) throws Exception {
return executeUpdateInternal(input, isExceptionThrown, Optional.<Context>absent());
return executeUpdateInternal(input, isExceptionThrown, dataMap, Optional.<Context>absent());
}
}, new MergeUnit<Integer, Integer>() {

Expand All @@ -107,9 +109,11 @@ public Integer merge(final List<Integer> results) {
return result;
}

private int executeUpdateInternal(final PreparedStatementExecutorWrapper preparedStatementExecutorWrapper, final boolean isExceptionThrown, final Optional<Context> context) throws SQLException {
private int executeUpdateInternal(final PreparedStatementExecutorWrapper preparedStatementExecutorWrapper,
final boolean isExceptionThrown, final Map<String, Object> dataMap, final Optional<Context> context) throws SQLException {
int result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
try {
result = preparedStatementExecutorWrapper.getPreparedStatement().executeUpdate();
} catch (final SQLException ex) {
Expand All @@ -135,24 +139,27 @@ public boolean execute() throws SQLException {
Context context = MetricsContext.start("ShardingPreparedStatement-execute");
postDMLExecutionEvents();
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
if (1 == preparedStatementExecutorWrappers.size()) {
PreparedStatementExecutorWrapper preparedStatementExecutorWrapper = preparedStatementExecutorWrappers.iterator().next();
return executeInternal(preparedStatementExecutorWrapper, isExceptionThrown, Optional.fromNullable(context));
return executeInternal(preparedStatementExecutorWrapper, isExceptionThrown, dataMap, Optional.fromNullable(context));
}
List<Boolean> result = executorEngine.execute(preparedStatementExecutorWrappers, new ExecuteUnit<PreparedStatementExecutorWrapper, Boolean>() {

@Override
public Boolean execute(final PreparedStatementExecutorWrapper input) throws Exception {
return executeInternal(input, isExceptionThrown, Optional.<Context>absent());
return executeInternal(input, isExceptionThrown, dataMap, Optional.<Context>absent());
}
});
MetricsContext.stop(context);
return result.get(0);
}

private boolean executeInternal(final PreparedStatementExecutorWrapper preparedStatementExecutorWrapper, final boolean isExceptionThrown, final Optional<Context> context) throws SQLException {
private boolean executeInternal(final PreparedStatementExecutorWrapper preparedStatementExecutorWrapper,
final boolean isExceptionThrown, final Map<String, Object> dataMap, final Optional<Context> context) throws SQLException {
boolean result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
try {
result = preparedStatementExecutorWrapper.getPreparedStatement().execute();
} catch (final SQLException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* 多线程执行静态语句对象请求的执行器.
Expand Down Expand Up @@ -132,15 +133,16 @@ private int executeUpdate(final Updater updater) throws SQLException {
Context context = MetricsContext.start("ShardingStatement-executeUpdate");
postDMLExecutionEvents();
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
int result;
if (1 == statementExecutorWrappers.size()) {
return executeUpdateInternal(updater, statementExecutorWrappers.iterator().next(), isExceptionThrown, Optional.fromNullable(context));
return executeUpdateInternal(updater, statementExecutorWrappers.iterator().next(), isExceptionThrown, dataMap, Optional.fromNullable(context));
}
result = executorEngine.execute(statementExecutorWrappers, new ExecuteUnit<StatementExecutorWrapper, Integer>() {

@Override
public Integer execute(final StatementExecutorWrapper input) throws Exception {
return executeUpdateInternal(updater, input, isExceptionThrown, Optional.<Context>absent());
return executeUpdateInternal(updater, input, isExceptionThrown, dataMap, Optional.<Context>absent());
}
}, new MergeUnit<Integer, Integer>() {

Expand All @@ -160,9 +162,11 @@ public Integer merge(final List<Integer> results) {
return result;
}

private int executeUpdateInternal(final Updater updater, final StatementExecutorWrapper statementExecutorWrapper, final boolean isExceptionThrown, final Optional<Context> context) {
private int executeUpdateInternal(final Updater updater, final StatementExecutorWrapper statementExecutorWrapper,
final boolean isExceptionThrown, final Map<String, Object> dataMap, final Optional<Context> context) {
int result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
try {
result = updater.executeUpdate(statementExecutorWrapper.getStatement(), statementExecutorWrapper.getSqlExecutionUnit().getSql());
} catch (final SQLException ex) {
Expand Down Expand Up @@ -228,23 +232,26 @@ private boolean execute(final Executor executor) throws SQLException {
Context context = MetricsContext.start("ShardingStatement-execute");
postDMLExecutionEvents();
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
if (1 == statementExecutorWrappers.size()) {
return executeInternal(executor, statementExecutorWrappers.iterator().next(), isExceptionThrown, Optional.fromNullable(context));
return executeInternal(executor, statementExecutorWrappers.iterator().next(), isExceptionThrown, dataMap, Optional.fromNullable(context));
}
List<Boolean> result = executorEngine.execute(statementExecutorWrappers, new ExecuteUnit<StatementExecutorWrapper, Boolean>() {

@Override
public Boolean execute(final StatementExecutorWrapper input) throws Exception {
return executeInternal(executor, input, isExceptionThrown, Optional.<Context>absent());
return executeInternal(executor, input, isExceptionThrown, dataMap, Optional.<Context>absent());
}
});
MetricsContext.stop(context);
return null == result ? false : result.get(0);
}

private boolean executeInternal(final Executor executor, final StatementExecutorWrapper statementExecutorWrapper, final boolean isExceptionThrown, final Optional<Context> context) {
private boolean executeInternal(final Executor executor, final StatementExecutorWrapper statementExecutorWrapper,
final boolean isExceptionThrown, final Map<String, Object> dataMap, final Optional<Context> context) {
boolean result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
try {
result = executor.execute(statementExecutorWrapper.getStatement(), statementExecutorWrapper.getSqlExecutionUnit().getSql());
} catch (final SQLException ex) {
Expand Down
Loading

0 comments on commit 01b9e1c

Please sign in to comment.