Skip to content

Commit d1818b6

Browse files
committed
working prototype
1 parent 481f079 commit d1818b6

32 files changed

+1799
-9
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
/**
21+
* A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this
22+
* interface to specify the bucket/sort columns of the reader/writer, to improve performance.
23+
*/
24+
public interface BucketingSupport {
25+
void setBucketColumns(String[] bucketColumns, int numBuckets);
26+
27+
void setSortColumns(String[] sortColumns);
28+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap;
21+
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
22+
import org.apache.spark.sql.types.StructType;
23+
24+
/**
25+
* The main interface and minimal requirement for data source v2 implementations. Users can mix in
26+
* more interfaces to implement more functions other than just scan.
27+
*/
28+
public interface DataSourceV2 {
29+
30+
/**
31+
* The main entrance for read interface.
32+
*
33+
* @param schema the full schema of this data source reader. Full schema usually maps to the
34+
* physical schema of the underlying storage of this data source reader, e.g.
35+
* parquet files, JDBC tables, etc, while this reader may not read data with full
36+
* schema, as column pruning or other optimizations may happen.
37+
* @param options the options for this data source reader, which is case insensitive.
38+
* @return a reader that implements the actual read logic.
39+
*/
40+
DataSourceV2Reader createReader(
41+
StructType schema,
42+
CaseInsensitiveMap<String> options);
43+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap;
21+
import org.apache.spark.sql.types.StructType;
22+
23+
/**
24+
* A mix in interface for `DataSourceV2`. Users can implement this interface to provide schema
25+
* inference ability when scanning data.
26+
*/
27+
public interface DataSourceV2SchemaProvider {
28+
/**
29+
* Return the inferred schema of this data source given these options.
30+
*/
31+
StructType inferSchema(CaseInsensitiveMap<String> options);
32+
33+
/**
34+
* Whether or not this data source can accept user specified schema. When Spark scans a data
35+
* source, users can specify the schema to avoid expensive schema inference. However some data
36+
* sources may have to infer the schema and reject any user specified schemas, they can overwrite
37+
* this method to achieve this.
38+
*/
39+
default boolean acceptsUserDefinedSchema() {
40+
return true;
41+
}
42+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
/**
21+
* A mix in interface for `DataSourceV2Reader` and `DataSourceV2Writer`. Users can implement this
22+
* interface to specify the partition columns of the reader/writer, to improve performance.
23+
*/
24+
public interface PartitioningSupport {
25+
void setPartitionColumns(String[] partitionColumns);
26+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import org.apache.spark.sql.SaveMode;
21+
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap;
22+
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
23+
24+
/**
25+
* A mix in interface for `DataSourceV2`. Users can implement this interface to provide data writing
26+
* ability with job-level transaction.
27+
*/
28+
public interface WritableDataSourceV2 extends DataSourceV2 {
29+
30+
/**
31+
* The main entrance for write interface.
32+
*
33+
* @param mode the save move, can be append, overwrite, etc.
34+
* @param options the options for this data source writer.
35+
* @return a writer that implements the actual write logic.
36+
*/
37+
DataSourceV2Writer createWriter(SaveMode mode, CaseInsensitiveMap<String> options);
38+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.reader;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.annotation.InterfaceStability;
22+
import org.apache.spark.sql.catalyst.expressions.Expression;
23+
24+
/**
25+
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to push down
26+
* arbitrary expressions as predicates to the data source.
27+
*/
28+
@Experimental
29+
@InterfaceStability.Unstable
30+
public interface CatalystFilterPushDownSupport {
31+
/**
32+
* Push down one filter, returns true if this filter can be pushed down to this data source,
33+
* false otherwise. This method might be called many times if more than one filter need to be
34+
* pushed down.
35+
*
36+
* TODO: we can also make it `Expression[] pushDownCatalystFilters(Expression[] filters)` which
37+
* returns unsupported filters.
38+
*/
39+
boolean pushDownCatalystFilter(Expression filter);
40+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.reader;
19+
20+
import org.apache.spark.sql.types.StructType;
21+
22+
/**
23+
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to only read
24+
* required columns/nested fields during scan.
25+
*/
26+
public interface ColumnPruningSupport {
27+
/**
28+
* Returns true if the implementation can apple this column pruning optimization, so that we can
29+
* reduce the data size to be read at the very beginning.
30+
*/
31+
boolean pruneColumns(StructType requiredSchema);
32+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.reader;
19+
20+
import java.util.List;
21+
22+
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
23+
24+
/**
25+
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to provide
26+
* columnar read ability for better performance.
27+
*/
28+
public interface ColumnarReadSupport {
29+
/**
30+
* Similar to `DataSourceV2Reader.createReadTasks`, but return data in columnar format.
31+
*/
32+
List<ReadTask<ColumnarBatch>> createColumnarReadTasks();
33+
34+
/**
35+
* A safety door for columnar reader. It's possible that the implementation can only support
36+
* columnar reads for some certain columns, users can overwrite this method to fallback to
37+
* normal read path under some conditions.
38+
*/
39+
default boolean supportsColumnarReads() {
40+
return true;
41+
}
42+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.reader;
19+
20+
import java.util.List;
21+
import java.util.stream.Collectors;
22+
23+
import org.apache.spark.annotation.Experimental;
24+
import org.apache.spark.annotation.InterfaceStability;
25+
import org.apache.spark.sql.Row;
26+
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
27+
import org.apache.spark.sql.types.StructType;
28+
29+
/**
30+
* The main interface and minimal requirement for a data source reader. The implementations should
31+
* at least implement the full scan logic, users can mix in more interfaces to implement scan
32+
* optimizations like column pruning, filter push down, etc.
33+
*/
34+
public abstract class DataSourceV2Reader {
35+
36+
/**
37+
* The actual schema of this data source reader, which may be different from the physical schema
38+
* of the underlying storage, as column pruning or other optimizations may happen.
39+
*/
40+
public abstract StructType readSchema();
41+
42+
/**
43+
* The actual read logic should be implemented here. This may not be a full scan as optimizations
44+
* may have already been applied on this reader. Implementations should return a list of
45+
* read tasks, each task is responsible to output data for one RDD partition, which means
46+
* the number of tasks returned here will be same as the number of RDD partitions this scan
47+
* output.
48+
*/
49+
// TODO: maybe we should support arbitrary type and work with Dataset, instead of only Row.
50+
public abstract List<ReadTask<Row>> createReadTasks();
51+
52+
/**
53+
* Inside Spark, the input rows will be converted to `UnsafeRow`s before processing. To avoid
54+
* this conversion, implementations can overwrite this method and output `UnsafeRow`s directly.
55+
* Note that, this is an experimental and unstable interface, as `UnsafeRow` is not public and
56+
* may get changed in future Spark versions.
57+
*/
58+
@Experimental
59+
@InterfaceStability.Unstable
60+
public List<ReadTask<UnsafeRow>> createUnsafeRowReadTasks() {
61+
StructType schema = readSchema();
62+
return createReadTasks().stream()
63+
.map(rowGenerator -> new RowToUnsafeRowReadTask(rowGenerator, schema))
64+
.collect(Collectors.toList());
65+
}
66+
}

0 commit comments

Comments
 (0)