Skip to content

Commit fba608e

Browse files
author
taochen
committed
optimize datasource switch listener
1 parent 30a86cc commit fba608e

File tree

9 files changed

+124
-109
lines changed

9 files changed

+124
-109
lines changed

dal-client/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<modelVersion>4.0.0</modelVersion>
55
<groupId>com.ctrip.platform</groupId>
66
<artifactId>dal-client</artifactId>
7-
<version>1.17.9</version>
7+
<version>1.18.1-SNAPSHOT</version>
88
<properties>
99
<ver.version>${project.version}</ver.version>
1010
<javax-persistence-version>1.0.2</javax-persistence-version>

dal-client/src/main/java/com/ctrip/platform/dal/dao/datasource/DataSourceCreator.java

-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ public SingleDataSource createSingleDataSource(String name, DataSourceConfigure
3434
else
3535
task = new DefaultDataSourceCreateTask(name, configure, singleDataSource);
3636
singleDataSource.setTask(task);
37-
singleDataSource.setSwitching(true);
3837
return singleDataSource;
3938
}
4039
}

dal-client/src/main/java/com/ctrip/platform/dal/dao/datasource/DataSourceSwitchListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44
* Created by taochen on 2019/8/7.
55
*/
66
public interface DataSourceSwitchListener {
7-
void preHandle();
7+
void execute();
88
}

dal-client/src/main/java/com/ctrip/platform/dal/dao/datasource/RefreshableDataSource.java

+69-46
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,21 @@
66
import java.sql.SQLFeatureNotSupportedException;
77
import java.util.LinkedList;
88
import java.util.List;
9-
import java.util.Queue;
109
import java.util.concurrent.*;
10+
import java.util.concurrent.atomic.AtomicBoolean;
1111
import java.util.concurrent.atomic.AtomicReference;
12-
import java.util.concurrent.locks.LockSupport;
1312

1413
import javax.sql.DataSource;
14+
import javax.sql.PooledConnection;
1515

1616
import com.ctrip.platform.dal.dao.configure.DataSourceConfigureChangeEvent;
1717
import com.ctrip.platform.dal.dao.configure.DataSourceConfigure;
1818
import com.ctrip.platform.dal.dao.configure.DataSourceConfigureChangeListener;
1919
import com.ctrip.platform.dal.dao.helper.CustomThreadFactory;
2020
import com.ctrip.platform.dal.dao.helper.DalElementFactory;
2121
import com.ctrip.platform.dal.dao.log.ILogger;
22+
import com.mysql.jdbc.MySQLConnection;
23+
import org.apache.commons.lang.StringUtils;
2224

