Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions cloudsplaining/scan/authorization_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import logging
from typing import Any

from cloudsplaining.shared.aws_principal import AWSPrincipal
from cloudsplaining.scan.policy_document import PolicyDocument

from policy_sentry.querying.actions import get_all_action_links
from policy_sentry.querying.all import get_all_service_prefixes

Expand All @@ -19,6 +22,8 @@
from cloudsplaining.scan.user_details import UserDetailList
from cloudsplaining.shared.exclusions import DEFAULT_EXCLUSIONS, Exclusions



all_service_prefixes = get_all_service_prefixes()
logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -98,10 +103,26 @@ def __init__(
"roles": self.role_detail_list.json,
}


self.policies.set_iam_data(iam_data)
self.group_detail_list.set_iam_data(iam_data)
self.user_detail_list.set_iam_data(iam_data)
self.role_detail_list.set_iam_data(iam_data)
from cloudsplaining.scan.policy_document import PolicyDocument

for role_detail in self.role_details:
# Collect all policies (attached + inline)
policy_docs = [p.policy_document for p in role_detail.policies]

# Merge them
merged_doc = PolicyDocument.merge(policy_docs)

if merged_doc:
composite_escalations = merged_doc.allows_privilege_escalation()
role_detail.composite_privilege_escalation_paths = composite_escalations




@property
def inline_policies(self) -> dict[str, dict[str, Any]]:
Expand Down
23 changes: 23 additions & 0 deletions cloudsplaining/scan/policy_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# Licensed under the BSD 3-Clause license.
# For full license text, see the LICENSE file in the repo root
# or https://opensource.org/licenses/BSD-3-Clause
# cloudsplaining/scan/policy_document.py
from copy import deepcopy
from __future__ import annotations

import logging
Expand Down Expand Up @@ -63,6 +65,27 @@ def __init__(
flag_resource_arn_statements=self.flag_resource_arn_statements,
)
)

@staticmethod
def merge_policy_documents(policy_documents):
"""
Merge multiple PolicyDocument objects into a single composite PolicyDocument.
Consolidates all 'Allow' and 'Deny' statements.
"""
if not policy_documents:
return None

merged_data = {"Version": "2012-10-17", "Statement": []}

for policy in policy_documents:
doc = policy.document # already parsed JSON dict
statements = doc.get("Statement", [])
if isinstance(statements, dict):
statements = [statements]
merged_data["Statement"].extend(deepcopy(statements))

return PolicyDocument(merged_data)


@property
def json(self) -> dict[str, Any]:
Expand Down
37 changes: 27 additions & 10 deletions cloudsplaining/scan/role_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import json
import logging
from typing import TYPE_CHECKING, Any
from cloudsplaining.scan.policy_document import PolicyDocument

from policy_sentry.util.arns import get_account_from_arn

Expand Down Expand Up @@ -72,18 +73,34 @@ def __init__(
this_role_path,
)
else:
self.roles.append(
RoleDetail(
role_detail,
policy_details,
exclusions=exclusions,
flag_conditional_statements=self.flag_conditional_statements,
flag_resource_arn_statements=self.flag_resource_arn_statements,
flag_trust_policies=flag_trust_policies,
severity=self.severity,
)
# ✅ Define role_obj first
role_obj = RoleDetail(
role_detail,
policy_details,
exclusions=exclusions,
flag_conditional_statements=self.flag_conditional_statements,
flag_resource_arn_statements=self.flag_resource_arn_statements,
flag_trust_policies=flag_trust_policies,
severity=self.severity,
)

# Append the role to the list
self.roles.append(role_obj)

# -----------------------------
# Composite Privilege Escalation Logic
# -----------------------------
policy_documents = [
p.policy_document
for p in getattr(role_obj, "attached_policies", []) + getattr(role_obj, "inline_policies", [])
]

merged_doc = PolicyDocument.merge(policy_documents)
if merged_doc:
composite_escalations = merged_doc.allows_privilege_escalation()
role_obj.add_composite_escalations(composite_escalations)


def set_iam_data(self, iam_data: dict[str, dict[Any, Any]]) -> None:
self.iam_data = iam_data
for role in self.roles:
Expand Down
42 changes: 42 additions & 0 deletions cloudsplaining/shared/aws_principal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# cloudsplaining/shared/aws_principal.py
"""
AWS Principal Data Model
Holds IAM principal information (User, Role, Group) and privilege escalation findings.
"""

from typing import List


class AWSPrincipal:
"""Base class representing a generic AWS IAM Principal."""

def __init__(self, name: str, arn: str, policies: list):
self.name = name
self.arn = arn
self.policies = policies

# Existing per-policy privilege escalation findings
self.privilege_escalation: List[str] = []

# New composite findings discovered from merged policies
self.composite_privilege_escalation_paths: List[str] = []

def add_composite_escalations(self, escalation_paths: list[str]):
"""Add findings from merged policy analysis."""
if escalation_paths:
self.composite_privilege_escalation_paths.extend(escalation_paths)


class AWSUser(AWSPrincipal):
"""IAM User Principal"""
pass


class AWSRole(AWSPrincipal):
"""IAM Role Principal"""
pass


class AWSGroup(AWSPrincipal):
"""IAM Group Principal"""
pass
8 changes: 8 additions & 0 deletions docs/report/cloudsplaining_limitation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
### Multi-Policy Privilege Escalation Detection

**Previous Limitation:**
Cloudsplaining analyzed each IAM policy independently, missing privilege escalation risks that arise only from the combination of multiple policies attached to a single principal.

**New Capability:**
Cloudsplaining now supports *principal-centric* privilege escalation analysis.
All attached and inline policies are merged before evaluation, enabling detection of composite escalation paths.
9 changes: 8 additions & 1 deletion test/scanning/test_policy_document.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import unittest
import json
from cloudsplaining.scan.policy_document import PolicyDocument
from cloudsplaining.scan.policy_document import PolicyDocument, merge_policy_documents
from cloudsplaining.shared.exclusions import is_name_excluded, Exclusions



class TestPolicyDocument(unittest.TestCase):
def test_policy_document_return_json(self):
test_policy = {
Expand All @@ -22,6 +23,12 @@ def test_policy_document_return_json(self):
result = policy_document.json
# That function returns the Policy as JSON
self.assertEqual(result, test_policy)
def test_merge_policy_documents_combines_allow_and_deny(self):
p1 = PolicyDocument({"Statement": [{"Effect": "Allow", "Action": "iam:PassRole"}]})
p2 = PolicyDocument({"Statement": [{"Effect": "Allow", "Action": "ec2:RunInstances"}]})
merged = merge_policy_documents([p1, p2])
self.assertEqual(len(merged.document["Statement"]), 2)


def test_policy_document_return_statement_results(self):
test_policy = {
Expand Down