Skip to content

Commit 4dc0aea

Browse files
committed
chore: unify logging usage
1 parent ec068ce commit 4dc0aea

File tree

13 files changed

+77
-100
lines changed

13 files changed

+77
-100
lines changed

langfuse/_client/observe.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import contextvars
33
import inspect
4-
import logging
54
import os
65
from functools import wraps
76
from typing import (
@@ -42,6 +41,7 @@
4241
LangfuseSpan,
4342
LangfuseTool,
4443
)
44+
from langfuse.logger import langfuse_logger as logger
4545
from langfuse.types import TraceContext
4646

4747
F = TypeVar("F", bound=Callable[..., Any])
@@ -69,8 +69,6 @@ class LangfuseDecorator:
6969
- Thread-safe client resolution when multiple Langfuse projects are used
7070
"""
7171

72-
_log = logging.getLogger("langfuse")
73-
7472
@overload
7573
def observe(self, func: F) -> F: ...
7674

@@ -166,7 +164,7 @@ def sub_process():
166164
"""
167165
valid_types = set(get_observation_types_list(ObservationTypeLiteralNoEvent))
168166
if as_type is not None and as_type not in valid_types:
169-
self._log.warning(
167+
logger.warning(
170168
f"Invalid as_type '{as_type}'. Valid types are: {', '.join(sorted(valid_types))}. Defaulting to 'span'."
171169
)
172170
as_type = "span"

langfuse/_task_manager/media_manager.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import logging
21
import os
32
import time
43
from queue import Empty, Full, Queue
@@ -12,6 +11,7 @@
1211
from langfuse._utils import _get_timestamp
1312
from langfuse.api import LangfuseAPI, MediaContentType
1413
from langfuse.api.core import ApiError
14+
from langfuse.logger import langfuse_logger as logger
1515
from langfuse.media import LangfuseMedia
1616

1717
from .media_upload_queue import UploadMediaJob
@@ -21,8 +21,6 @@
2121

2222

2323
class MediaManager:
24-
_log = logging.getLogger("langfuse")
25-
2624
def __init__(
2725
self,
2826
*,
@@ -42,7 +40,7 @@ def __init__(
4240
def process_next_media_upload(self) -> None:
4341
try:
4442
upload_job = self._queue.get(block=True, timeout=1)
45-
self._log.debug(
43+
logger.debug(
4644
f"Media: Processing upload for media_id={upload_job['media_id']} in trace_id={upload_job['trace_id']}"
4745
)
4846
self._process_upload_media_job(data=upload_job)
@@ -51,7 +49,7 @@ def process_next_media_upload(self) -> None:
5149
except Empty:
5250
pass
5351
except Exception as e:
54-
self._log.error(
52+
logger.error(
5553
f"Media upload error: Failed to upload media due to unexpected error. Queue item marked as done. Error: {e}"
5654
)
5755
self._queue.task_done()
@@ -179,7 +177,7 @@ def _process_media(
179177
return
180178

181179
if media._media_id is None:
182-
self._log.error("Media ID is None. Skipping upload.")
180+
logger.error("Media ID is None. Skipping upload.")
183181
return
184182

185183
try:
@@ -198,17 +196,17 @@ def _process_media(
198196
item=upload_media_job,
199197
block=False,
200198
)
201-
self._log.debug(
199+
logger.debug(
202200
f"Queue: Enqueued media ID {media._media_id} for upload processing | trace_id={trace_id} | field={field}"
203201
)
204202

205203
except Full:
206-
self._log.warning(
204+
logger.warning(
207205
f"Queue capacity: Media queue is full. Failed to process media_id={media._media_id} for trace_id={trace_id}. Consider increasing queue capacity."
208206
)
209207

210208
except Exception as e:
211-
self._log.error(
209+
logger.error(
212210
f"Media processing error: Failed to process media_id={media._media_id} for trace_id={trace_id}. Error: {str(e)}"
213211
)
214212

@@ -230,14 +228,14 @@ def _process_upload_media_job(
230228
upload_url = upload_url_response.upload_url
231229

232230
if not upload_url:
233-
self._log.debug(
231+
logger.debug(
234232
f"Media status: Media with ID {data['media_id']} already uploaded. Skipping duplicate upload."
235233
)
236234

237235
return
238236

239237
if upload_url_response.media_id != data["media_id"]:
240-
self._log.error(
238+
logger.error(
241239
f"Media integrity error: Media ID mismatch between SDK ({data['media_id']}) and Server ({upload_url_response.media_id}). Upload cancelled. Please check media ID generation logic."
242240
)
243241

@@ -270,7 +268,7 @@ def _process_upload_media_job(
270268
upload_time_ms=upload_time_ms,
271269
)
272270

273-
self._log.debug(
271+
logger.debug(
274272
f"Media upload: Successfully uploaded media_id={data['media_id']} for trace_id={data['trace_id']} | status_code={upload_response.status_code} | duration={upload_time_ms}ms | size={data['content_length']} bytes"
275273
)
276274

langfuse/_task_manager/media_upload_consumer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
import logging
21
import threading
32

3+
from langfuse.logger import langfuse_logger as logger
4+
45
from .media_manager import MediaManager
56

67

78
class MediaUploadConsumer(threading.Thread):
8-
_log = logging.getLogger("langfuse")
99
_identifier: int
1010
_max_retries: int
1111
_media_manager: MediaManager
@@ -30,15 +30,15 @@ def __init__(
3030

3131
def run(self) -> None:
3232
"""Run the media upload consumer."""
33-
self._log.debug(
33+
logger.debug(
3434
f"Thread: Media upload consumer thread #{self._identifier} started and actively processing queue items"
3535
)
3636
while self.running:
3737
self._media_manager.process_next_media_upload()
3838

3939
def pause(self) -> None:
4040
"""Pause the media upload consumer."""
41-
self._log.debug(
41+
logger.debug(
4242
f"Thread: Pausing media upload consumer thread #{self._identifier}"
4343
)
4444
self.running = False

langfuse/_task_manager/score_ingestion_consumer.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import json
2-
import logging
32
import os
43
import threading
54
import time
@@ -12,6 +11,7 @@
1211
from langfuse._utils.parse_error import handle_exception
1312
from langfuse._utils.request import APIError, LangfuseClient
1413
from langfuse._utils.serializer import EventSerializer
14+
from langfuse.logger import langfuse_logger as logger
1515

1616
from ..version import __version__ as langfuse_version
1717

@@ -27,8 +27,6 @@ class ScoreIngestionMetadata(BaseModel):
2727

2828

2929
class ScoreIngestionConsumer(threading.Thread):
30-
_log = logging.getLogger("langfuse")
31-
3230
def __init__(
3331
self,
3432
*,
@@ -83,7 +81,7 @@ def _next(self) -> list:
8381
try:
8482
json.dumps(event, cls=EventSerializer)
8583
except Exception as e:
86-
self._log.error(
84+
logger.error(
8785
f"Data error: Failed to serialize score object for ingestion. Score will be dropped. Error: {e}"
8886
)
8987
self._ingestion_queue.task_done()
@@ -94,7 +92,7 @@ def _next(self) -> list:
9492

9593
total_size += item_size
9694
if total_size >= MAX_BATCH_SIZE_BYTES:
97-
self._log.debug(
95+
logger.debug(
9896
f"Batch management: Reached maximum batch size limit ({total_size} bytes). Processing {len(events)} events now."
9997
)
10098
break
@@ -103,7 +101,7 @@ def _next(self) -> list:
103101
break
104102

105103
except Exception as e:
106-
self._log.warning(
104+
logger.warning(
107105
f"Data processing error: Failed to process score event in consumer thread #{self._identifier}. Event will be dropped. Error: {str(e)}",
108106
exc_info=True,
109107
)
@@ -117,7 +115,7 @@ def _get_item_size(self, item: Any) -> int:
117115

118116
def run(self) -> None:
119117
"""Run the consumer."""
120-
self._log.debug(
118+
logger.debug(
121119
f"Startup: Score ingestion consumer thread #{self._identifier} started with batch size {self._flush_at} and interval {self._flush_interval}s"
122120
)
123121
while self.running:
@@ -143,7 +141,7 @@ def pause(self) -> None:
143141
self.running = False
144142

145143
def _upload_batch(self, batch: List[Any]) -> None:
146-
self._log.debug(
144+
logger.debug(
147145
f"API: Uploading batch of {len(batch)} score events to Langfuse API"
148146
)
149147

@@ -171,6 +169,6 @@ def execute_task_with_backoff(batch: List[Any]) -> None:
171169
raise e
172170

173171
execute_task_with_backoff(batch)
174-
self._log.debug(
172+
logger.debug(
175173
f"API: Successfully sent {len(batch)} score events to Langfuse API in batch mode"
176174
)

langfuse/_utils/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
"""@private"""
22

3-
import logging
43
import typing
54
from datetime import datetime, timezone
65

6+
from langfuse.logger import langfuse_logger as logger
77
from langfuse.model import PromptClient
88

9-
log = logging.getLogger("langfuse")
9+
log = logger # Legacy re-export
1010

1111

1212
def _get_timestamp() -> datetime:

langfuse/_utils/error_logging.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import functools
2-
import logging
32
from typing import Any, Callable, List, Optional
43

5-
logger = logging.getLogger("langfuse")
4+
from langfuse.logger import langfuse_logger as logger
65

76

87
def catch_and_log_errors(func: Callable[..., Any]) -> Callable[..., Any]:

langfuse/_utils/parse_error.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import logging
21
from typing import Union
32

43
# our own api errors
@@ -14,6 +13,7 @@
1413
UnauthorizedError,
1514
)
1615
from langfuse.api.core import ApiError
16+
from langfuse.logger import langfuse_logger as logger
1717

1818
SUPPORT_URL = "https://langfuse.com/support"
1919
API_DOCS_URL = "https://api.reference.langfuse.com"
@@ -67,10 +67,9 @@ def generate_error_message_fern(error: Error) -> str:
6767

6868

6969
def handle_fern_exception(exception: Error) -> None:
70-
log = logging.getLogger("langfuse")
71-
log.debug(exception)
70+
logger.debug(exception)
7271
error_message = generate_error_message_fern(exception)
73-
log.error(error_message)
72+
logger.error(error_message)
7473

7574

7675
def generate_error_message(exception: Union[APIError, APIErrors, Exception]) -> str:
@@ -95,7 +94,6 @@ def generate_error_message(exception: Union[APIError, APIErrors, Exception]) ->
9594

9695

9796
def handle_exception(exception: Union[APIError, APIErrors, Exception]) -> None:
98-
log = logging.getLogger("langfuse")
99-
log.debug(exception)
97+
logger.debug(exception)
10098
error_message = generate_error_message(exception)
101-
log.error(error_message)
99+
logger.error(error_message)

0 commit comments

Comments
 (0)