Skip to content

Commit

Permalink
Support smallint, tinyint, date type of Cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr authored and highker committed Sep 16, 2020
1 parent 5bdb41d commit 06d5f49
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 72 deletions.
5 changes: 5 additions & 0 deletions presto-cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@
<groupId>com.facebook.airlift</groupId>
<artifactId>security</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ public CassandraClientConfig setSpeculativeExecutionDelay(Duration speculativeEx
return this;
}

@NotNull
public ProtocolVersion getProtocolVersion()
{
return protocolVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.cassandra;

import com.datastax.driver.core.ProtocolVersion;
import com.facebook.airlift.json.JsonCodec;
import com.facebook.presto.cassandra.util.CassandraCqlUtils;
import com.facebook.presto.common.predicate.TupleDomain;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class CassandraMetadata
private final CassandraSession cassandraSession;
private final CassandraPartitionManager partitionManager;
private final boolean allowDropTable;
private final ProtocolVersion protocolVersion;

private final JsonCodec<List<ExtraColumnMetadata>> extraColumnMetadataCodec;

Expand All @@ -86,6 +88,7 @@ public CassandraMetadata(
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession is null");
this.allowDropTable = requireNonNull(config, "config is null").getAllowDropTable();
this.extraColumnMetadataCodec = requireNonNull(extraColumnMetadataCodec, "extraColumnMetadataCodec is null");
this.protocolVersion = requireNonNull(config, "config is null").getProtocolVersion();
}

@Override
Expand Down Expand Up @@ -294,7 +297,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
queryBuilder.append(", ")
.append(name)
.append(" ")
.append(toCassandraType(type).name().toLowerCase(ENGLISH));
.append(toCassandraType(type, protocolVersion).name().toLowerCase(ENGLISH));
}
queryBuilder.append(") ");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
*/
package com.facebook.presto.cassandra;

import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.querybuilder.Insert;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
Expand All @@ -22,17 +24,17 @@
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;

import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
Expand All @@ -42,11 +44,14 @@
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.Varchars.isVarcharType;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.primitives.Shorts.checkedCast;
import static java.lang.Float.intBitsToFloat;
import static java.lang.Math.toIntExact;
import static java.util.Objects.requireNonNull;
Expand All @@ -55,15 +60,17 @@
public class CassandraPageSink
implements ConnectorPageSink
{
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ISO_LOCAL_DATE.withZone(ZoneId.of("UTC"));
private static final DateTimeFormatter DATE_FORMATTER = ISODateTimeFormat.date().withZoneUTC();

private final CassandraSession cassandraSession;
private final PreparedStatement insert;
private final List<Type> columnTypes;
private final boolean generateUUID;
private final Function<Long, Object> toCassandraDate;

public CassandraPageSink(
CassandraSession cassandraSession,
ProtocolVersion protocolVersion,
String schemaName,
String tableName,
List<String> columnNames,
Expand All @@ -77,6 +84,13 @@ public CassandraPageSink(
this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
this.generateUUID = generateUUID;

if (protocolVersion.toInt() <= ProtocolVersion.V3.toInt()) {
toCassandraDate = value -> DATE_FORMATTER.print(TimeUnit.DAYS.toMillis(value));
}
else {
toCassandraDate = value -> LocalDate.fromDaysSinceEpoch(toIntExact(value));
}

Insert insert = insertInto(schemaName, tableName);
if (generateUUID) {
insert.value("id", bindMarker());
Expand Down Expand Up @@ -123,14 +137,20 @@ else if (BIGINT.equals(type)) {
else if (INTEGER.equals(type)) {
values.add(toIntExact(type.getLong(block, position)));
}
else if (SMALLINT.equals(type)) {
values.add(checkedCast(type.getLong(block, position)));
}
else if (TINYINT.equals(type)) {
values.add((byte) type.getLong(block, position));
}
else if (DOUBLE.equals(type)) {
values.add(type.getDouble(block, position));
}
else if (REAL.equals(type)) {
values.add(intBitsToFloat(toIntExact(type.getLong(block, position))));
}
else if (DATE.equals(type)) {
values.add(DATE_FORMATTER.format(Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(type.getLong(block, position)))));
values.add(toCassandraDate.apply(type.getLong(block, position)));
}
else if (TIMESTAMP.equals(type)) {
values.add(new Timestamp(type.getLong(block, position)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.cassandra;

import com.datastax.driver.core.ProtocolVersion;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPageSink;
Expand All @@ -30,11 +31,13 @@ public class CassandraPageSinkProvider
implements ConnectorPageSinkProvider
{
private final CassandraSession cassandraSession;
private final ProtocolVersion protocolVersion;

@Inject
public CassandraPageSinkProvider(CassandraSession cassandraSession)
public CassandraPageSinkProvider(CassandraSession cassandraSession, CassandraClientConfig cassandraClientConfig)
{
this.cassandraSession = requireNonNull(cassandraSession, "cassandraSession is null");
this.protocolVersion = requireNonNull(cassandraClientConfig, "cassandraClientConfig is null").getProtocolVersion();
}

@Override
Expand All @@ -47,6 +50,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa

return new CassandraPageSink(
cassandraSession,
protocolVersion,
handle.getSchemaName(),
handle.getTableName(),
handle.getColumnNames(),
Expand All @@ -64,6 +68,7 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa

return new CassandraPageSink(
cassandraSession,
protocolVersion,
handle.getSchemaName(),
handle.getTableName(),
handle.getColumnNames(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,17 @@ public long getLong(int i)
switch (getCassandraType(i)) {
case INT:
return currentRow.getInt(i);
case SMALLINT:
return currentRow.getShort(i);
case TINYINT:
return currentRow.getByte(i);
case BIGINT:
case COUNTER:
return currentRow.getLong(i);
case TIMESTAMP:
return currentRow.getTimestamp(i).getTime();
case DATE:
return currentRow.getDate(i).getDaysSinceEpoch();
case FLOAT:
return floatToRawIntBits(currentRow.getFloat(i));
default:
Expand Down
Loading

0 comments on commit 06d5f49

Please sign in to comment.