Skip to content
This repository was archived by the owner on Apr 12, 2020. It is now read-only.

Commit a36d926

Browse files
committed
Implemented H2 based SubscriptionsRepository
1 parent 1ba9eb3 commit a36d926

18 files changed

+151
-46
lines changed

broker/build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ dependencies {
1313
compile group: 'io.netty', name: 'netty-transport-native-epoll', version: nettyVersion, classifier: 'linux-x86_64'
1414
// <classifier>${os.detected.name}-${os.detected.arch}</classifier>
1515

16+
compile group: 'com.h2database', name: 'h2-mvstore', version: '1.4.192'
17+
1618
compile group: 'com.zaxxer', name: 'HikariCP', version:'2.4.7'
1719

1820
compile group: 'io.dropwizard.metrics', name: 'metrics-core', version:'3.2.2'
@@ -26,12 +28,11 @@ dependencies {
2628

2729
compile group: 'org.slf4j', name: 'slf4j-api', version:'1.7.5'
2830
runtime group: 'org.slf4j', name: 'slf4j-log4j12', version:'1.7.5' // only for local tests
29-
testCompile group: 'org.slf4j', name: 'slf4j-log4j12', version:'1.7.5'
3031

32+
testCompile group: 'org.slf4j', name: 'slf4j-log4j12', version:'1.7.5'
3133
testCompile group: 'org.fusesource.mqtt-client', name: 'mqtt-client', version:'1.12'
3234
testCompile group: 'org.eclipse.paho', name: 'org.eclipse.paho.client.mqttv3', version:'1.2.0'
3335
testCompile group: 'org.eclipse.jetty.websocket', name: 'websocket-client', version:'9.2.0.M1'
3436

35-
testCompile group: 'com.h2database', name: 'h2', version:'1.4.191'
3637
testRuntime group: 'io.netty', name: 'netty-tcnative', version: '2.0.10.Final', classifier: 'linux-x86_64'
3738
}

broker/src/main/java/io/moquette/broker/ISubscriptionsRepository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@ public interface ISubscriptionsRepository {
2424
List<Subscription> listAllSubscriptions();
2525

2626
void addNewSubscription(Subscription subscription);
27+
28+
void removeSubscription(String topic, String clientID);
2729
}

broker/src/main/java/io/moquette/broker/Server.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
import io.moquette.BrokerConstants;
1919
import io.moquette.broker.config.*;
2020
import io.moquette.interception.InterceptHandler;
21-
import io.moquette.persistence.MemoryStorageService;
22-
import io.moquette.spi.ISessionsStore;
21+
import io.moquette.persistence.H2Builder;
22+
import io.moquette.persistence.MemorySubscriptionsRepository;
2323
import io.moquette.interception.BrokerInterceptor;
2424
import io.moquette.broker.security.*;
2525
import io.moquette.broker.subscriptions.CTrieSubscriptionDirectory;
@@ -159,11 +159,14 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
159159
authenticator = initializeAuthenticator(authenticator, config);
160160
authorizatorPolicy = initializeAuthorizatorPolicy(authorizatorPolicy, config);
161161

162-
// TODO user real implementation DBG
163-
MemoryStorageService memStorage = new MemoryStorageService(null, null);
164-
ISessionsStore sessionStore = memStorage.sessionsStore();
165-
ISubscriptionsRepository subscriptionsRepository = new MemorySubscriptionsRepository();
166-
// DBG
162+
final ISubscriptionsRepository subscriptionsRepository;
163+
if (persistencePath != null && !persistencePath.isEmpty()) {
164+
LOG.trace("Configuring H2 subscriptions store to {}", persistencePath);
165+
subscriptionsRepository = new H2Builder(config, scheduler).initStore().subscriptionsRepository();
166+
} else {
167+
LOG.trace("Configuring in-memory subscriptions store");
168+
subscriptionsRepository = new MemorySubscriptionsRepository();
169+
}
167170

168171
ISubscriptionsDirectory subscriptions = new CTrieSubscriptionDirectory();
169172
subscriptions.init(subscriptionsRepository);

broker/src/main/java/io/moquette/broker/subscriptions/CTrieSubscriptionDirectory.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ public class CTrieSubscriptionDirectory implements ISubscriptionsDirectory {
2828
private static final Token ROOT = new Token("root");
2929
private static final INode NO_PARENT = null;
3030

31-
INode root;
32-
private volatile ISubscriptionsRepository sessionsRepository;
31+
private INode root;
32+
private volatile ISubscriptionsRepository subscriptionsRepository;
3333

3434
interface IVisitor<T> {
3535

@@ -43,23 +43,22 @@ private enum Action {
4343
}
4444

4545
@Override
46-
public void init(ISubscriptionsRepository sessionsRepository) {
46+
public void init(ISubscriptionsRepository subscriptionsRepository) {
4747
LOG.info("Initializing CTrie");
4848
final CNode mainNode = new CNode();
4949
mainNode.token = ROOT;
5050
this.root = new INode(mainNode);
5151

5252
LOG.info("Initializing subscriptions store...");
53-
this.sessionsRepository = sessionsRepository;
53+
this.subscriptionsRepository = subscriptionsRepository;
5454
// reload any subscriptions persisted
5555
if (LOG.isTraceEnabled()) {
5656
LOG.trace("Reloading all stored subscriptions. SubscriptionTree = {}", dumpTree());
5757
}
5858

59-
for (Subscription subscription : this.sessionsRepository.listAllSubscriptions()) {
60-
LOG.info("Re-subscribing client to topic CId={}, topicFilter={}", subscription.clientId,
61-
subscription.topicFilter);
62-
add(subscription);
59+
for (Subscription subscription : this.subscriptionsRepository.listAllSubscriptions()) {
60+
LOG.debug("Re-subscribing {}", subscription);
61+
addToTree(subscription);
6362
}
6463
if (LOG.isTraceEnabled()) {
6564
LOG.trace("Stored subscriptions have been reloaded. SubscriptionTree = {}", dumpTree());
@@ -160,6 +159,11 @@ public Action cleanTomb(INode inode, INode iParent) {
160159

161160
@Override
162161
public void add(Subscription newSubscription) {
162+
addToTree(newSubscription);
163+
subscriptionsRepository.addNewSubscription(newSubscription);
164+
}
165+
166+
private void addToTree(Subscription newSubscription) {
163167
Action res;
164168
do {
165169
res = insert(newSubscription.topicFilter, this.root, newSubscription);
@@ -230,6 +234,11 @@ private INode createPathRec(Topic topic, Subscription newSubscription) {
230234
*/
231235
@Override
232236
public void removeSubscription(Topic topic, String clientID) {
237+
removeFromTree(topic, clientID);
238+
this.subscriptionsRepository.removeSubscription(topic.toString(), clientID);
239+
}
240+
241+
private void removeFromTree(Topic topic, String clientID) {
233242
Action res;
234243
do {
235244
res = remove(clientID, topic, this.root, NO_PARENT);

broker/src/main/java/io/moquette/broker/subscriptions/Subscription.java

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,33 +29,17 @@ public final class Subscription implements Serializable {
2929
private final MqttQoS requestedQos; // max QoS acceptable
3030
final String clientId;
3131
final Topic topicFilter;
32-
private final boolean active;
3332

3433
public Subscription(String clientId, Topic topicFilter, MqttQoS requestedQos) {
3534
this.requestedQos = requestedQos;
3635
this.clientId = clientId;
3736
this.topicFilter = topicFilter;
38-
this.active = true;
3937
}
4038

4139
public Subscription(Subscription orig) {
4240
this.requestedQos = orig.requestedQos;
4341
this.clientId = orig.clientId;
4442
this.topicFilter = orig.topicFilter;
45-
this.active = orig.active;
46-
}
47-
48-
/**
49-
* Constructor with undefined maximum QoS
50-
*
51-
* @param clientId id of client owning this subscription.
52-
* @param topicFilter the topic to subscribe to.
53-
* */
54-
public Subscription(String clientId, Topic topicFilter) {
55-
this.requestedQos = null;
56-
this.clientId = clientId;
57-
this.topicFilter = topicFilter;
58-
this.active = true;
5943
}
6044

6145
public String getClientId() {
@@ -70,10 +54,6 @@ public Topic getTopicFilter() {
7054
return topicFilter;
7155
}
7256

73-
public boolean isActive() {
74-
return active;
75-
}
76-
7757
public boolean qosLessThan(Subscription sub) {
7858
return requestedQos.value() < sub.requestedQos.value();
7959
}
@@ -101,12 +81,7 @@ public int hashCode() {
10181

10282
@Override
10383
public String toString() {
104-
return String.format(
105-
"[filter:%s, cliID: %s, qos: %s, active: %s]",
106-
this.topicFilter,
107-
this.clientId,
108-
this.requestedQos,
109-
this.active);
84+
return String.format("[filter:%s, clientID: %s, qos: %s]", topicFilter, clientId, requestedQos);
11085
}
11186

11287
@Override
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package io.moquette.persistence;
2+
3+
import io.moquette.BrokerConstants;
4+
import io.moquette.broker.ISubscriptionsRepository;
5+
import io.moquette.broker.config.IConfig;
6+
import org.h2.mvstore.MVStore;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import java.util.concurrent.ScheduledExecutorService;
11+
import java.util.concurrent.TimeUnit;
12+
13+
public class H2Builder {
14+
15+
private static final Logger LOG = LoggerFactory.getLogger(H2Builder.class);
16+
17+
private final String storePath;
18+
private final int autosaveInterval; // in seconds
19+
private final ScheduledExecutorService scheduler;
20+
private MVStore mvStore;
21+
22+
public H2Builder(IConfig props, ScheduledExecutorService scheduler) {
23+
this.storePath = props.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, "");
24+
final String autosaveProp = props.getProperty(BrokerConstants.AUTOSAVE_INTERVAL_PROPERTY_NAME, "30");
25+
this.autosaveInterval = Integer.parseInt(autosaveProp);
26+
this.scheduler = scheduler;
27+
}
28+
29+
@SuppressWarnings("FutureReturnValueIgnored")
30+
public H2Builder initStore() {
31+
LOG.info("Initializing H2 store");
32+
if (storePath == null || storePath.isEmpty()) {
33+
throw new IllegalArgumentException("H2 store path can't be null or empty");
34+
}
35+
mvStore = new MVStore.Builder()
36+
.fileName(storePath)
37+
.autoCommitDisabled()
38+
.open();
39+
40+
LOG.trace("Scheduling H2 commit task");
41+
scheduler.scheduleWithFixedDelay(() -> {
42+
LOG.trace("Committing to H2");
43+
mvStore.commit();
44+
}, autosaveInterval, autosaveInterval, TimeUnit.SECONDS);
45+
return this;
46+
}
47+
48+
public ISubscriptionsRepository subscriptionsRepository() {
49+
return new H2SubscriptionsRepository(mvStore);
50+
}
51+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.moquette.persistence;
2+
3+
import io.moquette.broker.ISubscriptionsRepository;
4+
import io.moquette.broker.subscriptions.Subscription;
5+
import org.h2.mvstore.Cursor;
6+
import org.h2.mvstore.MVMap;
7+
import org.h2.mvstore.MVStore;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
public class H2SubscriptionsRepository implements ISubscriptionsRepository {
15+
16+
private static final Logger LOG = LoggerFactory.getLogger(H2SubscriptionsRepository.class);
17+
private static final String SUBSCRIPTIONS_MAP = "subscriptions";
18+
19+
private MVMap<String, Subscription> subscriptions;
20+
21+
H2SubscriptionsRepository(MVStore mvStore) {
22+
this.subscriptions = mvStore.openMap(SUBSCRIPTIONS_MAP);
23+
}
24+
25+
@Override
26+
public List<Subscription> listAllSubscriptions() {
27+
LOG.debug("Retrieving existing subscriptions");
28+
29+
List<Subscription> results = new ArrayList<>();
30+
Cursor<String, Subscription> mapCursor = subscriptions.cursor(null);
31+
while (mapCursor.hasNext()) {
32+
String subscriptionStr = mapCursor.next();
33+
results.add(mapCursor.getValue());
34+
}
35+
LOG.debug("Loaded {} subscriptions", results.size());
36+
return results;
37+
}
38+
39+
@Override
40+
public void addNewSubscription(Subscription subscription) {
41+
subscriptions.put(subscription.getTopicFilter() + "-" + subscription.getClientId(), subscription);
42+
}
43+
44+
@Override
45+
public void removeSubscription(String topic, String clientID) {
46+
47+
}
48+
}

broker/src/main/java/io/moquette/persistence/MemoryStorageService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.util.concurrent.ScheduledExecutorService;
2525

26+
@Deprecated
2627
public class MemoryStorageService implements IStore {
2728

2829
private MemorySessionStore m_sessionsStore;

broker/src/main/java/io/moquette/broker/MemorySubscriptionsRepository.java renamed to broker/src/main/java/io/moquette/persistence/MemorySubscriptionsRepository.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@
1313
*
1414
* You may elect to redistribute this code under either of these licenses.
1515
*/
16-
package io.moquette.broker;
16+
package io.moquette.persistence;
1717

18+
import io.moquette.broker.ISubscriptionsRepository;
1819
import io.moquette.broker.subscriptions.Subscription;
1920

2021
import java.util.ArrayList;
@@ -34,4 +35,12 @@ public List<Subscription> listAllSubscriptions() {
3435
public void addNewSubscription(Subscription subscription) {
3536
subscriptions.add(subscription);
3637
}
38+
39+
@Override
40+
public void removeSubscription(String topic, String clientID) {
41+
subscriptions.stream()
42+
.filter(s -> s.getTopicFilter().toString().equals(topic) && s.getClientId().equals(clientID))
43+
.findFirst()
44+
.ifPresent(subscriptions::remove);
45+
}
3746
}

broker/src/main/java/io/moquette/spi/IStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.moquette.spi;
1818

19+
@Deprecated
1920
public interface IStore {
2021

2122
void initStore();

0 commit comments

Comments
 (0)