Skip to content

Commit 3d54bb4

Browse files
authored
Merge pull request #3 from run-llama/clelia/add-observability
Adding observability dashboard
2 parents f9aad93 + b72424d commit 3d54bb4

File tree

10 files changed

+708
-12
lines changed

10 files changed

+708
-12
lines changed

.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
OPENAI_API_KEY="sk-***"
22
LLAMACLOUD_API_KEY="llx-***"
33
ELEVENLABS_API_KEY="sk_***"
4+
pgql_db="postgres"
5+
pgql_user="localhost"
6+
pgql_psw="admin"

.github/workflows/release.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
name: GitHub Release
2+
3+
on:
4+
push:
5+
tags:
6+
- "v[0-9].[0-9]+.[0-9]+*"
7+
8+
jobs:
9+
release:
10+
runs-on: ubuntu-latest
11+
permissions:
12+
contents: write
13+
14+
steps:
15+
- name: Checkout
16+
uses: actions/checkout@v4
17+
18+
- name: Create GitHub Release
19+
uses: ncipollo/release-action@v1
20+
with:
21+
generateReleaseNotes: true

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ uv run tools/create_llama_cloud_index.py
4040

4141
And you're ready to set up the app!
4242

43+
Launch Postgres and Jaeger:
44+
45+
```bash
46+
docker compose up -d
47+
```
48+
4349
Run the **MCP** server:
4450

