Skip to content

🐛 Refactor data loaders to be lazy and use generators to prevent memory problems #103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 43 additions & 22 deletions pystreamapi/loaders/__csv/__csv_loader.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,62 @@
from collections import namedtuple
from csv import reader
from io import StringIO
from typing import Any, Iterator

from pystreamapi.loaders.__loader_utils import LoaderUtils
from pystreamapi.loaders.__lazy_file_iterable import LazyFileIterable


def csv(file_path: str, cast_types=True, delimiter=',', encoding="utf-8") -> LazyFileIterable:
def csv(
src: str, read_from_src=False, cast_types=True, delimiter=',', encoding="utf-8"
) -> Iterator[Any]:
"""
Loads a CSV file and converts it into a list of namedtuples.

Returns:
list: A list of namedtuples, where each namedtuple represents a row in the CSV.
:param cast_types: Set as False to disable casting of values to int, bool or float.
:param encoding: The encoding of the CSV file.
:param file_path: The path to the CSV file.
:param delimiter: The delimiter used in the CSV file.
Lazily loads CSV data from either a path or a string and yields namedtuples.

Args:
src (str): Either the path to a CSV file or a CSV string.
read_from_src (bool): If True, src is treated as a CSV string.
If False, src is treated as a path to a CSV file.
cast_types (bool): Set as False to disable casting of values to int, bool or float.
delimiter (str): The delimiter used in the CSV data.
encoding (str): The encoding of the CSV file (only used when reading from file).

Yields:
namedtuple: Each row in the CSV as a namedtuple.
"""
file_path = LoaderUtils.validate_path(file_path)
return LazyFileIterable(lambda: __load_csv(file_path, cast_types, delimiter, encoding))
if not read_from_src:
src = LoaderUtils.validate_path(src)
return __load_csv_from_file(src, cast_types, delimiter, encoding)
return __load_csv_from_string(src, cast_types, delimiter)


def __load_csv(file_path, cast, delimiter, encoding):
"""Load a CSV file and convert it into a list of namedtuples"""
def __load_csv_from_file(file_path, cast, delimiter, encoding):
"""Load a CSV file and convert it into a generator of namedtuples"""
# skipcq: PTC-W6004
with open(file_path, mode='r', newline='', encoding=encoding) as csvfile:
csvreader = reader(csvfile, delimiter=delimiter)
yield from __process_csv(csvfile, cast, delimiter)


def __load_csv_from_string(csv_string, cast, delimiter):
"""Load a CSV from string and convert it into a generator of namedtuples"""
with StringIO(csv_string) as csvfile:
yield from __process_csv(csvfile, cast, delimiter)


# Create a namedtuple type, casting the header values to int or float if possible
header = __get_csv_header(csvreader)
def __process_csv(csvfile, cast, delimiter):
"""Process CSV data and yield namedtuples"""
csvreader = reader(csvfile, delimiter=delimiter)

Row = namedtuple('Row', list(header))
# Create a namedtuple type, casting the header values to int or float if possible
header = __get_csv_header(csvreader)
if not header:
return

mapper = LoaderUtils.try_cast if cast else lambda x: x
Row = namedtuple('Row', list(header))
mapper = LoaderUtils.try_cast if cast else lambda x: x

# Process the data, casting values to int or float if possible
data = [Row(*[mapper(value) for value in row]) for row in csvreader]
return data
# Yield the data row by row, casting values to int or float if possible
for row in csvreader:
yield Row(*[mapper(value) for value in row])


def __get_csv_header(csvreader):
Expand Down
55 changes: 33 additions & 22 deletions pystreamapi/loaders/__json/__json_loader.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,51 @@
import json as jsonlib
from collections import namedtuple
from typing import Any, Iterator

from pystreamapi.loaders.__lazy_file_iterable import LazyFileIterable
from pystreamapi.loaders.__loader_utils import LoaderUtils


def json(src: str, read_from_src=False) -> LazyFileIterable:
def json(src: str, read_from_src=False) -> Iterator[Any]:
"""
Loads JSON data from either a path or a string and converts it into a list of namedtuples.
Lazily loads JSON data from either a path or a string and yields namedtuples.

Returns:
list: A list of namedtuples, where each namedtuple represents an object in the JSON.
:param src: Either the path to a JSON file or a JSON string.
:param read_from_src: If True, src is treated as a JSON string. If False, src is treated as
a path to a JSON file.
Args:
src (str): Either the path to a JSON file or a JSON string.
read_from_src (bool): If True, src is treated as a JSON string.
If False, src is treated as a path to a JSON file.

Yields:
namedtuple: Each object in the JSON as a namedtuple.
"""
if read_from_src:
return LazyFileIterable(lambda: __load_json_string(src))
return __lazy_load_json_string(src)
path = LoaderUtils.validate_path(src)
return LazyFileIterable(lambda: __load_json_file(path))
return __lazy_load_json_file(path)


