Skip to content

Commit

Permalink
Add new python feature to make Python support conditional
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Sep 24, 2024
1 parent 45672f7 commit 677f67c
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 184 deletions.
40 changes: 30 additions & 10 deletions .github/workflows/binaries.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ jobs:
matrix:
# see https://docs.github.com/en/actions/using-github-hosted-runners/using-github-hosted-runners/about-github-hosted-runners#standard-github-hosted-runners-for-public-repositories
config:
- { runner: buildjet-32vcpu-ubuntu-2204, protoc: linux-x86_64, artifact: linux-x86_64 }
- { runner: buildjet-32vcpu-ubuntu-2204-arm, protoc: linux-aarch_64, artifact: linux-arm64 }
- { runner: buildjet-32vcpu-ubuntu-2204, protoc: linux-x86_64, pyarch: x86_64, artifact: linux-x86_64 }
- { runner: buildjet-32vcpu-ubuntu-2204-arm, protoc: linux-aarch_64, pyarch: aarch64, artifact: linux-arm64 }
runs-on: ${{ matrix.config.runner }}
services:
postgres:
Expand Down Expand Up @@ -49,28 +49,42 @@ jobs:
- name: Update rust
run: |
rustup update
- name: Install Python 3.12
run: |
curl -OL https://github.com/indygreg/python-build-standalone/releases/download/20240814/cpython-3.12.5+20240814-${{ matrix.config.pyarch }}-unknown-linux-gnu-install_only.tar.gz
tar xvfz cpython*.tar.gz
sudo cp -r python/bin/* /usr/local/bin/
sudo cp -r python/include/* /usr/local/include/
sudo cp -r python/lib/* /usr/local/lib/
sudo cp -r python/share/* /usr/local/share/
sudo ldconfig
- name: Run DB migrations
run: |
cargo install --debug refinery_cli --version $REFINERY_VERSION
refinery migrate -e REFINERY_CONFIG -p crates/arroyo-api/migrations
- name: Run frontend build
run: cd webui && pnpm install && pnpm build
- name: Build Arroyo
run: cargo build --release --package arroyo && strip target/release/arroyo
- name: Create output directory
run: mkdir artifacts
- name: Build Arroyo with Python
run: cargo build --features python --release --package arroyo && strip target/release/arroyo && mv target/release/arroyo artifacts/arroyo-python
- name: Build Arroyo without Python
run: cargo build --release --package arroyo && strip target/release/arroyo && mv target/release/arroyo artifacts/arroyo
- uses: actions/upload-artifact@v4
with:
name: arroyo-${{ matrix.config.artifact }}
path: target/release/arroyo
path: artifacts/*
if-no-files-found: error

macos:
strategy:
fail-fast: true
matrix:
# see https://docs.github.com/en/actions/using-github-hosted-runners/using-github-hosted-runners/about-github-hosted-runners#standard-github-hosted-runners-for-public-repositories
config:
- { runner: macos-13, protoc: osx-x86_64, artifact: macos-x86_64 }
- { runner: macos-14, protoc: osx-aarch_64, artifact: macos-m1 }
- { runner: macos-14-xlarge, protoc: osx-aarch_64, artifact: macos-m1 }
runs-on: ${{ matrix.config.runner }}
steps:
- name: Check out
Expand All @@ -79,6 +93,8 @@ jobs:
uses: pnpm/action-setup@v4
with:
version: 9.7.1
- name: Install Python 3.12 via homebrew
run: brew install python@3.12
- name: Install protoc compiler
run: |
wget https://github.com/protocolbuffers/protobuf/releases/download/v$PROTOC_VERSION/protoc-$PROTOC_VERSION-${{ matrix.config.protoc }}.zip
Expand All @@ -94,10 +110,14 @@ jobs:
refinery migrate -e REFINERY_CONFIG -p crates/arroyo-api/migrations
- name: Run frontend build
run: cd webui && pnpm install && pnpm build
- name: Build Arroyo
run: cargo build --release --package arroyo && strip target/release/arroyo
- name: Create output directory
run: mkdir artifacts
- name: Build Arroyo with Python
run: PYO3_PYTHON=/opt/homebrew/opt/python@3.12/Frameworks/Python.framework/Versions/3.12/Python cargo build --features python --release --package arroyo && strip target/release/arroyo && mv target/release/arroyo artifacts/arroyo-python
- name: Build Arroyo without Python
run: cargo build --release --package arroyo && strip target/release/arroyo && mv target/release/arroyo artifacts/arroyo
- uses: actions/upload-artifact@v4
with:
name: arroyo-${{ matrix.config.artifact }}
path: target/release/arroyo
path: artifacts/*
if-no-files-found: error
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion crates/arroyo-udf/arroyo-udf-python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ name = "arroyo-udf-python"
version = "0.2.0"
edition = "2021"

[features]
python-enabled = ["pyo3"]

[dependencies]
arroyo-udf-common = { path = "../arroyo-udf-common" }
arrow = { workspace = true, features = ["ffi"] }
datafusion = { workspace = true }
pyo3 = { version = "0.21"}
pyo3 = { version = "0.21", optional = true}
anyhow = "1"
tokio = { version = "1", features = ["full"] }
itertools = "0.13.0"
190 changes: 19 additions & 171 deletions crates/arroyo-udf/arroyo-udf-python/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
#[cfg(feature = "python-enabled")]
mod interpreter;
#[cfg(feature = "python-enabled")]
mod pyarrow;
#[cfg(feature = "python-enabled")]
mod threaded;
#[cfg(feature = "python-enabled")]
mod types;

use crate::threaded::ThreadedUdfInterpreter;
use anyhow::{anyhow, bail};
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::DataType;
use arroyo_udf_common::parse::NullableType;
use datafusion::common::Result as DFResult;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature};
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyString, PyTuple};
use pyo3::{Bound, PyAny};
use std::any::Any;
use std::fmt::Debug;
use std::sync::mpsc::{Receiver, SyncSender};
use std::sync::{Arc, Mutex};

#[cfg(not(feature = "python-enabled"))]
const NOT_ENABLED_ERROR: &str =
"Python is not enabled in this build of Arroyo. See https://doc.arroyo.dev/udfs/python/udfs \
for more information on how to obtain a Python-enabled build.";

#[cfg(feature = "python-enabled")]
const UDF_PY_LIB: &str = include_str!("../python/arroyo_udf.py");

#[derive(Debug)]
pub struct PythonUDF {
pub name: Arc<String>,
pub task_tx: SyncSender<Vec<ArrayRef>>,
pub result_rx: Arc<Mutex<Receiver<anyhow::Result<ArrayRef>>>>,
pub(crate) task_tx: SyncSender<Vec<ArrayRef>>,
pub(crate) result_rx: Arc<Mutex<Receiver<anyhow::Result<ArrayRef>>>>,
pub definition: Arc<String>,
pub signature: Arc<Signature>,
pub arg_types: Arc<Vec<NullableType>>,
Expand Down Expand Up @@ -88,175 +94,17 @@ impl ScalarUDFImpl for PythonUDF {
}
}

fn extract_type_info(udf: &Bound<PyAny>) -> anyhow::Result<(Vec<NullableType>, NullableType)> {
let attr = udf.getattr("__annotations__")?;
let annotations: &Bound<PyDict> = attr.downcast().map_err(|e| {
anyhow!(
"__annotations__ object is not a dictionary: {}",
e.to_string()
)
})?;

// Iterate over annotations dictionary
let (ok, err): (Vec<_>, Vec<_>) = annotations
.iter()
.map(|(k, v)| {
python_type_to_arrow(
k.downcast::<PyString>().unwrap().to_str().unwrap(),
&v,
false,
)
})
.partition(|e| e.is_ok());

if !err.is_empty() {
bail!(
"Could not register Python UDF: {}",
err.into_iter()
.map(|t| t.unwrap_err().to_string())
.collect::<Vec<_>>()
.join(", ")
);
}

let mut result: Vec<_> = ok.into_iter().map(|t| t.unwrap()).collect();

let ret = result
.pop()
.ok_or_else(|| anyhow!("No return type defined for function"))?;

Ok((result, ret))
}

impl PythonUDF {
#[allow(unused)]
pub async fn parse(body: impl Into<String>) -> anyhow::Result<Self> {
ThreadedUdfInterpreter::new(Arc::new(body.into())).await
}
}

fn python_type_to_arrow(
var_name: &str,
py_type: &Bound<PyAny>,
nullable: bool,
) -> anyhow::Result<NullableType> {
let name = py_type
.getattr("__name__")
.map_err(|e| anyhow!("Could not get name of type for argument {var_name}: {e}"))?
.downcast::<PyString>()
.map_err(|_| anyhow!("Argument type was not a string"))?
.to_string();

if name == "Optional" {
return python_type_to_arrow(
var_name,
&py_type
.getattr("__args__")
.map_err(|_| anyhow!("Optional type does not have arguments"))?
.downcast::<PyTuple>()
.map_err(|e| anyhow!("__args__ is not a tuple: {e}"))?
.get_item(0)?,
true,
);
}

let data_type = match name.as_str() {
"int" => DataType::Int64,
"float" => DataType::Float64,
"str" => DataType::Utf8,
"bool" => DataType::Boolean,
"list" => bail!("lists are not yet supported"),
other => bail!("Unsupported Python type: {}", other),
};

Ok(NullableType::new(data_type, nullable))
}

#[cfg(test)]
mod test {
use crate::PythonUDF;
use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, TypeSignature};
use std::sync::Arc;

#[tokio::test]
async fn test() {
let udf = r#"
from arroyo_udf import udf
@udf
def my_add(x: int, y: float) -> float:
return float(x) + y
"#;

let udf = PythonUDF::parse(udf).await.unwrap();
assert_eq!(udf.name.as_str(), "my_add");
if let datafusion::logical_expr::TypeSignature::OneOf(args) = &udf.signature.type_signature
#[cfg(feature = "python-enabled")]
{
let ts: Vec<_> = args
.iter()
.map(|e| {
if let TypeSignature::Exact(v) = e {
v
} else {
panic!(
"expected inner typesignature sto be exact, but found {:?}",
e
)
}
})
.collect();

use arrow::datatypes::DataType::*;

assert_eq!(
ts,
vec![
&vec![Int8, Float32],
&vec![Int8, Float64],
&vec![Int16, Float32],
&vec![Int16, Float64],
&vec![Int32, Float32],
&vec![Int32, Float64],
&vec![Int64, Float32],
&vec![Int64, Float64],
&vec![UInt8, Float32],
&vec![UInt8, Float64],
&vec![UInt16, Float32],
&vec![UInt16, Float64],
&vec![UInt32, Float32],
&vec![UInt32, Float64],
&vec![UInt64, Float32],
&vec![UInt64, Float64]
]
);
} else {
panic!("Expected oneof type signature");
crate::threaded::ThreadedUdfInterpreter::new(Arc::new(body.into())).await
}

assert_eq!(
udf.return_type.data_type,
arrow::datatypes::DataType::Float64
);
assert!(!udf.return_type.nullable);

let data = vec![
ColumnarValue::Array(Arc::new(arrow::array::Int64Array::from(vec![1, 2, 3]))),
ColumnarValue::Array(Arc::new(arrow::array::Float64Array::from(vec![
1.0, 2.0, 3.0,
]))),
];

let result = udf.invoke(&data).unwrap();
if let ColumnarValue::Array(a) = result {
let a = a
.as_any()
.downcast_ref::<arrow::array::Float64Array>()
.unwrap();
assert_eq!(a.len(), 3);
assert_eq!(a.value(0), 2.0);
assert_eq!(a.value(1), 4.0);
assert_eq!(a.value(2), 6.0);
} else {
panic!("Expected array result");
#[cfg(not(feature = "python-enabled"))]
{
anyhow::bail!(NOT_ENABLED_ERROR)
}
}
}
3 changes: 2 additions & 1 deletion crates/arroyo-udf/arroyo-udf-python/src/threaded.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::interpreter::SubInterpreter;
use crate::pyarrow::Converter;
use crate::{extract_type_info, PythonUDF, UDF_PY_LIB};
use crate::types::extract_type_info;
use crate::{PythonUDF, UDF_PY_LIB};
use anyhow::anyhow;
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::DataType;
Expand Down
Loading

0 comments on commit 677f67c

Please sign in to comment.