Skip to content

Commit

Permalink
Local support delta (#347)
Browse files Browse the repository at this point in the history
* support for delta table in server type local added

* Update datacontract.yaml
  • Loading branch information
Happycipher authored Jul 26, 2024
1 parent cb0a9dc commit c74c9aa
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 12 deletions.
28 changes: 16 additions & 12 deletions datacontract/engines/soda/connections/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,28 @@ def get_duckdb_connection(data_contract, server, run: Run):
f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1, columns={columns});"""
)
elif server.format == "delta":
if server.type == "local":
delta_table_arrow = DeltaTable(model_path).to_pyarrow_dataset()
con.register(model_name, delta_table_arrow)

if server.type == "azure":
# After switching to native delta table support
# in https://github.com/datacontract/datacontract-cli/issues/258,
# azure storage should also work
# https://github.com/duckdb/duckdb_delta/issues/21
raise NotImplementedError("Support for Delta Tables on Azure Storage is not implemented yet")

storage_options = {
"AWS_ENDPOINT_URL": server.endpointUrl,
"AWS_ACCESS_KEY_ID": os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID"),
"AWS_SECRET_ACCESS_KEY": os.getenv("DATACONTRACT_S3_SECRET_ACCESS_KEY"),
"AWS_REGION": os.getenv("DATACONTRACT_S3_REGION", "us-east-1"),
"AWS_ALLOW_HTTP": "True" if server.endpointUrl.startswith("http://") else "False",
}

delta_table_arrow = DeltaTable(model_path, storage_options=storage_options).to_pyarrow_dataset()

con.register(model_name, delta_table_arrow)
if server.type == "s3":
storage_options = {
"AWS_ENDPOINT_URL": server.endpointUrl,
"AWS_ACCESS_KEY_ID": os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID"),
"AWS_SECRET_ACCESS_KEY": os.getenv("DATACONTRACT_S3_SECRET_ACCESS_KEY"),
"AWS_REGION": os.getenv("DATACONTRACT_S3_REGION", "us-east-1"),
"AWS_ALLOW_HTTP": "True" if server.endpointUrl.startswith("http://") else "False",
}

delta_table_arrow = DeltaTable(model_path, storage_options=storage_options).to_pyarrow_dataset()

con.register(model_name, delta_table_arrow)
return con


Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"4df5ab31-bc35-478a-a175-bf27fc05d3a4","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"line_item_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"order_id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"sku\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1721891765448,"configuration":{}}}
{"add":{"path":"0-7b7ac87a-16b4-43be-b019-de661a3180cf-0.parquet","partitionValues":{},"size":1414,"modificationTime":1721891765441,"dataChange":true,"stats":"{\"numRecords\": 12, \"minValues\": {\"line_item_id\": \"LI-001\", \"order_id\": 1001, \"sku\": \"SKU-12345\"}, \"maxValues\": {\"line_item_id\": \"LI-012\", \"order_id\": 1008, \"sku\": \"SKU-12356\"}, \"nullCount\": {\"line_item_id\": 0, \"order_id\": 0, \"sku\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1721891765448,"operation":"CREATE TABLE","operationParameters":{"mode":"ErrorIfExists","metadata":"{\"configuration\":{},\"createdTime\":1721891765448,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"4df5ab31-bc35-478a-a175-bf27fc05d3a4\",\"name\":null,\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"line_item_id\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"order_id\\\",\\\"type\\\":\\\"long\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"sku\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}","location":"file:///C:/Users/harsh/OneDrive/Desktop/New%2520folder/data/line_items","protocol":"{\"minReaderVersion\":1,\"minWriterVersion\":2}"},"clientVersion":"delta-rs.0.18.1"}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
{"metaData":{"id":"6ebebefc-604c-4498-939f-58eb7c631c7c","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"order_timestamp\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}},{\"name\":\"order_total\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1721891765294,"configuration":{}}}
{"add":{"path":"0-5014bd96-6666-482e-bec9-d02a43a78cfb-0.parquet","partitionValues":{},"size":1481,"modificationTime":1721891765240,"dataChange":true,"stats":"{\"numRecords\": 8, \"minValues\": {\"order_id\": \"1001\", \"order_timestamp\": \"2024-01-01T10:00:00\", \"order_total\": 2000}, \"maxValues\": {\"order_id\": \"1008\", \"order_timestamp\": \"2024-01-02T11:30:00\", \"order_total\": 12000}, \"nullCount\": {\"order_id\": 0, \"order_timestamp\": 0, \"order_total\": 0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1721891765357,"operation":"CREATE TABLE","operationParameters":{"protocol":"{\"minReaderVersion\":3,\"minWriterVersion\":7,\"readerFeatures\":[\"timestampNtz\"],\"writerFeatures\":[\"timestampNtz\"]}","metadata":"{\"configuration\":{},\"createdTime\":1721891765294,\"description\":null,\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"6ebebefc-604c-4498-939f-58eb7c631c7c\",\"name\":null,\"partitionColumns\":[],\"schemaString\":\"{\\\"type\\\":\\\"struct\\\",\\\"fields\\\":[{\\\"name\\\":\\\"order_id\\\",\\\"type\\\":\\\"string\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"order_timestamp\\\",\\\"type\\\":\\\"timestamp_ntz\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}},{\\\"name\\\":\\\"order_total\\\",\\\"type\\\":\\\"long\\\",\\\"nullable\\\":true,\\\"metadata\\\":{}}]}\"}","location":"file:///C:/Users/harsh/OneDrive/Desktop/New%2520folder/data/orders","mode":"ErrorIfExists"},"clientVersion":"delta-rs.0.18.1"}}
23 changes: 23 additions & 0 deletions tests/fixtures/local-delta/datacontract.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
dataContractSpecification: 0.9.2
id: orders-unit-test
info:
title: Orders Unit Test
version: 1.0.0
servers:
production:
type: local
path: ./fixtures/local-delta/data/orders
format: delta
dataProductId: orders
models:
orders:
fields:
order_id:
type: varchar
unique: true
required: true
order_timestamp:
required: true
order_total:
type: bigint
required: true
67 changes: 67 additions & 0 deletions tests/fixtures/local-delta/helper/create_delta_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os
from deltalake import write_deltalake

import pandas as pd

# Ensure the required directory exists
output_dir = "../data"
if not os.path.exists(output_dir):
os.makedirs(output_dir)

# Sample data for Orders table
orders_data = {
"order_id": ["1001", "1002", "1003", "1004", "1005", "1006", "1007", "1008"],
"order_timestamp": [
"2024-01-01T10:00:00.000Z",
"2024-01-01T11:30:00.000Z",
"2024-01-01T12:45:00.000Z",
"2024-01-02T08:20:00.000Z",
"2024-01-02T09:15:00.000Z",
"2024-01-02T10:05:00.000Z",
"2024-01-02T10:45:00.000Z",
"2024-01-02T11:30:00.000Z",
],
"order_total": [5000, 7500, 3000, 2000, 6500, 12000, 4500, 8000],
}

orders_df = pd.DataFrame(orders_data)
orders_df["order_timestamp"] = pd.to_datetime(orders_df["order_timestamp"], format="%Y-%m-%dT%H:%M:%S.%fZ")

# Sample data for Line Items table
line_items_data = {
"line_item_id": [
"LI-001",
"LI-002",
"LI-003",
"LI-004",
"LI-005",
"LI-006",
"LI-007",
"LI-008",
"LI-009",
"LI-010",
"LI-011",
"LI-012",
],
"order_id": [1001, 1001, 1002, 1004, 1004, 1005, 1005, 1006, 1006, 1007, 1008, 1008],
"sku": [
"SKU-12345",
"SKU-12346",
"SKU-12347",
"SKU-12348",
"SKU-12349",
"SKU-12350",
"SKU-12351",
"SKU-12352",
"SKU-12353",
"SKU-12354",
"SKU-12355",
"SKU-12356",
],
}
line_items_df = pd.DataFrame(line_items_data)
write_deltalake("data/orders", orders_df)
write_deltalake("data/line_items", line_items_df)
# Write to Parquet files
# orders_df.to_parquet(os.path.join(output_dir, "orders.parquet"))
# line_items_df.to_parquet(os.path.join(output_dir, "line_items.parquet"))
29 changes: 29 additions & 0 deletions tests/test_test_delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os

from typer.testing import CliRunner

from datacontract.cli import app
from datacontract.data_contract import DataContract

runner = CliRunner()


def test_valid_cli():
current_file_path = os.path.abspath(__file__)
print("DEBUG Current file path:" + current_file_path)

result = runner.invoke(app, ["test", "./fixtures/local-delta/datacontract.yaml"])
assert result.exit_code == 0
assert "Testing ./fixtures/local-delta/datacontract.yaml" in result.stdout


def test_valid():
data_contract = DataContract(
data_contract_file="fixtures/local-delta/datacontract.yaml",
# publish=True,
)
run = data_contract.test()
print(run.pretty())
assert run.result == "passed"
assert len(run.checks) == 9
assert all(check.result == "passed" for check in run.checks)

0 comments on commit c74c9aa

Please sign in to comment.