Skip to content

Commit

Permalink
Make an initial pass at a data structure
Browse files Browse the repository at this point in the history
  • Loading branch information
hyanwong committed Sep 29, 2023
1 parent 5894965 commit a8b5e7d
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 0 deletions.
7 changes: 7 additions & 0 deletions GeneticInheritanceGraph/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import sys

if sys.version_info[0] < 3:
raise Exception("Python 3 only")

from .tables import NULL
from .tables import TableCollection
104 changes: 104 additions & 0 deletions GeneticInheritanceGraph/tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from dataclasses import dataclass

NULL = -1

@dataclass
class IntervalTableRow:
parent: int
child: int
parent_left: float
child_left: float
parent_right: float
child_right: float
parent_chromosome: int = None
child_chromosome: int = None


@dataclass
class NodeTableRow:
time: float
flags: int = 0
individual: int = NULL

@dataclass
class IndividualTableRow:
parents: tuple = ()

class BaseTable:
RowClass = None
def __init__(self):
self.data = []
def __str__(self):
return "\n".join(f"{row}" for row in self.data)
def __getitem__(self, index):
return self.data[index]
def add_row(self, *args, **kwargs) -> int:
self.data.append(self.RowClass(*args, **kwargs))
return len(self.data) - 1

def append(self, obj) -> int:
"""obj can be a dict or an object with an .asdict() method"""
try:
obj = obj.asdict()
except AttributeError:
pass
new_dict = {k: v for k, v in obj.items() if k in self.RowClass.__annotations__}
self.data.append(self.RowClass(**new_dict))
return len(self.data) - 1

class IntervalTable(BaseTable):
RowClass = IntervalTableRow
def append(self, obj) -> int:
try:
obj = obj.asdict()
except AttributeError:
pass
obj["child_left"] = obj["parent_left"] = obj["left"]
obj["child_right"] = obj["parent_right"] = obj["right"]
new_dict = {k: v for k, v in obj.items() if k in self.RowClass.__annotations__}
self.data.append(self.RowClass(**new_dict))
return len(self.data) - 1

class NodeTable(BaseTable):
RowClass = NodeTableRow

class IndividualTable(BaseTable):
RowClass = IndividualTableRow

class TableCollection:
def __init__(self):
self.nodes = NodeTable()
self.intervals = IntervalTable()
self.individuals = IndividualTable()
self.time_units = "unknown"

def __str__(self):
return "\n\n".join([
"== NODES ==\n" + str(self.nodes),
"== INTERVALS ==\n" + str(self.intervals),
])

@classmethod
def from_tree_sequence(cls, ts, timedelta=0):
"""
NB: timedelta is a hack until we can set entire columns
like in tskit
"""
tables = ts.tables
gig_tables = cls()
if tables.migrations.num_rows > 0:
raise NotImplementedError
if tables.mutations.num_rows > 0:
raise NotImplementedError
if tables.sites.num_rows > 0:
raise NotImplementedError
if tables.populations.num_rows > 1:
# If there is only one population, ignore it
raise NotImplementedError
for row in tables.nodes:
obj = row.asdict()
obj["time"] += timedelta
gig_tables.nodes.append(obj)
for row in tables.edges:
gig_tables.intervals.append(row)
return gig_tables

0 comments on commit a8b5e7d

Please sign in to comment.