Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
branches:
- 'main'
- 'feature/**'
- 'dev'

jobs:
build:
Expand Down
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ repos:
networkx>=2.5.1,
typing-extensions>=3.10.0.2,
HLL>=2.0.3,
datasketches>=4.1.0,

# requirements-dev.txt
check-manifest>=0.48,
Expand Down Expand Up @@ -109,7 +110,7 @@ repos:
additional_dependencies: ['h5py', 'wheel', 'future', 'numpy', 'pandas',
'python-dateutil', 'pytz', 'pyarrow', 'chardet', 'fastavro',
'python-snappy', 'charset-normalizer', 'psutil', 'scipy', 'requests',
'networkx','typing-extensions', 'HLL']
'networkx','typing-extensions', 'HLL', 'datasketches']
# Pyupgrade - standardize and modernize Python syntax for newer versions of the language
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.0
Expand Down
12 changes: 12 additions & 0 deletions dataprofiler/data_readers/csv_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(
self._checked_header: bool = "header" in options and self._header != "auto"
self._default_delimiter: str = ","
self._default_quotechar: str = '"'
self._sample_nrows: Optional[int] = options.get("sample_nrows", None)

if data is not None:
self._load_data(data)
Expand Down Expand Up @@ -115,6 +116,11 @@ def header(self) -> Optional[Union[str, int]]:
"""Return header."""
return self._header

@property
def sample_nrows(self) -> Optional[int]:
"""Return sample_nrows."""
return self._sample_nrows

@property
def is_structured(self) -> bool:
"""Determine compatibility with StructuredProfiler."""
Expand Down Expand Up @@ -168,6 +174,10 @@ def _check_and_return_options(options: Optional[Dict]) -> Dict:
raise ValueError(
"'record_samples_per_line' must be an int " "more than 0"
)
if "sample_nrows" in options:
value = options["sample_nrows"]
if not isinstance(value, int) or value < 0:
raise ValueError("'sample_nrows' must be an int more than 0")
return options

@staticmethod
Expand Down Expand Up @@ -549,6 +559,7 @@ def _load_data_from_str(self, data_as_str: str) -> pd.DataFrame:
data_buffered,
self.delimiter,
cast(Optional[int], self.header),
self.sample_nrows,
self.selected_columns,
read_in_string=True,
)
Expand Down Expand Up @@ -595,6 +606,7 @@ def _load_data_from_file(self, input_file_path: str) -> pd.DataFrame:
input_file_path,
self.delimiter,
cast(Optional[int], self.header),
self.sample_nrows,
self.selected_columns,
read_in_string=True,
encoding=self.file_encoding,
Expand Down
115 changes: 112 additions & 3 deletions dataprofiler/data_readers/data_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Contains functions for data readers."""
import json
import os
import random
import re
import urllib
from collections import OrderedDict
from io import BytesIO, StringIO, TextIOWrapper
from itertools import islice
from math import floor, log, log1p
from typing import (
Any,
Dict,
Expand All @@ -24,7 +28,7 @@
from chardet.universaldetector import UniversalDetector
from typing_extensions import TypeGuard

from .. import dp_logging
from .. import dp_logging, settings
from .._typing import JSONType, Url
from .filepath_or_buffer import FileOrBufferHandler, is_stream_buffer # NOQA

Expand Down Expand Up @@ -268,10 +272,106 @@ def read_json(
return lines


def reservoir(file: TextIOWrapper, sample_nrows: int) -> list:
"""
Implement the mathematical logic of Reservoir sampling.

:param file: wrapper of the opened csv file
:type file: TextIOWrapper
:param sample_nrows: number of rows to sample
:type sample_nrows: int

:raises: ValueError()

