Skip to content

Commit

Permalink
[CALCITE-4128] Remove dependency of File adapter on Example CSV adapter
Browse files Browse the repository at this point in the history
Move some classes from 'example/csv' to 'file', and make CSV adapter
depend on File adapter. Copy some other classes (so that we can keep the
'example/csv' versions simple, and add functionality to the 'file'
adapter).

Rename SqlTest to FileAdapterTest, factor utility methods into
FileAdapterTests, and copy in a bunch of tests from example/csv/.../CsvTest.
  • Loading branch information
julianhyde committed Jul 28, 2020
1 parent 03c76a7 commit 9b678f1
Show file tree
Hide file tree
Showing 39 changed files with 1,950 additions and 517 deletions.
1 change: 1 addition & 0 deletions example/csv/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ val sqllineClasspath by configurations.creating {

dependencies {
api(project(":core"))
api(project(":file"))
api(project(":linq4j"))

implementation("com.fasterxml.jackson.core:jackson-core")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.calcite.adapter.csv;

import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.file.CsvEnumerator;
import org.apache.calcite.adapter.file.CsvFieldType;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
Expand All @@ -27,6 +29,7 @@
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.FilterableTable;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Source;

import java.util.List;
Expand All @@ -53,12 +56,12 @@ public Enumerable<Object[]> scan(DataContext root, List<RexNode> filters) {
final List<CsvFieldType> fieldTypes = getFieldTypes(root.getTypeFactory());
final String[] filterValues = new String[fieldTypes.size()];
filters.removeIf(filter -> addFilter(filter, filterValues));
final int[] fields = CsvEnumerator.identityList(fieldTypes.size());
final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
return new CsvEnumerator<>(source, cancelFlag, false, filterValues,
new CsvEnumerator.ArrayRowConverter(fieldTypes, fields));
CsvEnumerator.arrayConverter(fieldTypes, fields, false));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package org.apache.calcite.adapter.csv;

import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.file.CsvEnumerator;
import org.apache.calcite.adapter.file.CsvFieldType;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Source;

import java.util.List;
Expand All @@ -46,12 +49,12 @@ public String toString() {

public Enumerable<Object[]> scan(DataContext root) {
final List<CsvFieldType> fieldTypes = getFieldTypes(root.getTypeFactory());
final int[] fields = CsvEnumerator.identityList(fieldTypes.size());
final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
return new CsvEnumerator<>(source, cancelFlag, false, null,
new CsvEnumerator.ArrayRowConverter(fieldTypes, fields));
CsvEnumerator.arrayConverter(fieldTypes, fields, false));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.calcite.adapter.csv;

import org.apache.calcite.adapter.file.JsonScannableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.util.Source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
*/
@SuppressWarnings("UnusedDeclaration")
public class CsvSchemaFactory implements SchemaFactory {
/** Name of the column that is implicitly created in a CSV stream table
* to hold the data arrival time. */
static final String ROWTIME_COLUMN_NAME = "ROWTIME";

/** Public singleton, per factory contract. */
public static final CsvSchemaFactory INSTANCE = new CsvSchemaFactory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package org.apache.calcite.adapter.csv;

import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.file.CsvEnumerator;
import org.apache.calcite.adapter.file.CsvFieldType;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.StreamableTable;
import org.apache.calcite.schema.Table;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Source;

import java.util.List;
Expand Down Expand Up @@ -52,12 +55,12 @@ public String toString() {

public Enumerable<Object[]> scan(DataContext root) {
final List<CsvFieldType> fieldTypes = getFieldTypes(root.getTypeFactory());
final int[] fields = CsvEnumerator.identityList(fieldTypes.size());
final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
return new CsvEnumerator<>(source, cancelFlag, true, null,
new CsvEnumerator.ArrayRowConverter(fieldTypes, fields, true));
CsvEnumerator.arrayConverter(fieldTypes, fields, true));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.calcite.adapter.csv;

import org.apache.calcite.adapter.file.CsvEnumerator;
import org.apache.calcite.adapter.file.CsvFieldType;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.adapter.file.JsonTable;
import org.apache.calcite.linq4j.tree.Blocks;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.Primitive;
Expand All @@ -44,7 +45,7 @@
/**
* Relational expression representing a scan of a CSV file.
*
* <p>Like any table scan, it serves as a leaf node of a query tree.</p>
* <p>Like any table scan, it serves as a leaf node of a query tree.
*/
public class CsvTableScan extends TableScan implements EnumerableRel {
final CsvTranslatableTable csvTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.calcite.adapter.csv;

import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.file.CsvEnumerator;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
Expand All @@ -30,6 +31,7 @@
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Source;

import java.lang.reflect.Type;
Expand All @@ -49,9 +51,8 @@ public String toString() {
return "CsvTranslatableTable";
}

/** Returns an enumerable over a given projection of the fields.
*
* <p>Called from generated code. */
/** Returns an enumerable over a given projection of the fields. */
@SuppressWarnings("unused") // called from generated code
public Enumerable<Object> project(final DataContext root,
final int[] fields) {
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
Expand All @@ -61,7 +62,7 @@ public Enumerator<Object> enumerator() {
source,
cancelFlag,
getFieldTypes(root.getTypeFactory()),
fields);
ImmutableIntList.of(fields));
}
};
}
Expand Down
4 changes: 3 additions & 1 deletion file/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
*/
dependencies {
api(project(":core"))
api(project(":example:csv"))
api(project(":linq4j"))

implementation("com.google.guava:guava")
implementation("com.joestelmach:natty")
implementation("net.sf.opencsv:opencsv")
implementation("org.apache.calcite.avatica:avatica-core")
implementation("commons-io:commons-io")
implementation("org.apache.commons:commons-lang3")
implementation("org.jsoup:jsoup")

testImplementation(project(":core", "testClasses"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.csv;
package org.apache.calcite.adapter.file;

import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.ImmutableNullableList;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Source;

Expand All @@ -41,9 +43,9 @@
*
* @param <E> Row type
*/
class CsvEnumerator<E> implements Enumerator<E> {
public class CsvEnumerator<E> implements Enumerator<E> {
private final CSVReader reader;
private final String[] filterValues;
private final List<String> filterValues;
private final AtomicBoolean cancelFlag;
private final RowConverter<E> rowConverter;
private E current;
Expand All @@ -60,23 +62,19 @@ class CsvEnumerator<E> implements Enumerator<E> {
FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt);
}

CsvEnumerator(Source source, AtomicBoolean cancelFlag,
List<CsvFieldType> fieldTypes) {
this(source, cancelFlag, fieldTypes, identityList(fieldTypes.size()));
}

CsvEnumerator(Source source, AtomicBoolean cancelFlag,
List<CsvFieldType> fieldTypes, int[] fields) {
public CsvEnumerator(Source source, AtomicBoolean cancelFlag,
List<CsvFieldType> fieldTypes, List<Integer> fields) {
//noinspection unchecked
this(source, cancelFlag, false, null,
(RowConverter<E>) converter(fieldTypes, fields));
}

CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
String[] filterValues, RowConverter<E> rowConverter) {
this.cancelFlag = cancelFlag;
this.rowConverter = rowConverter;
this.filterValues = filterValues;
this.filterValues = filterValues == null ? null
: ImmutableNullableList.copyOf(filterValues);
try {
if (stream) {
this.reader = new CsvStreamReader(source);
Expand All @@ -90,15 +88,20 @@ class CsvEnumerator<E> implements Enumerator<E> {
}

private static RowConverter<?> converter(List<CsvFieldType> fieldTypes,
int[] fields) {
if (fields.length == 1) {
final int field = fields[0];
List<Integer> fields) {
if (fields.size() == 1) {
final int field = fields.get(0);
return new SingleColumnRowConverter(fieldTypes.get(field), field);
} else {
return new ArrayRowConverter(fieldTypes, fields);
return arrayConverter(fieldTypes, fields, false);
}
}

public static RowConverter<Object[]> arrayConverter(
List<CsvFieldType> fieldTypes, List<Integer> fields, boolean stream) {
return new ArrayRowConverter(fieldTypes, fields, stream);
}

/** Deduces the names and types of a table's columns by reading the first line
* of a CSV file. */
static RelDataType deduceRowType(JavaTypeFactory typeFactory, Source source,
Expand All @@ -108,12 +111,12 @@ static RelDataType deduceRowType(JavaTypeFactory typeFactory, Source source,

/** Deduces the names and types of a table's columns by reading the first line
* of a CSV file. */
static RelDataType deduceRowType(JavaTypeFactory typeFactory, Source source,
List<CsvFieldType> fieldTypes, Boolean stream) {
public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
Source source, List<CsvFieldType> fieldTypes, Boolean stream) {
final List<RelDataType> types = new ArrayList<>();
final List<String> names = new ArrayList<>();
if (stream) {
names.add(CsvSchemaFactory.ROWTIME_COLUMN_NAME);
names.add(FileSchemaFactory.ROWTIME_COLUMN_NAME);
types.add(typeFactory.createSqlType(SqlTypeName.TIMESTAMP));
}
try (CSVReader reader = openCsv(source)) {
Expand Down Expand Up @@ -161,7 +164,7 @@ static RelDataType deduceRowType(JavaTypeFactory typeFactory, Source source,
return typeFactory.createStructType(Pair.zip(names, types));
}

public static CSVReader openCsv(Source source) throws IOException {
static CSVReader openCsv(Source source) throws IOException {
Objects.requireNonNull(source, "source");
return new CSVReader(source.reader());
}
Expand Down Expand Up @@ -193,7 +196,7 @@ public boolean moveNext() {
}
if (filterValues != null) {
for (int i = 0; i < strings.length; i++) {
String filterValue = filterValues[i];
String filterValue = filterValues.get(i);
if (filterValue != null) {
if (!filterValue.equals(strings[i])) {
continue outer;
Expand Down Expand Up @@ -222,7 +225,7 @@ public void close() {
}

/** Returns an array of integers {0, ..., n - 1}. */
static int[] identityList(int n) {
public static int[] identityList(int n) {
int[] integers = new int[n];
for (int i = 0; i < n; i++) {
integers[i] = i;
Expand Down Expand Up @@ -315,20 +318,16 @@ protected Object convert(CsvFieldType fieldType, String string) {

/** Array row converter. */
static class ArrayRowConverter extends RowConverter<Object[]> {
private final CsvFieldType[] fieldTypes;
private final int[] fields;
// whether the row to convert is from a stream
/** Field types. List must not be null, but any element may be null. */
private final List<CsvFieldType> fieldTypes;
private final ImmutableIntList fields;
/** Whether the row to convert is from a stream. */
private final boolean stream;

ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields) {
this.fieldTypes = fieldTypes.toArray(new CsvFieldType[0]);
this.fields = fields;
this.stream = false;
}

ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields, boolean stream) {
this.fieldTypes = fieldTypes.toArray(new CsvFieldType[0]);
this.fields = fields;
ArrayRowConverter(List<CsvFieldType> fieldTypes, List<Integer> fields,
boolean stream) {
this.fieldTypes = ImmutableNullableList.copyOf(fieldTypes);
this.fields = ImmutableIntList.copyOf(fields);
this.stream = stream;
}

Expand All @@ -341,27 +340,27 @@ public Object[] convertRow(String[] strings) {
}

public Object[] convertNormalRow(String[] strings) {
final Object[] objects = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
int field = fields[i];
objects[i] = convert(fieldTypes[field], strings[field]);
final Object[] objects = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
int field = fields.get(i);
objects[i] = convert(fieldTypes.get(field), strings[field]);
}
return objects;
}

public Object[] convertStreamRow(String[] strings) {
final Object[] objects = new Object[fields.length + 1];
final Object[] objects = new Object[fields.size() + 1];
objects[0] = System.currentTimeMillis();
for (int i = 0; i < fields.length; i++) {
int field = fields[i];
objects[i + 1] = convert(fieldTypes[field], strings[field]);
for (int i = 0; i < fields.size(); i++) {
int field = fields.get(i);
objects[i + 1] = convert(fieldTypes.get(field), strings[field]);
}
return objects;
}
}

/** Single column row converter. */
private static class SingleColumnRowConverter extends RowConverter {
private static class SingleColumnRowConverter extends RowConverter<Object> {
private final CsvFieldType fieldType;
private final int fieldIndex;

Expand Down
Loading

0 comments on commit 9b678f1

Please sign in to comment.