Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion singlestoredb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
DataError, ManagementError,
)
from .management import (
manage_cluster, manage_workspaces, manage_files,
manage_cluster, manage_workspaces, manage_files, manage_regions,
)
from .types import (
Date, Time, Timestamp, DateFromTicks, TimeFromTicks, TimestampFromTicks,
Expand Down
5 changes: 5 additions & 0 deletions singlestoredb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@
environ=['SINGLESTOREDB_EXT_FUNC_PORT'],
)

register_option(
'external_function.timeout', 'int', check_int, 24*60*60,
'Specifies the timeout in seconds for processing a batch of rows.',
environ=['SINGLESTOREDB_EXT_FUNC_TIMEOUT'],
)

#
# Debugging options
Expand Down
45 changes: 32 additions & 13 deletions singlestoredb/functions/decorator.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import functools
import inspect
from typing import Any
Expand All @@ -19,6 +20,7 @@
]

ReturnType = ParameterType
UDFType = Callable[..., Any]


def is_valid_type(obj: Any) -> bool:
Expand Down Expand Up @@ -100,38 +102,50 @@ def _func(
name: Optional[str] = None,
args: Optional[ParameterType] = None,
returns: Optional[ReturnType] = None,
) -> Callable[..., Any]:
timeout: Optional[int] = None,
) -> UDFType:
"""Generic wrapper for UDF and TVF decorators."""

_singlestoredb_attrs = { # type: ignore
k: v for k, v in dict(
name=name,
args=expand_types(args),
returns=expand_types(returns),
timeout=timeout,
).items() if v is not None
}

# No func was specified, this is an uncalled decorator that will get
# called later, so the wrapper much be created with the func passed
# in at that time.
if func is None:
def decorate(func: Callable[..., Any]) -> Callable[..., Any]:
def decorate(func: UDFType) -> UDFType:

def wrapper(*args: Any, **kwargs: Any) -> Callable[..., Any]:
return func(*args, **kwargs) # type: ignore
if asyncio.iscoroutinefunction(func):
async def async_wrapper(*args: Any, **kwargs: Any) -> UDFType:
return await func(*args, **kwargs) # type: ignore
async_wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
return functools.wraps(func)(async_wrapper)

wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore

return functools.wraps(func)(wrapper)
else:
def wrapper(*args: Any, **kwargs: Any) -> UDFType:
return func(*args, **kwargs) # type: ignore
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
return functools.wraps(func)(wrapper)

return decorate

def wrapper(*args: Any, **kwargs: Any) -> Callable[..., Any]:
return func(*args, **kwargs) # type: ignore

wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
if asyncio.iscoroutinefunction(func):
async def async_wrapper(*args: Any, **kwargs: Any) -> UDFType:
return await func(*args, **kwargs) # type: ignore
async_wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
return functools.wraps(func)(async_wrapper)

return functools.wraps(func)(wrapper)
else:
def wrapper(*args: Any, **kwargs: Any) -> UDFType:
return func(*args, **kwargs) # type: ignore
wrapper._singlestoredb_attrs = _singlestoredb_attrs # type: ignore
return functools.wraps(func)(wrapper)


def udf(
Expand All @@ -140,7 +154,8 @@ def udf(
name: Optional[str] = None,
args: Optional[ParameterType] = None,
returns: Optional[ReturnType] = None,
) -> Callable[..., Any]:
timeout: Optional[int] = None,
) -> UDFType:
"""
Define a user-defined function (UDF).

Expand All @@ -167,6 +182,9 @@ def udf(
Specifies the return data type of the function. This parameter
works the same way as `args`. If the function is a table-valued
function, the return type should be a `Table` object.
timeout : int, optional
The timeout in seconds for the UDF execution. If not specified,
the global default timeout is used.

Returns
-------
Expand All @@ -178,4 +196,5 @@ def udf(
name=name,
args=args,
returns=returns,
timeout=timeout,
)
Loading