2325
public class RefreshableDataSource implements DataSource, DataSourceConfigureChangeListener {
2426
private static ILogger LOGGER = DalElementFactory.DEFAULT.getILogger();
@@ -27,14 +29,16 @@ public class RefreshableDataSource implements DataSource, DataSourceConfigureCha
2729

2830
private AtomicReference<String> connectionIpReference = new AtomicReference<>();
2931

30-
private List<DataSourceSwitchListener> dataSourceSwitchListeners = new LinkedList<>();
32+
private CopyOnWriteArraySet<DataSourceSwitchListener> dataSourceSwitchListeners = new CopyOnWriteArraySet<>();
33+
34+
private AtomicBoolean isListenerExecuted = new AtomicBoolean(false);
3135

3236
private ScheduledExecutorService service =
3337
Executors.newScheduledThreadPool(POOL_SIZE, new CustomThreadFactory(THREAD_NAME));
3438

3539
private static ThreadPoolExecutor executor;
3640

37-
private Queue<Thread> waiters = new ConcurrentLinkedQueue<>();
41+
private static ExecutorService getConnExecutor = Executors.newSingleThreadExecutor();
3842

3943
private static final int INIT_DELAY = 0;
4044
private static final int POOL_SIZE = 1;
@@ -47,6 +51,7 @@ public class RefreshableDataSource implements DataSource, DataSourceConfigureCha
4751
public RefreshableDataSource(String name, DataSourceConfigure config) throws SQLException {
4852
SingleDataSource dataSource = new SingleDataSource(name, config);
4953
dataSourceReference.set(dataSource);
54+
asyncGetConnection();
5055
executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
5156
new LinkedBlockingQueue<Runnable>(),
5257
new CustomThreadFactory("DataSourceSwitchListener"));
@@ -62,14 +67,10 @@ public synchronized void configChanged(DataSourceConfigureChangeEvent event) thr
6267

6368
public void refreshDataSource(String name, DataSourceConfigure configure) throws SQLException {
6469
SingleDataSource newDataSource = createSingleDataSource(name, configure, null);
65-
SingleDataSource oldDataSource = dataSourceReference.getAndSet(newDataSource);
66-
executeDataSourceListener(name);
67-
newDataSource.setSwitching(false);
68-
for (Thread waiter : waiters) {
69-
LockSupport.unpark(waiter);
70-
}
71-
waiters.clear();
7270
getExecutorService().schedule(newDataSource.getTask(), INIT_DELAY, TimeUnit.MILLISECONDS);
71+
isListenerExecuted.set(false);
72+
SingleDataSource oldDataSource = dataSourceReference.getAndSet(newDataSource);
73+
asyncGetConnection();
7374
close(oldDataSource);
7475
DataSourceCreateTask oldTask = oldDataSource.getTask();
7576
if (oldTask != null)
@@ -83,7 +84,9 @@ public void run() {
8384
SingleDataSource newDataSource = createSingleDataSource(name, configure, listener);
8485
boolean isSuccess = newDataSource.createPool(name, configure);
8586
if (isSuccess) {
87+
isListenerExecuted.set(false);
8688
SingleDataSource oldDataSource = dataSourceReference.getAndSet(newDataSource);
89+
asyncGetConnection();
8790
listener.onCreatePoolSuccess();
8891
close(oldDataSource);
8992
DataSourceCreateTask oldTask = oldDataSource.getTask();
@@ -115,19 +118,7 @@ public DataSource getDataSource() {
115118
SingleDataSource singleDataSource = getSingleDataSource();
116119
if (singleDataSource == null)
117120
throw new IllegalStateException("SingleDataSource can't be null.");
118-
for(;;) {
119-
if (!singleDataSource.getSwitching()) {
120-
break;
121-
}
122-
else {
123-
if (singleDataSource.getSwitching()) {
124-
waiters.add(Thread.currentThread());
125-
}
126-
if (singleDataSource.getSwitching()) {
127-
LockSupport.park(this);
128-
}
129-
}
130-
}
121+
131122
DataSource dataSource = singleDataSource.getDataSource();
132123
if (dataSource == null)
133124
throw new IllegalStateException("DataSource can't be null.");
@@ -147,35 +138,67 @@ private ScheduledExecutorService getExecutorService() {
147138
}
148139

149140
private void executeDataSourceListener(final String keyName) {
150-
if (dataSourceSwitchListeners.size() == 0) {
151-
return;
152-
}
153-
final CountDownLatch latch = new CountDownLatch(dataSourceSwitchListeners.size());
154-
for (final DataSourceSwitchListener dataSourceSwitchListener : dataSourceSwitchListeners) {
155-
if (dataSourceSwitchListener != null) {
156-
executor.submit(new Runnable() {
157-
@Override
158-
public void run() {
159-
try {
160-
dataSourceSwitchListener.preHandle();
161-
latch.countDown();
162-
} catch (Throwable e) {
163-
LOGGER.error(String.format("execute datasource switch listener fail for %s", keyName), e);
141+
synchronized (this) {
142+
if (isListenerExecuted.get()) {
143+
return;
144+
}
145+
final CountDownLatch latch = new CountDownLatch(dataSourceSwitchListeners.size());
146+
for (final DataSourceSwitchListener dataSourceSwitchListener : dataSourceSwitchListeners) {
147+
if (dataSourceSwitchListener != null) {
148+
executor.submit(new Runnable() {
149+
@Override
150+
public void run() {
151+
try {
152+
dataSourceSwitchListener.execute();
153+
latch.countDown();
154+
} catch (Throwable e) {
155+
LOGGER.error(String.format("execute datasource switch listener fail for %s", keyName), e);
156+
}
164157
}
165-
}
166-
});
158+
});
159+
}
160+
}
161+
try {
162+
latch.await(TIME_OUT, TimeUnit.MILLISECONDS);
163+
} catch (InterruptedException e) {
164+
LOGGER.error(String.format("timeout,execute datasource switch listener is interrupted for %s", keyName), e);
165+
} finally {
166+
isListenerExecuted.set(true);
167167
}
168-
}
169-
try {
170-
latch.await(TIME_OUT, TimeUnit.MILLISECONDS);
171-
} catch (InterruptedException e) {
172-
LOGGER.error(String.format("timeout,execute datasource switch listener is interrupted for %s", keyName), e);
173168
}
174169
}
175170

171+
private void asyncGetConnection() {
172+
getConnExecutor.submit(new Runnable() {
173+
@Override
174+
public void run() {
175+
try {
176+
getConnection();
177+
} catch (SQLException e) {
178+
//ignore
179+
}
180+
}
181+
});
182+
}
183+
176184
@Override
177185
public Connection getConnection() throws SQLException {
178-
return getDataSource().getConnection();
186+
Connection connection = getDataSource().getConnection();
187+
if (dataSourceSwitchListeners.size() > 0) {
188+
String currentIp = ((MySQLConnection)(((PooledConnection)connection).getConnection())).getIO().mysqlConnection.getInetAddress().getHostAddress();
189+
String oldIp = connectionIpReference.get();
190+
if (StringUtils.isEmpty(oldIp)) {
191+
connectionIpReference.set(currentIp);
192+
}
193+
else {
194+
if (!oldIp.equalsIgnoreCase(currentIp)) {
195+
String keyName = getSingleDataSource().getName();
196+
executeDataSourceListener(keyName);
197+
connectionIpReference.set(currentIp);
198+
}
199+
}
200+
}
201+
return connection;
179202
}
180203

181204
@Override

dal-client/src/main/java/com/ctrip/platform/dal/dao/datasource/SingleDataSource.java

-14
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@
1111
import org.apache.tomcat.jdbc.pool.Validator;
1212

1313
import javax.sql.DataSource;
14-
import java.util.Queue;
15-
import java.util.concurrent.ConcurrentLinkedQueue;
16-
import java.util.concurrent.locks.LockSupport;
1714

1815
public class SingleDataSource implements DataSourceConfigureConstants {
1916
private static ILogger LOGGER = DalElementFactory.DEFAULT.getILogger();
@@ -23,7 +20,6 @@ public class SingleDataSource implements DataSourceConfigureConstants {
2320
private DataSource dataSource;
2421
private DataSourceCreateTask task;
2522
private DataSourceCreatePoolListener listener;
26-
private volatile boolean isSwitching = false;
2723

2824
private static final String DATASOURCE_CREATE_DATASOURCE = "DataSource::createDataSource:%s";
2925

@@ -47,14 +43,6 @@ public void setTask(DataSourceCreateTask task) {
4743
this.task = task;
4844
}
4945

50-
public boolean getSwitching() {
51-
return isSwitching;
52-
}
53-
54-
public void setSwitching(boolean switching) {
55-
isSwitching = switching;
56-
}
57-
5846
public SingleDataSource(String name, DataSourceConfigure dataSourceConfigure) {
5947
this(name, dataSourceConfigure, null);
6048
createPool(name, dataSourceConfigure);
@@ -97,8 +85,6 @@ public boolean createPool(String name, DataSourceConfigure dataSourceConfigure)
9785
if (listener != null)
9886
listener.onCreatePoolFail(e);
9987
isSuccess = false;
100-
} finally {
101-
isSwitching = false;
10288
}
10389
return isSuccess;
10490
}

dal-client/src/test/java/com/ctrip/platform/dal/dao/datasource/MockDataSourceSwitchListenerOne.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public class MockDataSourceSwitchListenerOne implements DataSourceSwitchListener
88

99
private int sleep = 100;
1010
@Override
11-
public void preHandle() {
11+
public void execute() {
1212
try {
1313
Thread.sleep(sleep);
1414
} catch (InterruptedException e) {

dal-client/src/test/java/com/ctrip/platform/dal/dao/datasource/MockDataSourceSwitchListenerThree.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*/
66
public class MockDataSourceSwitchListenerThree implements DataSourceSwitchListener {
77
@Override
8-
public void preHandle() {
8+
public void execute() {
99

1010
}
1111
}

dal-client/src/test/java/com/ctrip/platform/dal/dao/datasource/MockDataSourceSwitchListenerTwo.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ public class MockDataSourceSwitchListenerTwo implements DataSourceSwitchListener
88

99
private int sleep = 150;
1010
@Override
11-
public void preHandle() {
11+
public void execute() {
1212
try {
1313
Thread.sleep(sleep);
1414
} catch (InterruptedException e) {

0 commit comments

Comments
 (0)