Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 83 additions & 74 deletions complexity/generator.py
Original file line number Diff line number Diff line change
@@ -1,111 +1,120 @@
from abc import ABC, abstractmethod
from typing import Any

from typing import Any, List, Tuple
import random
import numpy as np


import networkx as nx

class DataGenerator(ABC):
# Abstract Base Class for Data Generators
class SyntheticDataGenerator(ABC):
@abstractmethod
def generate(self, size: int) -> Any:
"""Generate synthetic data of the given size."""
def create(self, size: int) -> Any:
"""Create synthetic data of a specified size."""
pass

class LinearDataGenerator(DataGenerator):
def generate(self, size: int) -> list[int]:
return list(range(1, size + 1))
# Linear Data Generator
class SequentialData(SyntheticDataGenerator):
def create(self, size: int) -> List[int]:
return [i for i in range(1, size + 1)]

class RandomDataGenerator(DataGenerator):
def __init__(self, low: int = 0, high: int = 100):
self.low = low
self.high = high
# Random Number Generator
class UniformRandomData(SyntheticDataGenerator):
def __init__(self, min_val: int = 0, max_val: int = 100):
self.min_val = min_val
self.max_val = max_val

def generate(self, size: int) -> list[int]:
return [random.randint(self.low, self.high) for _ in range(size)]
def create(self, size: int) -> List[int]:
return [random.randint(self.min_val, self.max_val) for _ in range(size)]

class GaussianDataGenerator(DataGenerator):
def __init__(self, mean: float = 0, std: float = 1):
# Gaussian Data Generator
class NormalDistributedData(SyntheticDataGenerator):
def __init__(self, mean: float = 0.0, std_dev: float = 1.0):
self.mean = mean
self.std = std
self.std_dev = std_dev

def generate(self, size: int) -> np.ndarray:
return np.random.normal(self.mean, self.std, size)
def create(self, size: int) -> np.ndarray:
return np.random.normal(self.mean, self.std_dev, size)

class DataGeneratorFactory:
# Factory for Data Generators
class GeneratorFactory:
def __init__(self):
self.generators = {}
self.registry = {}

def register_generator(self, name: str, generator: DataGenerator):
self.generators[name] = generator
def add_generator(self, key: str, generator: SyntheticDataGenerator):
self.registry[key] = generator

def get_generator(self, name: str) -> DataGenerator:
if name not in self.generators:
raise ValueError(f"Generator '{name}' not found.")
return self.generators[name]
def get_generator(self, key: str) -> SyntheticDataGenerator:
if key not in self.registry:
raise ValueError(f"Generator with key '{key}' is not registered.")
return self.registry[key]

class NumberGenerator(DataGenerator):
def __init__(self, low: int = 0, high: int = 100, fixed: int = None):
self.low = low
self.high = high
self.fixed = fixed
# String Generator
class RandomStringGenerator:
def __init__(self, charset: List[str] = ['A', 'B', 'C']):
self.charset = charset

def generate(self, size: int = 1) -> int:
if self.fixed is not None:
return self.fixed
return random.randint(self.low, self.high)
def create(self, size: int) -> str:
return ''.join(random.choice(self.charset) for _ in range(size))

def create_pair(self, len1: int, len2: int, match_ratio: float = 0.0) -> Tuple[str, str]:
if not (0.0 <= match_ratio <= 1.0):
raise ValueError("Match ratio must be between 0.0 and 1.0")

#TODO:add the string geneation logic
class StringGenerator(DataGenerator):
def __init__(self,alphabit=['A','B','C']):
pass
def generate(self, size: int = 1) -> int:
pass
base = [random.choice(self.charset) for _ in range(min(len1, len2))]
shared_count = int(len(base) * match_ratio)
shared_indices = random.sample(range(len(base)), shared_count)

for idx in shared_indices:
base[idx] = random.choice(self.charset)

class GraphGenerator(DataGenerator):
str1 = ''.join(base[:len1]) + ''.join(random.choices(self.charset, k=(len1 - len(base))))
str2 = ''.join(base[:len2]) + ''.join(random.choices(self.charset, k=(len2 - len(base))))

return str1, str2

# Graph Generator
class RandomGraphGenerator(SyntheticDataGenerator):
def __init__(self, directed: bool = False, weighted: bool = True):
self.directed = directed
self.weighted = weighted

def generate(self, size: int) -> nx.Graph:
def create(self, size: int) -> nx.Graph:
graph = nx.DiGraph() if self.directed else nx.Graph()

# Create nodes
for i in range(size):
graph.add_node(i)
for node in range(size):
graph.add_node(node)

# Create edges with random weights
for i in range(size):
for j in range(i + 1, size):
if random.random() < 0.3: # Sparsity control
for node1 in range(size):
for node2 in range(node1 + 1, size):
if random.random() < 0.3:
weight = random.randint(1, 10) if self.weighted else 1
graph.add_edge(i, j, weight=weight)
graph.add_edge(node1, node2, weight=weight)

return graph



def main():
# Factory setup
factory = DataGeneratorFactory()
factory.register_generator("linear", LinearDataGenerator())
factory.register_generator("random", RandomDataGenerator(0, 50))
factory.register_generator("gaussian", GaussianDataGenerator(0, 1))
factory.register_generator("number", NumberGenerator(1, 100))
factory.register_generator("graph", GraphGenerator(directed=True, weighted=True))

# Generate a number
number_generator = factory.get_generator("number")
print(f"Generated Number: {number_generator.generate()}")