4551
```bash

compose.yaml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
name: instrumentation
2+
3+
services:
4+
jaeger:
5+
image: jaegertracing/all-in-one:latest
6+
ports:
7+
- 16686:16686
8+
- 4317:4317
9+
- 4318:4318
10+
- 9411:9411
11+
environment:
12+
- COLLECTOR_ZIPKIN_HOST_PORT=:9411
13+
14+
postgres:
15+
image: postgres
16+
ports:
17+
- 5432:5432
18+
environment:
19+
POSTGRES_DB: $pgql_db
20+
POSTGRES_USER: $pgql_user
21+
POSTGRES_PASSWORD: $pgql_psw
22+
volumes:
23+
- pgdata:/var/lib/postgresql/data
24+
25+
adminer:
26+
image: adminer
27+
ports:
28+
- "8080:8080"
29+
30+
volumes:
31+
pgdata:

pyproject.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[project]
22
name = "notebooklm-clone"
3-
version = "0.1.0"
4-
description = "Add your description here"
3+
version = "0.2.0"
4+
description = "An OSS and LlamaCloud-backed alternative to NotebookLM"
55
readme = "README.md"
66
requires-python = ">=3.13"
77
dependencies = [
@@ -15,10 +15,14 @@ dependencies = [
1515
"llama-index-embeddings-openai>=0.3.1",
1616
"llama-index-indices-managed-llama-cloud>=0.6.11",
1717
"llama-index-llms-openai>=0.4.7",
18+
"llama-index-observability-otel>=0.1.1",
1819
"llama-index-tools-mcp>=0.2.5",
1920
"llama-index-workflows>=1.0.1",
2021
"mypy>=1.16.1",
22+
"opentelemetry-exporter-otlp-proto-http>=1.34.1",
23+
"plotly>=6.2.0",
2124
"pre-commit>=4.2.0",
25+
"psycopg2-binary>=2.9.10",
2226
"pydub>=0.25.1",
2327
"pytest>=8.4.1",
2428
"pytest-asyncio>=1.0.0",

src/notebooklm_clone/Home.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,36 @@
33
import os
44
import asyncio
55
import tempfile as temp
6+
from dotenv import load_dotenv
7+
import time
68
import streamlit.components.v1 as components
79

810
from pathlib import Path
911
from audio import PODCAST_GEN
1012
from typing import Tuple
1113
from workflow import NotebookLMWorkflow, FileInputEvent, NotebookOutputEvent
14+
from instrumentation import OtelTracesSqlEngine
15+
from llama_index.observability.otel import LlamaIndexOpenTelemetry
16+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
17+
OTLPSpanExporter,
18+
)
19+
20+
load_dotenv()
21+
22+
# define a custom span exporter
23+
span_exporter = OTLPSpanExporter("http://0.0.0.0:4318/v1/traces")
24+
25+
# initialize the instrumentation object
26+
instrumentor = LlamaIndexOpenTelemetry(
27+
service_name_or_resource="agent.traces",
28+
span_exporter=span_exporter,
29+
debug=True,
30+
)
31+
sql_engine = OtelTracesSqlEngine(
32+
engine_url=f"postgresql+psycopg2://{os.getenv('pgql_user')}:{os.getenv('pgql_psw')}@localhost:5432/{os.getenv('pgql_db')}",
33+
table_name="agent_traces",
34+
service_name="agent.traces",
35+
)
1236

1337
WF = NotebookLMWorkflow(timeout=600)
1438

@@ -24,6 +48,7 @@ async def run_workflow(file: io.BytesIO) -> Tuple[str, str, str, str, str]:
2448
content = file.getvalue()
2549
with open(fl.name, "wb") as f:
2650
f.write(content)
51+
st_time = int(time.time() * 1000000)
2752
ev = FileInputEvent(file=fl.name)
2853
result: NotebookOutputEvent = await WF.run(start_event=ev)
2954
q_and_a = ""
@@ -34,7 +59,9 @@ async def run_workflow(file: io.BytesIO) -> Tuple[str, str, str, str, str]:
3459
mind_map = result.mind_map
3560
if Path(mind_map).is_file():
3661
mind_map = read_html_file(mind_map)
37-
os.remove(mind_map)
62+
os.remove(result.mind_map)
63+
end_time = int(time.time() * 1000000)
64+
sql_engine.to_sql_database(start_time=st_time, end_time=end_time)
3865
return result.md_content, result.summary, q_and_a, bullet_points, mind_map
3966

4067

@@ -138,3 +165,6 @@ def sync_create_podcast(file_content: str):
138165

139166
else:
140167
st.info("Please upload a PDF file to get started.")
168+
169+
if __name__ == "__main__":
170+
instrumentor.start_registering()
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import requests
2+
import time
3+
import csv
4+
import pandas as pd
5+
import tempfile as temp
6+
import os
7+
8+
from sqlalchemy import Engine, create_engine, Connection, Result
9+
from typing import Optional, Dict, Any, List, Literal, Union, cast
10+
11+
12+
class OtelTracesSqlEngine:
13+
def __init__(
14+
self,
15+
engine: Optional[Engine] = None,
16+
engine_url: Optional[str] = None,
17+
table_name: Optional[str] = None,
18+
service_name: Optional[str] = None,
19+
):
20+
self.service_name: str = service_name or "service"
21+
self.table_name: str = table_name or "otel_traces"
22+
self._connection: Optional[Connection] = None
23+
if engine:
24+
self._engine: Engine = engine
25+
elif engine_url:
26+
self._engine = create_engine(url=engine_url)
27+
else:
28+
raise ValueError("One of engine or engine_setup_kwargs must be set")
29+
30+
def _connect(self) -> None:
31+
self._connection = self._engine.connect()
32+
33+
def _export(
34+
self,
35+
start_time: Optional[int] = None,
36+
end_time: Optional[int] = None,
37+
limit: Optional[int] = None,
38+
) -> Dict[str, Any]:
39+
url = "http://localhost:16686/api/traces"
40+
params = {
41+
"service": self.service_name,
42+
"start": start_time
43+
or int(time.time() * 1000000) - (24 * 60 * 60 * 1000000),
44+
"end": end_time or int(time.time() * 1000000),
45+
"limit": limit or 1000,
46+
}
47+
response = requests.get(url, params=params)
48+
print(response.json())
49+
return response.json()
50+
51+
def _to_pandas(self, data: Dict[str, Any]) -> pd.DataFrame:
52+
rows: List[Dict[str, Any]] = []
53+
# Loop over each trace
54+
for trace in data.get("data", []):
55+
trace_id = trace.get("traceID")
56+
service_map = {
57+
pid: proc.get("serviceName")
58+
for pid, proc in trace.get("processes", {}).items()
59+
}
60+
61+
for span in trace.get("spans", []):
62+
span_id = span.get("spanID")
63+
operation = span.get("operationName")
64+
start = span.get("startTime")
65+
duration = span.get("duration")
66+
process_id = span.get("processID")
67+
service = service_map.get(process_id, "")
68+
status = next(
69+
(
70+
tag.get("value")
71+
for tag in span.get("tags", [])
72+
if tag.get("key") == "otel.status_code"
73+
),
74+
"",
75+
)
76+
parent_span_id = None
77+
if span.get("references"):
78+
parent_span_id = span["references"][0].get("spanID")
79+
80+
rows.append(
81+
{
82+
"trace_id": trace_id,
83+
"span_id": span_id,
84+
"parent_span_id": parent_span_id,
85+
"operation_name": operation,
86+
"start_time": start,
87+
"duration": duration,
88+
"status_code": status,
89+
"service_name": service,
90+
}
91+
)
92+
93+
# Define the CSV header
94+
fieldnames = [
95+
"trace_id",
96+
"span_id",
97+
"parent_span_id",
98+
"operation_name",
99+
"start_time",
100+
"duration",
101+
"status_code",
102+
"service_name",
103+
]
104+
105+
fl = temp.NamedTemporaryFile(suffix=".csv", delete=False, delete_on_close=False)
106+
# Write to CSV
107+
with open(fl.name, "w", newline="") as csvfile:
108+
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
109+
writer.writeheader()
110+
writer.writerows(rows)
111+
112+
df = pd.read_csv(fl)
113+
os.remove(fl.name)
114+
return df
115+
116+
def _to_sql(
117+
self,
118+
dataframe: pd.DataFrame,
119+
if_exists_policy: Optional[Literal["fail", "replace", "append"]] = None,
120+
) -> None:
121+
if not self._connection:
122+
self._connect()
123+
dataframe.to_sql(
124+
name=self.table_name,
125+
con=self._connection,
126+
if_exists=if_exists_policy or "append",
127+
)
128+
129+
def to_sql_database(
130+
self,
131+
start_time: Optional[int] = None,
132+
end_time: Optional[int] = None,
133+
limit: Optional[int] = None,
134+
if_exists_policy: Optional[Literal["fail", "replace", "append"]] = None,
135+
) -> None:
136+
data = self._export(start_time=start_time, end_time=end_time, limit=limit)
137+
df = self._to_pandas(data=data)
138+
self._to_sql(dataframe=df, if_exists_policy=if_exists_policy)
139+
140+
def execute(
141+
self,
142+
statement: Any,
143+
parameters: Optional[Any] = None,
144+
execution_options: Optional[Any] = None,
145+
return_pandas: bool = False,
146+
) -> Union[Result, pd.DataFrame]:
147+
if not self._connection:
148+
self._connect()
149+
if not return_pandas:
150+
self._connection = cast(Connection, self._connection)
151+
return self._connection.execute(
152+
statement=statement,
153+
parameters=parameters,
154+
execution_options=execution_options,
155+
)
156+
return pd.read_sql(sql=statement, con=self._connection)
157+
158+
def to_pandas(
159+
self,
160+
) -> pd.DataFrame:
161+
if not self._connection:
162+
self._connect()
163+
return pd.read_sql_table(table_name=self.table_name, con=self._connection)
164+
165+
def disconnect(self) -> None:
166+
if not self._connection:
167+
raise ValueError("Engine was never connected!")
168+
self._engine.dispose(close=True)

0 commit comments

Comments
 (0)