def __lazy_load_json_file(file_path: str) -> Iterator[Any]:
"""Lazily read and parse a JSON file, yielding namedtuples."""

def generator():
# skipcq: PTC-W6004
with open(file_path, mode='r', encoding='utf-8') as jsonfile:
src = jsonfile.read()
if src == '':
return
Comment on lines +33 to +34
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Whitespace-only JSON files not skipped

src == '' does not handle files with only whitespace. Use if not src.strip(): to skip such files and prevent JSON decode errors.

Suggested change
if src == '':
return
if not src.strip():
return

yield from jsonlib.loads(src, object_hook=__dict_to_namedtuple)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Incorrect iteration over JSON file data for non-array top-level objects

Instead of using yield from directly, assign the loaded JSON to a variable and yield from it if it's a list, or yield it directly if not. This prevents iterating over the fields of a namedtuple when the top-level object is not a list.


return generator()


def __load_json_file(file_path):
"""Load a JSON file and convert it into a list of namedtuples"""
# skipcq: PTC-W6004
with open(file_path, mode='r', encoding='utf-8') as jsonfile:
src = jsonfile.read()
if src == '':
return []
data = jsonlib.loads(src, object_hook=__dict_to_namedtuple)
return data
def __lazy_load_json_string(json_string: str) -> Iterator[Any]:
"""Lazily parse a JSON string, yielding namedtuples."""

def generator():
if not json_string.strip():
return
yield from jsonlib.loads(json_string, object_hook=__dict_to_namedtuple)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Incorrect iteration over JSON string data for non-array top-level objects

Handle lists and single objects separately instead of always using yield from, to avoid incorrect unpacking when the top-level object is not an array.


def __load_json_string(json_string):
"""Load JSON data from a string and convert it into a list of namedtuples"""
return jsonlib.loads(json_string, object_hook=__dict_to_namedtuple)
return generator()


def __dict_to_namedtuple(d, name='Item'):
Expand Down
54 changes: 29 additions & 25 deletions pystreamapi/loaders/__xml/__xml_loader.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from typing import Iterator, Any

try:
from defusedxml import ElementTree
except ImportError as exc:
raise ImportError(
"Please install the xml_loader extra dependency to use the xml loader."
) from exc
from collections import namedtuple
from pystreamapi.loaders.__lazy_file_iterable import LazyFileIterable
from pystreamapi.loaders.__loader_utils import LoaderUtils


Expand All @@ -21,14 +22,14 @@ def __init__(self):


def xml(src: str, read_from_src=False, retrieve_children=True, cast_types=True,
encoding="utf-8") -> LazyFileIterable:
encoding="utf-8") -> Iterator[Any]:
"""
Loads XML data from either a path or a string and converts it into a list of namedtuples.
Warning: This method isn't safe against malicious XML trees. Parse only safe XML from sources
you trust.

Returns:
LazyFileIterable: A list of namedtuples, where each namedtuple represents an XML element.
An iterator with namedtuples, where each namedtuple represents an XML element.
:param retrieve_children: If true, the children of the root element are used as stream
elements.
:param encoding: The encoding of the XML file.
Expand All @@ -39,32 +40,37 @@ def xml(src: str, read_from_src=False, retrieve_children=True, cast_types=True,
"""
config.cast_types = cast_types
config.retrieve_children = retrieve_children
Comment on lines 41 to 42
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Module-level config mutation causes global side-effects

Passing these options as function arguments or using a per-call config object would prevent unintended side effects from shared state.


if read_from_src:
return LazyFileIterable(lambda: __load_xml_string(src))
return _lazy_parse_xml_string(src)

path = LoaderUtils.validate_path(src)
return LazyFileIterable(lambda: __load_xml_file(path, encoding))
return _lazy_parse_xml_file(path, encoding)


def _lazy_parse_xml_file(file_path: str, encoding: str) -> Iterator[Any]:
def generator():
with open(file_path, mode='r', encoding=encoding) as xmlfile:
xml_string = xmlfile.read()
yield from _parse_xml_string_lazy(xml_string)
Comment on lines +51 to +55
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): Reads entire XML file into memory, may lead to high memory usage

