Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion flow360/component/results/base_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,6 @@ def full_name_pattern(word: str) -> re.Pattern:
return rf"^(?:{re.escape(word)}|[^/]+/{re.escape(word)})$"

self.reload_data() # Remove all the imposed filters
print(">> _x_columns =", self._x_columns)
raw_values = {}
for x_column in self._x_columns:
raw_values[x_column] = np.array(self.raw_values[x_column])
Expand Down
171 changes: 78 additions & 93 deletions flow360/component/simulation/framework/entity_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import copy
import hashlib
import uuid
from abc import ABCMeta
Expand Down Expand Up @@ -283,23 +282,70 @@ def _get_valid_entity_types(cls):
@classmethod
def _valid_individual_input(cls, input_data):
"""Validate each individual element in a list or as standalone entity."""
if isinstance(input_data, (str, EntityBase)):
if isinstance(input_data, EntityBase):
return input_data

raise ValueError(
f"Type({type(input_data)}) of input to `entities` ({input_data}) is not valid. "
"Expected entity instance."
)

@classmethod
def _process_selector(cls, selector: EntitySelector, valid_type_names: List[str]) -> dict:
"""Process and validate an EntitySelector object."""
if selector.target_class not in valid_type_names:
raise ValueError(
f"Selector target_class ({selector.target_class}) is incompatible "
f"with EntityList types {valid_type_names}."
)
return selector.model_dump()

@classmethod
def _process_entity(cls, entity: EntityBase, valid_types: tuple) -> Optional[EntityBase]:
"""Process and validate an entity object. Returns None if entity type is invalid."""
cls._valid_individual_input(entity)
if is_exact_instance(entity, valid_types):
return entity
return None

@classmethod
def _build_result(
cls, entities_to_store: List[EntityBase], entity_patterns_to_store: List[dict]
) -> dict:
"""Build the final result dictionary."""
return {
"stored_entities": entities_to_store,
"selectors": entity_patterns_to_store if entity_patterns_to_store else None,
}

@classmethod
# pylint: disable=too-many-arguments
def _process_single_item(
cls,
item: Union[EntityBase, EntitySelector],
valid_types: tuple,
valid_type_names: List[str],
entities_to_store: List[EntityBase],
entity_patterns_to_store: List[dict],
) -> None:
"""Process a single item (entity or selector) and add to appropriate storage lists."""
if isinstance(item, EntitySelector):
entity_patterns_to_store.append(cls._process_selector(item, valid_type_names))
else:
processed_entity = cls._process_entity(item, valid_types)
if processed_entity is not None:
entities_to_store.append(processed_entity)

@pd.model_validator(mode="before")
@classmethod
def deserializer(cls, input_data: Union[dict, list]):
def deserializer(cls, input_data: Union[dict, list, EntityBase, EntitySelector]):
"""
Flatten List[EntityBase] and put into stored_entities.
"""
entities_to_store = []
entity_patterns_to_store = []
valid_types = cls._get_valid_entity_types()
valid_types = tuple(cls._get_valid_entity_types())
valid_type_names = [t.__name__ for t in valid_types]

if isinstance(input_data, list):
# -- User input mode. --
Expand All @@ -308,107 +354,46 @@ def deserializer(cls, input_data: Union[dict, list]):
raise ValueError("Invalid input type to `entities`, list is empty.")
for item in input_data:
if isinstance(item, list): # Nested list comes from assets __getitem__
_ = [cls._valid_individual_input(individual) for individual in item]
processed_entities = [
entity
for entity in (
cls._process_entity(individual, valid_types) for individual in item
)
if entity is not None
]
# pylint: disable=fixme
# TODO: Give notice when some of the entities are not selected due to `valid_types`?
entities_to_store.extend(
[
individual
for individual in item
if is_exact_instance(individual, tuple(valid_types))
]
)
entities_to_store.extend(processed_entities)
else:
cls._valid_individual_input(item)
if is_exact_instance(item, tuple(valid_types)):
entities_to_store.append(item)
# Single entity or selector
cls._process_single_item(
item,
valid_types,
valid_type_names,
entities_to_store,
entity_patterns_to_store,
)
elif isinstance(input_data, dict): # Deserialization
if "stored_entities" not in input_data:
raise KeyError(
f"Invalid input type to `entities`, dict {input_data} is missing the key 'stored_entities'."
)
return {
"stored_entities": input_data["stored_entities"],
"selectors": None if not entity_patterns_to_store else entity_patterns_to_store,
}
# pylint: disable=no-else-return
else: # Single entity
return cls._build_result(input_data["stored_entities"], input_data.get("selectors", []))
else: # Single entity or selector
if input_data is None:
return {
"stored_entities": None,
"selectors": None if not entity_patterns_to_store else entity_patterns_to_store,
}
else:
cls._valid_individual_input(input_data)
if is_exact_instance(input_data, tuple(valid_types)):
entities_to_store.append(input_data)
return cls._build_result(None, [])
cls._process_single_item(
input_data,
valid_types,
valid_type_names,
entities_to_store,
entity_patterns_to_store,
)

