Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
protected final BrokerController brokerController;
protected Set<String> configBlackList = new HashSet<>();
private final ExecutorService asyncExecuteWorker = new ThreadPoolExecutor(0, 4, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
private final ExecutorService asyncExecuteWorker = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>());


public AdminBrokerProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
Expand Down Expand Up @@ -778,6 +780,25 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
}
}

final Boolean syncDelete = requestHeader.getSyncDelete();
if (Boolean.TRUE.equals(syncDelete)) {
return doDeleteTopic(topic, true);
} else {
asyncExecuteWorker.execute(() -> {
try {
doDeleteTopic(topic, syncDelete);
} catch (Exception e) {
LOGGER.error(String.format("delete topic %s failed for ", topic), e);
}
});
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}

private RemotingCommand doDeleteTopic(String topic,boolean isSyncDelete) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
List<String> topicsToClean = new ArrayList<>();
topicsToClean.add(topic);

Expand All @@ -798,7 +819,10 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
try {
for (String topicToClean : topicsToClean) {
// delete topic
deleteTopicInBroker(topicToClean);
deleteTopicInBroker(topicToClean,isSyncDelete);
}
if (!isSyncDelete) {
batchSyncMetaData();
}
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
Expand All @@ -808,15 +832,21 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
return response;
}

private void deleteTopicInBroker(String topic) {
this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
this.brokerController.getTopicQueueMappingManager().delete(topic);
private void deleteTopicInBroker(String topic, boolean isSyncDelete) {
this.brokerController.getTopicConfigManager().deleteTopicConfig(topic, isSyncDelete);
this.brokerController.getTopicQueueMappingManager().delete(topic, isSyncDelete);
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic));
this.brokerController.getMessageStore().getTimerMessageStore().getTimerMetrics().removeTimingCount(topic);
}

private void batchSyncMetaData() {
this.brokerController.getTopicConfigManager().persist();
this.brokerController.getTopicQueueMappingManager().persist();
this.brokerController.getConsumerOffsetManager().persist();
}

private RemotingCommand getUnknownCmdResponse(ChannelHandlerContext ctx, RemotingCommand request) {
String error = " request type " + request.getCode() + " not supported";
final RemotingCommand response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,17 +595,23 @@ public boolean isOrderTopic(final String topic) {
}
}

public void deleteTopicConfig(final String topic) {
public void deleteTopicConfig(final String topic, boolean isSync) {
TopicConfig old = removeTopicConfig(topic);
if (old != null) {
log.info("delete topic config OK, topic: {}", old);
updateDataVersion();
this.persist();
if (isSync) {
this.persist();
}
} else {
log.warn("delete topic config failed, topic: {} not exists", topic);
}
}

public void deleteTopicConfig(final String topic) {
deleteTopicConfig(topic, true);
}

public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,23 @@ public void updateTopicQueueMapping(TopicQueueMappingDetail newDetail, boolean f

}

public void delete(final String topic) {
public void delete(final String topic, boolean isSync) {
TopicQueueMappingDetail old = this.topicQueueMappingTable.remove(topic);
if (old != null) {
log.info("delete topic queue mapping OK, static topic queue mapping: {}", old);
this.dataVersion.nextVersion();
this.persist();
if (isSync) {
this.persist();
}
} else {
log.warn("delete topic queue mapping failed, static topic: {} not exists", topic);
}
}

public void delete(final String topic) {
delete(topic,true);
}

public TopicQueueMappingDetail getTopicQueueMapping(String topic) {
return topicQueueMappingTable.get(topic);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class DeleteTopicRequestHeader extends TopicRequestHeader {
@CFNotNull
@RocketMQResource(ResourceType.TOPIC)
private String topic;
private Boolean syncDelete = true;

@Override
public void checkFields() throws RemotingCommandException {
Expand All @@ -46,4 +47,12 @@ public String getTopic() {
public void setTopic(String topic) {
this.topic = topic;
}

public Boolean getSyncDelete() {
return syncDelete;
}

public void setSyncDelete(Boolean syncDelete) {
this.syncDelete = syncDelete;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,7 @@ public long now() {
* dispatched to consume queue.
*/
@Override
public int deleteTopics(final Set<String> deleteTopics) {
public synchronized int deleteTopics(final Set<String> deleteTopics) {
if (deleteTopics == null || deleteTopics.isEmpty()) {
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package org.apache.rocketmq.store.stats;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -140,7 +141,7 @@ public class BrokerStatsManager {
private ScheduledExecutorService accountExecutor;
private ScheduledExecutorService cleanResourceExecutor;

private final HashMap<String, StatsItemSet> statsTable = new HashMap<>();
private final Map<String, StatsItemSet> statsTable = new ConcurrentHashMap<>();
private final String clusterName;
private final boolean enableQueueStat;
private MomentStatsItemSet momentStatsItemSetFallSize;
Expand Down
Loading