-
Notifications
You must be signed in to change notification settings - Fork 2
Prototype: Data Source V2 #10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2; | ||
|
||
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; | ||
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader; | ||
import org.apache.spark.sql.types.StructType; | ||
|
||
/** | ||
* The main interface and minimal requirement for data source v2 implementations. Users can mix in | ||
* more interfaces to implement more functions other than just scan. | ||
*/ | ||
public interface DataSourceV2 { | ||
|
||
/** | ||
* The main entrance for read interface. | ||
* | ||
* @param schema the full schema of this data source reader. Full schema usually maps to the | ||
* physical schema of the underlying storage of this data source reader, e.g. | ||
* parquet files, JDBC tables, etc, while this reader may not read data with full | ||
* schema, as column pruning or other optimizations may happen. | ||
* @param options the options for this data source reader, which is case insensitive. | ||
* @return a reader that implements the actual read logic. | ||
*/ | ||
DataSourceV2Reader createReader( | ||
StructType schema, | ||
CaseInsensitiveMap<String> options); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe not a bit deal at this stage but I wonder if this should be just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't use a Map. It's a huge disaster in v1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There might also be data sources that expects case-sensitive key names, I think it will be good idea to pass options as user specified and let the data-source implementation handle the options as appropriate for the data source. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has bitten me in design. Not realizing all the options come through lowercased There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't do that (let the source handle case sensitivity). It will be the most confusing thing to the end user. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rxin, what about using an option map was a disaster in v1? For case sensitivity, what happens elsewhere? I didn't think that options were case insensitive in Spark. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some sources interpret the options as case sensitive, and some as case insensitive. It's really confusing to the end users. On top of that, Scala Map has really bad binary compatibility across Scala versions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, that makes sense. What are you proposing instead of a map? A configuration object that handles resolution internally? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2; | ||
|
||
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; | ||
import org.apache.spark.sql.types.StructType; | ||
|
||
/** | ||
* A mix in interface for `DataSourceV2`. Users can implement this interface to provide schema | ||
* inference ability when scanning data. | ||
*/ | ||
public interface DataSourceV2SchemaProvider { | ||
/** | ||
* Return the inferred schema of this data source given these options. | ||
*/ | ||
StructType inferSchema(CaseInsensitiveMap<String> options); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should have a better name. Data sources with a metadata store won't infer anything, this should come from the table's metadata. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd use a different name other than "get", since get is usually very cheap, whereas here it can potentially be very expensive. Maybe computeSchema ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point about get. If we add an interface to signal schema-on-read behavior, we could also move |
||
|
||
/** | ||
* Whether or not this data source can accept user specified schema. When Spark scans a data | ||
* source, users can specify the schema to avoid expensive schema inference. However some data | ||
* sources may have to infer the schema and reject any user specified schemas, they can overwrite | ||
* this method to achieve this. | ||
*/ | ||
default boolean acceptsUserDefinedSchema() { | ||
return true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why a boolean method instead of a UserDefinedSchema interface like the other traits? Also, doesn't the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I explained in the doc, we have 3 requirements for the schema inference feature:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left a longer comment above. I didn't get what you were doing with these interfaces at first since I think of most data sources having a schema instead of being schema-on-read. |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2; | ||
|
||
import org.apache.spark.sql.SaveMode; | ||
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; | ||
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer; | ||
|
||
/** | ||
* A mix in interface for `DataSourceV2`. Users can implement this interface to provide data writing | ||
* ability with job-level transaction. | ||
*/ | ||
public interface WritableDataSourceV2 extends DataSourceV2 { | ||
|
||
/** | ||
* The main entrance for write interface. | ||
* | ||
* @param mode the save move, can be append, overwrite, etc. | ||
* @param options the options for this data source writer. | ||
* @return a writer that implements the actual write logic. | ||
*/ | ||
DataSourceV2Writer createWriter(SaveMode mode, CaseInsensitiveMap<String> options); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2.reader; | ||
|
||
import org.apache.spark.annotation.Experimental; | ||
import org.apache.spark.annotation.InterfaceStability; | ||
import org.apache.spark.sql.catalyst.expressions.Expression; | ||
|
||
/** | ||
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to push down | ||
* arbitrary expressions as predicates to the data source. | ||
*/ | ||
@Experimental | ||
@InterfaceStability.Unstable | ||
public interface CatalystFilterPushDownSupport { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. boolean pushDownCatalystFilter(Expression filter); This interface is very nice. Just wondering, for data source to implement this method , is it ok for implementation to access sub types of Expression in Spark , for example functions like Abs ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. implementations can pattern match the given expression and access any subtype you like. |
||
/** | ||
* Push down filters, returns unsupported filters. | ||
*/ | ||
Expression[] pushDownCatalystFilters(Expression[] filters); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2.reader; | ||
|
||
import org.apache.spark.sql.types.StructType; | ||
|
||
/** | ||
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to only read | ||
* required columns/nested fields during scan. | ||
*/ | ||
public interface ColumnPruningSupport { | ||
/** | ||
* Returns true if the implementation can apple this column pruning optimization, so that we can | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: should be "apply" not "apple" |
||
* reduce the data size to be read at the very beginning. | ||
*/ | ||
boolean pruneColumns(StructType requiredSchema); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why allow the implementation to reject the required schema? If the data source supports column pruning, then it should throw an exception if the columns passed by this method are invalid (with more context). Otherwise, it should do the pruning. The only case where I think this might be valid is if there are some columns that can't be pruned, in which case it should return the actual schema that will be produced instead of rejecting by returning false. But then, the actual read schema is accessible from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agreed here! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually I don't have a use case for rejecting the required schema, just wanna follow other push down interface to return a boolean. I'm fine to return |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2.reader; | ||
|
||
import java.util.List; | ||
|
||
import org.apache.spark.sql.execution.vectorized.ColumnarBatch; | ||
|
||
/** | ||
* A mix in interface for `DataSourceV2Reader`. Users can implement this interface to provide | ||
* columnar read ability for better performance. | ||
*/ | ||
public interface ColumnarReadSupport { | ||
/** | ||
* Similar to `DataSourceV2Reader.createReadTasks`, but return data in columnar format. | ||
*/ | ||
List<ReadTask<ColumnarBatch>> createColumnarReadTasks(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means that I'd have to implement the I suppose this is a mixin because of the below method. It's unclear to me that you actually want to implement both of them as a user (even if you can support columnar in some cases) - I'd prefer to implement one and wrap it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a data source may not be able to support columnar read for all columns, so I think they should always implement the row-based interface as a fallback. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but if I can support columnar read for all columns, then why make me implement the row based interface? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then you can throw exception in the row based interface, I'll put it in the java doc. |
||
|
||
/** | ||
* A safety door for columnar reader. It's possible that the implementation can only support | ||
* columnar reads for some certain columns, users can overwrite this method to fallback to | ||
* normal read path under some conditions. | ||
* | ||
* Note that, if the implementation always return true here, then he can throw exception in | ||
* the row based `DataSourceV2Reader.createReadTasks`, as it will never be called. | ||
*/ | ||
default boolean supportsColumnarReads() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a bit strange - shouldn't this return value sometimes depend on schema? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the schema is a state of the reader, so when a reader mix-in this interface, it should know what the current schema is, after column pruning or something. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok so this is only called after all push downs are done? we should specify that. |
||
return true; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2.reader; | ||
|
||
import java.io.Closeable; | ||
import java.util.Iterator; | ||
|
||
/** | ||
* A data reader returned by a read task and is responsible for outputting data for an RDD | ||
* partition. | ||
*/ | ||
public interface DataReader<T> extends Iterator<T>, Closeable {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this an "Iterator"? Don't do this ... Use explicit next(), with close(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it makes sense to use something like |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.sources.v2.reader; | ||
|
||
import java.io.IOException; | ||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
import org.apache.spark.annotation.Experimental; | ||
import org.apache.spark.annotation.InterfaceStability; | ||
import org.apache.spark.sql.Row; | ||
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; | ||
import org.apache.spark.sql.catalyst.encoders.RowEncoder; | ||
import org.apache.spark.sql.catalyst.expressions.UnsafeRow; | ||
import org.apache.spark.sql.types.StructType; | ||
|
||
/** | ||
* The main interface and minimal requirement for a data source reader. The implementations should | ||
* at least implement the full scan logic, users can mix in more interfaces to implement scan | ||
* optimizations like column pruning, filter push down, etc. | ||
*/ | ||
public abstract class DataSourceV2Reader { | ||
|
||
/** | ||
* The actual schema of this data source reader, which may be different from the physical schema | ||
* of the underlying storage, as column pruning or other optimizations may happen. | ||
*/ | ||
public abstract StructType readSchema(); | ||
|
||
/** | ||
* The actual read logic should be implemented here. This may not be a full scan as optimizations | ||
* may have already been applied on this reader. Implementations should return a list of | ||
* read tasks, each task is responsible to output data for one RDD partition, which means | ||
* the number of tasks returned here will be same as the number of RDD partitions this scan | ||
* output. | ||
*/ | ||
// TODO: maybe we should support arbitrary type and work with Dataset, instead of only Row. | ||
protected abstract List<ReadTask<Row>> createReadTasks(); | ||
|
||
/** | ||
* Inside Spark, the input rows will be converted to `UnsafeRow`s before processing. To avoid | ||
* this conversion, implementations can overwrite this method and output `UnsafeRow`s directly. | ||
* Note that, this is an experimental and unstable interface, as `UnsafeRow` is not public and | ||
* may get changed in future Spark versions. | ||
* | ||
* Note that, if the implement overwrites this method, he should also overwrite `createReadTasks` | ||
* to throw exception, as it will never be called. | ||
*/ | ||
@Experimental | ||
@InterfaceStability.Unstable | ||
public List<ReadTask<UnsafeRow>> createUnsafeRowReadTasks() { | ||
StructType schema = readSchema(); | ||
return createReadTasks().stream() | ||
.map(rowGenerator -> new RowToUnsafeRowReadTask(rowGenerator, schema)) | ||
.collect(Collectors.toList()); | ||
} | ||
} | ||
|
||
class RowToUnsafeRowReadTask implements ReadTask<UnsafeRow> { | ||
private final ReadTask<Row> rowReadTask; | ||
private final StructType schema; | ||
|
||
RowToUnsafeRowReadTask(ReadTask<Row> rowReadTask, StructType schema) { | ||
this.rowReadTask = rowReadTask; | ||
this.schema = schema; | ||
} | ||
|
||
@Override | ||
public String[] preferredLocations() { | ||
return rowReadTask.preferredLocations(); | ||
} | ||
|
||
@Override | ||
public DataReader<UnsafeRow> getReader() { | ||
return new RowToUnsafeDataReader(rowReadTask.getReader(), RowEncoder.apply(schema)); | ||
} | ||
} | ||
|
||
class RowToUnsafeDataReader implements DataReader<UnsafeRow> { | ||
private final DataReader<Row> rowReader; | ||
private final ExpressionEncoder<Row> encoder; | ||
|
||
RowToUnsafeDataReader(DataReader<Row> rowReader, ExpressionEncoder<Row> encoder) { | ||
this.rowReader = rowReader; | ||
this.encoder = encoder; | ||
} | ||
|
||
@Override | ||
public boolean hasNext() { | ||
return rowReader.hasNext(); | ||
} | ||
|
||
@Override | ||
public UnsafeRow next() { | ||
return (UnsafeRow) encoder.toRow(rowReader.next()); | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
rowReader.close(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the schema should be handled a little differently. We should assume that a data source has a schema that applies to all of its data, instead of passing one in. That's the case with most datasets, like Hive tables, relational tables, and Cassandra tables. I'd also argue that it is a best practice so we don't have expensive operations to infer the schema, like we do for non-Metastore tables.
So instead of passing the full schema here and having an interface to infer a schema, the data source should be expected to report its schema while analyzing the query (which may require inference). Then the schema passed to create a reader should be the expected schema, or projection schema after the query is optimized.
Also, I'd like to understand how column IDs should be handled. I'm building a data source that uses column IDs to implement schema evolution that supports, add, delete, and rename. The column IDs for a table are only unique within that table (another table can reuse them), so it doesn't necessarily make sense to expose them to Spark. Spark will still need its own unique attribute IDs. But, I'd prefer to have Spark request columns by ID rather than by name so that Spark handles name resolution between a query and the data source.
I think what makes sense is for the data source to create a Spark schema with attributes that Spark will use in the projection schema. So if my data source has columns
0:x
and1:y
, I create a Spark schema with those columns and they are assigned Spark attribute IDs in the process, sayx#67
andy#68
. When Spark requests a reader, the projection schema uses those attributes, which I can then map back to internal IDs. Spark should callcreateReader
with a schema containingx#67
, notx#104
. Sound reasonable?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm +1 on it if we can rewrite the data source part from scratch...
But now, since users can specify a schema in
DataFrameReader
, which means the data source should be able to take a user specified schema.For the column id stuff, I don't totally understand it. Attribute id is internal to Spark, Spark only uses column names when interacting with data sources, that's why we only use
StructType
in various APIs.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue
"We should assume that a data source has a schema that applies to all of its data, instead of passing one in. That's the case with most datasets, like Hive tables, relational tables, and Cassandra tables."
This is not true though. Hive table, for example, the schema is specified in the catalog, which means it has to be passed into the underlying file-based data source. The file-based data source itself is actually not dictating the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think what I didn't understand is that the datasource API is between the metastore and the files in a table. So the metastore is queried to get a schema, then the data files are treated as schema-on-read. But why should that be the case? I would rather be able to have Hive tables entirely behind the DataSource API, so they can be handled the same way as a JDBC table or another source with a pre-determined schema for its data.
With that missing context, I can see how the interface proposed here looks designed for schema-on-read, rather than for tables that already have a fixed schema: it assumes that the data source will commonly use an externally provided schema, can't necessarily infer one (e.g. JSON), and the projection is separate from the data source schema. I think it's reasonable to consider whether this API should be based on schema-on-read or should assume that the data source has a schema along the lines of what I was suggesting before.
The proposal doc lists 3 cases: an external schema is required, schema is fixed and a user schema is not allowed, and a user schema is optional because it can be inferred.
For sources with fixed schemas,
DataSourceV2SchemaProvider#inferSchema
is required, but that also pulls inacceptsUserDefinedSchema
that defaults to true. A fixed-schema source also has to implementcreateReader
with a "full schema of the data source" passed in. Those two imply that fixed-schema sources should support schema-on-read, but I think that's a bad idea. The problem is similar to why we don't want to pass a map of options: it makes the source implementation decide how to reconcile the user-defined schema with the source schema, and that probably won't be consistent across sources. I think that's a good reason to separate out the methods needed for schema-on-read to a different interface.What about a
SchemaOnRead
interface that can be used to set the schema instead of passing it to thecreateReader
method? ThenacceptsUserDefinedSchema
can be dropped from the provider interface because it is signalled bySchemaOnRead
. (I'd also renameinferSchema
togetSchema
.) Then,createReader
here would drop the schema argument. We could also have acreateReader
with a schema argument that passes the requested projection; that's where I would expect to pass the requested schema.For the other two cases, the proposed API works okay. It's a little strange that those that require a user schema won't implement
DataSourceV2SchemaProvider
, which is what signals that a user-defined schema is accepted even though one can be passed to any data source. ASchemaOnRead
interface would be slightly better for the required case as well by cleaning this up.