Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,19 @@ def _convert_extra_fields(form_widgets: dict[str, ConnectionFormWidgetInfo]) ->
result: dict[str, MutableMapping] = {}
for key, form_widget in form_widgets.items():
hook_key = key.split("__")[1]
if isinstance(form_widget.field, HookMetaService.MockBaseField):
hook_widgets = result.get(hook_key, {})
hook_widgets = result.get(hook_key, {})

if isinstance(form_widget.field, dict):
# yaml path, form widgets read from yaml and already present in SerializedParam.dump() format
hook_widgets[form_widget.field_name] = form_widget.field
elif isinstance(form_widget.field, HookMetaService.MockBaseField):
# legacy path, form widgets created using mocked WTForms fields, need to convert to SerializedParam.dump()
hook_widgets[form_widget.field_name] = form_widget.field.param.dump()
result[hook_key] = hook_widgets
else:
log.error("Unknown form widget in %s: %s", hook_key, form_widget)
continue

result[hook_key] = hook_widgets
return result

@staticmethod
Expand Down
4 changes: 3 additions & 1 deletion airflow-core/src/airflow/cli/commands/provider_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ def connection_form_widget_list(args):
"connection_parameter_name": x[0],
"class": x[1].hook_class_name,
"package_name": x[1].package_name,
"field_type": x[1].field.field_class.__name__,
"field_type": x[1].field.get("schema", {}).get("type", "unknown")
if isinstance(x[1].field, dict)
else x[1].field.field_class.__name__,
},
)