if not entities_to_store:
if not entities_to_store and not entity_patterns_to_store:
raise ValueError(
f"Can not find any valid entity of type {[valid_type.__name__ for valid_type in valid_types]}"
f" from the input."
)

return {
"stored_entities": entities_to_store,
"selectors": None if not entity_patterns_to_store else entity_patterns_to_store,
}

def _get_expanded_entities(
self,
*,
create_hard_copy: bool,
) -> List[EntityBase]:
"""
Processes `stored_entities` to remove duplicate entities and raise error if conflicting entities are found.

Possible future upgrade includes expanding `TokenEntity` (naming pattern, enabling compact data storage
like MatrixType and also templating SimulationParams which is planned when importing JSON as setting template)

Raises:
TypeError: If an entity does not match the expected type.
Returns:
Expanded entities list.
"""

entities = getattr(self, "stored_entities", [])

expanded_entities = []
# Note: Points need to skip deduplication bc:
# 1. Performance of deduplication is slow when Point count is high.
not_merged_entity_types_name = [
"Point"
] # Entity types that need skipping deduplication (hacky)
not_merged_entities = []

# pylint: disable=not-an-iterable
for entity in entities:
if entity.private_attribute_entity_type_name in not_merged_entity_types_name:
not_merged_entities.append(entity)
continue
# if entity not in expanded_entities:
expanded_entities.append(entity)

expanded_entities = _remove_duplicate_entities(expanded_entities)
expanded_entities += not_merged_entities

if not expanded_entities:
raise ValueError(
f"Failed to find any matching entity with {entities}. Please check the input to entities."
)
# pylint: disable=fixme
# TODO: As suggested by Runda. We better prompt user what entities are actually used/expanded to
# TODO: avoid user input error. We need a switch to turn it on or off.
if create_hard_copy is True:
return copy.deepcopy(expanded_entities)
return expanded_entities

# pylint: disable=arguments-differ
def preprocess(self, **kwargs):
"""
Expand and overwrite self.stored_entities in preparation for submission/serialization.
Should only be called as late as possible to incorporate all possible changes.
"""
# WARNING: this is very expensive all for long lists as it is quadratic
self.stored_entities = self._get_expanded_entities(create_hard_copy=False)
return super().preprocess(**kwargs)
return cls._build_result(entities_to_store, entity_patterns_to_store)
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Scoped context for entity materialization and reuse.

This module provides a context-managed cache and an injectable builder
for converting entity dictionaries to model instances, avoiding global
state and enabling high-performance reuse during validation.
"""

from __future__ import annotations

import contextvars
from typing import Any, Callable, Optional

_entity_cache_ctx: contextvars.ContextVar[Optional[dict]] = contextvars.ContextVar(
"entity_cache", default=None
)
_entity_builder_ctx: contextvars.ContextVar[Optional[Callable[[dict], Any]]] = (
contextvars.ContextVar("entity_builder", default=None)
)


class EntityMaterializationContext:
"""Context manager providing a per-validation scoped cache and builder.

Use this to avoid global state when materializing entity dictionaries
into model instances while reusing objects across the validation pass.
"""

def __init__(self, *, builder: Callable[[dict], Any]):
self._token_cache = None
self._token_builder = None
self._builder = builder

def __enter__(self):
self._token_cache = _entity_cache_ctx.set({})
self._token_builder = _entity_builder_ctx.set(self._builder)
return self

def __exit__(self, exc_type, exc, tb):
_entity_cache_ctx.reset(self._token_cache)
_entity_builder_ctx.reset(self._token_builder)


def get_entity_cache() -> Optional[dict]:
"""Return the current cache dict for entity reuse, or None if not active."""

return _entity_cache_ctx.get()


def get_entity_builder() -> Optional[Callable[[dict], Any]]:
"""Return the current dict->entity builder, or None if not active."""

return _entity_builder_ctx.get()
Loading