Skip to content

⚡️ Speed up function find_last_node by 23,284% #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

codeflash-ai[bot]
Copy link

@codeflash-ai codeflash-ai bot commented May 15, 2025

📄 23,284% (232.84x) speedup for find_last_node in src/dsa/nodes.py

⏱️ Runtime : 210 milliseconds 899 microseconds (best of 125 runs)

📝 Explanation and details

Here’s a much faster version. The original code checks each node against every edge in a nested loop (O(N*M)), which is expensive.
The optimization is to first build a set of all sources in edges, then simply find the first node whose ID is not in this set (O(N+M)).

This reduces the complexity from O(N*M) to O(N+M) and yields the exact same results. All comments are preserved as requested.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 43 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests Details
import pytest  # used for our unit tests
from src.dsa.nodes import find_last_node

# unit tests

# -------------------
# Basic Test Cases
# -------------------

def test_single_node_no_edges():
    # One node, no edges: should return the node itself
    nodes = [{"id": "A", "value": 1}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_two_nodes_one_edge():
    # Two nodes, one edge from A to B: should return B (not a source)
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_three_nodes_chain():
    # A -> B -> C: should return C
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_multiple_end_nodes():
    # A -> B, C is isolated: should return C (since B and C are not sources, but C comes first in nodes)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_multiple_end_nodes_order():
    # A -> B, D is isolated and appears before B in nodes: should return B (since B comes before D)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "D"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

# -------------------
# Edge Test Cases
# -------------------

def test_empty_nodes_and_edges():
    # No nodes, no edges: should return None
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_edges_but_no_nodes():
    # Edges exist but no nodes: should return None
    nodes = []
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_nodes_no_edges_multiple():
    # Multiple nodes, no edges: should return the first node
    nodes = [{"id": "X"}, {"id": "Y"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_cycle_graph():
    # A -> B -> C -> A (cycle): all nodes are sources; should return None
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [
        {"source": "A", "target": "B"},
        {"source": "B", "target": "C"},
        {"source": "C", "target": "A"},
    ]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_disconnected_graph():
    # A -> B, C -> D, E isolated: should return B (first node not a source)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}, {"id": "E"}]
    edges = [
        {"source": "A", "target": "B"},
        {"source": "C", "target": "D"},
    ]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output


def test_non_string_ids():
    # Non-string node IDs
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_duplicate_node_ids():
    # Duplicate node IDs: should still return the first node not a source
    nodes = [{"id": "A"}, {"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_edge_with_nonexistent_source():
    # Edge with source not in nodes: should not affect result
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "X", "target": "A"}]
    # Both A and B are not sources in any edge, so should return A (first in list)
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_edge_with_nonexistent_target():
    # Edge with target not in nodes: should not affect result
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "Z"}]
    # B is not a source, so should return B
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_node_with_extra_fields():
    # Node with extra fields should be returned as-is
    nodes = [{"id": "A", "meta": 123}, {"id": "B", "meta": 456}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

# -------------------
# Large Scale Test Cases
# -------------------

def test_large_linear_chain():
    # 1000 node chain: 0->1->2->...->999, should return node 999
    N = 1000
    nodes = [{"id": str(i)} for i in range(N)]
    edges = [{"source": str(i), "target": str(i+1)} for i in range(N-1)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_fan_in():
    # 999 nodes point to node 'sink'
    N = 1000
    nodes = [{"id": str(i)} for i in range(N-1)] + [{"id": "sink"}]
    edges = [{"source": str(i), "target": "sink"} for i in range(N-1)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_fan_out():
    # Node 'root' points to 999 other nodes
    N = 1000
    nodes = [{"id": "root"}] + [{"id": str(i)} for i in range(1, N)]
    edges = [{"source": "root", "target": str(i)} for i in range(1, N)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_no_edges():
    # 1000 nodes, no edges: should return the first node
    N = 1000
    nodes = [{"id": str(i)} for i in range(N)]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_cycle():
    # 1000 node cycle: all nodes are sources, should return None
    N = 1000
    nodes = [{"id": str(i)} for i in range(N)]
    edges = [{"source": str(i), "target": str((i+1)%N)} for i in range(N)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

# -------------------
# Mutation Testing Guards
# -------------------

def test_returns_first_non_source_node():
    # If multiple nodes are not sources, returns the first in nodes order
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = []
    # All are not sources; should return A
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_returns_none_when_all_are_sources():
    # All nodes are sources in at least one edge
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

import pytest  # used for our unit tests
from src.dsa.nodes import find_last_node

# unit tests

# -------------------------
# Basic Test Cases
# -------------------------

def test_single_node_no_edges():
    # One node, no edges: should return the node itself
    nodes = [{"id": "A"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)

def test_two_nodes_one_edge():
    # Two nodes, one edge from A to B: last node is B (no outgoing edges)
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges)

def test_three_nodes_linear_chain():
    # A -> B -> C, last node is C
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges)

def test_multiple_sinks_returns_first():
    # Two sink nodes (no outgoing edges): returns the first one in nodes order
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}]
    # Both B and C have no outgoing edges, so B is returned (first in nodes)
    codeflash_output = find_last_node(nodes, edges)

# -------------------------
# Edge Test Cases
# -------------------------

def test_empty_nodes_and_edges():
    # Both nodes and edges are empty: should return None
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges)

def test_nodes_but_no_edges():
    # Multiple nodes, no edges: should return the first node (all are sinks)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)

def test_all_nodes_have_outgoing_edges():
    # All nodes have outgoing edges: should return None
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges)

def test_self_loop_node():
    # Node with a self-loop: should not be a sink
    nodes = [{"id": "A"}]
    edges = [{"source": "A", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges)

def test_disconnected_nodes():
    # Some nodes not connected at all: should return the first disconnected node
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}]
    # C is disconnected, so it's a sink (no outgoing edges)
    codeflash_output = find_last_node(nodes, edges)

