Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions qlib/data/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ def __setitem__(self, key, value):
self.popitem(last=False)

def __getitem__(self, key):
from qlib.utils.telemetry import metrics

v = self.od.__getitem__(key)
self.od.move_to_end(key)
metrics.counter("cache.mem.hit")
return v

def __contains__(self, key):
Expand Down
15 changes: 13 additions & 2 deletions qlib/data/dataset/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,15 @@ def setup_data(self, enable_cache: bool = False):
the processed data will be saved on disk, and handler will load the cached data from the disk directly
when we call `init` next time
"""
from qlib.utils.telemetry import metrics, tracer

# Setup data.
# _data may be with multiple column index level. The outer level indicates the feature set name
with TimeInspector.logt("Loading data"):
with TimeInspector.logt("Loading data"), tracer.span("data_handler.setup_data"):
# make sure the fetch method is based on an index-sorted pd.DataFrame
self._data = lazy_sort_index(self.data_loader.load(self.instruments, self.start_time, self.end_time))
metrics.gauge("data_handler.rows_loaded", len(self._data))
metrics.gauge("data_handler.columns_loaded", len(self._data.columns))
# TODO: cache

def fetch(
Expand Down Expand Up @@ -531,13 +535,20 @@ def fit_process_data(self):
def _run_proc_l(
df: pd.DataFrame, proc_l: List[processor_module.Processor], with_fit: bool, check_for_infer: bool
) -> pd.DataFrame:
from qlib.utils.telemetry import metrics, tracer

for proc in proc_l:
if check_for_infer and not proc.is_for_infer():
raise TypeError("Only processors usable for inference can be used in `infer_processors` ")
with TimeInspector.logt(f"{proc.__class__.__name__}"):
proc_name = proc.__class__.__name__
with TimeInspector.logt(proc_name), tracer.span(f"processor.{proc_name}"):
rows_before = len(df)
if with_fit:
proc.fit(df)
df = proc(df)
rows_after = len(df)
metrics.gauge("processor.rows_in", rows_before, tags={"processor": proc_name})
metrics.gauge("processor.rows_out", rows_after, tags={"processor": proc_name})
return df

@staticmethod
Expand Down
Loading