Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

残留的synchronized #825

Merged
merged 1 commit into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,24 @@ public AbstractValueDecoder getDecoder(int identityNumber) {
return decoderMap.get(identityNumber);
}

public synchronized void register(int identityNumber, AbstractValueDecoder decoder) {
decoderMap.put(identityNumber, decoder);
inited = true;
public void register(int identityNumber, AbstractValueDecoder decoder) {
reentrantLock.lock();
try {
decoderMap.put(identityNumber, decoder);
inited = true;
}finally {
reentrantLock.unlock();
}
}

public synchronized void clear() {
decoderMap.clear();
inited = true;
public void clear() {
reentrantLock.lock();
try {
decoderMap.clear();
inited = true;
}finally {
reentrantLock.unlock();
}
}

public void initDefaultDecoder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created on 2016/10/27.
Expand All @@ -28,6 +29,7 @@ public class DefaultCacheMonitor implements CacheMonitor {

private static final Logger logger = LoggerFactory.getLogger(DefaultCacheMonitor.class);

private final ReentrantLock reentrantLock = new ReentrantLock();
protected CacheStat cacheStat;
private String cacheName;

Expand All @@ -43,44 +45,59 @@ public String getCacheName() {
return cacheName;
}

public synchronized void resetStat() {
cacheStat = new CacheStat();
cacheStat.setStatStartTime(System.currentTimeMillis());
cacheStat.setCacheName(cacheName);
public void resetStat() {
reentrantLock.lock();
try {
cacheStat = new CacheStat();
cacheStat.setStatStartTime(System.currentTimeMillis());
cacheStat.setCacheName(cacheName);
}finally {
reentrantLock.unlock();
}
}

public synchronized CacheStat getCacheStat() {
CacheStat stat = cacheStat.clone();
stat.setStatEndTime(System.currentTimeMillis());
return stat;
public CacheStat getCacheStat() {
reentrantLock.lock();
try {
CacheStat stat = cacheStat.clone();
stat.setStatEndTime(System.currentTimeMillis());
return stat;
}finally {
reentrantLock.unlock();
}
}

@Override
public synchronized void afterOperation(CacheEvent event) {
if (event instanceof CacheGetEvent) {
CacheGetEvent e = (CacheGetEvent) event;
afterGet(e.getMillis(), e.getKey(), e.getResult());
} else if (event instanceof CachePutEvent) {
CachePutEvent e = (CachePutEvent) event;
afterPut(e.getMillis(), e.getKey(), e.getValue(), e.getResult());
} else if (event instanceof CacheRemoveEvent) {
CacheRemoveEvent e = (CacheRemoveEvent) event;
afterRemove(e.getMillis(), e.getKey(), e.getResult());
} else if (event instanceof CacheLoadEvent) {
CacheLoadEvent e = (CacheLoadEvent) event;
afterLoad(e.getMillis(), e.getKey(), e.getLoadedValue(), e.isSuccess());
} else if (event instanceof CacheGetAllEvent) {
CacheGetAllEvent e = (CacheGetAllEvent) event;
afterGetAll(e.getMillis(), e.getKeys(), e.getResult());
} else if (event instanceof CacheLoadAllEvent) {
CacheLoadAllEvent e = (CacheLoadAllEvent) event;
afterLoadAll(e.getMillis(), e.getKeys(), e.getLoadedValue(), e.isSuccess());
} else if (event instanceof CachePutAllEvent) {
CachePutAllEvent e = (CachePutAllEvent) event;
afterPutAll(e.getMillis(), e.getMap(), e.getResult());
} else if (event instanceof CacheRemoveAllEvent) {
CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;
afterRemoveAll(e.getMillis(), e.getKeys(), e.getResult());
public void afterOperation(CacheEvent event) {
reentrantLock.lock();
try {
if (event instanceof CacheGetEvent) {
CacheGetEvent e = (CacheGetEvent) event;
afterGet(e.getMillis(), e.getKey(), e.getResult());
} else if (event instanceof CachePutEvent) {
CachePutEvent e = (CachePutEvent) event;
afterPut(e.getMillis(), e.getKey(), e.getValue(), e.getResult());
} else if (event instanceof CacheRemoveEvent) {
CacheRemoveEvent e = (CacheRemoveEvent) event;
afterRemove(e.getMillis(), e.getKey(), e.getResult());
} else if (event instanceof CacheLoadEvent) {
CacheLoadEvent e = (CacheLoadEvent) event;
afterLoad(e.getMillis(), e.getKey(), e.getLoadedValue(), e.isSuccess());
} else if (event instanceof CacheGetAllEvent) {
CacheGetAllEvent e = (CacheGetAllEvent) event;
afterGetAll(e.getMillis(), e.getKeys(), e.getResult());
} else if (event instanceof CacheLoadAllEvent) {
CacheLoadAllEvent e = (CacheLoadAllEvent) event;
afterLoadAll(e.getMillis(), e.getKeys(), e.getLoadedValue(), e.isSuccess());
} else if (event instanceof CachePutAllEvent) {
CachePutAllEvent e = (CachePutAllEvent) event;
afterPutAll(e.getMillis(), e.getMap(), e.getResult());
} else if (event instanceof CacheRemoveAllEvent) {
CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;
afterRemoveAll(e.getMillis(), e.getKeys(), e.getResult());
}
}finally {
reentrantLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author <a href="mailto:areyouok@gmail.com">huangli</a>
*/
Expand All @@ -35,6 +37,8 @@ public class LettuceBroadcastManager extends BroadcastManager {
private final LettuceConnectionManager lettuceConnectionManager;
private final BaseRedisAsyncCommands<byte[], byte[]> stringAsyncCommands;

private final ReentrantLock reentrantLock = new ReentrantLock();


public LettuceBroadcastManager(CacheManager cacheManager, RedisLettuceCacheConfig<Object, Object> config) {
super(cacheManager);
Expand Down Expand Up @@ -72,20 +76,25 @@ public CacheResult publish(CacheMessage cacheMessage) {
}

@Override
public synchronized void startSubscribe() {
if (subscribeThreadStart) {
throw new IllegalStateException("startSubscribe has invoked");
}
this.pubSubAdapter = new RedisPubSubAdapter<byte[], byte[]>() {
@Override
public void message(byte[] channel, byte[] message) {
processNotification(message, config.getValueDecoder());
public void startSubscribe() {
reentrantLock.lock();
try {
if (subscribeThreadStart) {
throw new IllegalStateException("startSubscribe has invoked");
}
};
config.getPubSubConnection().addListener(this.pubSubAdapter);
RedisPubSubAsyncCommands<byte[], byte[]> asyncCommands = config.getPubSubConnection().async();
asyncCommands.subscribe(channel);
this.subscribeThreadStart = true;
this.pubSubAdapter = new RedisPubSubAdapter<byte[], byte[]>() {
@Override
public void message(byte[] channel, byte[] message) {
processNotification(message, config.getValueDecoder());
}
};
config.getPubSubConnection().addListener(this.pubSubAdapter);
RedisPubSubAsyncCommands<byte[], byte[]> asyncCommands = config.getPubSubConnection().async();
asyncCommands.subscribe(channel);
this.subscribeThreadStart = true;
}finally {
reentrantLock.unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.springframework.data.redis.listener.Topic;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author <a href="mailto:areyouok@gmail.com">huangli</a>
Expand All @@ -32,6 +33,8 @@ public class SpringDataBroadcastManager extends BroadcastManager {
private final byte[] channel;
private volatile RedisMessageListenerContainer listenerContainer;

private final ReentrantLock reentrantLock = new ReentrantLock();

public SpringDataBroadcastManager(CacheManager cacheManager, RedisSpringDataCacheConfig config) {
super(cacheManager);
this.config = config;
Expand Down Expand Up @@ -65,37 +68,47 @@ public CacheResult publish(CacheMessage cacheMessage) {
}

@Override
public synchronized void startSubscribe() {
if (this.listenerContainer != null) {
throw new IllegalStateException("subscribe thread is started");
}
Topic topic = new ChannelTopic(config.getBroadcastChannel());
if (config.getListenerContainer() == null) {
RedisMessageListenerContainer c = new RedisMessageListenerContainer();
c.setConnectionFactory(config.getConnectionFactory());
c.afterPropertiesSet();
c.start();
this.listenerContainer = c;
logger.info("create RedisMessageListenerContainer instance");
} else {
this.listenerContainer = config.getListenerContainer();
public void startSubscribe() {
reentrantLock.lock();
try {
if (this.listenerContainer != null) {
throw new IllegalStateException("subscribe thread is started");
}
Topic topic = new ChannelTopic(config.getBroadcastChannel());
if (config.getListenerContainer() == null) {
RedisMessageListenerContainer c = new RedisMessageListenerContainer();
c.setConnectionFactory(config.getConnectionFactory());
c.afterPropertiesSet();
c.start();
this.listenerContainer = c;
logger.info("create RedisMessageListenerContainer instance");
} else {
this.listenerContainer = config.getListenerContainer();
}
this.listenerContainer.addMessageListener(listener, topic);
logger.info("subscribe jetcache invalidate notification. channel={}", config.getBroadcastChannel());
}finally {
reentrantLock.unlock();
}
this.listenerContainer.addMessageListener(listener, topic);
logger.info("subscribe jetcache invalidate notification. channel={}", config.getBroadcastChannel());
}

private void onMessage(Message message, byte[] pattern) {
processNotification(message.getBody(), config.getValueDecoder());
}

@Override
public synchronized void close() throws Exception {
if (this.listenerContainer != null) {
this.listenerContainer.removeMessageListener(listener);
if (this.config.getListenerContainer() == null) {
this.listenerContainer.destroy();
public void close() throws Exception {
reentrantLock.lock();
try {
if (this.listenerContainer != null) {
this.listenerContainer.removeMessageListener(listener);
if (this.config.getListenerContainer() == null) {
this.listenerContainer.destroy();
}
}
this.listenerContainer = null;
}finally {
reentrantLock.unlock();
}
this.listenerContainer = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import redis.clients.jedis.UnifiedJedis;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created on 2022-05-03
Expand All @@ -32,6 +33,8 @@ public class RedisBroadcastManager extends BroadcastManager {
private volatile boolean subscribe;
private boolean subscribeThreadStart;

private final ReentrantLock reentrantLock = new ReentrantLock();

public RedisBroadcastManager(CacheManager cacheManager, RedisCacheConfig<Object, Object> config) {
super(cacheManager);
this.channelStr = config.getBroadcastChannel();
Expand All @@ -48,16 +51,21 @@ public RedisBroadcastManager(CacheManager cacheManager, RedisCacheConfig<Object,
}

@Override
public synchronized void startSubscribe() {
if (subscribeThreadStart) {
throw new IllegalStateException("subscribe thread is started");
public void startSubscribe() {
reentrantLock.lock();
try {
if (subscribeThreadStart) {
throw new IllegalStateException("subscribe thread is started");
}
this.cacheMessagePubSub = new CacheMessagePubSub();
Thread subThread;
subThread = new Thread(this::runSubThread, "Sub_" + channelStr);
subThread.setDaemon(true);
subThread.start();
this.subscribeThreadStart = true;
}finally {
reentrantLock.unlock();
}
this.cacheMessagePubSub = new CacheMessagePubSub();
Thread subThread;
subThread = new Thread(this::runSubThread, "Sub_" + channelStr);
subThread.setDaemon(true);
subThread.start();
this.subscribeThreadStart = true;
}

private void runSubThread() {
Expand Down Expand Up @@ -116,17 +124,22 @@ public CacheResult publish(CacheMessage message) {


@Override
public synchronized void close() {
if (this.closed) {
return;
}
this.closed = true;
if (subscribe) {
try {
this.cacheMessagePubSub.unsubscribe(channel);
} catch (Exception e) {
logger.warn("unsubscribe {} fail", channelStr, e);
public void close() {
reentrantLock.lock();
try {
if (this.closed) {
return;
}
this.closed = true;
if (subscribe) {
try {
this.cacheMessagePubSub.unsubscribe(channel);
} catch (Exception e) {
logger.warn("unsubscribe {} fail", channelStr, e);
}
}
}finally {
reentrantLock.unlock();
}
}

Expand Down
Loading