Skip to content

Commit 3bd7200

Browse files
authored
expose table name in proto extension codec (apache#11139)
1 parent 838e0f7 commit 3bd7200

File tree

3 files changed

+33
-6
lines changed

3 files changed

+33
-6
lines changed

datafusion/proto/src/logical_plan/file_formats.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use datafusion::{
2424
},
2525
prelude::SessionContext,
2626
};
27-
use datafusion_common::not_impl_err;
27+
use datafusion_common::{not_impl_err, TableReference};
2828

2929
use super::LogicalExtensionCodec;
3030

@@ -53,6 +53,7 @@ impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
5353
fn try_decode_table_provider(
5454
&self,
5555
_buf: &[u8],
56+
_table_ref: &TableReference,
5657
_schema: arrow::datatypes::SchemaRef,
5758
_ctx: &datafusion::prelude::SessionContext,
5859
) -> datafusion_common::Result<
@@ -63,6 +64,7 @@ impl LogicalExtensionCodec for CsvLogicalExtensionCodec {
6364

6465
fn try_encode_table_provider(
6566
&self,
67+
_table_ref: &TableReference,
6668
_node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
6769
_buf: &mut Vec<u8>,
6870
) -> datafusion_common::Result<()> {
@@ -127,6 +129,7 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
127129
fn try_decode_table_provider(
128130
&self,
129131
_buf: &[u8],
132+
_table_ref: &TableReference,
130133
_schema: arrow::datatypes::SchemaRef,
131134
_ctx: &datafusion::prelude::SessionContext,
132135
) -> datafusion_common::Result<
@@ -137,6 +140,7 @@ impl LogicalExtensionCodec for JsonLogicalExtensionCodec {
137140

138141
fn try_encode_table_provider(
139142
&self,
143+
_table_ref: &TableReference,
140144
_node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
141145
_buf: &mut Vec<u8>,
142146
) -> datafusion_common::Result<()> {
@@ -201,6 +205,7 @@ impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
201205
fn try_decode_table_provider(
202206
&self,
203207
_buf: &[u8],
208+
_table_ref: &TableReference,
204209
_schema: arrow::datatypes::SchemaRef,
205210
_ctx: &datafusion::prelude::SessionContext,
206211
) -> datafusion_common::Result<
@@ -211,6 +216,7 @@ impl LogicalExtensionCodec for ParquetLogicalExtensionCodec {
211216

212217
fn try_encode_table_provider(
213218
&self,
219+
_table_ref: &TableReference,
214220
_node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
215221
_buf: &mut Vec<u8>,
216222
) -> datafusion_common::Result<()> {
@@ -275,6 +281,7 @@ impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
275281
fn try_decode_table_provider(
276282
&self,
277283
_buf: &[u8],
284+
_table_ref: &TableReference,
278285
_schema: arrow::datatypes::SchemaRef,
279286
_ctx: &datafusion::prelude::SessionContext,
280287
) -> datafusion_common::Result<
@@ -285,6 +292,7 @@ impl LogicalExtensionCodec for ArrowLogicalExtensionCodec {
285292

286293
fn try_encode_table_provider(
287294
&self,
295+
_table_ref: &TableReference,
288296
_node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
289297
_buf: &mut Vec<u8>,
290298
) -> datafusion_common::Result<()> {
@@ -349,6 +357,7 @@ impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
349357
fn try_decode_table_provider(
350358
&self,
351359
_buf: &[u8],
360+
_table_ref: &TableReference,
352361
_schema: arrow::datatypes::SchemaRef,
353362
_cts: &datafusion::prelude::SessionContext,
354363
) -> datafusion_common::Result<
@@ -359,6 +368,7 @@ impl LogicalExtensionCodec for AvroLogicalExtensionCodec {
359368

360369
fn try_encode_table_provider(
361370
&self,
371+
_table_ref: &TableReference,
362372
_node: std::sync::Arc<dyn datafusion::datasource::TableProvider>,
363373
_buf: &mut Vec<u8>,
364374
) -> datafusion_common::Result<()> {

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,14 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
109109
fn try_decode_table_provider(
110110
&self,
111111
buf: &[u8],
112+
table_ref: &TableReference,
112113
schema: SchemaRef,
113114
ctx: &SessionContext,
114115
) -> Result<Arc<dyn TableProvider>>;
115116

116117
fn try_encode_table_provider(
117118
&self,
119+
table_ref: &TableReference,
118120
node: Arc<dyn TableProvider>,
119121
buf: &mut Vec<u8>,
120122
) -> Result<()>;
@@ -164,6 +166,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
164166
fn try_decode_table_provider(
165167
&self,
166168
_buf: &[u8],
169+
_table_ref: &TableReference,
167170
_schema: SchemaRef,
168171
_ctx: &SessionContext,
169172
) -> Result<Arc<dyn TableProvider>> {
@@ -172,6 +175,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
172175

173176
fn try_encode_table_provider(
174177
&self,
178+
_table_ref: &TableReference,
175179
_node: Arc<dyn TableProvider>,
176180
_buf: &mut Vec<u8>,
177181
) -> Result<()> {
@@ -445,15 +449,17 @@ impl AsLogicalPlan for LogicalPlanNode {
445449
.iter()
446450
.map(|expr| from_proto::parse_expr(expr, ctx, extension_codec))
447451
.collect::<Result<Vec<_>, _>>()?;
452+
453+
let table_name =
454+
from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
455+
448456
let provider = extension_codec.try_decode_table_provider(
449457
&scan.custom_table_data,
458+
&table_name,
450459
schema,
451460
ctx,
452461
)?;
453462

454-
let table_name =
455-
from_table_reference(scan.table_name.as_ref(), "CustomScan")?;
456-
457463
LogicalPlanBuilder::scan_with_filters(
458464
table_name,
459465
provider_as_source(provider),
@@ -1048,7 +1054,7 @@ impl AsLogicalPlan for LogicalPlanNode {
10481054
} else {
10491055
let mut bytes = vec![];
10501056
extension_codec
1051-
.try_encode_table_provider(provider, &mut bytes)
1057+
.try_encode_table_provider(table_name, provider, &mut bytes)
10521058
.map_err(|e| context!("Error serializing custom table", e))?;
10531059
let scan = CustomScan(CustomTableScanNode {
10541060
table_name: Some(table_name.clone().into()),

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use datafusion_common::config::TableOptions;
5252
use datafusion_common::scalar::ScalarStructBuilder;
5353
use datafusion_common::{
5454
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, DFSchemaRef,
55-
DataFusionError, Result, ScalarValue,
55+
DataFusionError, Result, ScalarValue, TableReference,
5656
};
5757
use datafusion_expr::dml::CopyTo;
5858
use datafusion_expr::expr::{
@@ -134,6 +134,9 @@ pub struct TestTableProto {
134134
/// URL of the table root
135135
#[prost(string, tag = "1")]
136136
pub url: String,
137+
/// Qualified table name
138+
#[prost(string, tag = "2")]
139+
pub table_name: String,
137140
}
138141

139142
#[derive(Debug)]
@@ -156,12 +159,14 @@ impl LogicalExtensionCodec for TestTableProviderCodec {
156159
fn try_decode_table_provider(
157160
&self,
158161
buf: &[u8],
162+
table_ref: &TableReference,
159163
schema: SchemaRef,
160164
_ctx: &SessionContext,
161165
) -> Result<Arc<dyn TableProvider>> {
162166
let msg = TestTableProto::decode(buf).map_err(|_| {
163167
DataFusionError::Internal("Error decoding test table".to_string())
164168
})?;
169+
assert_eq!(msg.table_name, table_ref.to_string());
165170
let provider = TestTableProvider {
166171
url: msg.url,
167172
schema,
@@ -171,6 +176,7 @@ impl LogicalExtensionCodec for TestTableProviderCodec {
171176

172177
fn try_encode_table_provider(
173178
&self,
179+
table_ref: &TableReference,
174180
node: Arc<dyn TableProvider>,
175181
buf: &mut Vec<u8>,
176182
) -> Result<()> {
@@ -181,6 +187,7 @@ impl LogicalExtensionCodec for TestTableProviderCodec {
181187
.expect("Can't encode non-test tables");
182188
let msg = TestTableProto {
183189
url: table.url.clone(),
190+
table_name: table_ref.to_string(),
184191
};
185192
msg.encode(buf).map_err(|_| {
186193
DataFusionError::Internal("Error encoding test table".to_string())
@@ -866,6 +873,7 @@ impl LogicalExtensionCodec for TopKExtensionCodec {
866873
fn try_decode_table_provider(
867874
&self,
868875
_buf: &[u8],
876+
_table_ref: &TableReference,
869877
_schema: SchemaRef,
870878
_ctx: &SessionContext,
871879
) -> Result<Arc<dyn TableProvider>> {
@@ -874,6 +882,7 @@ impl LogicalExtensionCodec for TopKExtensionCodec {
874882

875883
fn try_encode_table_provider(
876884
&self,
885+
_table_ref: &TableReference,
877886
_node: Arc<dyn TableProvider>,
878887
_buf: &mut Vec<u8>,
879888
) -> Result<()> {
@@ -943,6 +952,7 @@ impl LogicalExtensionCodec for ScalarUDFExtensionCodec {
943952
fn try_decode_table_provider(
944953
&self,
945954
_buf: &[u8],
955+
_table_ref: &TableReference,
946956
_schema: SchemaRef,
947957
_ctx: &SessionContext,
948958
) -> Result<Arc<dyn TableProvider>> {
@@ -951,6 +961,7 @@ impl LogicalExtensionCodec for ScalarUDFExtensionCodec {
951961

952962
fn try_encode_table_provider(
953963
&self,
964+
_table_ref: &TableReference,
954965
_node: Arc<dyn TableProvider>,
955966
_buf: &mut Vec<u8>,
956967
) -> Result<()> {

0 commit comments

Comments
 (0)