Skip to content

Commit

Permalink
server side json variables (#13500)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakekaplan authored May 22, 2024
1 parent aad7f63 commit 7c37407
Show file tree
Hide file tree
Showing 15 changed files with 700 additions and 246 deletions.
48 changes: 44 additions & 4 deletions src/prefect/client/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@
from uuid import UUID, uuid4

import jsonschema
from pydantic.v1 import Field, root_validator, validator
import orjson
from pydantic.v1 import (
Field,
StrictBool,
StrictFloat,
StrictInt,
StrictStr,
root_validator,
validator,
)

import prefect.client.schemas.objects as objects
from prefect._internal.compatibility.deprecated import DeprecatedInfraOverridesField
Expand Down Expand Up @@ -708,14 +717,29 @@ class VariableCreate(ActionBaseModel):
examples=["my_variable"],
max_length=objects.MAX_VARIABLE_NAME_LENGTH,
)
value: str = Field(
value: Union[
StrictStr, StrictFloat, StrictBool, StrictInt, None, Dict[str, Any], List[Any]
] = Field(
default=...,
description="The value of the variable",
examples=["my-value"],
max_length=objects.MAX_VARIABLE_VALUE_LENGTH,
)
tags: Optional[List[str]] = Field(default=None)

@validator("value")
def validate_value(cls, v):
try:
json_string = orjson.dumps(v)
except orjson.JSONDecodeError:
raise ValueError("Variable value must be serializable to JSON.")

if len(json_string) > objects.MAX_VARIABLE_VALUE_LENGTH:
raise ValueError(
f"value must less than {objects.MAX_VARIABLE_VALUE_LENGTH} characters when serialized."
)

return v

# validators
_validate_name_format = validator("name", allow_reuse=True)(validate_variable_name)

Expand All @@ -729,14 +753,30 @@ class VariableUpdate(ActionBaseModel):
examples=["my_variable"],
max_length=objects.MAX_VARIABLE_NAME_LENGTH,
)
value: Optional[str] = Field(
value: Union[
StrictStr, StrictFloat, StrictBool, StrictInt, None, Dict[str, Any], List[Any]
] = Field(
default=None,
description="The value of the variable",
examples=["my-value"],
max_length=objects.MAX_VARIABLE_NAME_LENGTH,
)
tags: Optional[List[str]] = Field(default=None)

@validator("value")
def validate_value(cls, v):
try:
json_string = orjson.dumps(v)
except orjson.JSONDecodeError:
raise ValueError("Variable value must be serializable to JSON.")

if len(json_string) > objects.MAX_VARIABLE_VALUE_LENGTH:
raise ValueError(
f"value must less than {objects.MAX_VARIABLE_VALUE_LENGTH} characters when serialized."
)

return v

# validators
_validate_name_format = validator("name", allow_reuse=True)(validate_variable_name)

Expand Down
19 changes: 0 additions & 19 deletions src/prefect/client/schemas/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,22 +1088,6 @@ class VariableFilterName(PrefectBaseModel):
)


class VariableFilterValue(PrefectBaseModel):
"""Filter by `Variable.value`."""

any_: Optional[List[str]] = Field(
default=None, description="A list of variables value to include"
)
like_: Optional[str] = Field(
default=None,
description=(
"A string to match variable value against. This can include "
"SQL wildcard characters like `%` and `_`."
),
examples=["my-value-%"],
)


class VariableFilterTags(PrefectBaseModel, OperatorMixin):
"""Filter by `Variable.tags`."""

Expand All @@ -1129,9 +1113,6 @@ class VariableFilter(PrefectBaseModel, OperatorMixin):
name: Optional[VariableFilterName] = Field(
default=None, description="Filter criteria for `Variable.name`"
)
value: Optional[VariableFilterValue] = Field(
default=None, description="Filter criteria for `Variable.value`"
)
tags: Optional[VariableFilterTags] = Field(
default=None, description="Filter criteria for `Variable.tags`"
)
16 changes: 13 additions & 3 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@

import orjson
import pendulum
from pydantic.v1 import Field, HttpUrl, root_validator, validator
from pydantic.v1 import (
Field,
HttpUrl,
StrictBool,
StrictFloat,
StrictInt,
StrictStr,
root_validator,
validator,
)
from typing_extensions import Literal

