Skip to content

Commit 44a6101

Browse files
committed
kafka_cdc decorator
1 parent f24db16 commit 44a6101

File tree

3 files changed

+161
-29
lines changed

3 files changed

+161
-29
lines changed

src/mindwm/knfunc/decorators.py

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
from fastapi import FastAPI, Request, Response, status
1313
from fastapi.responses import JSONResponse
1414
from mindwm import logging
15-
from mindwm.model.events import (CloudEvent, IoDocumentEvent, LLMAnswerEvent,
16-
TouchEvent)
15+
from mindwm.model.events import (CloudEvent, IoDocumentEvent, KafkaCdcEvent,
16+
LLMAnswerEvent, TouchEvent)
17+
from mindwm.model.graph import KafkaCdc
1718
from neontology import auto_constrain, init_neontology
1819
from opentelemetry import trace
1920
from opentelemetry._logs import set_logger_provider
@@ -250,3 +251,83 @@ async def inner(**kwargs):
250251
return res
251252

252253
return wrapper
254+
255+
256+
def kafka_cdc(func):
257+
258+
@app.post("/")
259+
async def wrapper(r: Request, response: Response):
260+
func_sig = inspect.signature(func)
261+
xx = [p.annotation for p in func_sig.parameters.values()]
262+
kwargs = dict(func_sig.parameters)
263+
b = await r.body()
264+
logger.debug(f"headers: {r.headers}")
265+
logger.debug(f"body: {b}")
266+
cdc_obj = KafkaCdc.model_validate_json(b)
267+
logger.info(f"cdc_obj: {cdc_obj}")
268+
if 'traceparent' in r.headers.keys():
269+
carrier = r.headers.get('traceparent')
270+
271+
if 'obj' in kwargs:
272+
kwargs['obj'] = cdc_obj
273+
274+
match cdc_obj.payload.type:
275+
case 'relationship':
276+
carrier = cdc_obj.payload.traceparent
277+
case 'node':
278+
if cdc_obj.meta.operation != 'deleted':
279+
carrier = cdc_obj.payload.after.properties.traceparent
280+
else:
281+
carrier = cdc_obj.payload.before.properties.traceparent
282+
283+
@with_trace(carrier=r.headers)
284+
@wraps(func)
285+
async def inner(**kwargs):
286+
287+
value = await func(**kwargs)
288+
289+
logger.debug(f"return value: {value}")
290+
291+
headers = {}
292+
if carrier:
293+
headers = {"traceparent": carrier}
294+
295+
if not value:
296+
return Response(status_code=status.HTTP_200_OK,
297+
headers=headers)
298+
else:
299+
context_name = os.environ.get('CONTEXT_NAME', 'NO_CONTEXT')
300+
301+
obj_ev = CloudEvent.make_obj_event(value)
302+
match cdc_obj.payload.type:
303+
case "node":
304+
if cdc_obj.meta.operation != 'deleted':
305+
event_type = cdc_obj.payload.after.labels[0].lower(
306+
)
307+
else:
308+
event_type = cdc_obj.payload.before.labels[
309+
0].lower()
310+
case "relationship":
311+
event_type = cdc_obj.payload.label.lower()
312+
case _:
313+
event_type = "UNKNOWN"
314+
315+
#logger.info(f"ctx: {span.context}")
316+
ev = CloudEvent(
317+
id=uuid4().hex,
318+
source=
319+
f"mindwm.{context_name}.graph.{ cdc_obj.payload.type.lower() }",
320+
subject=f"{ cdc_obj.meta.operation }",
321+
type=event_type,
322+
data=obj_ev.model_dump(),
323+
traceparent=carrier)
324+
325+
headers['content-type'] = 'application/cloudevents+json'
326+
logger.debug(f"response headers: {headers}")
327+
logger.debug(f"response event: {ev}")
328+
return Response(content=ev.model_dump_json(), headers=headers)
329+
330+
res = await inner(**kwargs)
331+
return res
332+
333+
return wrapper

src/mindwm/model/events.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from pydantic import BaseModel, Field
44

5+
from .graph import KafkaCdc
56
from .objects import IoDocument, LLMAnswer, ShowMessage, Touch, TypeText
67

78

@@ -45,18 +46,24 @@ class TypeTextEvent(BaseEvent):
4546
type: Literal['typetextevent'] = 'typetextevent'
4647

4748

48-
Obj = TypeVar("Obj", IoDocument, Touch, LLMAnswer, ShowMessage, TypeText)
49+
class KafkaCdcEvent(BaseEvent):
50+
data: KafkaCdc
51+
type: Literal['kafkacdcevent'] = 'kafkacdcevent'
52+
53+
54+
Obj = TypeVar("Obj", IoDocument, Touch, LLMAnswer, ShowMessage, TypeText,
55+
KafkaCdc)
4956

5057
ObjEvent = TypeVar("ObjEvent", IoDocumentEvent, TouchEvent, LLMAnswerEvent,
51-
ShowMessageEvent, TypeTextEvent)
58+
ShowMessageEvent, TypeTextEvent, KafkaCdcEvent)
5259

5360

5461
class CloudEvent(BaseEvent):
5562
id: str
5663
source: str
5764
specversion: str = "1.0"
5865
data: Annotated[Union[IoDocumentEvent, TouchEvent, LLMAnswerEvent,
59-
ShowMessageEvent, TypeTextEvent],
66+
ShowMessageEvent, TypeTextEvent, KafkaCdcEvent],
6067
Field(discriminator='type')]
6168
type: Optional[str] = None
6269
datacontenttype: Optional[str] = None
@@ -76,6 +83,10 @@ class CloudEvent(BaseEvent):
7683
Field(min_length=1,
7784
description="a comma-delimited list of key-value pairs")]] = None
7885
knativearrivaltime: Optional[str] = None
86+
key: Optional[str] = None
87+
knativekafkaoffset: Optional[int] = None
88+
knativekafkapartition: Optional[int] = None
89+
partitionkey: Optional[str] = None
7990

8091
@classmethod
8192
def make_obj_event(cls, obj: Type[Obj]) -> Type[ObjEvent]:
@@ -90,6 +101,8 @@ def make_obj_event(cls, obj: Type[Obj]) -> Type[ObjEvent]:
90101
return ShowMessageEvent(data=obj)
91102
case TypeText():
92103
return TypeTextEvent(data=obj)
104+
case KafkaCdc():
105+
return KafkaCdcEvent(data=obj)
93106
case _:
94107
msg = f"unknown object type: {obj}"
95108
raise TypeError(msg)

src/mindwm/model/graph.py

Lines changed: 62 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
1-
from typing import ClassVar, Dict, List, TypeVar, Union
1+
from datetime import datetime
2+
from typing import (Annotated, Any, ClassVar, Dict, List, Literal, Optional,
3+
TypeVar, Union)
24

35
from neontology import BaseNode, BaseRelationship
4-
from pydantic import BaseModel
6+
from pydantic import BaseModel, Field
57

68
from .objects import IoDocument
79

810

911
class MindwmNode(BaseNode):
10-
atime: int = 0
12+
atime: Optional[int] = 0
13+
created: Optional[datetime] = None
14+
merged: Optional[datetime] = None
15+
traceparent: Optional[str] = None
1116
# atime: datetime = Field(
1217
# default_factory=datetime.now
1318
#)
@@ -66,79 +71,85 @@ class Parameter(MindwmNode):
6671

6772

6873
# Relations
69-
class UserHasHost(BaseRelationship):
74+
class MindwmRelationship(BaseRelationship):
75+
traceparent: Optional[str] = None
76+
created: Optional[datetime] = None
77+
merged: Optional[datetime] = None
78+
79+
80+
class UserHasHost(MindwmRelationship):
7081
__relationshiptype__: ClassVar[str] = "HAS_HOST"
7182
source: User
7283
target: Host
7384

7485

75-
class HostHasTmux(BaseRelationship):
86+
class HostHasTmux(MindwmRelationship):
7687
__relationshiptype__: ClassVar[str] = "HAS_TMUX"
7788
source: Host
7889
target: Tmux
7990

8091

81-
class TmuxHasTmuxSession(BaseRelationship):
92+
class TmuxHasTmuxSession(MindwmRelationship):
8293
__relationshiptype__: ClassVar[str] = "HAS_TMUX_SESSION"
8394
source: Tmux
8495
target: TmuxSession
8596

8697

87-
class TmuxSessionHasTmuxPane(BaseRelationship):
98+
class TmuxSessionHasTmuxPane(MindwmRelationship):
8899
__relationshiptype__: ClassVar[str] = "HAS_TMUX_PANE"
89100
source: TmuxSession
90101
target: TmuxPane
91102

92103

93-
class TmuxPaneHasIoDocument(BaseRelationship):
104+
class TmuxPaneHasIoDocument(MindwmRelationship):
94105
__relationshiptype__: ClassVar[str] = "HAS_IO_DOCUMENT"
95106
source: TmuxPane
96107
target: IoDocument
97108

98109

99-
class HostHasClipboard(BaseRelationship):
110+
class HostHasClipboard(MindwmRelationship):
100111
__relationshiptype__: ClassVar[str] = "HAS_CLIPBOARD"
101112
source: Host
102113
target: Clipboard
103114

104115

105-
class UserHasTmux(BaseRelationship):
116+
class UserHasTmux(MindwmRelationship):
106117
__relationshiptype__: ClassVar[str] = "HAS_TMUX"
107118
source: User
108119
target: Tmux
109120

110121

111-
class IoDocumentHasUser(BaseRelationship):
122+
class IoDocumentHasUser(MindwmRelationship):
112123
__relationshiptype__: ClassVar[str] = "HAS_USER"
113124
source: IoDocument
114125
target: User
115126

116127

117-
class UserHasParameter(BaseRelationship):
128+
class UserHasParameter(MindwmRelationship):
118129
__relationshiptype__: ClassVar[str] = "HAS_PARAMETER"
119130
source: User
120131
target: Parameter
121132

122133

123-
class HostHasParameter(BaseRelationship):
134+
class HostHasParameter(MindwmRelationship):
124135
__relationshiptype__: ClassVar[str] = "HAS_PARAMETER"
125136
source: Host
126137
target: Parameter
127138

128139

129-
class TmuxHasParameter(BaseRelationship):
140+
class TmuxHasParameter(MindwmRelationship):
130141
__relationshiptype__: ClassVar[str] = "HAS_PARAMETER"
131142
source: Tmux
132143
target: Parameter
133144

134145

135-
class TmuxSessionHasParameter(BaseRelationship):
146+
class TmuxSessionHasParameter(MindwmRelationship):
136147
__relationshiptype__: ClassVar[str] = "HAS_PARAMETER"
137148
source: TmuxSession
138149
target: Parameter
139150

140151

141-
class TmuxPaneHasParameter(BaseRelationship):
152+
class TmuxPaneHasParameter(MindwmRelationship):
142153
__relationshiptype__: ClassVar[str] = "HAS_PARAMETER"
143154
source: TmuxPane
144155
target: Parameter
@@ -155,19 +166,42 @@ class KafkaCdcMeta(BaseModel):
155166
source: Dict[str, str]
156167

157168

158-
Prop = TypeVar("Prop", User, Host)
169+
Prop = TypeVar("Prop", User, Host, Tmux, TmuxSession, TmuxPane, IoDocument,
170+
Clipboard, Parameter)
171+
172+
173+
class KafkaCdcRelNode(BaseModel):
174+
id: str
175+
labels: List[str]
176+
ids: Dict[str, str]
177+
178+
179+
class KafkaCdcRelProp(BaseModel):
180+
# FIX: more pricise parametrization if needed
181+
properties: Any
182+
183+
184+
class KafkaCdcRelation(BaseModel):
185+
id: int
186+
start: KafkaCdcRelNode
187+
end: KafkaCdcRelNode
188+
before: Optional[KafkaCdcRelProp] = None
189+
after: Optional[KafkaCdcRelProp] = None
190+
label: str
191+
type: Literal['relationship'] = 'relationship'
192+
traceparent: Optional[str] = None
159193

160194

161-
class KafkaCdcPayloadData(BaseModel):
195+
class KafkaCdcNodeData(BaseModel):
162196
properties: Prop
163197
labels: List[str]
164198

165199

166-
class KafkaCdcPayload(BaseModel):
200+
class KafkaCdcNode(BaseModel):
167201
id: int
168-
type: str
169-
before: KafkaCdcPayloadData
170-
after: KafkaCdcPayloadData
202+
type: Literal['node'] = 'node'
203+
before: Optional[KafkaCdcNodeData] = None
204+
after: Optional[KafkaCdcNodeData] = None
171205

172206

173207
class KafkaCdcSchema(BaseModel):
@@ -177,7 +211,8 @@ class KafkaCdcSchema(BaseModel):
177211

178212
class KafkaCdc(BaseModel):
179213
meta: KafkaCdcMeta
180-
payload: KafkaCdcPayload
214+
payload: Annotated[Union[KafkaCdcNode, KafkaCdcRelation],
215+
Field(discriminator='type')]
181216
schema: KafkaCdcSchema
182217

183218
def get_object_before(self):
@@ -187,4 +222,7 @@ def get_object_after(self):
187222
return self.payload.after.properties
188223

189224
def get_object(self):
190-
return self.get_object_after()
225+
if self.payload.meta.operation != 'deleted':
226+
return self.get_object_after()
227+
else:
228+
return self.get_object_before()

0 commit comments

Comments
 (0)