Skip to content

⚡️ Speed up function find_last_node by 15,745% #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

codeflash-ai[bot]
Copy link

@codeflash-ai codeflash-ai bot commented Jun 27, 2025

📄 15,745% (157.45x) speedup for find_last_node in src/dsa/nodes.py

⏱️ Runtime : 37.7 milliseconds 238 microseconds (best of 556 runs)

📝 Explanation and details

Here is a faster version of your program. The main optimization is to avoid repeatedly scanning edges for each node. Instead, we build a set of all edge sources just once and then look for the first node whose id is not in this set.

Explanation:

  • The code now computes the set of all source ids present in edges in a single pass (O(N) time where N is the number of edges).
  • Then, it checks which node's id is not in the set, which is an O(1) lookup for each node, leading to overall O(M+N) time, where M is the number of nodes.
  • This is a significant optimization over the original all() call inside the generator, which could be O(N*M) in the worst case.
  • All function signatures and preserved comments remain unchanged except for internal code rewrite for speed.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 20 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests and Runtime
import pytest  # used for our unit tests
from src.dsa.nodes import find_last_node

# unit tests

# -------------------- BASIC TEST CASES --------------------

def test_single_node_no_edges():
    # One node, no edges: node is last node
    nodes = [{"id": "A"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_two_nodes_one_edge():
    # Two nodes, one edge from A to B: B is last node
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_three_nodes_linear_chain():
    # Linear chain: A -> B -> C, C is last node
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_multiple_last_nodes_returns_first():
    # Two nodes with no outgoing edges: should return the first one in nodes
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_branching_graph():
    # A -> B, A -> C; B and C are both last nodes, should return B (first in nodes)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "A", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

# -------------------- EDGE TEST CASES --------------------

def test_empty_nodes_and_edges():
    # No nodes, no edges: should return None
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_edges_but_no_nodes():
    # Edges present, but no nodes: should return None
    nodes = []
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_isolated_and_connected_nodes():
    # One isolated node, one connected: isolated node is last node
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "B", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_cycle_graph():
    # Cycle: A -> B -> C -> A, no last node (all have outgoing edges)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [
        {"source": "A", "target": "B"},
        {"source": "B", "target": "C"},
        {"source": "C", "target": "A"}
    ]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_node_with_multiple_outgoing_edges():
    # A -> B, A -> C, C -> D; B and D have no outgoing edges, should return B (first in nodes)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}]
    edges = [
        {"source": "A", "target": "B"},
        {"source": "A", "target": "C"},
        {"source": "C", "target": "D"}
    ]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_edge_with_nonexistent_nodes():
    # Edge references nodes not in list: should ignore and return the node in nodes
    nodes = [{"id": "A"}]
    edges = [{"source": "B", "target": "C"}]  # B and C not in nodes
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_duplicate_node_ids():
    # Duplicate node IDs: function should return the first one with no outgoing edges
    nodes = [{"id": "A"}, {"id": "A"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_node_with_self_loop():
    # Node with self-loop: should not be last node
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_all_nodes_with_outgoing_edges():
    # All nodes have outgoing edges: should return None
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output


def test_missing_source_key_in_edge():
    # Edge missing 'source' key: should raise KeyError
    nodes = [{"id": "A"}]
    edges = [{}]
    with pytest.raises(KeyError):
        find_last_node(nodes, edges)

# -------------------- LARGE SCALE TEST CASES --------------------

def test_large_linear_chain():
    # Large linear chain: 1000 nodes, last node is last in list
    N = 1000
    nodes = [{"id": str(i)} for i in range(N)]
    edges = [{"source": str(i), "target": str(i+1)} for i in range(N-1)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_branching_graph():
    # Large branching: root connects to 999 leaves, all leaves are last nodes, should return first leaf
    N = 1000
    nodes = [{"id": "root"}] + [{"id": f"leaf_{i}"} for i in range(N-1)]
    edges = [{"source": "root", "target": f"leaf_{i}"} for i in range(N-1)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_graph_all_nodes_with_outgoing_edges():
    # All nodes have outgoing edges: should return None
    N = 1000
    nodes = [{"id": str(i)} for i in range(N)]
    # Each node i has an edge to node (i+1)%N, forming a cycle
    edges = [{"source": str(i), "target": str((i+1)%N)} for i in range(N)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_graph_with_isolated_node():
    # 999 nodes in a chain, 1 isolated node at beginning
    N = 1000
    nodes = [{"id": "iso"}] + [{"id": str(i)} for i in range(N-1)]
    edges = [{"source": str(i), "target": str(i+1)} for i in range(1, N-2)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_graph_with_duplicate_ids():
    # 500 nodes with id "A", 500 with id "B", no edges
    N = 500
    nodes = [{"id": "A"} for _ in range(N)] + [{"id": "B"} for _ in range(N)]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

import pytest  # used for our unit tests
from src.dsa.nodes import find_last_node

# unit tests

# -------------------- BASIC TEST CASES --------------------

def test_single_node_no_edges():
    # One node, no edges: node is last node
    nodes = [{"id": "A"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 1.17μs -> 1.00μs (16.6% faster)

def test_two_nodes_one_edge():
    # Two nodes, one edge from A to B: B is last node
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 1.75μs -> 1.04μs (68.1% faster)

def test_three_nodes_linear_chain():
    # Linear chain: A -> B -> C, C is last node
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 2.17μs -> 1.12μs (92.6% faster)

def test_multiple_last_nodes_returns_first():
    # Two nodes with no outgoing edges: should return the first one in nodes
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 1.08μs -> 917ns (18.1% faster)

def test_branching_graph():
    # A -> B, A -> C; B and C are both last nodes, should return B (first in nodes)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "A", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 1.67μs -> 1.04μs (60.1% faster)

# -------------------- EDGE TEST CASES --------------------

def test_empty_nodes_and_edges():
    # No nodes, no edges: should return None
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 500ns -> 583ns (14.2% slower)

def test_edges_but_no_nodes():
    # Edges present, but no nodes: should return None
    nodes = []
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 458ns -> 625ns (26.7% slower)

def test_isolated_and_connected_nodes():
    # One isolated node, one connected: isolated node is last node
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "B", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 1.21μs -> 1.00μs (20.8% faster)

def test_cycle_graph():
    # Cycle: A -> B -> C -> A, no last node (all have outgoing edges)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [
        {"source": "A", "target": "B"},
        {"source": "B", "target": "C"},
        {"source": "C", "target": "A"}
    ]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 2.17μs -> 875ns (148% faster)

def test_node_with_multiple_outgoing_edges():
    # A -> B, A -> C, C -> D; B and D have no outgoing edges, should return B (first in nodes)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}, {"id": "D"}]
    edges = [
        {"source": "A", "target": "B"},
        {"source": "A", "target": "C"},
        {"source": "C", "target": "D"}
    ]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 1.71μs -> 1.08μs (57.7% faster)

def test_edge_with_nonexistent_nodes():
    # Edge references nodes not in list: should ignore and return the node in nodes
    nodes = [{"id": "A"}]
    edges = [{"source": "B", "target": "C"}]  # B and C not in nodes
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 1.17μs -> 958ns (21.8% faster)

def test_duplicate_node_ids():
    # Duplicate node IDs: function should return the first one with no outgoing edges
    nodes = [{"id": "A"}, {"id": "A"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 1.04μs -> 834ns (24.9% faster)

def test_node_with_self_loop():
    # Node with self-loop: should not be last node
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 1.50μs -> 958ns (56.6% faster)

def test_all_nodes_with_outgoing_edges():
    # All nodes have outgoing edges: should return None
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 1.62μs -> 791ns (105% faster)


def test_missing_source_key_in_edge():
    # Edge missing 'source' key: should raise KeyError
    nodes = [{"id": "A"}]
    edges = [{}]
    with pytest.raises(KeyError):
        find_last_node(nodes, edges)

# -------------------- LARGE SCALE TEST CASES --------------------

def test_large_linear_chain():
    # Large linear chain: 1000 nodes, last node is last in list
    N = 1000
    nodes = [{"id": str(i)} for i in range(N)]
    edges = [{"source": str(i), "target": str(i+1)} for i in range(N-1)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 18.8ms -> 83.2μs (22499% faster)

def test_large_branching_graph():
    # Large branching: root connects to 999 leaves, all leaves are last nodes, should return first leaf
    N = 1000
    nodes = [{"id": "root"}] + [{"id": f"leaf_{i}"} for i in range(N-1)]
    edges = [{"source": "root", "target": f"leaf_{i}"} for i in range(N-1)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 35.6μs -> 17.7μs (101% faster)

def test_large_graph_all_nodes_with_outgoing_edges():
    # All nodes have outgoing edges: should return None
    N = 1000
    nodes = [{"id": str(i)} for i in range(N)]
    # Each node i has an edge to node (i+1)%N, forming a cycle
    edges = [{"source": str(i), "target": str((i+1)%N)} for i in range(N)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 18.8ms -> 80.1μs (23404% faster)

def test_large_graph_with_isolated_node():
    # 999 nodes in a chain, 1 isolated node at beginning
    N = 1000
    nodes = [{"id": "iso"}] + [{"id": str(i)} for i in range(N-1)]
    edges = [{"source": str(i), "target": str(i+1)} for i in range(1, N-2)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 38.4μs -> 42.6μs (9.88% slower)

def test_large_graph_with_duplicate_ids():
    # 500 nodes with id "A", 500 with id "B", no edges
    N = 500
    nodes = [{"id": "A"} for _ in range(N)] + [{"id": "B"} for _ in range(N)]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output # 1.21μs -> 1.08μs (11.5% faster)
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-find_last_node-mce8nlg9 and push.

Codeflash

Here is a faster version of your program. The main optimization is to avoid repeatedly scanning `edges` for each node. Instead, we build a set of all edge sources just once and then look for the first node whose id is not in this set.



**Explanation**:  
- The code now computes the set of all `source` ids present in `edges` in a single pass (`O(N)` time where `N` is the number of edges).
- Then, it checks which node's id is not in the set, which is an `O(1)` lookup for each node, leading to overall `O(M+N)` time, where `M` is the number of nodes.
- This is a significant optimization over the original `all()` call inside the generator, which could be `O(N*M)` in the worst case.
- All function signatures and preserved comments remain unchanged except for internal code rewrite for speed.
@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label Jun 27, 2025
@codeflash-ai codeflash-ai bot requested a review from misrasaurabh1 June 27, 2025 03:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
⚡️ codeflash Optimization PR opened by Codeflash AI
Projects
None yet
Development

Successfully merging this pull request may close these issues.

0 participants