def test_multiple_edges_from_one_node():
    # One node with multiple outgoing edges, others are sinks
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "A", "target": "C"}]
    # B and C are both sinks, B is first in nodes order
    codeflash_output = find_last_node(nodes, edges)

def test_edge_with_nonexistent_node():
    # Edge references a node not in nodes list: should ignore and return correct sink
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}]  # C not in nodes
    # B has outgoing edge, A has outgoing edge, but C is not in nodes, so no sinks
    codeflash_output = find_last_node(nodes, edges)

def test_node_with_none_id():
    # Node with None as id: should handle gracefully
    nodes = [{"id": None}, {"id": "A"}]
    edges = [{"source": "A", "target": None}]
    # None node has no outgoing edges, so should be returned
    codeflash_output = find_last_node(nodes, edges)

def test_node_with_non_string_id():
    # Node with integer id
    nodes = [{"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)

def test_duplicate_node_ids():
    # Duplicate node ids; should return the first sink (by order)
    nodes = [{"id": "A"}, {"id": "A"}]
    edges = []
    # Both are sinks, first one is returned
    codeflash_output = find_last_node(nodes, edges)

# -------------------------
# Large Scale Test Cases
# -------------------------

def test_large_linear_chain():
    # 1000 nodes in a linear chain: last node is the last in the chain
    N = 1000
    nodes = [{"id": str(i)} for i in range(N)]
    edges = [{"source": str(i), "target": str(i+1)} for i in range(N-1)]
    codeflash_output = find_last_node(nodes, edges)

def test_large_fan_out():
    # One node with 999 outgoing edges to 999 sink nodes
    nodes = [{"id": "root"}] + [{"id": f"leaf{i}"} for i in range(999)]
    edges = [{"source": "root", "target": f"leaf{i}"} for i in range(999)]
    # All leaf nodes are sinks, first one in nodes is "leaf0"
    codeflash_output = find_last_node(nodes, edges)

def test_large_fan_in():
    # 999 nodes point to a single sink node
    nodes = [{"id": f"n{i}"} for i in range(999)] + [{"id": "sink"}]
    edges = [{"source": f"n{i}", "target": "sink"} for i in range(999)]
    # Only "sink" is a sink node
    codeflash_output = find_last_node(nodes, edges)

def test_large_disconnected_graph():
    # 500 disconnected nodes, should return the first one
    nodes = [{"id": f"n{i}"} for i in range(500)]
    edges = []
    codeflash_output = find_last_node(nodes, edges)

def test_large_cycle_graph():
    # 1000 nodes in a cycle: no sinks, should return None
    N = 1000
    nodes = [{"id": str(i)} for i in range(N)]
    edges = [{"source": str(i), "target": str((i+1)%N)} for i in range(N)]
    codeflash_output = find_last_node(nodes, edges)

# -------------------------
# Additional Robustness Cases
# -------------------------

def test_nodes_with_extra_fields():
    # Nodes may have extra fields, should still work
    nodes = [{"id": "A", "label": "alpha"}, {"id": "B", "label": "beta"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges)

def test_edges_with_extra_fields():
    # Edges may have extra fields, should still work
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B", "weight": 1}]
    codeflash_output = find_last_node(nodes, edges)

To edit these changes git checkout codeflash/optimize-find_last_node-map1nt8f and push.

Codeflash

Here’s a much faster version. The original code checks each node against every edge in a nested loop (O(N*M)), which is expensive.  
The optimization is to **first build a set of all sources** in `edges`, then simply find the first node whose ID is not in this set (O(N+M)).



**This reduces the complexity from O(N*M) to O(N+M) and yields the exact same results. All comments are preserved as requested.**
@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label May 15, 2025
@codeflash-ai codeflash-ai bot requested a review from KRRT7 May 15, 2025 07:24
@codeflash-ai codeflash-ai bot deleted the codeflash/optimize-find_last_node-map1nt8f branch May 20, 2025 05:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
⚡️ codeflash Optimization PR opened by Codeflash AI
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant