Skip to content

Commit f9fa70b

Browse files
committed
Merge pull request #602 from basho/feature/create-table-riakts-command
RiakTs CreateTable command
2 parents 2d9ee21 + 040cd21 commit f9fa70b

File tree

10 files changed

+631
-183
lines changed

10 files changed

+631
-183
lines changed
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2013-2016 Basho Technologies Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.basho.riak.client.api.commands.timeseries;
17+
18+
import com.basho.riak.client.api.RiakCommand;
19+
import com.basho.riak.client.core.RiakCluster;
20+
import com.basho.riak.client.core.RiakFuture;
21+
import com.basho.riak.client.core.operations.ts.CreateTableOperation;
22+
import com.basho.riak.client.core.query.timeseries.TableDefinition;
23+
24+
/**
25+
* Time Series Create Command
26+
* Allows you to create a Riak Time Series table according to the provided definition.
27+
*
28+
* @author Sergey Galkin <srggal at gmail dot com>
29+
* @since 2.0.6
30+
*/
31+
public class CreateTable extends RiakCommand<Void, String>
32+
{
33+
private final Builder builder;
34+
35+
private CreateTable(Builder builder)
36+
{
37+
this.builder = builder;
38+
}
39+
40+
@Override
41+
protected RiakFuture<Void, String> executeAsync(RiakCluster cluster)
42+
{
43+
final RiakFuture<Void, String> future =
44+
cluster.execute(builder.buildOperation());
45+
46+
return future;
47+
}
48+
49+
public static class Builder extends CreateTableOperation.AbstractBuilder<CreateTable, Builder>
50+
{
51+
public Builder(TableDefinition tableDefinition)
52+
{
53+
super(tableDefinition);
54+
}
55+
56+
public CreateTable build()
57+
{
58+
return new CreateTable(this);
59+
}
60+
}
61+
}
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright 2013-2016 Basho Technologies Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.basho.riak.client.core.operations.ts;
17+
18+
import com.basho.riak.client.core.operations.PBFutureOperation;
19+
import com.basho.riak.client.core.query.timeseries.ColumnDescription;
20+
import com.basho.riak.client.core.query.timeseries.FullColumnDescription;
21+
import com.basho.riak.client.core.query.timeseries.TableDefinition;
22+
import com.basho.riak.protobuf.RiakMessageCodes;
23+
import com.basho.riak.protobuf.RiakTsPB;
24+
import com.google.protobuf.ByteString;
25+
26+
import java.util.Collection;
27+
import java.util.List;
28+
import java.util.concurrent.TimeUnit;
29+
30+
/**
31+
* An operation to create a Riak Time Series table according to the provided definition.
32+
*
33+
* @author Sergey Galkin <srggal at gmail dot com>
34+
* @since 2.0.6
35+
*/
36+
public class CreateTableOperation extends PBFutureOperation<Void, RiakTsPB.TsQueryResp, String>
37+
{
38+
private final RiakTsPB.TsQueryReq.Builder reqBuilder;
39+
private final String queryText;
40+
41+
private CreateTableOperation(AbstractBuilder builder)
42+
{
43+
super(RiakMessageCodes.MSG_TsQueryReq,
44+
RiakMessageCodes.MSG_TsQueryResp,
45+
builder.reqBuilder,
46+
RiakTsPB.TsQueryResp.PARSER);
47+
48+
this.reqBuilder = builder.reqBuilder;
49+
this.queryText = builder.queryText;
50+
}
51+
52+
@Override
53+
protected Void convert(List<RiakTsPB.TsQueryResp> rawResponse)
54+
{
55+
return null;
56+
}
57+
58+
@Override
59+
public String getQueryInfo()
60+
{
61+
return this.queryText;
62+
}
63+
64+
public static abstract class AbstractBuilder<R, THIS extends AbstractBuilder>
65+
{
66+
private RiakTsPB.TsQueryReq.Builder reqBuilder;
67+
private String queryText;
68+
private int quantum;
69+
private char quantumUnit;
70+
private final TableDefinition tableDefinition;
71+
72+
public AbstractBuilder(TableDefinition tableDefinition)
73+
{
74+
if (tableDefinition == null)
75+
{
76+
throw new IllegalArgumentException("TableDefinition cannot be null.");
77+
}
78+
79+
final String tableName = tableDefinition.getTableName();
80+
if (tableName == null || tableName.length() == 0)
81+
{
82+
throw new IllegalArgumentException("Table Name cannot be null or empty");
83+
}
84+
85+
this.tableDefinition = tableDefinition;
86+
}
87+
88+
public abstract R build();
89+
90+
91+
public CreateTableOperation buildOperation()
92+
{
93+
final String keys = generateKeys(tableDefinition, quantum, quantumUnit).toString();
94+
95+
queryText = String.format("CREATE TABLE %s (%s,\n\n PRIMARY KEY (%s))",
96+
tableDefinition.getTableName(), generateColumns(tableDefinition),
97+
keys);
98+
99+
reqBuilder = RiakTsPB.TsQueryReq.newBuilder()
100+
.setQuery(RiakTsPB.TsInterpolation.newBuilder().setBase(
101+
ByteString.copyFromUtf8(queryText)
102+
));
103+
104+
105+
return new CreateTableOperation(this);
106+
}
107+
108+
109+
@SuppressWarnings("unchecked")
110+
public THIS withQuantum(int quantum, TimeUnit tu)
111+
{
112+
switch (tu)
113+
{
114+
case SECONDS:
115+
quantumUnit = 's';
116+
break;
117+
118+
case MINUTES:
119+
quantumUnit = 'm';
120+
break;
121+
122+
case HOURS:
123+
quantumUnit = 'h';
124+
break;
125+
126+
case DAYS:
127+
quantumUnit = 'd';
128+
break;
129+
130+
default:
131+
throw new IllegalArgumentException("Unsupported quantum unit '"+ tu.name() +"', at the moment the only:" +
132+
" seconds, minutes, hours and days are supported.");
133+
}
134+
135+
this.quantum = quantum;
136+
return (THIS)this;
137+
}
138+
139+
private static StringBuilder generateColumns(TableDefinition tableDefinition)
140+
{
141+
final StringBuilder sb = new StringBuilder();
142+
143+
for (FullColumnDescription fd: tableDefinition.getFullColumnDescriptions())
144+
{
145+
if (sb.length() > 0)
146+
{
147+
sb.append(",\n ");
148+
}
149+
150+
sb.append(fd.getName())
151+
.append(' ')
152+
.append(fd.getType().name());
153+
154+
if (!fd.isNullable()) {
155+
sb.append(" not null");
156+
}
157+
}
158+
159+
return sb;
160+
}
161+
162+
private static StringBuilder generateKeys(TableDefinition tableDefinition, int quantum, char quantumUnit)
163+
{
164+
165+
final Collection<FullColumnDescription> pks = tableDefinition.getPartitionKeyColumnDescriptions();
166+
if (pks == null || pks.isEmpty())
167+
{
168+
throw new IllegalArgumentException("No defined primary keys, at least one primary key should be defined.");
169+
}
170+
171+
boolean quantumGenerated = false;
172+
final StringBuilder sb = new StringBuilder();
173+
for (FullColumnDescription k: pks)
174+
{
175+
if (sb.length() > 0)
176+
{
177+
sb.append(", ");
178+
} else {
179+
sb.append('(');
180+
}
181+
182+
if (!quantumGenerated && ColumnDescription.ColumnType.TIMESTAMP.equals(k.getType()))
183+
{
184+
// handle quantum
185+
sb.append("quantum(")
186+
.append(k.getName())
187+
.append(',')
188+
.append(quantum)
189+
.append(',')
190+
.append(quantumUnit)
191+
.append(')');
192+
193+
quantumGenerated = true;
194+
}
195+
else
196+
{
197+
sb.append(k.getName());
198+
}
199+
}
200+
sb.append(')');
201+
202+
for (FullColumnDescription lk: tableDefinition.getLocalKeyColumnDescriptions())
203+
{
204+
sb.append(", ")
205+
.append(lk.getName());
206+
}
207+
208+
return sb;
209+
}
210+
}
211+
}

0 commit comments

Comments
 (0)