Skip to content

Commit

Permalink
Initial support for Python UDFs (#736)
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored Sep 9, 2024
1 parent 4741624 commit 099f851
Show file tree
Hide file tree
Showing 35 changed files with 1,433 additions and 141 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ jobs:
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
override: true
- name: Check Formatting
run: cargo fmt -- --check
- uses: actions/setup-python@v5
name: Setup Python
with:
python-version: '3.12'
- name: Setup pnpm
uses: pnpm/action-setup@v4
with:
Expand Down
106 changes: 106 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"crates/arroyo-udf/arroyo-udf-plugin",
"crates/arroyo-udf/arroyo-udf-host",
"crates/arroyo-udf/arroyo-udf-macros",
"crates/arroyo-udf/arroyo-udf-python",
"crates/arroyo-worker",
"crates/copy-artifacts",
"crates/integ",
Expand Down Expand Up @@ -76,4 +77,4 @@ datafusion-physical-expr = {git = 'https://github.com/ArroyoSystems/arrow-datafu
datafusion-physical-plan = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
datafusion-proto = {git = 'https://github.com/ArroyoSystems/arrow-datafusion', branch = '40.0.0/arroyo'}
cornucopia_async = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
cornucopia = { git = "https://github.com/ArroyoSystems/cornucopia", branch = "sqlite" }
1 change: 1 addition & 0 deletions crates/arroyo-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ arroyo-datastream = { path = "../arroyo-datastream" }
arroyo-state = { path = "../arroyo-state" }
arroyo-formats = { path = "../arroyo-formats" }
arroyo-udf-host = { path = "../arroyo-udf/arroyo-udf-host" }
arroyo-udf-python = { path = "../arroyo-udf/arroyo-udf-python" }

tonic = { workspace = true }
tonic-reflection = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/arroyo-api/migrations/V24__add_udf_language.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE udfs ADD COLUMN language VARCHAR(15) NOT NULL DEFAULT 'rust';
ALTER TABLE udfs ALTER COLUMN dylib_url DROP NOT NULL;
ALTER TABLE udfs ALTER COLUMN dylib_url DROP DEFAULT;
17 changes: 9 additions & 8 deletions crates/arroyo-api/queries/api_queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -281,26 +281,27 @@ WHERE job_configs.organization_id = :organization_id AND job_configs.id = :job_i
ORDER BY jlm.created_at DESC
LIMIT cast(:limit as integer);

----------- udfs -----------------------

--: DbUdf (description?)
----------- udfs -----------------------

--! create_udf
INSERT INTO udfs (pub_id, organization_id, created_by, prefix, name, definition, description, dylib_url)
VALUES (:pub_id, :organization_id, :created_by, :prefix, :name, :definition, :description, :dylib_url);
--: DbUdf (description?, dylib_url?)

--! create_udf (dylib_url?)
INSERT INTO udfs (pub_id, organization_id, created_by, prefix, name, language, definition, description, dylib_url)
VALUES (:pub_id, :organization_id, :created_by, :prefix, :name, :language, :definition, :description, :dylib_url);

--! get_udf: DbUdf
SELECT pub_id, prefix, name, definition, created_at, updated_at, description, dylib_url
SELECT pub_id, prefix, name, language, definition, created_at, updated_at, description, dylib_url
FROM udfs
WHERE organization_id = :organization_id AND pub_id = :pub_id;

--! get_udf_by_name: DbUdf
SELECT pub_id, prefix, name, definition, created_at, updated_at, description, dylib_url
SELECT pub_id, prefix, name, language, definition, created_at, updated_at, description, dylib_url
FROM udfs
WHERE organization_id = :organization_id AND name = :name;

--! get_udfs: DbUdf
SELECT pub_id, prefix, name, definition, created_at, updated_at, description, dylib_url
SELECT pub_id, prefix, name, language, definition, created_at, updated_at, description, dylib_url
FROM udfs
WHERE organization_id = :organization_id;

Expand Down
27 changes: 27 additions & 0 deletions crates/arroyo-api/sqlite_migrations/V2__add_udf_language.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
CREATE TABLE udfs_new (
pub_id TEXT PRIMARY KEY,
organization_id TEXT NOT NULL,
created_by TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
prefix TEXT,
name TEXT NOT NULL,
definition TEXT NOT NULL,
description TEXT,
dylib_url TEXT,
language VARCHAR(15) NOT NULL DEFAULT 'rust',
UNIQUE (organization_id, name)
);

-- Copy data from the old table to the nfew table
INSERT INTO udfs_new (pub_id, organization_id, created_by, created_at, updated_at,
prefix, name, definition, description, dylib_url, language)
SELECT pub_id, organization_id, created_by, created_at, updated_at,
prefix, name, definition, description, dylib_url, 'rust'
from udfs;

-- Drop the old table
DROP TABLE udfs;

-- Rename the new table to the old table name
ALTER TABLE udfs_new RENAME TO udfs;
1 change: 1 addition & 0 deletions crates/arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ impl IntoResponse for HttpError {
ValidateUdfPost,
UdfValidationResult,
Udf,
UdfLanguage,
UdfPost,
GlobalUdf,
GlobalUdfCollection,
Expand Down
Loading

0 comments on commit 099f851

Please sign in to comment.