-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgit_awareness.py
More file actions
197 lines (162 loc) · 6.69 KB
/
git_awareness.py
File metadata and controls
197 lines (162 loc) · 6.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""Sovereign — Git Awareness: the organism watches code evolve.
When a commit lands, the organism sees what changed, understands why
from the message and diff, connects it to memory, and flags security-
relevant changes for proactive alerting.
Not a webhook — a sense. The organism scans repos on heartbeat pulses.
"""
from __future__ import annotations
import logging
import os
import subprocess
import time
from dataclasses import dataclass, field
log = logging.getLogger("sovereign.git_awareness")
_SECURITY_PATTERNS = frozenset({
"auth", "login", "password", "token", "secret", "key",
"encrypt", "decrypt", "permission", "cors", "csrf",
"sanitize", "inject", ".env", "credential", "session",
})
@dataclass
class GitCommit:
hash: str
author: str
message: str
date: str
@dataclass
class DiffSummary:
files_changed: list[str] = field(default_factory=list)
lines_added: int = 0
lines_removed: int = 0
areas: list[str] = field(default_factory=list)
security_relevant: bool = False
is_large: bool = False
@dataclass
class GitPerception:
"""A commit the organism perceived."""
commit: GitCommit
diff: DiffSummary
repo: str
alert: str = ""
class GitAwareness:
"""Watches local git repositories for changes."""
SCAN_INTERVAL = 300 # 5 minutes between scans
def __init__(self, store, notification_system=None) -> None:
self._store = store
self._notifs = notification_system
self._watched_repos: list[str] = []
self._last_commits: dict[str, str] = {} # repo → last seen hash
self._last_scan: float = 0.0
self._first_scan: bool = True # don't alert on existing commits at boot
self._detect_repos()
log.info("GitAwareness initialized (watching %d repos)", len(self._watched_repos))
async def on_pulse(self, pulse_count: int, state: str) -> None:
"""Heartbeat phase — scan repos for new commits."""
now = time.time()
if now - self._last_scan < self.SCAN_INTERVAL:
return
self._last_scan = now
for repo in self._watched_repos:
try:
perceptions = self._scan_repo(repo)
for p in perceptions:
self._remember_commit(p)
if p.alert and not self._first_scan:
log.info("Git security alert: %s", p.alert)
if self._notifs:
# Queue as a thought, not an alert to "system"
self._notifs.queue_thought(
p.commit.author, p.alert
)
except Exception as e:
log.debug("Git scan error for %s: %s", repo, e)
self._first_scan = False
def _detect_repos(self) -> None:
"""Auto-detect git repos under the Agent_System directory."""
base = os.path.expanduser("~/Desktop/Agent_System")
if not os.path.isdir(base):
return
for entry in os.listdir(base):
path = os.path.join(base, entry)
if os.path.isdir(os.path.join(path, ".git")):
self._watched_repos.append(path)
def _scan_repo(self, repo: str) -> list[GitPerception]:
"""Check a repo for new commits since last scan."""
last = self._last_commits.get(repo, "")
if last:
cmd = ["git", "-C", repo, "log", f"{last}..HEAD",
"--format=%H|%an|%s|%ci", "--no-merges", "-10"]
else:
cmd = ["git", "-C", repo, "log", "-3",
"--format=%H|%an|%s|%ci", "--no-merges"]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode != 0:
return []
perceptions = []
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split("|", 3)
if len(parts) < 4:
continue
commit = GitCommit(hash=parts[0], author=parts[1],
message=parts[2], date=parts[3])
diff = self._analyze_commit(repo, commit.hash)
alert = ""
if diff.security_relevant:
alert = (
f"🔒 Security-relevant commit in {os.path.basename(repo)}: "
f"'{commit.message}' ({len(diff.files_changed)} files, "
f"+{diff.lines_added}/-{diff.lines_removed})"
)
perceptions.append(GitPerception(
commit=commit, diff=diff,
repo=os.path.basename(repo), alert=alert,
))
# Update last seen
if perceptions:
self._last_commits[repo] = perceptions[0].commit.hash
return perceptions
def _analyze_commit(self, repo: str, commit_hash: str) -> DiffSummary:
"""Analyze a commit diff for significance."""
result = subprocess.run(
["git", "-C", repo, "diff", "--stat", f"{commit_hash}~1..{commit_hash}"],
capture_output=True, text=True, timeout=10,
)
ds = DiffSummary()
for line in result.stdout.strip().split("\n"):
if "|" in line:
fname = line.split("|")[0].strip()
ds.files_changed.append(fname)
parts = line.split("|")[1].strip().split()
if len(parts) >= 1:
try:
changes = int(parts[0])
ds.lines_added += changes // 2
ds.lines_removed += changes // 2
except ValueError:
pass
# Areas affected
ds.areas = list({f.split("/")[0] for f in ds.files_changed if "/" in f})
# Security check
all_text = " ".join(ds.files_changed).lower()
ds.security_relevant = any(p in all_text for p in _SECURITY_PATTERNS)
# Large change
ds.is_large = (ds.lines_added + ds.lines_removed) > 200
return ds
def _remember_commit(self, p: GitPerception) -> None:
"""Store the perceived commit in memory."""
from .models import MemoryEntry, MemorySource
entry = MemoryEntry(
content=(
f"Git commit in {p.repo}: '{p.commit.message}' by {p.commit.author}. "
f"Changed {len(p.diff.files_changed)} files "
f"(+{p.diff.lines_added}/-{p.diff.lines_removed}). "
f"Areas: {', '.join(p.diff.areas)}"
),
source=MemorySource.AGENT,
provenance_chain=[f"git:{p.repo}"],
)
try:
self._store.save_memory(entry)
except Exception as e:
log.debug("Failed to save git memory: %s", e)