:return: sampled values
:rtype: list
"""
# Copyright 2021 Oscar Benjamin
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# https://gist.github.com/oscarbenjamin/4c1b977181f34414a425f68589e895d1

iterator = iter(file)
values = list(islice(iterator, sample_nrows))

irange = range(len(values))
indices = dict(zip(irange, irange))

kinv = 1 / sample_nrows
W = 1.0
rng = random.Random(x=settings._seed)
if "DATAPROFILER_SEED" in os.environ and settings._seed is None:
seed = os.environ.get("DATAPROFILER_SEED")
if seed:
rng = random.Random(int(seed))

while True:
W *= rng.random() ** kinv
# random() < 1.0 but random() ** kinv might not be
# W == 1.0 implies "infinite" skips
if W == 1.0:
break
# skip is geometrically distributed with parameter W
skip = floor(log(rng.random()) / log1p(-W))
try:
newval = next(islice(iterator, skip, skip + 1))
except StopIteration:
break
# Append new, replace old with dummy, and keep track of order
remove_index = rng.randrange(sample_nrows)
values[indices[remove_index]] = str(None)
indices[remove_index] = len(values)
values.append(newval)

values = [values[indices[i]] for i in irange]
return values


def rsample(file_path: TextIOWrapper, sample_nrows: int, args: dict) -> StringIO:
"""
Implement Reservoir Sampling to sample n rows out of a total of M rows.

:param file_path: path of the csv file to be read in
:type file_path: TextIOWrapper
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param args: options to read the csv file
:type args: dict
"""
header = args["header"]
result = []

if header is not None:
result = [[next(file_path) for i in range(header + 1)][-1]]
args["header"] = 0

result += reservoir(file_path, sample_nrows)

fo = StringIO("".join([i if (i[-1] == "\n") else i + "\n" for i in result]))
return fo


def read_csv_df(
file_path: Union[str, BytesIO, TextIOWrapper],
delimiter: Optional[str],
header: Optional[int],
sample_nrows: Optional[int] = None,
selected_columns: List[str] = [],
read_in_string: bool = False,
encoding: Optional[str] = "utf-8",
Expand Down Expand Up @@ -314,19 +414,28 @@ def read_csv_df(

# account for py3.6 requirement for pandas, can remove if >= py3.7
is_buf_wrapped = False
is_file_open = False
if isinstance(file_path, BytesIO):
# a BytesIO stream has to be wrapped in order to properly be detached
# in 3.6 this avoids read_csv wrapping the stream and closing too early
file_path = TextIOWrapper(file_path, encoding=encoding)
is_buf_wrapped = True

fo = pd.read_csv(file_path, **args)
elif isinstance(file_path, str):
file_path = open(file_path, encoding=encoding)
is_file_open = True

file_data = file_path
if sample_nrows:
file_data = rsample(file_path, sample_nrows, args)
fo = pd.read_csv(file_data, **args)
data = fo.read()

# if the buffer was wrapped, detach it before returning
if is_buf_wrapped:
file_path = cast(TextIOWrapper, file_path)
file_path.detach()
elif is_file_open:
file_path.close()
fo.close()

return data
Expand Down
2 changes: 1 addition & 1 deletion dataprofiler/data_readers/graph_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def _format_data_networkx(self) -> nx.Graph:
self.input_file_path,
self._delimiter,
cast(Optional[int], self._header),
[],
selected_columns=[],
read_in_string=True,
encoding=self.file_encoding,
)
Expand Down
4 changes: 3 additions & 1 deletion dataprofiler/labelers/base_data_labeler.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,9 @@ def load_from_library(cls, name: str) -> BaseDataLabeler:
:return: DataLabeler class
:rtype: BaseDataLabeler
"""
return cls(os.path.join(default_labeler_dir, name))
labeler = cls(os.path.join(default_labeler_dir, name))
labeler._default_model_loc = name
return labeler

@classmethod
def load_from_disk(cls, dirpath: str, load_options: dict = None) -> BaseDataLabeler:
Expand Down
5 changes: 4 additions & 1 deletion dataprofiler/labelers/data_labelers.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __new__( # type: ignore
trainable: bool = False,
) -> BaseDataLabeler:
"""
Create structured and unstructred data labeler objects.
Create structured and unstructured data labeler objects.

:param dirpath: Path to load data labeler
:type dirpath: str
Expand Down Expand Up @@ -143,6 +143,9 @@ def load_from_library(cls, name: str, trainable: bool = False) -> BaseDataLabele
"""
if trainable:
return TrainableDataLabeler.load_from_library(name)
for _, labeler_class_obj in cls.labeler_classes.items():
if name in labeler_class_obj._default_model_loc:
return labeler_class_obj()
return BaseDataLabeler.load_from_library(name)