Consider using a streaming parser like ElementTree.iterparse to handle large files more efficiently.

Suggested change
def _lazy_parse_xml_file(file_path: str, encoding: str) -> Iterator[Any]:
def generator():
with open(file_path, mode='r', encoding=encoding) as xmlfile:
xml_string = xmlfile.read()
yield from _parse_xml_string_lazy(xml_string)
def _lazy_parse_xml_file(file_path: str, encoding: str) -> Iterator[Any]:
# Use ElementTree.iterparse for efficient streaming parsing
# Note: ElementTree does not support encoding argument in iterparse, so we open the file accordingly
import io
with open(file_path, mode='r', encoding=encoding) as xmlfile:
# iterparse expects a file-like object opened in text mode for XML
context = ElementTree.iterparse(xmlfile, events=("end",))
for event, elem in context:
yield elem
# Optionally clear the element to free memory
elem.clear()


return generator()

def __load_xml_file(file_path, encoding):
"""Load an XML file and convert it into a list of namedtuples."""
# skipcq: PTC-W6004
with open(file_path, mode='r', encoding=encoding) as xmlfile:
src = xmlfile.read()
if src:
return __parse_xml_string(src)
return []

def _lazy_parse_xml_string(xml_string: str) -> Iterator[Any]:
def generator():
yield from _parse_xml_string_lazy(xml_string)

def __load_xml_string(xml_string):
"""Load XML data from a string and convert it into a list of namedtuples."""
return __parse_xml_string(xml_string)
return generator()


def __parse_xml_string(xml_string):
"""Parse XML string and convert it into a list of namedtuples."""
def _parse_xml_string_lazy(xml_string: str) -> Iterator[Any]:
root = ElementTree.fromstring(xml_string)
parsed_xml = __parse_xml(root)
return __flatten(parsed_xml) if config.retrieve_children else [parsed_xml]
parsed = __parse_xml(root)
if config.retrieve_children:
yield from __flatten(parsed)
else:
yield parsed


def __parse_xml(element):
Expand Down Expand Up @@ -107,11 +113,9 @@ def __filter_single_items(tag_dict):


def __flatten(data):
"""Flatten a list of lists."""
res = []
"""Yield flattened elements from a possibly nested structure."""
for item in data:
if isinstance(item, list):
res.extend(item)
yield from item
else:
res.append(item)
return res
yield item
25 changes: 13 additions & 12 deletions pystreamapi/loaders/__yaml/__yaml_loader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any, Iterator

try:
import yaml as yaml_lib
except ImportError as exc:
Expand All @@ -6,11 +8,10 @@
) from exc
from collections import namedtuple

from pystreamapi.loaders.__lazy_file_iterable import LazyFileIterable
from pystreamapi.loaders.__loader_utils import LoaderUtils


def yaml(src: str, read_from_src=False) -> LazyFileIterable:
def yaml(src: str, read_from_src=False) -> Iterator[Any]:
"""
Loads YAML data from either a path or a string and converts it into a list of namedtuples.

Expand All @@ -23,26 +24,26 @@ def yaml(src: str, read_from_src=False) -> LazyFileIterable:
list: A list of namedtuples, where each namedtuple represents an object in the YAML.
"""
if read_from_src:
return LazyFileIterable(lambda: __load_yaml_string(src))
return __load_yaml_string(src)
path = LoaderUtils.validate_path(src)
return LazyFileIterable(lambda: __load_yaml_file(path))
return __load_yaml_file(path)


def __load_yaml_file(file_path):
"""Load a YAML file and convert it into a list of namedtuples"""
# skipcq: PTC-W6004
with open(file_path, mode='r', encoding='utf-8') as yamlfile:
src = yamlfile.read()
if src == '':
return []
data = yaml_lib.safe_load(src)
return __convert_to_namedtuples(data)
with open(file_path, 'r', encoding='utf-8') as yamlfile:
# Supports both single and multiple documents
for document in yaml_lib.safe_load_all(yamlfile):
if document:
yield from __convert_to_namedtuples(document)


def __load_yaml_string(yaml_string):
"""Load YAML data from a string and convert it into a list of namedtuples"""
data = yaml_lib.safe_load(yaml_string)
return [] if data is None else __convert_to_namedtuples(data)
for document in yaml_lib.safe_load_all(yaml_string):
if document:
yield from __convert_to_namedtuples(document)


def __convert_to_namedtuples(data, name='Item'):
Expand Down
Loading
Loading