Skip to content

⚡️ Speed up function find_last_node by 19,011% #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

codeflash-ai[bot]
Copy link

@codeflash-ai codeflash-ai bot commented May 15, 2025

📄 19,011% (190.11x) speedup for find_last_node in src/dsa/nodes.py

⏱️ Runtime : 175 milliseconds 916 microseconds (best of 308 runs)

📝 Explanation and details

Here’s an optimized version of your program. The main inefficiency comes from repeatedly scanning all edges for each node. A much faster approach is to build a set of source node IDs up front (O(E)), then scan nodes for the first one whose id is not in that set (O(N)), for overall linear time instead of quadratic.

Explanation of the optimization:

  • Before: For each node (O(N)), checked if it was the source in any edge (O(E)), for total O(N*E).
  • Now: Build set of all source IDs (O(E)). For each node, check set membership (O(1)) for total O(N+E).

The return value and comments are unchanged. The logic is identical but much faster for larger graphs and flows.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 40 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Generated Regression Tests Details
import pytest  # used for our unit tests
from src.dsa.nodes import find_last_node

# unit tests

# ------------------------
# Basic Test Cases
# ------------------------

def test_single_node_no_edges():
    # Only one node, no edges: should return the node itself
    nodes = [{"id": "A"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_two_nodes_one_edge():
    # A -> B, B has no outgoing edges, so B is the last node
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_three_nodes_linear_chain():
    # A -> B -> C, C is the last node
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_multiple_sinks_returns_first():
    # A -> B, A -> C, both B and C are sinks, should return B (first in nodes)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "A", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_no_edges_multiple_nodes():
    # No edges, all nodes are sinks, should return first node
    nodes = [{"id": "X"}, {"id": "Y"}, {"id": "Z"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

# ------------------------
# Edge Test Cases
# ------------------------

def test_empty_nodes_and_edges():
    # No nodes or edges, should return None
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_edges_but_no_nodes():
    # Edges exist, but nodes list is empty, should return None
    nodes = []
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_nodes_with_self_loop():
    # Node with a self-loop is not a sink
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_cycle_graph():
    # A -> B -> C -> A (cycle), all nodes have outgoing edges, so return None
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [
        {"source": "A", "target": "B"},
        {"source": "B", "target": "C"},
        {"source": "C", "target": "A"},
    ]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_disconnected_graph():
    # A -> B, C (disconnected), C is a sink and should be returned (since after B in nodes)
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_node_with_multiple_outgoing_edges():
    # A -> B, A -> C, B and C are both sinks, should return B
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "A", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_node_with_multiple_incoming_edges():
    # A -> C, B -> C, C has no outgoing edges, should return C
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "C"}, {"source": "B", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_edge_with_nonexistent_node():
    # Edge references a node not in nodes list; should ignore and return the sink in nodes
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}, {"source": "C", "target": "A"}]  # "C" not in nodes
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_duplicate_node_ids():
    # Duplicate node ids, should return the first occurrence that is a sink
    nodes = [{"id": "A"}, {"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_node_with_empty_id():
    # Node with empty string id, edge from A to '', should recognize sink correctly
    nodes = [{"id": "A"}, {"id": ""}]
    edges = [{"source": "A", "target": ""}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output


def test_large_linear_chain():
    # 1000 nodes in a chain: n0 -> n1 -> ... -> n999, last node should be n999
    N = 1000
    nodes = [{"id": f"n{i}"} for i in range(N)]
    edges = [{"source": f"n{i}", "target": f"n{i+1}"} for i in range(N-1)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_star_graph():
    # One central node with 999 outgoing edges to 999 leaf nodes
    N = 1000
    nodes = [{"id": "center"}] + [{"id": f"leaf{i}"} for i in range(1, N)]
    edges = [{"source": "center", "target": f"leaf{i}"} for i in range(1, N)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_fully_disconnected():
    # 1000 nodes, no edges: first node should be returned
    N = 1000
    nodes = [{"id": f"n{i}"} for i in range(N)]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_all_connected_cycle():
    # 1000 nodes in a cycle: n0->n1->...->n999->n0, no sinks, should return None
    N = 1000
    nodes = [{"id": f"n{i}"} for i in range(N)]
    edges = [{"source": f"n{i}", "target": f"n{(i+1)%N}"} for i in range(N)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_multiple_sinks():
    # 1000 nodes, first 500 have outgoing edges to next 500, last 500 are sinks
    N = 1000
    nodes = [{"id": f"n{i}"} for i in range(N)]
    edges = [{"source": f"n{i}", "target": f"n{i+500}"} for i in range(500)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

import pytest  # used for our unit tests
from src.dsa.nodes import find_last_node

# unit tests

# ---------------------------
# Basic Test Cases
# ---------------------------

def test_single_node_no_edges():
    # Only one node, no edges. Should return the node itself.
    nodes = [{"id": "A"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_two_nodes_one_edge():
    # Two nodes, one edge from A to B. Last node is B (no outgoing edges).
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_three_nodes_linear_chain():
    # A -> B -> C, so C is the last node.
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_multiple_last_nodes():
    # A -> B, C is disconnected, so both B and C have no outgoing edges.
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_last_node_not_at_end_of_list():
    # A -> C, B is disconnected. B is before C in nodes, so B should be returned.
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

# ---------------------------
# Edge Test Cases
# ---------------------------

def test_empty_nodes_and_edges():
    # No nodes or edges, should return None.
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_nodes_with_no_edges():
    # Multiple nodes, no edges. Should return the first node.
    nodes = [{"id": "X"}, {"id": "Y"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_all_nodes_have_outgoing_edges():
    # All nodes have outgoing edges, so none are last. Should return None.
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}, {"source": "B", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_cycle_graph():
    # Cycle: A -> B -> C -> A, so all nodes have outgoing edges. Should return None.
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [
        {"source": "A", "target": "B"},
        {"source": "B", "target": "C"},
        {"source": "C", "target": "A"},
    ]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_node_with_self_loop():
    # Node with self-loop, so it has outgoing edge. Should not be last node.
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "A"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_edge_with_nonexistent_node():
    # Edge refers to node not in nodes. Should ignore and return correct last node.
    nodes = [{"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "C"}]  # C not in nodes
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_duplicate_node_ids():
    # Duplicate IDs: Only the first with no outgoing edge should be returned.
    nodes = [{"id": "A"}, {"id": "A"}, {"id": "B"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_node_with_multiple_outgoing_edges():
    # Node A has two outgoing edges, B and C have none.
    nodes = [{"id": "A"}, {"id": "B"}, {"id": "C"}]
    edges = [{"source": "A", "target": "B"}, {"source": "A", "target": "C"}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_nodes_with_non_string_ids():
    # IDs are integers
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_nodes_with_complex_ids():
    # IDs are tuples
    nodes = [{"id": (1, 2)}, {"id": (2, 3)}]
    edges = [{"source": (1, 2), "target": (2, 3)}]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

# ---------------------------
# Large Scale Test Cases
# ---------------------------

def test_large_linear_chain():
    # Linear chain of 1000 nodes: 0 -> 1 -> 2 -> ... -> 999
    num_nodes = 1000
    nodes = [{"id": i} for i in range(num_nodes)]
    edges = [{"source": i, "target": i+1} for i in range(num_nodes-1)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_star_topology():
    # Node 0 connects to all others, so nodes 1..999 have no outgoing edges.
    num_nodes = 1000
    nodes = [{"id": i} for i in range(num_nodes)]
    edges = [{"source": 0, "target": i} for i in range(1, num_nodes)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_disconnected_nodes():
    # 1000 nodes, no edges. Should return the first node.
    num_nodes = 1000
    nodes = [{"id": i} for i in range(num_nodes)]
    edges = []
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_complete_graph():
    # Every node connects to every other node (no last node).
    num_nodes = 100
    nodes = [{"id": i} for i in range(num_nodes)]
    edges = [
        {"source": i, "target": j}
        for i in range(num_nodes)
        for j in range(num_nodes)
        if i != j
    ]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output

def test_large_graph_with_isolated_node():
    # 999 nodes in a chain, 1 isolated node at the end.
    nodes = [{"id": i} for i in range(999)] + [{"id": "iso"}]
    edges = [{"source": i, "target": i+1} for i in range(998)]
    codeflash_output = find_last_node(nodes, edges); result = codeflash_output
# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-find_last_node-map1geqi and push.

Codeflash

Here’s an optimized version of your program. The main inefficiency comes from repeatedly scanning all `edges` for each node. A much faster approach is to build a set of source node IDs up front (O(E)), then scan nodes for the first one whose `id` is not in that set (O(N)), for overall linear time instead of quadratic.



**Explanation of the optimization:**  
- **Before:** For each node (`O(N)`), checked if it was the source in any edge (`O(E)`), for total `O(N*E)`.
- **Now:** Build `set` of all source IDs (`O(E)`). For each node, check set membership (`O(1)`) for total `O(N+E)`.

The return value and comments are unchanged. The logic is identical but much faster for larger graphs and flows.
@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label May 15, 2025
@codeflash-ai codeflash-ai bot requested a review from KRRT7 May 15, 2025 07:19
@codeflash-ai codeflash-ai bot deleted the codeflash/optimize-find_last_node-map1geqi branch May 20, 2025 05:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
⚡️ codeflash Optimization PR opened by Codeflash AI
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant