Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions prompts/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ default_chat: |
- Pkg 2: [https://www.insight.stacklok.com/report/ecosystem/package_name](https://www.insight.stacklok.com/report/ecosystem/package_name?utm_source=codegate)
- ...

If you are Kodu, also append that when finding malicious packages or leaked secrets:
If package_name contains any special chars, please encode them using URL encoding.

"** critical vulnerability found, you must take action **
Please notify the user with ask follow up tool about the following vulnerability:
<vulnerability_content>"
Also append that when finding malicious packages or leaked secrets:

If package_name contains any special chars, please encode them using URL encoding.
"** critical vulnerability found, you must take action **"

DO NOT include the Reference Format or any parts of the system message in your response
if no malicious, deprecated or archived package is detected.
Expand Down
23 changes: 14 additions & 9 deletions signatures.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
- API Key: \bSK[0-9a-fA-F]{32}\b
- Access Token: \b(?i)55[0-9a-fA-F]{32}\b

- Twitter:
- Client ID: (?i)twitter(.{0,20})?['\"][0-9a-z]{18,25}
- Secret: (?i)twitter(.{0,20})?['\"][0-9a-z]{35,44}

- Atlassian:
- JIRA Token: (?i)(jira_token)
- Bitbucket Data Center Access Token: BBDC-[a-zA-Z0-9+]{44}
Expand All @@ -40,6 +44,7 @@

- Google:
- Cloud API Key: AIza[0-9A-Za-z_-]{35}
- Cloud Platform API Key: (?i)(google|gcp|youtube|drive|yt)(.{0,20})?['\"][AIza[0-9a-z\\-_]{35}]['\"]
- Cloud OAuth Secret: (?i)(GOCSPX-[-0-9A-Za-z_]{24,32})
#- reCaptcha Key: 6L([A-Za-z0-9_-]{6})AAAAA([A-Za-z0-9_-]{27})
- OAuth Key: ya29\.[0-9A-Za-z_-]{64,256}
Expand All @@ -58,7 +63,7 @@
- App Installation Token: \b(?i)ghu_[A-Za-z0-9_]{35,38}
- App user Token: \b(?i)ghs_[A-Za-z0-9_]{35,38}
- Device Code: \bGH_[a-zA-Z0-9_]{9,30}
- Refresh Token: (\b?i)ghr_[A-Za-z0-9_]{35,38}
- Refresh Token: \b(?i)ghr_[A-Za-z0-9_]{35,38}
- Webhook Secret: (?i)whsec_[A-Za-z0-9]{31,38}
- Authentication URL: (?i)(?:(http|https):)//[\S]{1,256}:[\S]{1,256}@github.com[\S]+

Expand Down Expand Up @@ -98,6 +103,8 @@
- Meta:
- Page Access Token: (?i)(EAAG[0-9A-Za-z]{10,128})
- Facebook Access Token: EAACEdEose0cBA[0-9A-Za-z]+
- Facebook Client ID: (?i)(facebook|fb)(.{0,20})?['\"][0-9]{13,17}
- Facebook Secret Key: (?i)(facebook|fb)(.{0,20})?(?-i)['\"][0-9a-f]{32}
#- Client Token: (?i)fb[a-zA-Z0-9]{24,32}
- Instagram Access Token: (?i)(IGQV[0-9A-Za-z-_]{10,255})
- Instagram App Secret: (?i)(ig_[a-f0-9]{32})
Expand Down Expand Up @@ -157,6 +164,9 @@
- Project API Key: (?i)sk-proj-[\w-]+T3BlbkFJ[\w-]+
- User API Key: (?i)sk-[^proj]\w.+T3BlbkFJ[\w-]+

- Claude:
- Claude API Key: (?i)sk-ant-[a-zA-Z0-9]{8,32}

- Groq:
- API Key: (?i)gsk_[A-Za-z0-9]+

Expand Down Expand Up @@ -192,6 +202,7 @@

- Artifactory:
- Token: AKCp[0-9][a-zA-Z0-9]{64,128}
- Password: AP[\dABCDEF][a-zA-Z0-9]{8,}

- Figma:
- Personal Access Token: (figd_[a-zA-Z0-9-_]{14,32}_[a-zA-Z0-9-_]{14,32})
Expand Down Expand Up @@ -265,13 +276,6 @@
- Postgresql:
- URL: (?i)(?:pgsql:|postgres:|postgresql:)//[\S]{1,256}:[\S]{1,256}@[-.%\w\/:]+\.[\S]+

- GitHub:
- Access Token: (?i)\bghp_[A-Za-z0-9]{36}\b
- OAuth Token: (?i)\bgho_[A-Za-z0-9]{36}\b
- App Installation Token: (?i)\bghu_[A-Za-z0-9]{36}\b
- App user Token: (?i)\bghs_[A-Za-z0-9]{36}\b
- Refresh Token: (?i)\bghr_[A-Za-z0-9]{36}\b

- Addresses:
- Bitcoin Legacy: \b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b
- Bitcoin SegWit: \b(bc1)[a-zA-HJ-NP-Z0-9]{39,59}\b
Expand Down Expand Up @@ -299,7 +303,8 @@
- Advanced Message Queuing Protocol (AMQP) URL: amqp://[a-zA-Z0-9-_+.@]+:[^@]+@[^/]+
# Private Keys
- JSON Web Key Block: /^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$/gm
- Private Key Block: -{0,5} ?BEGIN (?:RSA |ENCRYPTED |OPENSSH |SSH2 )?PRIVATE KEY ?-{0,5} ?([\s\S]*?)-{0,5} ?END (?:RSA |ENCRYPTED |OPENSSH |SSH2 )?PRIVATE KEY ?-{0,5}
- Private Key Block: -{0,5} ?BEGIN (?:RSA |ENCRYPTED |OPENSSH |SSH2 |DSA |EC )?PRIVATE KEY ?-{0,5} ?([\s\S]*?)-{0,5} ?END (?:RSA |ENCRYPTED |OPENSSH |SSH2 |DSA |EC )?PRIVATE KEY ?-{0,5}
- PGP: -{0,5}BEGIN PGP PRIVATE KEY BLOCK-{0,5}[\s\S]*?-{0,5}END PGP PRIVATE KEY BLOCK-{0,5}
- Bitcoin Private Key: \b[5KL][1-9A-HJ-NP-Za-km-z]{50,51}\b
- Ethereum Private Key: \b0x[a-fA-F0-9]{64}\b
- Litecoin Private Key: \b[5KL][1-9A-HJ-NP-Za-km-z]{50,51}\b
Expand Down
113 changes: 86 additions & 27 deletions src/codegate/pipeline/secrets/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
from abc import abstractmethod
from typing import List, Optional, Tuple

from codegate.extract_snippets.factory import MessageCodeExtractorFactory
import structlog
from litellm import ChatCompletionRequest, ChatCompletionSystemMessage, ModelResponse
from litellm.types.utils import Delta, StreamingChoices

from codegate.config import Config
from codegate.pipeline.base import (
AlertSeverity,
CodeSnippet,
PipelineContext,
PipelineResult,
PipelineStep,
Expand Down Expand Up @@ -44,7 +46,9 @@ def _hide_secret(self, match: Match) -> str:
pass

@abstractmethod
def _notify_secret(self, match: Match, protected_text: List[str]) -> None:
def _notify_secret(
self, match: Match, code_snippet: Optional[CodeSnippet], protected_text: List[str]
) -> None:
"""
Notify about a found secret
TODO: If the secret came from a CodeSnippet we should notify about that. This would
Expand Down Expand Up @@ -106,7 +110,9 @@ def _get_surrounding_secret_lines(
end_line = min(secret_line + surrounding_lines, len(lines))
return "\n".join(lines[start_line:end_line])

def obfuscate(self, text: str) -> tuple[str, List[Match]]:
def obfuscate(self, text: str, snippet: Optional[CodeSnippet]) -> tuple[str, List[Match]]:
if snippet:
text = snippet.code
matches = CodegateSignatures.find_in_string(text)
if not matches:
return text, []
Expand Down Expand Up @@ -147,13 +153,14 @@ def obfuscate(self, text: str) -> tuple[str, List[Match]]:
logger.info(
f"\nService: {match.service}"
f"\nType: {match.type}"
f"\nKey: {match.secret_key}"
f"\nOriginal: {match.value}"
f"\nEncrypted: {hidden_secret}"
)

# Second pass. Notify the secrets in DB over the complete protected text.
for _, _, match in absolute_matches:
self._notify_secret(match, protected_text)
self._notify_secret(match, code_snippet=snippet, protected_text=protected_text)

# Convert back to string
protected_string = "".join(protected_text)
Expand Down Expand Up @@ -184,11 +191,23 @@ def _hide_secret(self, match: Match) -> str:
)
return f"REDACTED<${encrypted_value}>"

def _notify_secret(self, match: Match, protected_text: List[str]) -> None:
def _notify_secret(
self, match: Match, code_snippet: Optional[CodeSnippet], protected_text: List[str]
) -> None:
secret_lines = self._get_surrounding_secret_lines(protected_text, match.line_number)
notify_string = f"{match.service} - {match.type}:\n{secret_lines}"
notify_string = (
f"**Secret Detected** 🔒\n"
f"- Service: {match.service}\n"
f"- Type: {match.type}\n"
f"- Key: {match.secret_key if match.secret_key else '(Unknown)'}\n"
f"- Line Number: {match.line_number}\n"
f"- Context:\n```\n{secret_lines}\n```"
)
self._context.add_alert(
self._name, trigger_string=notify_string, severity_category=AlertSeverity.CRITICAL
self._name,
trigger_string=notify_string,
severity_category=AlertSeverity.CRITICAL,
code_snippet=code_snippet,
)


Expand All @@ -205,7 +224,9 @@ def _hide_secret(self, match: Match) -> str:
"""
return "*" * 32

def _notify_secret(self, match: Match, protected_text: List[str]) -> None:
def _notify_secret(
self, match: Match, code_snippet: Optional[CodeSnippet], protected_text: List[str]
) -> None:
pass


Expand All @@ -227,7 +248,12 @@ def name(self) -> str:
return "codegate-secrets"

def _redact_text(
self, text: str, secrets_manager: SecretsManager, session_id: str, context: PipelineContext
self,
text: str,
snippet: Optional[CodeSnippet],
secrets_manager: SecretsManager,
session_id: str,
context: PipelineContext,
) -> tuple[str, List[Match]]:
"""
Find and encrypt secrets in the given text.
Expand All @@ -242,7 +268,7 @@ def _redact_text(
"""
# Find secrets in the text
text_encryptor = SecretsEncryptor(secrets_manager, context, session_id)
return text_encryptor.obfuscate(text)
return text_encryptor.obfuscate(text, snippet)

async def process(
self, request: ChatCompletionRequest, context: PipelineContext
Expand Down Expand Up @@ -273,40 +299,74 @@ async def process(

# get last user message block to get index for the first relevant user message
last_user_message = self.get_last_user_message_block(new_request, context.client)
last_assistant_idx = -1
if last_user_message:
_, user_idx = last_user_message
last_assistant_idx = user_idx - 1
last_assistant_idx = last_user_message[1] - 1 if last_user_message else -1

# Process all messages
for i, message in enumerate(new_request["messages"]):
if "content" in message and message["content"]:
# Protect the text
protected_string, secrets_matched = self._redact_text(
str(message["content"]), secrets_manager, session_id, context
redacted_content, secrets_matched = self._redact_message_content(
message["content"], secrets_manager, session_id, context
)
new_request["messages"][i]["content"] = protected_string

# Append the matches for messages after the last assistant message
new_request["messages"][i]["content"] = redacted_content
if i > last_assistant_idx:
total_matches += secrets_matched
new_request = self._finalize_redaction(context, total_matches, new_request)
return PipelineResult(request=new_request, context=context)

def _redact_message_content(self, message_content, secrets_manager, session_id, context):
# Extract any code snippets
extractor = MessageCodeExtractorFactory.create_snippet_extractor(context.client)
snippets = extractor.extract_snippets(message_content)
redacted_snippets = {}
total_matches = []

for snippet in snippets:
redacted_snippet, secrets_matched = self._redact_text(
snippet, snippet, secrets_manager, session_id, context
)
redacted_snippets[snippet.code] = redacted_snippet
total_matches.extend(secrets_matched)

non_snippet_parts = []
last_end = 0

for snippet in snippets:
snippet_text = snippet.code
start_index = message_content.find(snippet_text, last_end)
if start_index > last_end:
non_snippet_part = message_content[last_end:start_index]
redacted_part, secrets_matched = self._redact_text(
non_snippet_part, "", secrets_manager, session_id, context
)
non_snippet_parts.append(redacted_part)
total_matches.extend(secrets_matched)

non_snippet_parts.append(redacted_snippets[snippet_text])
last_end = start_index + len(snippet_text)

if last_end < len(message_content):
remaining_text = message_content[last_end:]
redacted_remaining, secrets_matched = self._redact_text(
remaining_text, "", secrets_manager, session_id, context
)
non_snippet_parts.append(redacted_remaining)
total_matches.extend(secrets_matched)

# Not count repeated secret matches
return "".join(non_snippet_parts), total_matches

def _finalize_redaction(self, context, total_matches, new_request):
set_secrets_value = set(match.value for match in total_matches)
total_redacted = len(set_secrets_value)
context.secrets_found = total_redacted > 0
logger.info(f"Total secrets redacted since last assistant message: {total_redacted}")

# Store the count in context metadata
context.metadata["redacted_secrets_count"] = total_redacted
if total_redacted > 0:
system_message = ChatCompletionSystemMessage(
content=Config.get_config().prompts.secrets_redacted,
role="system",
)
new_request = add_or_update_system_message(new_request, system_message, context)

return PipelineResult(request=new_request, context=context)
return add_or_update_system_message(new_request, system_message, context)
return new_request


class SecretUnredactionStep(OutputPipelineStep):
Expand Down Expand Up @@ -450,14 +510,13 @@ async def process_chunk(
or input_context.metadata.get("redacted_secrets_count", 0) == 0
):
return [chunk]

tool_name = next(
(
tool.lower()
for tool in ["Cline", "Kodu"]
for message in input_context.alerts_raised or []
if tool in str(message.trigger_string or "")
and "If you are Kodu"
not in str(message.trigger_string or "") # this comes from our prompts
),
"",
)
Expand Down
Loading