Skip to content

Commit

Permalink
return exceptions when gathering (flyteorg#2657)
Browse files Browse the repository at this point in the history
* return exceptions when gathering

Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>

* pr comment

Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>

---------

Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: mao3267 <chenvincent610@gmail.com>
  • Loading branch information
wild-endeavor authored and mao3267 committed Aug 9, 2024
1 parent 9e0dbf5 commit 140b58f
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ def raw_register(
raise RegistrationSkipped(f"Remote task/Workflow {cp_entity.name} is not registrable.")
else:
logger.debug(f"Skipping registration of remote entity: {cp_entity.name}")
raise RegistrationSkipped(f"Remote task/Workflow {cp_entity.name} is not registrable.")
raise RegistrationSkipped(f"Remote entity {cp_entity.name} is not registrable.")

if isinstance(
cp_entity,
Expand Down Expand Up @@ -768,13 +768,23 @@ async def _serialize_and_register(
functools.partial(self.raw_register, cp_entity, serialization_settings, version, og_entity=entity),
)
)
ident = []
ident.extend(await asyncio.gather(*tasks))

identifiers_or_exceptions = []
identifiers_or_exceptions.extend(await asyncio.gather(*tasks, return_exceptions=True))
# Check to make sure any exceptions are just registration skipped exceptions
for ie in identifiers_or_exceptions:
if isinstance(ie, RegistrationSkipped):
logger.info(f"Skipping registration... {ie}")
continue
if isinstance(ie, Exception):
raise ie
# serial register
cp_other_entities = OrderedDict(filter(lambda x: not isinstance(x[1], task_models.TaskSpec), m.items()))
for entity, cp_entity in cp_other_entities.items():
ident.append(self.raw_register(cp_entity, serialization_settings, version, og_entity=entity))
return ident[-1]
identifiers_or_exceptions.append(
self.raw_register(cp_entity, serialization_settings, version, og_entity=entity)
)
return identifiers_or_exceptions[-1]

def register_task(
self,
Expand Down

0 comments on commit 140b58f

Please sign in to comment.