@classmethod
Expand Down
88 changes: 87 additions & 1 deletion dataprofiler/profilers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,98 @@
"""Package for providing statistics and predictions for a given dataset."""
from . import json_decoder
from .base_column_profilers import BaseColumnProfiler
from .categorical_column_profile import CategoricalColumn
from .column_profile_compilers import (
BaseCompiler,
ColumnDataLabelerCompiler,
ColumnPrimitiveTypeProfileCompiler,
ColumnStatsProfileCompiler,
)
from .data_labeler_column_profile import DataLabelerColumn
from .datetime_column_profile import DateTimeColumn
from .float_column_profile import FloatColumn
from .int_column_profile import IntColumn
from .numerical_column_stats import NumericStatsMixin
from .order_column_profile import OrderColumn
from .profile_builder import Profiler, StructuredProfiler, UnstructuredProfiler
from .profile_builder import (
Profiler,
StructuredColProfiler,
StructuredProfiler,
UnstructuredProfiler,
)
from .profiler_options import (
BaseInspectorOptions,
BooleanOption,
CategoricalOptions,
CorrelationOptions,
DataLabelerOptions,
DateTimeOptions,
FloatOptions,
HistogramOption,
HyperLogLogOptions,
IntOptions,
ModeOption,
NumericalOptions,
OrderOptions,
PrecisionOptions,
ProfilerOptions,
RowStatisticsOptions,
StructuredOptions,
TextOptions,
TextProfilerOptions,
UniqueCountOptions,
UnstructuredOptions,
)
from .text_column_profile import TextColumn
from .unstructured_labeler_profile import UnstructuredLabelerProfile

# set here to avoid circular imports
json_decoder._profiles = {
CategoricalColumn.__name__: CategoricalColumn,
FloatColumn.__name__: FloatColumn,
IntColumn.__name__: IntColumn,
DateTimeColumn.__name__: DateTimeColumn,
OrderColumn.__name__: OrderColumn,
DataLabelerColumn.__name__: DataLabelerColumn,
TextColumn.__name__: TextColumn,
}


json_decoder._compilers = {
ColumnDataLabelerCompiler.__name__: ColumnDataLabelerCompiler,
ColumnPrimitiveTypeProfileCompiler.__name__: ColumnPrimitiveTypeProfileCompiler,
ColumnStatsProfileCompiler.__name__: ColumnStatsProfileCompiler,
}

json_decoder._options = {
BooleanOption.__name__: BooleanOption,
HistogramOption.__name__: HistogramOption,
ModeOption.__name__: ModeOption,
BaseInspectorOptions.__name__: BaseInspectorOptions,
NumericalOptions.__name__: NumericalOptions,
IntOptions.__name__: IntOptions,
PrecisionOptions.__name__: PrecisionOptions,
FloatOptions.__name__: FloatOptions,
TextOptions.__name__: TextOptions,
DateTimeOptions.__name__: DateTimeOptions,
OrderOptions.__name__: OrderOptions,
CategoricalOptions.__name__: CategoricalOptions,
CorrelationOptions.__name__: CorrelationOptions,
UniqueCountOptions.__name__: UniqueCountOptions,
HyperLogLogOptions.__name__: HyperLogLogOptions,
RowStatisticsOptions.__name__: RowStatisticsOptions,
DataLabelerOptions.__name__: DataLabelerOptions,
TextProfilerOptions.__name__: TextProfilerOptions,
StructuredOptions.__name__: StructuredOptions,
UnstructuredOptions.__name__: UnstructuredOptions,
ProfilerOptions.__name__: ProfilerOptions,
}


json_decoder._profilers = {
StructuredProfiler.__name__: StructuredProfiler,
}

json_decoder._structured_col_profiler = {
StructuredColProfiler.__name__: StructuredColProfiler,
}
Loading