Skip to content

Conversation

@haseebmalik18
Copy link
Contributor

Adds direct GCS export capability to DatabricksSqlOperator with Parquet and Avro format support.

closes: #55128

Changes

  • Added parquet and avro to supported output_format values
  • Added GCS URI support (gs://bucket/path) in output_path parameter
  • New parameters: gcp_conn_id, gcs_impersonation_chain
  • Added optional [gcs] dependency for Google provider

@haseebmalik18 haseebmalik18 changed the title Add direct GCS export to DatabricksSqlOperator with Parquet/Avro supp… Add direct GCS export to DatabricksSqlOperator with Parquet/Avro support Jan 14, 2026
@haseebmalik18 haseebmalik18 force-pushed the feature/databricks-gcs-export-55128 branch from bad5f46 to 2a920d0 Compare January 14, 2026 22:16
@haseebmalik18 haseebmalik18 force-pushed the feature/databricks-gcs-export-55128 branch from 2a920d0 to 77a6ba5 Compare January 14, 2026 22:42
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Thanks for the PR, LGTM overall.

@jason810496
Copy link
Member

After this PR, the Databricks provider will depend on the GCP provider. Eventually, the Databricks provider will depend on all three cloud providers (AWS, Azure, and GCP), right?

https://www.databricks.com/company/partners/cloud-partners

@jason810496
Copy link
Member

I'm wondering we could move this kind of common serialization logic to common.io package In the long term. There are similar utils for serializing different output file format in both GCS and S3 Operators:

def _configure_csv_file(self, file_handle, schema):
"""Configure a csv writer with the file_handle and write schema as headers for the new file."""
csv_writer = csv.writer(file_handle, delimiter=self.field_delimiter)
csv_writer.writerow(schema)
return csv_writer
def _configure_parquet_file(self, file_handle, parquet_schema) -> pq.ParquetWriter:
parquet_writer = pq.ParquetWriter(file_handle.name, parquet_schema)
return parquet_writer

for group_name, df in self._partition_dataframe(df=data_df):
buf = io.BytesIO()
self.log.info("Writing data to in-memory buffer")
clean_key = self._strip_suffixes(self.s3_key)
object_key = (
f"{clean_key}_{group_name}{file_options.suffix}"
if group_name
else f"{clean_key}{file_options.suffix}"
)
if self.file_format != FILE_FORMAT.PARQUET and self.df_kwargs.get("compression") == "gzip":
object_key += ".gz"
df_kwargs = {k: v for k, v in self.df_kwargs.items() if k != "compression"}
with gzip.GzipFile(fileobj=buf, mode="wb", filename=object_key) as gz:
getattr(df, file_options.function)(gz, **df_kwargs)
else:
if self.file_format == FILE_FORMAT.PARQUET:
getattr(df, file_options.function)(buf, **self.df_kwargs)
else:
text_buf = io.TextIOWrapper(buf, encoding="utf-8", write_through=True)
getattr(df, file_options.function)(text_buf, **self.df_kwargs)
text_buf.flush()

@haseebmalik18 haseebmalik18 force-pushed the feature/databricks-gcs-export-55128 branch from 77a6ba5 to 27b1e3f Compare January 15, 2026 05:40
@haseebmalik18
Copy link
Contributor Author

I'm wondering we could move this kind of common serialization logic to common.io package In the long term. There are similar utils for serializing different output file format in both GCS and S3 Operators:

def _configure_csv_file(self, file_handle, schema):
"""Configure a csv writer with the file_handle and write schema as headers for the new file."""
csv_writer = csv.writer(file_handle, delimiter=self.field_delimiter)
csv_writer.writerow(schema)
return csv_writer
def _configure_parquet_file(self, file_handle, parquet_schema) -> pq.ParquetWriter:
parquet_writer = pq.ParquetWriter(file_handle.name, parquet_schema)
return parquet_writer

for group_name, df in self._partition_dataframe(df=data_df):
buf = io.BytesIO()
self.log.info("Writing data to in-memory buffer")
clean_key = self._strip_suffixes(self.s3_key)
object_key = (
f"{clean_key}_{group_name}{file_options.suffix}"
if group_name
else f"{clean_key}{file_options.suffix}"
)
if self.file_format != FILE_FORMAT.PARQUET and self.df_kwargs.get("compression") == "gzip":
object_key += ".gz"
df_kwargs = {k: v for k, v in self.df_kwargs.items() if k != "compression"}
with gzip.GzipFile(fileobj=buf, mode="wb", filename=object_key) as gz:
getattr(df, file_options.function)(gz, **df_kwargs)
else:
if self.file_format == FILE_FORMAT.PARQUET:
getattr(df, file_options.function)(buf, **self.df_kwargs)
else:
text_buf = io.TextIOWrapper(buf, encoding="utf-8", write_through=True)
getattr(df, file_options.function)(text_buf, **self.df_kwargs)
text_buf.flush()

Yes, I agree I think this can be possibly be opened up as another issue?

@haseebmalik18
Copy link
Contributor Author

After this PR, the Databricks provider will depend on the GCP provider. Eventually, the Databricks provider will depend on all three cloud providers (AWS, Azure, and GCP), right?

https://www.databricks.com/company/partners/cloud-partners

Yep, that's right

@haseebmalik18
Copy link
Contributor Author

I'm not quite sure why the Docker build test is failing. The errors show Microsoft's apt repository returning 403 Forbidden during apt-get update, which appears unrelated to the Databricks provider changes

@jscheffl
Copy link
Contributor

Restarted, was a problem in the backend. Assuming CI will turn green in a moment.

@jscheffl jscheffl requested a review from jason810496 January 15, 2026 21:07
Copy link
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree I think this can be possibly be opened up as another issue?

Yes, it's non-blocking. We could just create issue to track it as follow-up.

@haseebmalik18 haseebmalik18 force-pushed the feature/databricks-gcs-export-55128 branch from 27b1e3f to 9555bd1 Compare January 16, 2026 20:22
@jason810496 jason810496 merged commit b100b5e into apache:main Jan 17, 2026
129 checks passed
@eladkal
Copy link
Contributor

eladkal commented Jan 17, 2026

I'm getting build failure on #60719
related to this PR

  Resolved 940 packages in 4.00s
     Building apache-airflow-providers-apache-hdfs @ file:///opt/airflow/providers/apache/hdfs
     Building fastavro==1.9.4
        Built apache-airflow-providers-apache-hdfs @ file:///opt/airflow/providers/apache/hdfs
    × Failed to build `fastavro==1.9.4`
    
    help: `fastavro` (v1.9.4) was included because
          `apache-airflow-providers-apache-hdfs` (v4.11.2) depends on
          `hdfs[dataframe]` (v2.7.3) which depends on `fastavro`

https://github.com/apache/airflow/actions/runs/21098174150/job/60678759100?pr=60719#step:8:1265

@haseebmalik18
Copy link
Contributor Author

haseebmalik18 commented Jan 17, 2026

@jason810496 For the build issue Elad mentioned, the error shows fastavro 1.9.4 uses deprecated C APIs that were removed in Python 3.13. Would bumping fastavro up to >=1.10.0 work?

@jscheffl
Copy link
Contributor

Attempted fix PR: #60732

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add direct GCS export to DatabricksSqlOperator with Parquet/Avro support

4 participants