Skip to content
This repository was archived by the owner on Jul 31, 2025. It is now read-only.

Commit 84c3433

Browse files
committed
Initial (simple) lock implementation
just a brain dump of a distributed cassandra lock from (http://www.datastax.com/dev/blog/consensus-on-cassandra) will get some cleanup the other day
1 parent 6c5e8f5 commit 84c3433

File tree

7 files changed

+273
-0
lines changed

7 files changed

+273
-0
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.datanerds.lack;
2+
3+
import com.datastax.driver.core.ResultSet;
4+
import com.datastax.driver.core.Session;
5+
import io.datanerds.lack.cassandra.CassandraClient;
6+
import io.datanerds.lack.cassandra.LackConfig;
7+
import io.datanerds.lack.cassandra.Statements;
8+
9+
public class Lack {
10+
11+
private final String owner;
12+
private final CassandraClient client;
13+
private final Session session;
14+
private final Statements statements;
15+
16+
public Lack(LackConfig lackConfig, String owner) {
17+
this.owner = owner;
18+
client = new CassandraClient();
19+
session = client.init(lackConfig);
20+
statements = new Statements(session);
21+
statements.create();
22+
statements.setTtl(lackConfig.ttlInSeconds);
23+
}
24+
25+
public void acquire(String resource) {
26+
ResultSet result = statements.acquire(resource, owner);
27+
if (!result.wasApplied()) {
28+
throw new LackException(String.format("Could not acquire lock for '%s'", resource));
29+
}
30+
}
31+
32+
public void renew(String resource) {
33+
ResultSet result = statements.renew(resource, owner);
34+
if (!result.wasApplied()) {
35+
throw new LackException(String.format("Could not renew lock for '%s'", resource));
36+
}
37+
}
38+
39+
public void release(String resource) {
40+
ResultSet result = statements.release(resource, owner);
41+
if (!result.wasApplied()) {
42+
throw new LackException(String.format("Could not release lock for '%s'", resource));
43+
}
44+
}
45+
46+
public void stop() {
47+
session.close();
48+
client.close();
49+
}
50+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.datanerds.lack;
2+
3+
public class LackException extends RuntimeException {
4+
public LackException(String message) {
5+
super(message);
6+
}
7+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.datanerds.lack.cassandra;
2+
3+
import com.datastax.driver.core.Cluster;
4+
import com.datastax.driver.core.Metadata;
5+
import com.datastax.driver.core.Session;
6+
import com.datastax.driver.core.SocketOptions;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
public class CassandraClient {
11+
12+
private static final Logger logger = LoggerFactory.getLogger(CassandraClient.class);
13+
private Cluster cluster;
14+
15+
public Session init(LackConfig config) {
16+
Cluster.Builder clusterBuilder = new Cluster.Builder().addContactPoints(config.nodes)
17+
.withSocketOptions(new SocketOptions().setTcpNoDelay(true).setReuseAddress(true));
18+
if (config.username != null && config.password != null) {
19+
clusterBuilder.withCredentials(config.username, config.password);
20+
}
21+
22+
this.cluster = clusterBuilder.build();
23+
logClusterInfo();
24+
Session session = cluster.connect(config.keyspace);
25+
return session;
26+
}
27+
28+
public void close() {
29+
cluster.close();
30+
}
31+
32+
private void logClusterInfo() {
33+
Metadata metadata = cluster.getMetadata();
34+
logger.info("Connected to cluster: {}; Protocol Version: {}", metadata.getClusterName(),
35+
cluster.getConfiguration().getProtocolOptions().getProtocolVersionEnum());
36+
37+
metadata.getAllHosts()
38+
.forEach(host -> logger.info("Added '{}' from DC '{}' which is running '{}'", host.getAddress(),
39+
host.getDatacenter(), host.getCassandraVersion().toString()));
40+
}
41+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.datanerds.lack.cassandra;
2+
3+
public interface Constants {
4+
String TABLE = "leases";
5+
6+
interface Columns {
7+
String RESOURCE = "resource";
8+
String OWNER = "owner";
9+
}
10+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.datanerds.lack.cassandra;
2+
3+
public class LackConfig {
4+
5+
public final String username;
6+
public final String password;
7+
public final String[] nodes;
8+
public final String keyspace;
9+
public final int ttlInSeconds;
10+
11+
public LackConfig(String username, String password, String[] nodes, String keyspace, int ttlInSeconds) {
12+
this.username = username;
13+
this.password = password;
14+
this.nodes = nodes;
15+
this.keyspace = keyspace;
16+
this.ttlInSeconds = ttlInSeconds;
17+
}
18+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package io.datanerds.lack.cassandra;
2+
3+
import com.datastax.driver.core.BoundStatement;
4+
import com.datastax.driver.core.PreparedStatement;
5+
import com.datastax.driver.core.ResultSet;
6+
import com.datastax.driver.core.Session;
7+
import com.datastax.driver.core.querybuilder.BuiltStatement;
8+
import com.datastax.driver.core.querybuilder.Delete;
9+
import com.datastax.driver.core.querybuilder.Insert;
10+
import com.datastax.driver.core.querybuilder.Update;
11+
import com.datastax.driver.core.schemabuilder.Create;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import java.util.Map;
16+
import java.util.concurrent.ConcurrentHashMap;
17+
18+
import static com.datastax.driver.core.DataType.text;
19+
import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
20+
import static com.datastax.driver.core.schemabuilder.SchemaBuilder.alterTable;
21+
import static com.datastax.driver.core.schemabuilder.SchemaBuilder.createTable;
22+
import static io.datanerds.lack.cassandra.Constants.Columns.OWNER;
23+
import static io.datanerds.lack.cassandra.Constants.Columns.RESOURCE;
24+
import static io.datanerds.lack.cassandra.Constants.TABLE;
25+
26+
public class Statements {
27+
28+
private static final Logger logger = LoggerFactory.getLogger(Statements.class);
29+
private final Map<BuiltStatement, PreparedStatement> statements = new ConcurrentHashMap<>();
30+
private final Session session;
31+
32+
private final Create create = createTable(TABLE).addPartitionKey(RESOURCE, text())
33+
.addColumn(OWNER, text())
34+
.ifNotExists();
35+
36+
private final Insert acquire = insertInto(TABLE).value(RESOURCE, bindMarker())
37+
.value(OWNER, bindMarker())
38+
.ifNotExists();
39+
40+
private final Update.Conditions renew = update(TABLE).with(set(OWNER, bindMarker()))
41+
.where(eq(RESOURCE, bindMarker()))
42+
.onlyIf(eq(OWNER, bindMarker()));
43+
44+
private final Delete.Conditions release = delete().from(TABLE)
45+
.where(eq(RESOURCE, bindMarker()))
46+
.onlyIf(eq(OWNER, bindMarker()));
47+
48+
public Statements(Session session) {
49+
this.session = session;
50+
}
51+
52+
public void create() {
53+
session.execute(create);
54+
}
55+
56+
public void setTtl(int ttl) {
57+
logger.info("Setting a default TTL of '{}' seconds", ttl);
58+
session.execute(alterTable("leases").withOptions().defaultTimeToLive(ttl));
59+
}
60+
61+
public ResultSet acquire(String resource, String owner) {
62+
BoundStatement boundStatement = statements.computeIfAbsent(acquire, session::prepare).bind(resource, owner);
63+
return session.execute(boundStatement);
64+
}
65+
66+
public ResultSet renew(String resource, String owner) {
67+
BoundStatement boundStatement = statements.computeIfAbsent(renew, session::prepare)
68+
.bind(owner, resource, owner);
69+
return session.execute(boundStatement);
70+
}
71+
72+
public ResultSet release(String resource, String owner) {
73+
BoundStatement boundStatement = statements.computeIfAbsent(release, session::prepare).bind(resource, owner);
74+
return session.execute(boundStatement);
75+
76+
}
77+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package io.datanerds.lack;
2+
3+
import io.datanerds.lack.cassandra.LackConfig;
4+
import org.junit.*;
5+
import org.junit.rules.ExpectedException;
6+
7+
import java.util.UUID;
8+
9+
public class LackTest {
10+
11+
private static LackConfig config = new LackConfig(null, null, new String[]{"127.0.0.1"}, "lack", 1);
12+
private static Lack lack;
13+
private static Lack otherLack;
14+
private String resource;
15+
16+
@Rule
17+
public ExpectedException thrown = ExpectedException.none();
18+
19+
@BeforeClass
20+
public static void setup() {
21+
lack = new Lack(config, "lack");
22+
otherLack = new Lack(config, "otherLack");
23+
}
24+
25+
@AfterClass
26+
public static void tearDown() {
27+
lack.stop();
28+
}
29+
30+
@Before
31+
public void setupResource() {
32+
this.resource = UUID.randomUUID().toString();
33+
}
34+
35+
@Test
36+
public void simpleAcquireRenewAndRelease() {
37+
lack.acquire(resource);
38+
lack.renew(resource);
39+
lack.release(resource);
40+
}
41+
42+
@Test
43+
public void alreadyLocked() {
44+
lack.acquire(resource);
45+
thrown.expect(LackException.class);
46+
lack.acquire(resource);
47+
}
48+
49+
@Test
50+
public void alreadyLockedByOther() {
51+
lack.acquire(resource);
52+
thrown.expect(LackException.class);
53+
otherLack.acquire(resource);
54+
}
55+
56+
@Test
57+
public void alreadyReleased() {
58+
lack.acquire(resource);
59+
lack.release(resource);
60+
thrown.expect(LackException.class);
61+
otherLack.release(resource);
62+
}
63+
64+
@Test
65+
public void testReleaseAfterTtl() throws InterruptedException {
66+
lack.acquire(resource);
67+
Thread.sleep(1200);
68+
lack.acquire(resource);
69+
}
70+
}

0 commit comments

Comments
 (0)