Expand Down
100 changes: 100 additions & 0 deletions airflow-core/src/airflow/provider.yaml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,106 @@
"hook-class-name": {
"description": "Hook class name that implements the connection type",
"type": "string"
},
"ui-field-behaviour": {
"description": "Customizations for standard connection form fields",
"type": "object",
"properties": {
"hidden-fields": {
"description": "List of standard fields to hide in the UI",
"type": "array",
"items": {
"type": "string",
"enum": ["description", "host", "port", "login", "password", "schema", "extra"]
},
"default": []
},
"relabeling": {
"description": "Map of field names to custom labels",
"type": "object",
"additionalProperties": {
"type": "string"
},
"default": {}
},
"placeholders": {
"description": "Map of field names to placeholder text",
"type": "object",
"additionalProperties": {
"type": "string"
},
"default": {}
}
},
"additionalProperties": false
},
"conn-fields": {
"description": "Custom connection fields stored in Connection.extra JSON",
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"label": {
"type": "string",
"description": "Display label for the field"
},
"description": {
"type": "string",
"description": "Help text for the field"
},
"schema": {
"description": "JSON Schema definition for this field",
"type": "object",
"properties": {
"type": {
"description": "Field data type",
"oneOf": [
{"type": "string", "enum": ["string", "integer", "boolean", "number", "object", "array"]},
{"type": "array", "items": {"type": "string"}}
]
},
"default": {
"description": "Default value for the field"
},
"format": {
"type": "string",
"enum": ["password", "multiline", "email", "url", "json", "date", "date-time", "time"],
"description": "Format hint for rendering"
},
"enum": {
"type": "array",
"description": "List of allowed values (creates dropdown)"
},
"minimum": {
"type": "number",
"description": "Minimum value for numbers"
},
"maximum": {
"type": "number",
"description": "Maximum value for numbers"
},
"pattern": {
"type": "string",
"description": "Regex pattern for validation"
},
"minLength": {
"type": "integer",
"minimum": 0,
"description": "Minimum string length"
},
"maxLength": {
"type": "integer",
"minimum": 1,
"description": "Maximum string length"
}
},
"required": ["type"],
"additionalProperties": true
}
},
"required": ["label", "schema"],
"additionalProperties": false
}
}
},
"required": [
Expand Down
188 changes: 146 additions & 42 deletions airflow-core/src/airflow/providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ def initialize_providers_hooks(self):
self._init_airflow_core_hooks()
self.initialize_providers_list()
self._discover_hooks()
self._load_ui_metadata()
self._hook_provider_dict = dict(sorted(self._hook_provider_dict.items()))

@provider_info_cache("filesystems")
Expand Down Expand Up @@ -902,6 +903,98 @@ def _get_attr(obj: Any, attr_name: str):
return None
return getattr(obj, attr_name)

def _get_connection_type_config(self, provider_info: ProviderInfo, connection_type: str) -> dict | None:
"""Get connection type config from provider.yaml if it exists."""
connection_types = provider_info.data.get("connection-types", [])
for conn_config in connection_types:
if conn_config.get("connection-type") == connection_type:
return conn_config
return None

def _to_api_format(self, field_name: str, field_def: dict) -> dict:
"""Convert conn-fields definition to format expected by the API."""
schema_def = field_def.get("schema", {})

# build schema dict with label moved to `title` per jsonschema convention
schema = schema_def.copy()
if "label" in field_def:
schema["title"] = field_def.get("label")

return {
"value": schema_def.get("default"),
"schema": schema,
"description": field_def.get("description"),
"source": None,
}

def _add_widgets(
self, package_name: str, hook_class_name: str, connection_type: str, conn_fields: dict
) -> None:
"""Parse conn-fields from provider info and add to connection_form_widgets."""
for field_name, field_def in conn_fields.items():
field_data = self._to_api_format(field_name, field_def)

prefixed_name = f"extra__{connection_type}__{field_name}"
if prefixed_name in self._connection_form_widgets:
log.warning(
"Field %s for connection type %s already added, skipping",
field_name,
connection_type,
)
continue

schema_def = field_def.get("schema", {})
self._connection_form_widgets[prefixed_name] = ConnectionFormWidgetInfo(
hook_class_name=hook_class_name,
package_name=package_name,
field=field_data,
field_name=field_name,
is_sensitive=schema_def.get("format") == "password",
)

def _add_customized_fields(self, package_name: str, connection_type: str, behaviour: dict) -> None:
"""Process ui-field-behaviour from provider info and add to field_behaviours."""
if connection_type in self._field_behaviours:
log.warning(
"Field behaviour for connection type %s already exists, skipping",
connection_type,
)
return

# convert kebab-case keys to python style
customized_fields = {
"hidden_fields": behaviour.get("hidden-fields", []),
"relabeling": behaviour.get("relabeling", {}),
"placeholders": behaviour.get("placeholders", {}),
}

try:
self._customized_form_fields_schema_validator.validate(customized_fields)
customized_fields = _ensure_prefix_for_placeholders(customized_fields, connection_type)
self._field_behaviours[connection_type] = customized_fields
except Exception as e:
log.warning(
"Failed to add field behaviour for %s in package %s: %s",
connection_type,
package_name,
e,
)

def _load_ui_metadata(self) -> None:
"""Load connection form UI metadata from provider info without importing hooks."""
for package_name, provider in self._provider_dict.items():
for conn_config in provider.data.get("connection-types", []):
connection_type = conn_config.get("connection-type")
hook_class_name = conn_config.get("hook-class-name")
if not connection_type or not hook_class_name:
continue

if conn_fields := conn_config.get("conn-fields"):
self._add_widgets(package_name, hook_class_name, connection_type, conn_fields)

if behaviour := conn_config.get("ui-field-behaviour"):
self._add_customized_fields(package_name, connection_type, behaviour)

def _import_hook(
self,
connection_type: str | None,
Expand Down Expand Up @@ -942,48 +1035,59 @@ def _import_hook(
hook_class: type[BaseHook] | None = _correctness_check(package_name, hook_class_name, provider_info)
if hook_class is None:
return None
try:
from wtforms import BooleanField, IntegerField, PasswordField, StringField

allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField]
module, class_name = hook_class_name.rsplit(".", maxsplit=1)
# Do not use attr here. We want to check only direct class fields not those
# inherited from parent hook. This way we add form fields only once for the whole
# hierarchy and we add it only from the parent hook that provides those!
if "get_connection_form_widgets" in hook_class.__dict__:
widgets = hook_class.get_connection_form_widgets()
if widgets:
for widget in widgets.values():
if widget.field_class not in allowed_field_classes:
log.warning(
"The hook_class '%s' uses field of unsupported class '%s'. "
"Only '%s' field classes are supported",
hook_class_name,
widget.field_class,
allowed_field_classes,
)
return None
self._add_widgets(package_name, hook_class, widgets)
if "get_ui_field_behaviour" in hook_class.__dict__:
field_behaviours = hook_class.get_ui_field_behaviour()
if field_behaviours:
self._add_customized_fields(package_name, hook_class, field_behaviours)
except ImportError as e:
if e.name in ["flask_appbuilder", "wtforms"]:
log.info(
"The hook_class '%s' is not fully initialized (UI widgets will be missing), because "
"the 'flask_appbuilder' package is not installed, however it is not required for "
"Airflow components to work",

# Check if provider info already has UI metadata and skip Python hook methods
# to avoid duplicate initialization and unnecessary wtforms imports
ui_metadata_loaded = False
if provider_info and connection_type:
conn_config = self._get_connection_type_config(provider_info, connection_type)
ui_metadata_loaded = conn_config is not None and bool(
conn_config.get("conn-fields") or conn_config.get("ui-field-behaviour")
)

if not ui_metadata_loaded:
try:
from wtforms import BooleanField, IntegerField, PasswordField, StringField

allowed_field_classes = [IntegerField, PasswordField, StringField, BooleanField]
# Do not use attr here. We want to check only direct class fields not those
# inherited from parent hook. This way we add form fields only once for the whole
# hierarchy and we add it only from the parent hook that provides those!
if "get_connection_form_widgets" in hook_class.__dict__:
widgets = hook_class.get_connection_form_widgets()
if widgets:
for widget in widgets.values():
if widget.field_class not in allowed_field_classes:
log.warning(
"The hook_class '%s' uses field of unsupported class '%s'. "
"Only '%s' field classes are supported",
hook_class_name,
widget.field_class,
allowed_field_classes,
)
return None
self._add_widgets_from_hook(package_name, hook_class, widgets)
if "get_ui_field_behaviour" in hook_class.__dict__:
field_behaviours = hook_class.get_ui_field_behaviour()
if field_behaviours:
self._add_customized_fields_from_hook(package_name, hook_class, field_behaviours)
except ImportError as e:
if e.name in ["flask_appbuilder", "wtforms"]:
log.info(
"The hook_class '%s' is not fully initialized (UI widgets will be missing), because "
"the 'flask_appbuilder' package is not installed, however it is not required for "
"Airflow components to work",
hook_class_name,
)
except Exception as e:
log.warning(
"Exception when importing '%s' from '%s' package: %s",
hook_class_name,
package_name,
e,
)
except Exception as e:
log.warning(
"Exception when importing '%s' from '%s' package: %s",
hook_class_name,
package_name,
e,
)
return None
return None

hook_connection_type = self._get_attr(hook_class, "conn_type")
if connection_type:
if hook_connection_type != connection_type:
Expand Down Expand Up @@ -1019,7 +1123,7 @@ def _import_hook(
connection_testable=hasattr(hook_class, "test_connection"),
)

def _add_widgets(self, package_name: str, hook_class: type, widgets: dict[str, Any]):
def _add_widgets_from_hook(self, package_name: str, hook_class: type, widgets: dict[str, Any]):
conn_type = hook_class.conn_type # type: ignore
for field_identifier, field in widgets.items():
if field_identifier.startswith("extra__"):
Expand All @@ -1043,7 +1147,7 @@ def _add_widgets(self, package_name: str, hook_class: type, widgets: dict[str, A
and field.field_class.widget.input_type == "password",
)

def _add_customized_fields(self, package_name: str, hook_class: type, customized_fields: dict):
def _add_customized_fields_from_hook(self, package_name: str, hook_class: type, customized_fields: dict):
try:
connection_type = getattr(hook_class, "conn_type")

Expand Down
Loading
Loading