-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rich pydantic-type return values support #68
Comments
I didn't realize that but you're right 😄. It works as a side effect of our internal Pydantic usage, however type-checking should fail if a
You mean by using def get_user() -> User:
return User(id=1, name="John Doe")
user = await step.run("do-thing", get_user) But reflection can't know about an implicit return type: def get_user():
return User(id=1, name="John Doe")
user = await step.run("do-thing", get_user) And what would we do if it returns a union of Pydantic models? def get_user() -> User | Admin:
return User(id=1, name="John Doe")
user = await step.run("do-thing", get_user) We absolutely want to make Pydantic-based validation a first-class citizen in the SDK (Pydantic is amazing), but there are a lot of questions about how to effectively do that. We've had similar discussions with Zod in our TypeScript SDK |
Yes, I think that could be a solution, but as I see it, only if we define a step function statically with a special wrapper, so it knows how to deserialize step.run basic types output back into Pydantic models (or even other custom classes that have From the wrapper concept that I proposed in #63, we have an access to the initial function we're wrapping so we could store return types and then deserialize. If none present - fallback to the default primitive type behaviour. @staticmethod
def task(name: Optional[str] = None):
""" Wraps a function to be executed as an Inngest step further. """
def decorator(func):
async def wrapped(self, *args, **kwargs):
task_name = name or func.__name__
return_type = typing.get_type_hints(func).get("return") # <---
primitive_result = await self.step.run(task_name, lambda: func(self, *args, **kwargs))
result = maybe_convert_to_rich_object(primitive_result, return_type) # <---
return result
return wrapped
return decorator Otherwise (in dynamic step calls) I think it will be really hard to restore an object during memo retrieval stage. |
We could definitely do this with if not isinstance(memo, types.EmptySentinel):
model = typing.get_type_hints(handler).get("return")
if issubclass(model, pydantic.BaseModel):
return model.model_validate(memo.data) This would work well when As far as I can tell, we can't express the "return type changes based on explicit return types" behavior in types. That leads a major footgun where |
But will this work in case handlers are passed as closures using lambda?
In this "dyamically-defined step" case lambda is not typed, so you won't be able to extract return type from it. And at least for me when I build pipelines, it is very common that steps have input values from previously executed steps. |
It won't work with lambdas since they're implicitly typed. That said, the downside for lambdas is the same downside for implicitly typed functions. It's a really big footgun that would apply to both the object-oriented approach you proposed and the existing functional approach. A separate issue is union return types. Let's say that your function returned class Admin(pydantic.BaseModel):
id: int
name: str
role: typing.Literal["admin"] = "admin"
class User(pydantic.BaseModel):
id: int
name: str We could iterate over the union members and attempt to instantiate each Pydantic model, returning the first model that passes validation. But that can lead to unexpected behavior if you don't properly discriminate the union. For example, let's say you had this def step_handler() -> Admin | User:
return User(id=123, name="Alice") If we iterated over the union members, we'd actually instantiate an |
I also wanna mention that runtime model validation can be tricky if your schema changes over time. Like if you add a new required field to your model and then deploy, active runs (whose memoized value adheres to the old model) will start failing. We want to make validation more first-class in Inngest (Pydantic in Python and Zod in TypeScript), but we need to think about it more before implementing something. In the meantime, the current approach of roll-your-own model validation is flexible enough to let you transform the data however you need before validating. It's more verbose, but we want to avoid adding a bad abstraction -- we want to get it right |
I think the gotchas of union usage should be carefully avoided by users and be out of your scope. As it is a default behavior of Pydantic itself. from pydantic import BaseModel
class A(BaseModel):
x: int
class B(BaseModel):
x: int
class C(BaseModel):
inner_attr: A | B
d = {"inner_attr": {"x": 2}}
C(**d) # >>> C(inner_attr=A(x=2)) Returning to the initial-type-restoration topic: Yes, you are right that lambdas are usually rare. But in case of defining flows with your package, they appear in each def foo(*args):
return ...
def bar(*args):
return ...
init_args = ...
foo_result = step.run("id1", lambda: foo(*init_args))
bar_result = step.run("id2", lambda: bar(*foo_args)) The code is very clear and follows pretty common (as far as I know) approach - you define necessary functions that process your data first (in the same or another file) and then utilize those functions. Without lambdas you must define a handler as a closure that acts on variables in the scope, not on input arguments, after a previous step is executed: init_args = ...
def foo():
# access `init_args` defined in the same scope with `foo`
return ...
foo_result = step.run("id1", foo)
def bar():
# access `foo_result` defined in the same scope with `bar`
return ...
bar_result = step.run("id2", bar) For me this makes any non-trivial pipeline way harder to read and maintain. I can't speak for others, but as I see, it would be hard for people that previously used airflow/prefect/wrote custom pipelines flow logic/come from ML to transit to Inngest if they'd have to stick to the option # 2 |
Oh, I totally agree that users shouldn't be required to use your second example. Your first example is 100% fine -- no issues with that. I think I removed the "lambdas aren't common" part of my message after you started responding 😄. What are your thoughts on a utility function that handles validation? Something like this: ModelT = typing.TypeVar("ModelT", bound=pydantic.BaseModel)
def validate_output(
output: typing.Mapping, model: type[ModelT]
) -> ModelT:
return model.model_validate(output) Then you could do this: class User(pydantic.BaseModel):
id: int
name: str
def _get_user(user_id: int) -> User:
return User(id=user_id, name="Alice")
@client.create_function(
fn_id="my-fn",
trigger=inngest.TriggerEvent(event="my-event"),
)
async def fn(
ctx: inngest.Context,
step: inngest.Step,
) -> str:
user = validate_output(
await step.run("get-user", lambda: _get_user(1).model_dump()),
User,
)
return user.name It's more verbose but it's also more type safe. It also has the advantage of being less rigid, allowing you to add custom transformations in between the "get memoized output" and "validate output into model" calls, like this: user = validate_output(
convert_user_v1_to_v2(
await step.run("get-user", lambda: _get_user(1).model_dump())
),
User,
) |
This could work, but I think that this may be an overengineering and "responsibility intertwining", as the original topic is the "unexpected return behaviour of What I'm trying to address is the issue "I explicitly stated in my code that my function returns I think that cases like
should not be covered because optimistically user should be agnostic of package inner structure to be able to work with it. In the context of a pipeline (or task flow) if we want to act on a task result ( But providing an intuitive execution logic - this is a package responsibility. And it almost is, except this typing thing :) For the current design I think it will be enough to pass an additional |
The ability to return Pydantic models is a bug since we don't officially support that right now. There should be a type error for it, but we mistakenly allowed it at runtime. We might release a fix where we raise an error when a Pydantic model is returned, but we'll have to decide if the fix is worth breaking code.
That's a possibility (though it'll require 2 additional function overloads here). Adding user = step.run("my-step", lambda: User(id=123), output_model=User) Instead of this: user = User.model_validate(step.run("my-step", lambda: User(id=123).model_dump())) While the latter is a little more verbose, it's also more flexible. One of our north stars is "build primitives and let usage dictate abstractions". In other words, we like to enable advanced usage with flexible (but sometimes verbose) primitives and get a lot of user feedback before making the advanced usage first-class. A good example of this approach is our recent We'd love to observe how our users combine Inngest with Pydantic. Once we're able to gather a variety of nuanced usage patterns, we'll be able to effectively make Pydantic a first-class paradigm in our SDK |
We added a Pydantic guide to our docs: |
Hello, currently functions that return pydantic models (and I guess any json-serializeable object) get silently dumped/serialized into primitives.
I wonder if it is possbile to implement the logic using correct deserialization, so that you still use json for step memoization, but step.run function returns original type.
Because having to manually serialize an object before returning it (otherwise there is a confusion when step call with return type
-> SomePydanticModel
silently returns dict) and then deserialize results in-between the steps could add some redundant code.To do that, we could try to access function annotations via the
@task
decorator wrapping proposed in #63The text was updated successfully, but these errors were encountered: