Skip to content

Commit fa299d6

Browse files
committed
ctk info table-traffic
1 parent 3816479 commit fa299d6

File tree

3 files changed

+206
-1
lines changed

3 files changed

+206
-1
lines changed

cratedb_toolkit/info/cli.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from cratedb_toolkit import DatabaseCluster
88
from cratedb_toolkit.info.core import InfoContainer, JobInfoContainer, LogContainer
9+
from cratedb_toolkit.info.job import TableTraffic
910
from cratedb_toolkit.util.app import make_cli
1011
from cratedb_toolkit.util.cli import make_command
1112
from cratedb_toolkit.util.data import jd
@@ -85,6 +86,18 @@ def job_information(ctx: click.Context):
8586
jd(sample.to_dict())
8687

8788

89+
@make_command(cli, "table-traffic", "Display information about table use.")
90+
@click.pass_context
91+
def table_traffic(ctx: click.Context):
92+
"""
93+
Display ad hoc job information.
94+
"""
95+
scrub = ctx.meta.get("scrub", False)
96+
dc = DatabaseCluster.from_options(ctx.meta["address"])
97+
traffic = TableTraffic(adapter=dc.adapter)
98+
traffic.render()
99+
100+
88101
@make_command(cli, "serve", help_serve)
89102
@click.option("--listen", type=click.STRING, default=None, help="HTTP server listen address")
90103
@click.option("--reload", is_flag=True, help="Dynamically reload changed files")