# Generate a graph
graph_generator = factory.get_generator("graph")
graph = graph_generator.generate(5)
print("Generated Graph:")
print(graph.edges(data=True)) # Print edges with weights
# Setting up the factory
factory = GeneratorFactory()
factory.add_generator("linear", SequentialData())
factory.add_generator("random", UniformRandomData(0, 50))
factory.add_generator("gaussian", NormalDistributedData(0, 1))
factory.add_generator("graph", RandomGraphGenerator(directed=True, weighted=True))

# Generate linear data
linear_gen = factory.get_generator("linear")
print("Linear Data:", linear_gen.create(10))

# Generate random data
random_gen = factory.get_generator("random")
print("Random Data:", random_gen.create(10))

# Generate Gaussian data
gaussian_gen = factory.get_generator("gaussian")
print("Gaussian Data:", gaussian_gen.create(10))

# Generate graph
graph_gen = factory.get_generator("graph")
graph = graph_gen.create(5)
print("Graph Edges:", graph.edges(data=True))

if __name__ == "__main__":
main()

125 changes: 123 additions & 2 deletions examples/common_string.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,125 @@
#TODO: put the code here
import random
from typing import List, Tuple
import pandas as pd
from tqdm import tqdm
import sys
from pathlib import Path
from collections import namedtuple

print('Please use the complexity library')
# Add parent directory to sys.path
sys.path.append(str(Path(__file__).resolve().parent.parent))

from complexity import (
TimeAndSpaceProfiler,
StringGenerator
)

# Define a named tuple for capturing metrics
Metrics = namedtuple('Metrics', ['length', 'comparisons', 'move_count'])

# Recursive approach for LCS
def lcs_recursive(str1: str, str2: str, i: int, j: int, comparisons=0, moves=0) -> Metrics:
if i == 0 or j == 0:
return Metrics(0, comparisons, moves)

if str1[i - 1] == str2[j - 1]:
return lcs_recursive(str1, str2, i - 1, j - 1, comparisons + 1, moves)
else:
result1 = lcs_recursive(str1, str2, i - 1, j, comparisons + 1, moves)
result2 = lcs_recursive(str1, str2, i, j - 1, comparisons + 1, moves)
return Metrics(max(result1.length, result2.length), result1.comparisons, result1.move_count)

# Memoized version of LCS
def lcs_memoized(str1: str, str2: str, memo=None, comparisons=0, moves=0) -> Metrics:
if memo is None:
memo = {}

if (len(str1), len(str2)) in memo:
return memo[(len(str1), len(str2))]

if not str1 or not str2:
return Metrics(0, comparisons, moves)

if str1[-1] == str2[-1]:
result = lcs_memoized(str1[:-1], str2[:-1], memo, comparisons + 1, moves)
memo[(len(str1), len(str2))] = Metrics(result.length + 1, result.comparisons, result.move_count)
else:
res1 = lcs_memoized(str1[:-1], str2, memo, comparisons + 1, moves)
res2 = lcs_memoized(str1, str2[:-1], memo, comparisons + 1, moves)
memo[(len(str1), len(str2))] = Metrics(max(res1.length, res2.length), res1.comparisons, res1.move_count)

return memo[(len(str1), len(str2))]

# Bottom-up dynamic programming approach for LCS
def lcs_bottom_up(str1: str, str2: str) -> Metrics:
m, n = len(str1), len(str2)
dp = [[0] * (n + 1) for _ in range(m + 1)]
comparisons, moves = 0, 0

for i in range(1, m + 1):
for j in range(1, n + 1):
comparisons += 1
if str1[i - 1] == str2[j - 1]:
dp[i][j] = dp[i - 1][j - 1] + 1
moves += 1
else:
dp[i][j] = max(dp[i - 1][j], dp[i][j - 1])

return Metrics(dp[m][n], comparisons, moves)

# Initialize utility classes
string_gen = StringGenerator(['A', 'C', 'G', 'T'])
profiler = TimeAndSpaceProfiler()

# Function to profile LCS algorithms
def profile_lcs(method, *args):
"""Runs the profiling of a given LCS function."""
return profiler.profile(method, *args)

# Experiment parameters
sizes = [10, 15, 20]
tests_per_size = 3
results_data = []

# Benchmark LCS implementations
with tqdm(total=len(sizes) * tests_per_size * 3, desc="Running LCS Benchmarks", unit="test") as progress:
for string_size in sizes:
for test_num in range(tests_per_size):
# Generate input strings
str_a = string_gen.generate(string_size)
str_b = string_gen.generate(string_size)

# Recursive profiling
recursive_log = profile_lcs(lcs_recursive, str_a, str_b, len(str_a), len(str_b))
recursive_log.update({"method": "Recursive", "size": string_size, "test_id": test_num + 1})
results_data.append(recursive_log)

# Memoized profiling
memoized_log = profile_lcs(lcs_memoized, str_a, str_b)
memoized_log.update({"method": "Memoized", "size": string_size, "test_id": test_num + 1})
results_data.append(memoized_log)

# Bottom-up profiling
bottom_up_log = profile_lcs(lcs_bottom_up, str_a, str_b)
bottom_up_log.update({"method": "Bottom-Up", "size": string_size, "test_id": test_num + 1})
results_data.append(bottom_up_log)

# Update progress
progress.update(1)

# Save raw results to CSV
results_df = pd.DataFrame(results_data)
results_df.to_csv("lcs_raw_results.csv", index=False)

# Post-process and aggregate results
results_df['time'] = pd.to_numeric(results_df['time'], errors='coerce')
results_df['memory'] = pd.to_numeric(results_df['memory'], errors='coerce')
summary = results_df.groupby(['method', 'size']).agg({
'time': 'mean',
'memory': 'mean',
}).reset_index()

# Save aggregated results
summary.to_csv("lcs_summary_results.csv", index=False)

print("Benchmarking complete. Results saved in 'lcs_raw_results.csv' and 'lcs_summary_results.csv'.")