Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Toposort fails when using transcoded datasets #3799

Closed
Tracked by #3768
ElenaKhaustova opened this issue Apr 10, 2024 · 14 comments
Closed
Tracked by #3768

Toposort fails when using transcoded datasets #3799

ElenaKhaustova opened this issue Apr 10, 2024 · 14 comments

Comments

@ElenaKhaustova
Copy link
Contributor

ElenaKhaustova commented Apr 10, 2024

Description

The new toposort seems to not handle it well when transcoded datasets are involved.

Context

Appeared in kedro-org/kedro-starters#215
Possible change that caused it: #3728

Steps to Reproduce

When you have a node like

node(
        func=load_shuttles_to_csv,
        inputs="test_data@excel",
        outputs="test_data@csv",
        name="load_shuttles_to_csv_node",
),

node_dependencies are resolved as:

{Node(load_shuttles_to_csv, 'test_data@excel', 'test_data@csv', 'load_shuttles_to_csv_node'): {Node(load_shuttles_to_csv, 'test_data@excel', 'test_data@csv', 'load_shuttles_to_csv_node')}} 

If we don't call _strip_transcoding it resolves it as: {Node(load_shuttles_to_csv, 'test_data@excel', 'test_data@csv', 'load_shuttles_to_csv_node'): set()}

The problem disappears if giving the datasets a different name before the @ so instead of test_data@csv and test_data@excel make it: test_csv_data@csv and test_excel_data@excel

Expected Result

The error does not appear when using transcoded datasets.

@noklam
Copy link
Contributor

noklam commented Apr 10, 2024

Run failed: 2095e24

Run Success: 0fc8089
image

I can confirm the regression is created there.

@noklam
Copy link
Contributor

noklam commented Apr 10, 2024

Other than this error, if I try to print pipeline I also get error

<repr-error "unsupported operand type(s) for +: 'Pipeline' and 'Pipeline'">

image

@noklam
Copy link
Contributor

noklam commented Apr 11, 2024

^ the above error is flaky, I don't know why it shows up sometimes but not always. Maybe ignore it for now.

image

I have a new theory that this bug is actually not new, at least the pipeline.node_dependencies looks the same even when the pipeline can be run successfully. (the above pipeline can be run successfully at 0fc8089)

I suspect we did not strip the transcoding consistency and causing some funny thing here.

toposort also takes that dependencies happily without complain.
image

@noklam
Copy link
Contributor

noklam commented Apr 11, 2024

At least with the old toposort, a self dependencies is not a problem. Technically it shouldn't cause any problem in the resolving order too, but it's indeed a bit weird. So I think node_dependencies isn't the problem here.

image

Here is the code that I used to test (you don't need a project so it's easy to set breakpoint etc)

from kedro.pipeline import node, pipeline
def load_shuttles_to_csv(data):
        return data


def foo(data):
        return data

def bar(data):
        return data
n1 =  node(
                func=foo,
                inputs="shuttles@excel",
                outputs="shuttles@csv",
                name="load_shuttles",
            )
n2 =             node(
                func=bar,
                inputs="shuttles@spark",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles",
            )
from kedro.pipeline import *

p = pipeline([n1,n2])
# p2 = pipeline([n1])
from toposort import toposort
toposort(p.node_dependencies)
print("Done")

@noklam
Copy link
Contributor

noklam commented Apr 11, 2024

On the other hand, self dependency is not allowed with TopologicalSorter, thus we are getting this error only now.
image

@ElenaKhaustova
Copy link
Contributor Author

ElenaKhaustova commented Apr 11, 2024

A short test to confirm @noklam outcome that the problem is not in topological sort. So the only difference is that toposort accepts self-dependencies.

from graphlib import TopologicalSorter
from toposort import toposort


def main():
    graph = {"D": {"B", "C"}, "C": {"C"}, "B": {"A"}}

    old_ts = toposort(graph)
    print(list(old_ts))

    ts = TopologicalSorter(graph)
    print(tuple(ts.static_order()))

The actual problem lies in the way we get node_dependencies and _nodes_by_input.

When getting node_dependencies we iterate through the nodes and for each node look through its outputs. If an output of node A is input to some other node - B, then we add A as a dependency of B based on _nodes_by_input dictionary.

  dependencies: dict[Node, set[Node]] = {node: set() for node in self._nodes}
  for parent in self._nodes:
      for output in parent.outputs:
          for child in self._nodes_by_input[_strip_transcoding(output)]:
              dependencies[child].add(parent)

  return dependencies

We always _strip_transcoding for node names: when calculating dependencies, _nodes_by_input and indexing the node by name.

self._nodes_by_input: dict[str, set[Node]] = defaultdict(set)
        for node in tagged_nodes:
            for input_ in node.inputs:
                self._nodes_by_input[_strip_transcoding(input_)].add(node)

So in the following example, we treat the output of preprocess_shuttles_node as the input of it. Bcs after the transcoding of "shuttles@exel" or "shuttles@csv" we get the same string "shuttles"

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies@csv",
                outputs="preprocessed_companies@csv",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles@exel",
                outputs="shuttles@csv",
                name="preprocess_shuttles_node",
            ),
        ]
    )
#################### self._nodes_by_input ####################
{
'companies': {Node(preprocess_companies, 'companies@csv', 'preprocessed_companies@csv', 'preprocess_companies_node')},
'shuttles': {Node(preprocess_shuttles, 'shuttles@exel', 'shuttles@csv', 'preprocess_shuttles_node')}
}

#################### self.node_dependencies ####################
{
Node(preprocess_companies, 'companies@csv', 'preprocessed_companies@csv', 'preprocess_companies_node'): set(),
Node(preprocess_shuttles, 'shuttles@exel', 'shuttles@csv', 'preprocess_shuttles_node'): {Node(preprocess_shuttles, 'shuttles@exel', 'shuttles@csv', 'preprocess_shuttles_node')}
}

@ElenaKhaustova
Copy link
Contributor Author

Just rolling back to the old toposort that ignores self-dependency doesn't seem the right solution cause if so, we still can get where the second node will have a false dependency from the first one, though it uses only free input:

node(
                func=preprocess_shuttles,
                inputs="shuttles@exel",
                outputs="shuttles@csv",
                name="preprocess_shuttles_node_1",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles@exel",
                outputs="shuttles_test",
                name="preprocess_shuttles_node_1",
            ),

@noklam, @astrojuanlu, @merelcht, @idanov, the solution here would be to differentiate inputs/output like shuttles@exel and shuttles@exel by simply not stripping the transcoding. But I wonder what was the logic behind the striping and what are the use cases it should serve?

@noklam
Copy link
Contributor

noklam commented Apr 11, 2024

Just rolling back to the old toposort that ignores self-dependency doesn't seem the right solution cause if

I agree, though I think it generates a false dependency but still resolves in the correct execution order (it would be much worse if the order is wrong).

@ElenaKhaustova Do you understand what is transcoding dataset already? https://docs.kedro.org/en/0.17.5/05_data/01_data_catalog.html#transcoding-datasets may help. Without transcoding the resulted graph will be a disconnected graph, so the striping allow connecting the nodes properly. It makes more sense if you look at the @spark @pandas example. In other words, if the nodes are disconnected, the execution order can be wrong because Kedro doesn't understand there is a dependency.

Maybe the bug is caused by stripping in the wrong place or we did not strip it consistently?

@ElenaKhaustova
Copy link
Contributor Author

Another problem related to the current implementation of transcoding datasets is that we can get circular dependencies when we do not have them.

def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=preprocess_shuttles,
                inputs="shuttles@exel",
                outputs="shuttles@csv",
                name="preprocess_shuttles_node_1",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles@csv",
                outputs="shuttles@whatether",
                name="preprocess_shuttles_node_2",
            ),
        ]
    )

In this case, _nodes_by_input will contain two nodes for the shuttles key. So, each node has two nodes (itself + other) as a dependency.

If rolling back to 0fc8089, we get the following error, which raises before topological sort is applied:

    raise OutputNotUniqueError(
kedro.pipeline.pipeline.OutputNotUniqueError: Output(s) ['shuttles'] are returned by more than one nodes. Node outputs must be unique.

@ElenaKhaustova
Copy link
Contributor Author

ElenaKhaustova commented Apr 11, 2024

Summary:

  1. Rolling back to the toposort doesn't seem right. It solves the original case, but some others can appear anyway.
  2. We have to think about transcoding datasets and how to make them work for all the cases or at least update the docs to clarify the constraints.
  3. For now, we can update the starter to avoid the original error; for example, remove transcoding within the same node

@lrcouto The question is if we want to make a release before solving item 2.

@lrcouto
Copy link
Contributor

lrcouto commented Apr 11, 2024

I personally don't think we should release before solving this issue. We could do what @merelcht suggested yesterday and change the node names on the starters just to make it pass the CI, but that would be a band-aid fix.

@astrojuanlu
Copy link
Member

astrojuanlu commented Apr 11, 2024

We agreed to timebox this to avoid the risk of rushing an improper solution. Targeting a release ~early next week.

@idanov
Copy link
Member

idanov commented Apr 14, 2024

The issue is indeed that we have uncovered something that shouldn't be allowed in the first place, and namely using transcoding as a cheat for adding circular dependencies in nodes:

            node(
                func=load_shuttles_to_csv,
                inputs="shuttles@excel",
                outputs="shuttles@csv",
                name="load_shuttles_to_csv_node",
            ),

Transcoding should always be stripped when resolving ordering issues and validating pipelines and nodes for cycles. So the correct action here would be to fix the template. This should raise an error upon node or pipeline creation, depends how we want to define the behaviour, either will be valid. Transcoding was only made to enable hand-offs between pandas and pyspark nodes, or similar use cases. The starter here has gotten it the wrong way around - it's not the format at rest, but the format in-flight that we need to specify here.

As far as I can see, there's no bug in Kedro now, there was before due to a peculiarity of toposort allowing self-references. Currently, the starters are the broken ones and they were for a long time, because the Kedro bugged masked the bugs in the starters.

In order to make the error reporting more user-friendly, we might decide to add an extra check in the nodes creation, but that doesn't have to happen before the release.

Well done to @noklam for doing all the research and tracing it back to toposort's bug.

@ElenaKhaustova
Copy link
Contributor Author

ElenaKhaustova commented Apr 15, 2024

Applied changes:

Further updates required:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants