Skip to content

Commit 9ba2a90

Browse files
CopilotmaniSbindra
andcommitted
Enhance MCP server with improved image analysis prompts and database integration
Co-authored-by: maniSbindra <6338721+maniSbindra@users.noreply.github.com>
1 parent bf4529a commit 9ba2a90

File tree

1 file changed

+222
-29
lines changed

1 file changed

+222
-29
lines changed

mcp_server.py

Lines changed: 222 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,13 @@ async def _handle_tools_list(self, request_id: str) -> Dict[str, Any]:
142142
},
143143
{
144144
"name": "analyze_image",
145-
"description": "Analyze a specific container image and get recommendations for alternatives",
145+
"description": "Analyze a container image and get secure alternatives. If the image exists in our database, provides detailed security analysis. If not found, extracts language information and provides secure recommendations.",
146146
"inputSchema": {
147147
"type": "object",
148148
"properties": {
149149
"image_name": {
150150
"type": "string",
151-
"description": "Full container image name (e.g., docker.io/library/python:3.12-slim)",
151+
"description": "Full container image name (e.g., docker.io/library/python:3.12-slim, mcr.microsoft.com/azurelinux/base/python:3.12). The tool will check if this image exists in our security database and provide detailed analysis or fallback to language-based recommendations.",
152152
},
153153
"limit": {
154154
"type": "integer",
@@ -312,44 +312,217 @@ async def _analyze_image(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
312312
image_name = arguments.get("image_name")
313313
limit = arguments.get("limit", 5)
314314

315-
# For now, we'll extract requirements from the image name and get recommendations
316-
# In a full implementation, this would actually analyze the image
315+
# Check if the image exists in our database
316+
db = ImageDatabase(self.db_path)
317+
try:
318+
existing_image = db.get_image_by_exact_name(image_name)
319+
320+
if existing_image:
321+
# Image exists in database - provide detailed analysis
322+
return await self._analyze_existing_image(existing_image, limit)
323+
else:
324+
# Image not in database - extract language and provide fallback recommendations
325+
return await self._analyze_unknown_image(image_name, limit)
326+
327+
finally:
328+
db.close()
329+
330+
async def _analyze_existing_image(
331+
self, image_data: Dict[str, Any], limit: int
332+
) -> Dict[str, Any]:
333+
"""Analyze an image that exists in our database"""
334+
image_name = image_data.get("image_name") or image_data.get("name")
335+
languages = (
336+
image_data.get("languages", "").split(",")
337+
if image_data.get("languages")
338+
else []
339+
)
340+
primary_language = languages[0] if languages else None
341+
342+
# Get current vulnerability status
343+
current_vulns = {
344+
"total": image_data.get("total_vulnerabilities", 0),
345+
"critical": image_data.get("critical_vulnerabilities", 0),
346+
"high": image_data.get("high_vulnerabilities", 0),
347+
"medium": image_data.get("medium_vulnerabilities", 0),
348+
"low": image_data.get("low_vulnerabilities", 0),
349+
}
350+
351+
# Determine security level for recommendations
352+
security_level = "high"
353+
if current_vulns["critical"] > 0 or current_vulns["high"] > 5:
354+
security_level = "maximum"
355+
elif current_vulns["total"] > 20:
356+
security_level = "high"
357+
358+
# Get alternative recommendations
359+
alternatives = []
360+
if primary_language:
361+
requirement = UserRequirement(
362+
language=primary_language, security_level=security_level
363+
)
364+
recommendations = self.recommendation_engine.recommend(requirement)[:limit]
365+
366+
for rec in recommendations:
367+
rec_vulns = rec.analysis_data.get("vulnerabilities", {})
368+
security_improvement = self._calculate_security_improvement(
369+
current_vulns, rec_vulns
370+
)
371+
372+
alternatives.append(
373+
{
374+
"image_name": rec.image_name,
375+
"score": rec.score,
376+
"reasoning": rec.reasoning,
377+
"security_improvement": security_improvement,
378+
"vulnerabilities": rec_vulns,
379+
"size_mb": round(
380+
rec.analysis_data.get("manifest", {}).get("size", 0)
381+
/ (1024 * 1024),
382+
1,
383+
),
384+
}
385+
)
386+
387+
return {
388+
"analyzed_image": image_name,
389+
"image_found_in_database": True,
390+
"current_image_analysis": {
391+
"languages": languages,
392+
"vulnerabilities": current_vulns,
393+
"size_mb": round(image_data.get("size_bytes", 0) / (1024 * 1024), 1),
394+
"base_os": {
395+
"name": image_data.get("base_os_name"),
396+
"version": image_data.get("base_os_version"),
397+
},
398+
"scan_date": image_data.get("scan_timestamp"),
399+
},
400+
"security_assessment": self._assess_security_level(current_vulns),
401+
"alternatives": alternatives,
402+
"recommendation_note": (
403+
f"Found {len(alternatives)} alternative images with better security profiles"
404+
if alternatives
405+
else "No alternative recommendations available for this language"
406+
),
407+
}
317408

318-
# Simple extraction of language from image name
409+
async def _analyze_unknown_image(
410+
self, image_name: str, limit: int
411+
) -> Dict[str, Any]:
412+
"""Analyze an image that's not in our database"""
413+
# Extract language from image name
319414
language = self._extract_language_from_image(image_name)
415+
version = self._extract_version_from_image(image_name)
416+
320417
if not language:
321418
return {
322-
"error": "Could not determine language from image name",
323-
"image_name": image_name,
324-
"suggestions": "Please provide language explicitly using recommend_images tool",
419+
"analyzed_image": image_name,
420+
"image_found_in_database": False,
421+
"error": "Could not determine programming language from image name",
422+
"message": "This image is not in our security database and we couldn't automatically detect the programming language.",
423+
"suggestions": [
424+
"Use the 'recommend_images' tool with explicit language parameter",
425+
"Provide a more specific image name that includes the language (e.g., python:3.12, node:18, etc.)",
426+
"Check if you meant a similar image name that might be in our database",
427+
],
428+
"fallback_action": "Please specify the programming language to get secure image recommendations",
325429
}
326430

327-
# Create a requirement based on detected language
328-
requirement = UserRequirement(language=language, security_level="high")
431+
# Create requirement based on detected language
432+
requirement = UserRequirement(
433+
language=language, version=version, security_level="high"
434+
)
329435

330-
recommendations = self.recommendation_engine.recommend(requirement)
331-
recommendations = recommendations[:limit]
436+
recommendations = self.recommendation_engine.recommend(requirement)[:limit]
332437

333438
return {
334439
"analyzed_image": image_name,
440+
"image_found_in_database": False,
335441
"detected_language": language,
442+
"detected_version": version,
443+
"message": f"This image is not in our security database, but we detected it as a {language} image.",
444+
"security_recommendation": f"Consider switching to one of these verified secure {language} base images:",
336445
"alternatives": [
337446
{
338447
"image_name": rec.image_name,
339448
"score": rec.score,
340449
"reasoning": rec.reasoning,
341-
"security_improvement": (
342-
"Lower vulnerability count"
343-
if rec.analysis_data.get("vulnerabilities", {}).get("total", 0)
344-
< 50
345-
else "Similar security profile"
346-
),
450+
"security_benefit": "Pre-analyzed and verified secure image",
347451
"vulnerabilities": rec.analysis_data.get("vulnerabilities", {}),
452+
"size_mb": round(
453+
rec.analysis_data.get("manifest", {}).get("size", 0)
454+
/ (1024 * 1024),
455+
1,
456+
),
348457
}
349458
for rec in recommendations
350459
],
460+
"note": "These recommendations are based on language detection. For more specific recommendations, use the 'recommend_images' tool with your exact requirements.",
351461
}
352462

463+
def _calculate_security_improvement(
464+
self, current_vulns: Dict[str, int], alternative_vulns: Dict[str, int]
465+
) -> str:
466+
"""Calculate and describe security improvement between images"""
467+
current_total = current_vulns.get("total", 0)
468+
alt_total = alternative_vulns.get("total", 0)
469+
current_critical = current_vulns.get("critical", 0)
470+
alt_critical = alternative_vulns.get("critical", 0)
471+
current_high = current_vulns.get("high", 0)
472+
alt_high = alternative_vulns.get("high", 0)
473+
474+
if alt_critical == 0 and current_critical > 0:
475+
return f"Eliminates {current_critical} critical vulnerabilities"
476+
elif alt_critical < current_critical:
477+
return f"Reduces critical vulnerabilities from {current_critical} to {alt_critical}"
478+
elif alt_high == 0 and current_high > 0:
479+
return f"Eliminates {current_high} high-severity vulnerabilities"
480+
elif alt_total < current_total:
481+
reduction = current_total - alt_total
482+
return f"Reduces total vulnerabilities by {reduction} ({current_total}{alt_total})"
483+
elif alt_total == current_total:
484+
return "Similar security profile"
485+
else:
486+
return "May have different security characteristics"
487+
488+
def _assess_security_level(self, vulnerabilities: Dict[str, int]) -> str:
489+
"""Assess the security level of an image based on vulnerabilities"""
490+
critical = vulnerabilities.get("critical", 0)
491+
high = vulnerabilities.get("high", 0)
492+
total = vulnerabilities.get("total", 0)
493+
494+
if critical > 0:
495+
return f"HIGH RISK: {critical} critical vulnerabilities found"
496+
elif high > 5:
497+
return f"MEDIUM-HIGH RISK: {high} high-severity vulnerabilities"
498+
elif high > 0:
499+
return f"MEDIUM RISK: {high} high-severity vulnerabilities"
500+
elif total > 20:
501+
return f"LOW-MEDIUM RISK: {total} total vulnerabilities (no critical/high)"
502+
elif total > 0:
503+
return f"LOW RISK: {total} low-medium vulnerabilities"
504+
else:
505+
return "EXCELLENT: No known vulnerabilities"
506+
507+
def _extract_version_from_image(self, image_name: str) -> Optional[str]:
508+
"""Extract version from image name"""
509+
import re
510+
511+
# Common version patterns
512+
patterns = [
513+
r":(\d+\.\d+(?:\.\d+)?)", # :3.12, :18.19.0
514+
r":(\d+)", # :3, :18
515+
r"-(\d+\.\d+)", # -3.12
516+
r"(\d+\.\d+)$", # ending with version
517+
]
518+
519+
for pattern in patterns:
520+
match = re.search(pattern, image_name)
521+
if match:
522+
return match.group(1)
523+
524+
return None
525+
353526
async def _search_images(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
354527
"""Search for images based on criteria"""
355528
language = arguments.get("language")
@@ -434,19 +607,39 @@ async def _search_images(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
434607
db.close()
435608

436609
def _extract_language_from_image(self, image_name: str) -> Optional[str]:
437-
"""Extract language from image name"""
610+
"""Extract language from image name with improved detection"""
438611
image_lower = image_name.lower()
439612

440-
if "python" in image_lower:
441-
return "python"
442-
elif "node" in image_lower or "nodejs" in image_lower:
443-
return "nodejs"
444-
elif "java" in image_lower:
445-
return "java"
446-
elif "golang" in image_lower or "go:" in image_lower:
447-
return "go"
448-
elif "dotnet" in image_lower or ".net" in image_lower:
449-
return "dotnet"
613+
# More comprehensive language detection patterns
614+
language_patterns = {
615+
"python": ["python", "py"],
616+
"nodejs": ["node", "nodejs", "npm"],
617+
"java": ["java", "openjdk", "oracle-java", "adoptopenjdk"],
618+
"go": ["golang", "go:", "/go:", "-go:", "go-"],
619+
"dotnet": ["dotnet", ".net", "aspnet", "mcr.microsoft.com/dotnet"],
620+
"php": ["php", "php-fpm"],
621+
"ruby": ["ruby", "rails"],
622+
"rust": ["rust", "cargo"],
623+
"perl": ["perl"],
624+
}
625+
626+
# Check each language pattern
627+
for language, patterns in language_patterns.items():
628+
for pattern in patterns:
629+
if pattern in image_lower:
630+
return language
631+
632+
# Check for language-specific registry patterns
633+
if "mcr.microsoft.com" in image_lower:
634+
# Microsoft Container Registry specific mappings
635+
if "/python" in image_lower:
636+
return "python"
637+
elif "/nodejs" in image_lower or "/node" in image_lower:
638+
return "nodejs"
639+
elif "/java" in image_lower:
640+
return "java"
641+
elif "/dotnet" in image_lower:
642+
return "dotnet"
450643

451644
return None
452645

0 commit comments

Comments
 (0)