from prefect._internal.compatibility.deprecated import (
Expand Down Expand Up @@ -1478,11 +1487,12 @@ class Variable(ObjectBaseModel):
examples=["my_variable"],
max_length=MAX_VARIABLE_NAME_LENGTH,
)
value: str = Field(
value: Union[
StrictStr, StrictFloat, StrictBool, StrictInt, None, Dict[str, Any], List[Any]
] = Field(
default=...,
description="The value of the variable",
examples=["my_value"],
max_length=MAX_VARIABLE_VALUE_LENGTH,
)
tags: List[str] = Field(
default_factory=list,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""json_variables
Revision ID: 94622c1663e8
Revises: b23c83a12cb4
Create Date: 2024-05-21 10:14:57.246286
"""
import json

import sqlalchemy as sa
from alembic import op

from prefect.server.utilities.database import JSON

# revision identifiers, used by Alembic.
revision = "94622c1663e8"
down_revision = "b23c83a12cb4"
branch_labels = None
depends_on = None


def upgrade():
op.add_column("variable", sa.Column("json_value", JSON, nullable=True))

conn = op.get_bind()

result = conn.execute(sa.text("SELECT id, value FROM variable"))
rows = result.fetchall()

for variable_id, value in rows:
# these values need to be json compatible strings
json_value = json.dumps(value)
conn.execute(
sa.text("UPDATE variable SET json_value = :json_value WHERE id = :id"),
{"json_value": json_value, "id": variable_id},
)

op.drop_column("variable", "value")
op.alter_column("variable", "json_value", new_column_name="value")


def downgrade():
op.add_column("variable", sa.Column("string_value", sa.String, nullable=True))

conn = op.get_bind()

result = conn.execute(sa.text("SELECT id, value FROM variable"))
rows = result.fetchall()

for variable_id, value in rows:
string_value = str(value)
conn.execute(
sa.text("UPDATE variable SET string_value = :string_value WHERE id = :id"),
{"string_value": string_value, "id": variable_id},
)

op.drop_column("variable", "value")
op.alter_column("variable", "string_value", new_column_name="value", nullable=False)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""json_variables
Revision ID: 2ac65f1758c2
Revises: 20fbd53b3cef
Create Date: 2024-05-21 12:29:43.948758
"""
import json

import sqlalchemy as sa
from alembic import op

from prefect.server.utilities.database import JSON

# revision identifiers, used by Alembic.
revision = "2ac65f1758c2"
down_revision = "20fbd53b3cef"
branch_labels = None
depends_on = None


def upgrade():
op.add_column("variable", sa.Column("json_value", JSON, nullable=True))

conn = op.get_bind()

result = conn.execute(sa.text("SELECT id, value FROM variable"))
rows = result.fetchall()

for variable_id, value in rows:
# these values need to be json compatible strings
json_value = json.dumps(value)
conn.execute(
sa.text("UPDATE variable SET json_value = :json_value WHERE id = :id"),
{"json_value": json_value, "id": variable_id},
)

with op.batch_alter_table("variable") as batch_op:
batch_op.drop_column("value")
batch_op.alter_column("json_value", new_column_name="value")


def downgrade():
op.add_column("variable", sa.Column("string_value", sa.String, nullable=True))

conn = op.get_bind()

result = conn.execute(sa.text("SELECT id, value FROM variable"))
rows = result.fetchall()

for variable_id, value in rows:
string_value = json.loads(str(value))
conn.execute(
sa.text("UPDATE variable SET string_value = :string_value WHERE id = :id"),
{"string_value": string_value, "id": variable_id},
)

with op.batch_alter_table("variable") as batch_op:
batch_op.drop_column("value")
batch_op.alter_column("string_value", new_column_name="value", nullable=False)
2 changes: 1 addition & 1 deletion src/prefect/server/database/orm_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ class FlowRunNotificationQueue(Base):

class Variable(Base):
name = sa.Column(sa.String, nullable=False)
value = sa.Column(sa.String, nullable=False)
value = sa.Column(sa.JSON, nullable=False)
tags = sa.Column(JSON, server_default="[]", default=list, nullable=False)

__table_args__ = (sa.UniqueConstraint("name"),)
Expand Down
50 changes: 45 additions & 5 deletions src/prefect/server/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,17 @@
from typing import Any, Dict, Generator, List, Optional, Union
from uuid import UUID, uuid4

from pydantic.v1 import Field, HttpUrl, root_validator, validator
import orjson
from pydantic.v1 import (
Field,
HttpUrl,
StrictBool,
StrictFloat,
StrictInt,
StrictStr,
root_validator,
validator,
)

import prefect.server.schemas as schemas
from prefect._internal.compatibility.deprecated import DeprecatedInfraOverridesField
Expand Down Expand Up @@ -1032,11 +1042,12 @@ class VariableCreate(ActionBaseModel):
examples=["my-variable"],
max_length=schemas.core.MAX_VARIABLE_NAME_LENGTH,
)
value: str = Field(
value: Union[
StrictStr, StrictFloat, StrictBool, StrictInt, None, Dict[str, Any], List[Any]
] = Field(
default=...,
description="The value of the variable",
examples=["my-value"],
max_length=schemas.core.MAX_VARIABLE_VALUE_LENGTH,
)
tags: List[str] = Field(
default_factory=list,
Expand All @@ -1047,6 +1058,20 @@ class VariableCreate(ActionBaseModel):
# validators
_validate_name_format = validator("name", allow_reuse=True)(validate_variable_name)

@validator("value")
def validate_value(cls, v):
try:
json_string = orjson.dumps(v)
except orjson.JSONDecodeError:
raise ValueError("Variable value must be serializable to JSON.")

if len(json_string) > schemas.core.MAX_VARIABLE_VALUE_LENGTH:
raise ValueError(
f"value must less than {schemas.core.MAX_VARIABLE_VALUE_LENGTH} characters when serialized."
)

return v


class VariableUpdate(ActionBaseModel):
"""Data used by the Prefect REST API to update a Variable."""
Expand All @@ -1057,17 +1082,32 @@ class VariableUpdate(ActionBaseModel):
examples=["my-variable"],
max_length=schemas.core.MAX_VARIABLE_NAME_LENGTH,
)
value: Optional[str] = Field(
value: Union[
StrictStr, StrictInt, StrictFloat, StrictBool, None, Dict[str, Any], List[Any]
] = Field(
default=None,
description="The value of the variable",
examples=["my-value"],
max_length=schemas.core.MAX_VARIABLE_VALUE_LENGTH,
)
tags: Optional[List[str]] = Field(
default=None,
description="A list of variable tags",
examples=[["tag-1", "tag-2"]],
)

@validator("value")
def validate_value(cls, v):
try:
json_string = orjson.dumps(v)
except orjson.JSONDecodeError:
raise ValueError("Variable value must be serializable to JSON.")

if len(json_string) > schemas.core.MAX_VARIABLE_VALUE_LENGTH:
raise ValueError(
f"value must less than {schemas.core.MAX_VARIABLE_VALUE_LENGTH} characters when serialized."
)

return v

# validators
_validate_name_format = validator("name", allow_reuse=True)(validate_variable_name)
Loading

0 comments on commit 7c37407

Please sign in to comment.