Skip to content

Commit

Permalink
tune client logger/console.print
Browse files Browse the repository at this point in the history
- use rich.console as the global default logger
- remove useless log file for PipelineHandler
  • Loading branch information
tianweidut committed Apr 28, 2023
1 parent 6b5a881 commit f8dd61d
Show file tree
Hide file tree
Showing 40 changed files with 312 additions and 295 deletions.
8 changes: 5 additions & 3 deletions client/starwhale/api/_impl/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import requests
import tenacity
import jsonlines
from loguru import logger
from filelock import FileLock
from jsonlines import Writer
from typing_extensions import Protocol

from starwhale.utils import console
from starwhale.consts import STANDALONE_INSTANCE
from starwhale.utils.fs import ensure_dir
from starwhale.consts.env import SWEnv
Expand Down Expand Up @@ -1333,7 +1333,7 @@ def update_table(
)

if resp.status_code != HTTPStatus.OK:
logger.error(
console.error(
f"[update-table]Table:{table_name}, resp code:{resp.status_code}, \n resp text: {resp.text}, \n records: {records}"
)
resp.raise_for_status()
Expand Down Expand Up @@ -1562,7 +1562,9 @@ def run(self) -> None:
self.table_name, last_schema, to_submit
)
except Exception as e:
logger.exception(e)
from starwhale.utils import console

console.print_exception()
self._queue_run_exceptions.append(e)
if len(self._queue_run_exceptions) > self._run_exceptions_limits:
break
Expand Down
7 changes: 3 additions & 4 deletions client/starwhale/api/_impl/dataset/builder/mapping_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
from binascii import crc32
from collections import defaultdict

from loguru import logger