cratedb_toolkit/info/job.py

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
import dataclasses
2+
import logging
3+
import time
4+
from typing import List, Any
5+
import polars as pl
6+
import attr
7+
import sqlparse
8+
from boltons.iterutils import flatten
9+
from sqlparse.tokens import Keyword
10+
11+
from cratedb_toolkit.util.database import DatabaseAdapter, get_table_names
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
@attr.define
17+
class Operation:
18+
op: str
19+
stmt: str
20+
tables_symbols: List[str] = attr.field(factory=list)
21+
# tables_effective: List[str] = attr.field(factory=list)
22+
23+
@attr.define
24+
class Operations:
25+
data: List[Operation]
26+
27+
def foo(self):
28+
fj = [attr.asdict(j) for j in self.data]
29+
df = pl.from_records(fj)
30+
print(df)
31+
#grouped = df.group_by("tables_symbols").agg([pl.sum("tables_symbols"), pl.sum("op")])
32+
grouped = df.sql("SELECT tables_symbols, COUNT(op) FROM self GROUP BY tables_symbols")
33+
print(grouped)
34+
35+
36+
class TableTraffic:
37+
38+
def __init__(self, adapter: DatabaseAdapter):
39+
self.adapter = adapter
40+
41+
def read_jobs_database(self, begin: int = 0, end: int = 0):
42+
logger.info("Reading sys.jobs_log")
43+
now = int(time.time() * 1000)
44+
end = end or now
45+
begin = begin or now - 600 * 60 * 1000
46+
stmt = (
47+
f"SELECT "
48+
f"started, ended, classification, stmt, username, node "
49+
f"FROM sys.jobs_log "
50+
f"WHERE "
51+
f"stmt NOT LIKE '%sys.%' AND "
52+
f"stmt NOT LIKE '%information_schema.%' "
53+
f"AND ended BETWEEN {begin} AND {end} "
54+
f"ORDER BY ended ASC"
55+
)
56+
return self.adapter.run_sql(stmt, records=True)
57+
58+
def read_jobs(self, jobs) -> List[Operation]:
59+
result = []
60+
for job in jobs:
61+
sql = job["stmt"]
62+
result.append(self.parse_expression(sql))
63+
return result
64+
65+
@staticmethod
66+
def parse_expression(sql: str) -> Operation:
67+
logger.debug(f"Analyzing SQL: {sql}")
68+
classifier = SqlStatementClassifier(expression=sql)
69+
if not classifier.operation:
70+
logger.warning(f"Unable to determine operation: {sql}")
71+
if not classifier.table_names:
72+
logger.warning(f"Unable to determine table names: {sql}")
73+
return Operation(
74+
op=classifier.operation,
75+
stmt=sql,
76+
tables_symbols=classifier.table_names,
77+
)
78+
79+
def analyze_jobs(self, ops: Operations):
80+
ops.foo()
81+
82+
def render(self):
83+
jobs = self.read_jobs_database()
84+
logger.info(f"Analyzing {len(jobs)} jobs")
85+
ops = Operations(self.read_jobs(jobs))
86+
jobsa = self.analyze_jobs(ops)
87+
logger.info(f"Result: {jobsa}")
88+
89+
90+
@dataclasses.dataclass
91+
class SqlStatementClassifier:
92+
"""
93+
Helper to classify an SQL statement.
94+
95+
Here, most importantly: Provide the `is_dql` property that
96+
signals truthfulness for read-only SQL SELECT statements only.
97+
"""
98+
99+
expression: str
100+
permit_all: bool = False
101+
102+
_parsed_sqlparse: Any = dataclasses.field(init=False, default=None)
103+
104+
def __post_init__(self) -> None:
105+
if self.expression is None:
106+
self.expression = ""
107+
if self.expression:
108+
self.expression = self.expression.strip()
109+
110+
def parse_sqlparse(self) -> List[sqlparse.sql.Statement]:
111+
"""
112+
Parse expression using traditional `sqlparse` library.
113+
"""
114+
if self._parsed_sqlparse is None:
115+
self._parsed_sqlparse = sqlparse.parse(self.expression)
116+
return self._parsed_sqlparse
117+
118+
@property
119+
def is_dql(self) -> bool:
120+
"""
121+
Is it a DQL statement, which effectively invokes read-only operations only?
122+
"""
123+
124+
if not self.expression:
125+
return False
126+
127+
if self.permit_all:
128+
return True
129+
130+
# Check if the expression is valid and if it's a DQL/SELECT statement,
131+
# also trying to consider `SELECT ... INTO ...` and evasive
132+
# `SELECT * FROM users; \uff1b DROP TABLE users` statements.
133+
return self.is_select and not self.is_camouflage
134+
135+
@property
136+
def is_select(self) -> bool:
137+
"""
138+
Whether the expression is an SQL SELECT statement.
139+
"""
140+
return self.operation == "SELECT"
141+
142+
@property
143+
def operation(self) -> str:
144+
"""
145+
The SQL operation: SELECT, INSERT, UPDATE, DELETE, CREATE, etc.
146+
"""
147+
parsed = self.parse_sqlparse()
148+
return parsed[0].get_type().upper()
149+
150+
@property
151+
def table_names(self) -> List[str]:
152+
"""
153+
The SQL operation: SELECT, INSERT, UPDATE, DELETE, CREATE, etc.
154+
"""
155+
return flatten(get_table_names(self.expression))
156+
157+
@property
158+
def is_camouflage(self) -> bool:
159+
"""
160+
Innocent-looking `SELECT` statements can evade filters.
161+
"""
162+
return self.is_select_into or self.is_evasive
163+
164+
@property
165+
def is_select_into(self) -> bool:
166+
"""
167+
Use traditional `sqlparse` for catching `SELECT ... INTO ...` statements.
168+
Examples:
169+
SELECT * INTO foobar FROM bazqux
170+
SELECT * FROM bazqux INTO foobar
171+
"""
172+
# Flatten all tokens (including nested ones) and match on type+value.
173+
statement = self.parse_sqlparse()[0]
174+
return any(
175+
token.ttype is Keyword and token.value.upper() == "INTO"
176+
for token in statement.flatten()
177+
)
178+
179+
@property
180+
def is_evasive(self) -> bool:
181+
"""
182+
Use traditional `sqlparse` for catching evasive SQL statements.
183+
184+
A practice picked up from CodeRabbit was to reject multiple statements
185+
to prevent potential SQL injections. Is it a viable suggestion?
186+
187+
Examples:
188+
189+
SELECT * FROM users; \uff1b DROP TABLE users
190+
"""
191+
parsed = self.parse_sqlparse()
192+
return len(parsed) > 1

cratedb_toolkit/util/database.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,6 @@ def get_table_names(sql: str) -> t.List[t.List[str]]:
483483
for statement in statements:
484484
local_names = []
485485
for table in statement.metadata.tables:
486-
local_names.append(table.name)
486+
local_names.append(table.fqn)
487487
names.append(local_names)
488488
return names

0 commit comments

Comments
 (0)