This document describes how to use the Java client to interact with Riak. See the DEVELOPERS document for a technical overview of the project.
In addition, we are now working on a “cookbook” for using the Riak Java client. See the Cookbook section on the Github Wiki for complete, compilable examples of using the Java client.
Riak is a dynamo style, distributed key-value store that provides a map reduce query interface. It exposes both a REST and protocol buffers API. This is a Java client for talking to Riak via a single interface, regardless of underlying transport. This library also attempts to simplify some of the realities of dealing with a fault-tolerant, eventually consistent database by providing strategies for:
- Conflict resolution
- Retrying requests
- Value mutation
The client provides some lightweight ORM like capability for storing domain objects in Riak and returning domain objects from map reduce queries.
The current riak-java-client is available from maven central. Add the dependency to your pom.xml
<dependency>
<groupId>com.basho.riak</groupId>
<artifactId>riak-client</artifactId>
<version>1.0.6</version>
<type>pom</type>
</dependency>
Maven artifacts are published to the Sonatype OSS Repository.
Assuming you’re running Riak on localhost on the default ports getting started is as simple as:
// create a client (see Configuration below in this README for more details)
IRiakClient riakClient = RiakFactory.pbcClient(); //or RiakFactory.httpClient();
// create a new bucket
Bucket myBucket = riakClient.createBucket("myBucket").execute();
// add data to the bucket
myBucket.store("key1", "value1").execute();
//fetch it back
IRiakObject myData = myBucket.fetch("key1").execute();
// you can specify extra parameters to the store operation using the
// fluent builder style API
myData = myBucket.store("key1", "value2").returnBody(true).execute();
// delete
myBucket.delete("key1").rw(3).execute();
This riak-java-client API is new. Prior to this version there were two separate clients, one for protocol buffers, one for REST, both in the same library and both with quite different APIs.
The REST client (which you can still use directly) has been moved to
com.basho.riak.client.http.RiakClient
Though a deprecated RiakClient still exists at
com.basho.riak.client.RiakClient
for another release or two to ease transition. All the REST client’s HTTP specific classes have been moved to
com.basho.riak.client.http.*
and the originals retained but deprecated. If you want to use the legacy, low-level client directly please use the newly packaged version. The deprecated classes will be deleted in the next or following release to clean up the namespaces.
At that time IRiak* will become Riak* and any I* names will be deprecated then dropped. I’m sorry about the unpleasant naming conventions in the short term.
To avoid the profusion of constructors and setters there is a builder
com.basho.riak.client.builders.RiakObjectBuilder
to simplify creating and updating IRiakObjects.
In fact most classes are as immutable as possible and are created using fluent builders. The builders are not designed to be used across multiple threads but the immutable value objects they create are.
There is a low-level interface, RawClient
com.basho.riak.client.raw.RawClient
and two adapters that adapt the legacy protocol buffers and REST clients to the RawClient interface. RawClient provides access to Riak’s APIs. If you don’t want any of the higher level features to deal with domain objects, eventual consistency and fault tolerance (see below) then at least use RawClient over the underlying legacy clients so your code will not need to change if you decide to move from REST to protocol buffers or back. For example:
RiakClient pbcClient = new RiakClient("127.0.0.1");
// OR
// com.basho.riak.client.http.RiakClient httpClient = new
// com.basho.riak.client.http.RiakClient("http://127.0.0.1:8098/riak");
RawClient rawClient = new PBClientAdapter(pbcClient);
// OR new HTTPClientAdapter(httpClient);
IRiakObject riakObject = RiakObjectBuilder.newBuilder(bucketName, "key1").withValue("value1").build();
rawClient.store(riakObject, new StoreMeta(2, 1, false));
RiakResponse fetched = rawClient.fetch(bucketName, "key1");
IRiakObject result = null;
if(fetched.hasValue()) {
if(fetched.hasSiblings()) {
//do what you must to resolve conflicts
} else {
result = fetched.getRiakObjects()[0];
}
}
result.addLink(new RiakLink("otherBucket", "otherKey", "tag"));
result.setValue("newValue");
RiakResponse stored = rawClient.store(result, new StoreMeta(2, 1, true));
IRiakObject updated = null;
if(stored.hasValue()) {
if(stored.hasSiblings()) {
//do what you must to resolve conflicts
} else {
updated = stored.getRiakObjects()[0];
}
}
rawClient.delete(bucketName, "key1");
If you want to add a client transport to Riak (say you hate Apache HTTP client but love Netty) implementing RawClient is the way to do it.
All the code so far elides somes rather important details:
// handle conflict here
If your bucket allows siblings at some point you may have to deal with conflict. Likewise, if you are running in the real world you may have to deal with temporary failure.
The higher level API (built on top of RawClient) gives you some tools to deal with eventual consistency and temporary failure.
Talking to Riak is modelled as a set of operations. An operation is a fluent builder for setting operation parameters (like the tunable CAP quorum for a read) and an execute method to carry out the operation. EG
Bucket b = client.createBucket(bucketName)
.nVal(1)
.allowSiblings(true)
.execute();
or
b.store("k", "v").w(2).dw(1).returnBody(false).execute();
All the operations implement RiakOperation<T>, which has a single method:
T execute() throws RiakException;
Each operation needs a Retrier. You can specify a default retrier implementation when you create an IRiakClient or you can provide one to each operation when you build it. There is a simple retrier provided with this library that retries the given operation n times before throwing an exception.
b.store("k", "v").withRetrier(DefaultRetrier.attempts(3)).execute();
The DefaultRiakClient implementation provides a 3 times retrier to all it’s operations. You can override this from the constructor or provide your own per operation (or per bucket, see below). The Retrier interface accepts Callable<T> for its “attempt” method. Internally, operations are built around that interface.
public interface Retrier {
<T> T attempt(Callable<T> command) throws RiakRetryFailedException;
}
To simplify the Riak client all value related operations are performed via the Bucket interface. The Bucket also provides access to the set of bucket properties (nval, allow_mult etc).
NOTE: at present not all bucket properties are exposed by either API. This is something that will be addressed very soon.
One thing to note is that you can store more than just IRiakObjects in buckets. Bucket has convenience methods to store byte[] and String values against a key but also type parameterized generic fetch and store methods. This allows you to store your domain objects in Riak. Please see Conversion below for details.
Although it is expensive and somewhat ill advised, you may list a bucket’s keys with:
for(String k : bucket.keys()) {
// do your key thing
}
The keys are streamed, and the stream closed by a reaper thread when the iterator is weakly reachable.
There is a further wrapper to bucket (see DomainBucket below) that simplifies calling operations further.
Conflict happens in Dynamo style systems. It is best to have a strategy in mind to deal with it. The strategy you employ is highly dependant on your domain. One example is a shopping cart. Conflicting shopping carts should be merged by a union of their contents, you might reinstate a deleted toaster but that is better than losing money.
See MergeCartResolver in src/test for an example of a Shopping Cart conflict resolver.
Both fetch and store make use of a ConflictResolver to handle siblings.
The default conflict resolver does not “resolve” conflicts, it blows up with an UnresolvedConflictException (which gives you access to the siblings).
Using the basic bucket interface you can provide a conflict resolver to either a fetch or a store operation. All operations are configured by default with a resolver for which siblings are an exception.
The conflict resolver interface is a single method that accepts a Collection of domain objects and returns the one true value, or throws an exception of conflict cannot be resolved. UnresolvedConflictException contains all the siblings. In cases were logic fails to resolve the conflict you can push the decision to a user:
T resolve(final Collection<T> siblings) throws UnresolvedConflictException;
Since conflict resolution requires domain knowledge it makes sense to convert riak data into domain objects.
Data in riak is made up of the value, its content-type, links and user meta data. There is then some riak meta data along with that (for example, the VClock, last update time etc.)
The data payload can be any type you like, but normally it is a serialized version of some application specific data. It is a lot easier to reason about siblings and conflict with the domain knowledge of your application, and easier still with the actual domain objects.
Each operation provided by Bucket can accept an implementation of
com.basho.riak.client.convert.Converter
Converter has two methods
IRiakObject fromDomain(T domainObject, VClock vclock)
T toDomain(IRiakObject riakObject)
Implement these and pass to a bucket operation to convert riak data into POJOs and back.
This library currently provides a JSONConverter that uses the Jackson JSON library. Jackson requires your classes to be either simple Java Bean types (getter, setter, no arg constructor) or annotated. Please see
com.megacorp.commerce.ShoppingCart
for an example of Jackson annotated domain class and LegacyCart in the same package for an unannotated class.
You can annotate a field of your class with
@RiakKey
and the client will use the value of that field as the key for fetch and store operations. If you do not or cannot annotate a key field then you must use the
bucket.store("key", myObject);
Implementing your own converter is pretty simple, so if you want to store XML, go ahead. Be aware that the converter should write the content-type when serializing and also check the content-type when deserializing.
There is also a pass through converter for IRiakObject.
You may also use the JSONConverter to store Java Collection types (like Map, List or Map<List> and List<Map<String, List<String>>>) as JSON in Riak. Which is pretty cool.
With conflict resolution comes Mutation. When you perform a store you might be creating a new key/value but you may well be updating an existing value and you don’t know in advance. If you model your data to be logically monotonic then you can provide a Mutation<T> that accepts the old value and returns the new value based on some logic.
b.store("k", myObject).withMutation(new Mutation<MyClass>() {
MyClass apply(MyClass original) {
myObject.setCounter(orignal.getCounter() +1 );
return myObject;
}).execute();
The Mutation<T> interface has a single method:
T apply(T original);
Which accepts the conflict resolved value from a fetch and returns it updated.
The default mutation replaces the old value with the new value. (See ClobberMutation.)
When a fetch operation is executed the order of execution is as follows:
- RawClient fetch
- Siblings iterated and converted
- Converted siblings passed to conflict resolver
- Resolved value returned
For a store operation
- Fetch operation performed as above
- The Mutation is applied to the fetched value
- The mutated value is converted to RiakObject
- The store is performed through the RawClient
- if returnBody is true the siblings are iterated, converted and conflict resolved and the value is returned
A DomainBucket is a wrapper around a bucket that simplifies the amount of typing and repetition required to work with that bucket. A DomainBucket is an abstraction that allows you to store and fetch specific types in Riak. BEWARE there is no enforcement of any schema on the Riak side, if you store ShoppingCart in the “carts” bucket and try and retrieve it through a DomainBucket<Account> then you will have a ConversionException.
Chances are, that once you project has stablised you will be working with maybe a few types and a few buckets, so you ShoppingCarts will always require that you use you MergedCartResolver and your CartConverter and your CartMutation.
Creating a DomainBucket is easy:
final DomainBucket<ShoppingCart> carts = DomainBucket.builder(b, ShoppingCart.class)
.withResolver(new MergeCartResolver())
.returnBody(true)
.retrier(new DefaultRetrier(4))
.w(1)
.dw(1)
.r(1)
.rw(1)
.mutationProducer(new CartMutator())
.build();
Thereafter there is less noise when working with your ShoppingCart data:
final ShoppingCart cart = new ShoppingCart(userId);
cart.addItem("coffee");
cart.addItem("fixie");
cart.addItem("moleskine");
final ShoppingCart storedCart = carts.store(cart);
carts.fetch(userId);
cart.addItem("bowtie");
cart.addItem("nail gun");
carts.delete(cart);
(NOTE: by default a DomainBucket is configured with the DefaultResolver, ClobberMutation and JSONConverter)
The Riak-java-client currently supports map reduce and link walking.
Performing map reduce is very much as it was for the legacy RiakClient:
Refer to the Riak Map/Reduce documentation for a detailed explanation of how map/reduce works in Riak. Map/Reduce is just another RiakOperation and so a fluent builder:
MapReduceResult result = client.mapReduce("myBucket")
.addLinkPhase("bucketX", "test", false)
.addMapPhase(new NamedJSFunction("Riak.mapValuesJson"), false)
.addReducePhase(new NamedErlangFunction("riak_kv_mapreduce", "reduce_sort"), true)
.execute();
The Map reduce operation lets you build up a number of phases. The MapReduceResult uses Jackson (again) to provide you query results as either Java Collection types, a raw JSON string or (again) as a Java Bean type that you provide to the getResult method:
Collection<GoogleStockDataItem> stockItems =
result.getResult(GoogleStockDataItem.class);
The inputs to a Map/Reduce are either a bucket, or bucket/key pairs.
A BucketMapReduce will run through all keys in the bucket. This will be time and resource consuming for large datasets. To create a BucketMapReduce operation call
client.mapReduce("myBucket");
BucketMapReduce also allows the addition of Key Filters to limit the results. Adding Key Filters is just like adding phases:
MapReduceResult result = client.mapReduce("myBucket")
.addKeyFilter(new TokenizeFilter("_", 2))
.addKeyFilter(new StringToIntFilter())
.addKeyFilter(new LessThanFilter(50))
.addMapPhase(new NamedJSFunction("Riak.mapValuesJson"))
.addReducePhase(new NamedErlangFunction("riak_kv_mapreduce","reduce_sort"), true)
.execute();
Collection<Integer> items = result.getResult(Integer.class);
Please see the Key Filters documentation for more details about key filters and the
com.basho.riak.client.query.filters.*
package for the available filters.
A BucketKeyMapReduce can be built with many inputs, they’re added just like phases.
MapReduceResult result = client.mapReduce()
.addInput("goog","2010-01-04")
.addInput("goog","2010-01-05")
.addInput("goog","2010-01-06")
.addInput("goog","2010-01-07")
.addInput("goog","2010-01-08")
.addMapPhase(new NamedJSFunction("Riak.mapValuesJson"), true)
.execute();
Riak Search queries can be used as inputs to Map/Reduce. To use a Riak
Search query as input for a Map/Reduce operation with the Riak-java-client create a
SearchMapReduce
operation with the IRiakClient:
MapReduceResult result = client.mapReduce("my_search_bucket", "foo:zero")
.addMapPhase(new NamedJSFunction("Riak.mapValuesJson"))
.execute();
And work with the results as before.
To enable a bucket for Riak Search, there is a new method added to
Bucket
, just call:
client.createBucket("bucket_name") // or updateBucket("bucket_name")
.enableForSearch().execute()
Note: At present, enabling a bucket only works if your client is an HTTP client. These bucket properties will soon be available via PB. Querying via Map/Reduce works for either transport
An IndexMapReduce
can be used if you have the
riak_kv_eleveldb_backend
enabled. A secondary indexes query can then
provide the input for a Map/Reduce job. As before, use the
IRiakClient
as a factory to create an IndexMapReduce
:
IndexQuery iq = new BinValueQuery(BinIndex.named("email"),
"user_bucket", "user@domain.com");
MapReduceResult result = client.mapReduce(iq)
.addReducePhase(NamedErlangFunction.REDUCE_IDENTITY)
.execute();
There are query classes to perform a RangeQuery
or a ValueQuery
.
You can also directly fetch an index using a Bucket
:
// fetch int range
List<String> ageRange = b.fetchIndex(IntIndex.named("age")).from(16).to(64).execute();
Links provide a light weight graph database-like feature to Riak. See the Link Walking documentation for full details.
Adding links to an IRiakObject is done via the builder
IRiakObject o = RiakObjectBuilder.newBuilder("myBucket",
"myKey").withValue("value").addLink("bucketX", "keyY", "tagZ").build();
Link Walking is just another RiakOperation. You start at a IRiakObject and add steps to walk and call execute. Adding a step is matter of specifying the bucket, tag and whether to keep the output for the step. A null, empty string or “_” are treated as the wildcard for either of bucket or tag. Specify keep as either a boolean or the Accumulate enum. Not specifying keep will result in the default for that step being used.
An example link walk:
WalkResult result = client.walk(riakObject)
.addStep(bucketName, fooTag, true)
.addStep(bucketName, fooTag)
.execute();
The result is always a Collection of IRiakObjects. In the next version conversion and conflict resolution will also be available to link walking. We also plan to add Link mapping so that links can be used to build graphs of domain objects.
NOTE: Link walking is a REST only operation as far as Riak’s interfaces are concerned. Link Walking in the protocol buffers Java client is a hack that issues two m/r jobs to the protocol buffers interface (the first constructs the inputs to the second by walking the links, the second returns the data). It is included to provide parity between the interfaces but should perform similarly to the REST link walking interface.
The riak-java-client takes a layered approach to architecture. There
are the 2 legacy clients (http.RiakClient
and pbc.RiakClient
)
which are adapted to the RawClient
interface, which itself is
wrapped in an IRiakClient implementation when exposing the high-level
API. In order to configure the transport at the lowest level, from the
high-level there is the RiakFactory
and the Configuration
interface.
To create a client for the transport of your choice, create a
Configuration
for that transport and pass it
RiakFactory.newClient(Configuration);
PBClientConfig conf = new PBClientConfig.Builder()
.withHost("127.0.0.1")
.withPort(8097)
.build();
IRiakClient client = RiakFactory.newClient(conf);
There are Configuration
implementations for HTTP, Protocal Buffers
and a new Configuration
for ClusterClient
.
A ClusterClient
is a set of homogenous RawClients that are
configured for many nodes in a Riak cluster and round robined between
for requests and retries. This means that you can create your
DefaultRetrier
to retry as many times as you have nodes in your
cluster to exhaustively try each node if the previous one failed to
satisfy a request. To create a ClusterClient
create a
ClusterClientConfig
and pass it to
RiakFactory.newClient(Configuration)
.
PBClientConfig node1 = new PBClientConfig.defaults();
PBClientConfig node2 = PBClientConfig.Builder.from(node1).withHost("mysecond-node.com").build();
PBClusterConfig clusterConf = new PBClusterConfig(200);
clusterConf.addClient(node1);
clusterConf.addClient(node2);
RiakFactory.newClient(clusterConf);
At the moment only cluster clients of homogenous transports are supported.
The Riak HTTP Client uses Apache HTTP Client 4.1. The Apache HTTP
Client provides connection pooling and you configure maximum
connections per host or cluster (if you are using a ClusterClient
)
The riak-java-client provides a connection pool for the protocol buffers client, too. Again you configure per node and over all for the cluster.
Both transport’s pools are configured with a connection wait timeout. If you see a lot of exceptions timing out acquiring a connection then the chances are you have sized your pool too small. The API docs have more details on this. See the DEVELOPERS document for instructions on building the API docs.
Have a look at the
com.basho.riak.client.itest
package for examples of all the features described above.
Start storing data in Riak using IRiakObject and anonymous inner classes for Mutation, ConflictResolution and Retrier. As your use case and application firm you can create concrete, testable, reusable implementations to act on your own domain objects.
Please start with the DEVELOPERS document.