from starwhale.utils import console
from starwhale.consts import STANDALONE_INSTANCE
from starwhale.base.uri import URI
from starwhale.utils.fs import (
Expand Down Expand Up @@ -105,7 +104,7 @@ def __exit__(
trace: TracebackType,
) -> None:
if value: # pragma: no cover
logger.warning(f"type:{type}, exception:{value}, traceback:{trace}")
console.warning(f"type:{type}, exception:{value}, traceback:{trace}")

self.close()

Expand Down Expand Up @@ -254,7 +253,7 @@ def __exit__(
trace: TracebackType,
) -> None:
if value: # pragma: no cover
logger.warning(f"type:{type}, exception:{value}, traceback:{trace}")
console.warning(f"type:{type}, exception:{value}, traceback:{trace}")

self.close()

Expand Down
11 changes: 2 additions & 9 deletions client/starwhale/api/_impl/dataset/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
import threading
from functools import total_ordering

import loguru
from loguru import logger as _logger

from starwhale.utils import console
from starwhale.consts import HTTPMethod
from starwhale.base.uri import URI
from starwhale.base.type import URIType, InstanceType
Expand Down Expand Up @@ -145,15 +143,13 @@ def __init__(
dataset_uri: URI,
start: t.Optional[t.Any] = None,
end: t.Optional[t.Any] = None,
logger: t.Optional[loguru.Logger] = None,
session_consumption: t.Optional[TabularDatasetSessionConsumption] = None,
cache_size: int = _DEFAULT_LOADER_CACHE_SIZE,
num_workers: int = 2,
dataset_scan_revision: str = "",
field_transformer: t.Optional[t.Dict] = None,
):
self.dataset_uri = dataset_uri
self.logger = logger or _logger
self.start = start
self.end = end
self.dataset_scan_revision = dataset_scan_revision
Expand Down Expand Up @@ -329,7 +325,7 @@ def __iter__(
else:
yield row

self.logger.debug(
console.debug(
"queue details:"
f"meta fetcher(qsize:{meta_fetched_queue.qsize()}, alive: {meta_fetcher.is_alive()}), "
f"row unpackers(qsize:{row_unpacked_queue.qsize()}, alive: {[t.is_alive() for t in rows_unpackers]})"
Expand All @@ -349,13 +345,11 @@ def get_data_loader(
start: t.Optional[t.Any] = None,
end: t.Optional[t.Any] = None,
session_consumption: t.Optional[TabularDatasetSessionConsumption] = None,
logger: t.Optional[loguru.Logger] = None,
cache_size: int = _DEFAULT_LOADER_CACHE_SIZE,
num_workers: int = 2,
dataset_scan_revision: str = "",
field_transformer: t.Optional[t.Dict] = None,
) -> DataLoader:

if session_consumption:
sc_start = session_consumption.session_start # type: ignore
sc_end = session_consumption.session_end # type: ignore
Expand All @@ -372,7 +366,6 @@ def get_data_loader(
start=start,
end=end,
session_consumption=session_consumption,
logger=logger or _logger,
cache_size=cache_size,
num_workers=num_workers,
dataset_scan_revision=dataset_scan_revision,
Expand Down
7 changes: 3 additions & 4 deletions client/starwhale/api/_impl/dataset/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
from itertools import islice

import yaml
from loguru import logger

from starwhale.utils import now_str, convert_to_bytes, gen_uniq_version
from starwhale.utils import console, now_str, convert_to_bytes, gen_uniq_version
from starwhale.consts import (
FileDesc,
HTTPMethod,
Expand Down Expand Up @@ -250,7 +249,7 @@ def __exit__(
trace: TracebackType,
) -> None:
if value: # pragma: no cover
logger.warning(f"type:{type}, exception:{value}, traceback:{trace}")
console.warning(f"type:{type}, exception:{value}, traceback:{trace}")

self.close()

Expand Down Expand Up @@ -919,7 +918,7 @@ def _submit_cloud_version(manifest_path: Path) -> None:
info_revision = _save_info()
manifest_path = _dump_manifest(dataset_revision, info_revision)

logger.debug(
console.debug(
f"dataset commit: revision-{dataset_revision}, info revision-{info_revision}"
)
if self.project_uri.instance == STANDALONE_INSTANCE:
Expand Down
85 changes: 11 additions & 74 deletions client/starwhale/api/_impl/evaluation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from __future__ import annotations

import io
import sys
import time
import typing as t
import logging
import threading
from abc import ABCMeta, abstractmethod
from types import TracebackType
Expand All @@ -13,29 +10,19 @@

import jsonlines

from starwhale.utils import now_str
from starwhale.utils import console, now_str
from starwhale.consts import RunStatus, CURRENT_FNAME, DecoratorInjectAttr
from starwhale.base.uri import URI
from starwhale.utils.fs import ensure_dir, ensure_file
from starwhale.api._impl import wrapper
from starwhale.base.type import URIType, RunSubDirType
from starwhale.utils.log import StreamWrapper
from starwhale.api.service import Input, Output, Service
from starwhale.utils.error import ParameterError, FieldTypeOrValueError
from starwhale.base.context import Context
from starwhale.core.job.store import JobStorage
from starwhale.api._impl.dataset import Dataset
from starwhale.core.dataset.tabular import TabularDatasetRow

if t.TYPE_CHECKING:
import loguru


class _LogType:
SW = "starwhale"
USER = "user"


_jl_writer: t.Callable[[Path], jsonlines.Writer] = lambda p: jsonlines.open(
str((p).resolve()), mode="w"
)
Expand Down Expand Up @@ -68,63 +55,19 @@ def __init__(
_logdir / RunSubDirType.RUNLOG / self.context.step / str(self.context.index)
)
self.status_dir = _run_dir / RunSubDirType.STATUS
self.log_dir = _run_dir / RunSubDirType.LOG
ensure_dir(self.status_dir)
ensure_dir(self.log_dir)

self.logger, self._sw_logger = self._init_logger(self.log_dir)
self._stdout_changed = False
self._stderr_changed = False
self._orig_stdout = sys.stdout
self._orig_stderr = sys.stderr
# TODO: split status/result files
self._timeline_writer = _jl_writer(self.status_dir / "timeline")

# TODO: use EvaluationLogStore to refactor this?
self.evaluation_store = wrapper.Evaluation(
eval_id=self.context.version, project=self.context.project
)
self._monkey_patch()
self._update_status(RunStatus.START)

def _init_logger(
self, log_dir: Path, rotation: str = "500MB"
) -> t.Tuple[loguru.Logger, loguru.Logger]:
# TODO: remove logger first?
# TODO: add custom log format, include daemonset pod name
from loguru import logger as _logger

# TODO: configure log rotation size
_logger.add(
log_dir / "{time}.log",
rotation=rotation,
backtrace=True,
diagnose=True,
serialize=True,
)
_logger.bind(
type=_LogType.USER,
task_id=self.context.index,
job_id=self.context.version,
)
_sw_logger = _logger.bind(type=_LogType.SW)
return _logger, _sw_logger

def _monkey_patch(self) -> None:
if not isinstance(sys.stdout, StreamWrapper) and isinstance(
sys.stdout, io.TextIOWrapper
):
sys.stdout = StreamWrapper(sys.stdout, self.logger, logging.INFO) # type: ignore
self._stdout_changed = True

if not isinstance(sys.stderr, StreamWrapper) and isinstance(
sys.stderr, io.TextIOWrapper
):
sys.stderr = StreamWrapper(sys.stderr, self.logger, logging.WARN) # type: ignore
self._stderr_changed = True

def __str__(self) -> str:
return f"PipelineHandler status@{self.status_dir}, " f"log@{self.log_dir}"
return f"PipelineHandler status@{self.status_dir}"

def __enter__(self) -> PipelineHandler:
return self
Expand All @@ -135,16 +78,10 @@ def __exit__(
value: t.Optional[BaseException],
trace: TracebackType,
) -> None:
self._sw_logger.debug(
f"execute {self.context.step}-{self.context.index} exit func..."
)
console.debug(f"execute {self.context.step}-{self.context.index} exit func...")
if value: # pragma: no cover
print(f"type:{type}, exception:{value}, traceback:{trace}")
console.warning(f"type:{type}, exception:{value}, traceback:{trace}")

if self._stdout_changed:
sys.stdout = self._orig_stdout
if self._stderr_changed:
sys.stderr = self._orig_stderr
self._timeline_writer.close()

@abstractmethod
Expand All @@ -160,15 +97,15 @@ def _record_status(func): # type: ignore
@wraps(func) # type: ignore
def _wrapper(*args: t.Any, **kwargs: t.Any) -> None:
self: PipelineHandler = args[0]
self._sw_logger.info(
console.info(
f"start to run {func.__name__} function@{self.context.step}-{self.context.index} ..." # type: ignore
)
self._update_status(RunStatus.RUNNING)
try:
func(*args, **kwargs) # type: ignore
except Exception as e:
except Exception:
self._update_status(RunStatus.FAILED)
self._sw_logger.exception(f"{func} abort, exception: {e}")
console.print_exception()
raise
else:
self._update_status(RunStatus.SUCCESS)
Expand All @@ -184,7 +121,7 @@ def _starwhale_internal_run_cmp(self) -> None:
else:
self.cmp()
except Exception as e:
self._sw_logger.exception(f"cmp exception: {e}")
console.exception(f"cmp exception: {e}")
self._timeline_writer.write(
{"time": now, "status": False, "exception": str(e)}
)
Expand Down Expand Up @@ -233,7 +170,7 @@ def _starwhale_internal_run_ppl(self) -> None:
]
except Exception as e:
_exception = e
self._sw_logger.exception(
console.exception(
f"[{[r.index for r in rows]}] data handle -> failed"
)
if not self.ignore_error:
Expand All @@ -246,7 +183,7 @@ def _starwhale_internal_run_ppl(self) -> None:
cnt += 1
_idx_with_ds = f"{_uri.object}{join_str}{_idx}"

self._sw_logger.debug(
console.debug(
f"[{_idx_with_ds}] use {time.time() - _start:.3f}s, session-id:{self.context.version} @{self.context.step}-{self.context.index}"
)

Expand Down Expand Up @@ -278,7 +215,7 @@ def _starwhale_internal_run_ppl(self) -> None:
if self.flush_result and self.ppl_auto_log:
self.evaluation_store.flush_result()

self._sw_logger.info(
console.info(
f"{self.context.step}-{self.context.index} handled {cnt} data items for dataset {self.dataset_uris}"
)

Expand Down
3 changes: 0 additions & 3 deletions client/starwhale/api/_impl/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from collections import defaultdict

import yaml
from loguru import logger

from starwhale.consts import DecoratorInjectAttr
from starwhale.utils.fs import ensure_file
Expand Down Expand Up @@ -270,8 +269,6 @@ def generate_jobs_yaml(
package_dir: t.Union[Path, str],
yaml_path: t.Union[Path, str],
) -> None:
logger.debug(f"ingest run_handlers {search_modules} at {package_dir}")

expanded_handlers = Handler.get_registered_handlers_with_expanded_needs(
search_modules, Path(package_dir)
)
Expand Down
9 changes: 5 additions & 4 deletions client/starwhale/api/_impl/track/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from collections import defaultdict

import psutil
from loguru import logger

from starwhale.utils import console
from starwhale.consts import FMT_DATETIME
from starwhale.utils.venv import guess_current_py_env
from starwhale.utils.error import NotFoundError
Expand Down Expand Up @@ -84,8 +84,9 @@ def run(self) -> None:
try:
ret = _action()
self.tracker._log_params(ret, source=_TrackSource.SYSTEM)
except Exception as e:
logger.exception(f"failed to inspect {_info}: {e}")
except Exception:
console.print(":warning: [red]{_info}[/red] inspect failed")
console.print_exception()

# TODO: tune the accuracy of inspect and report interval
last_inspect_time = last_report_time = time.monotonic()
Expand All @@ -106,7 +107,7 @@ def run(self) -> None:
self._report_metrics()
last_report_time = time.monotonic()
except Exception as e:
logger.exception(e)
console.print_exception()
self._run_exceptions.append(e)
self._raise_run_exceptions()

Expand Down
5 changes: 2 additions & 3 deletions client/starwhale/api/_impl/track/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
from pathlib import Path

import yaml
from loguru import logger

from starwhale.utils import now_str, random_str, gen_uniq_version
from starwhale.utils import console, now_str, random_str, gen_uniq_version
from starwhale.consts import SW_AUTO_DIRNAME, DEFAULT_MANIFEST_NAME
from starwhale.base.uri import URI
from starwhale.utils.fs import ensure_dir, ensure_file
Expand Down Expand Up @@ -133,7 +132,7 @@ def __exit__(
trace: TracebackType,
) -> None:
if value: # pragma: no cover
logger.warning(f"type:{type}, exception:{value}, traceback:{trace}")
console.warning(f"type:{type}, exception:{value}, traceback:{trace}")

self.end()

Expand Down
Loading

0 comments on commit f8dd61d

Please sign in to comment.