1
+ """Code to identify if a principal in an AWS account can use access to ECS to access other principals."""
2
+
3
+
4
+ # Copyright (c) NCC Group and Erik Steringer 2022. This file is part of Principal Mapper.
5
+ #
6
+ # Principal Mapper is free software: you can redistribute it and/or modify
7
+ # it under the terms of the GNU Affero General Public License as published by
8
+ # the Free Software Foundation, either version 3 of the License, or
9
+ # (at your option) any later version.
10
+ #
11
+ # Principal Mapper is distributed in the hope that it will be useful,
12
+ # but WITHOUT ANY WARRANTY; without even the implied warranty of
13
+ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
+ # GNU Affero General Public License for more details.
15
+ #
16
+ # You should have received a copy of the GNU Affero General Public License
17
+ # along with Principal Mapper. If not, see <https://www.gnu.org/licenses/>.
18
+
19
+ import io
20
+ import logging
21
+ import os
22
+ from typing import List , Optional
23
+
24
+ from principalmapper .common import Edge , Node
25
+ from principalmapper .graphing .edge_checker import EdgeChecker
26
+ from principalmapper .querying import query_interface
27
+ from principalmapper .querying .local_policy_simulation import resource_policy_authorization , ResourcePolicyEvalResult
28
+ from principalmapper .util import arns
29
+
30
+
31
+ logger = logging .getLogger (__name__ )
32
+
33
+
34
+ class ECSEdgeChecker (EdgeChecker ):
35
+ """Class for identifying if ECS can be used by IAM principals to gain access to other IAM principals."""
36
+
37
+ def return_edges (self , nodes : List [Node ], region_allow_list : Optional [List [str ]] = None ,
38
+ region_deny_list : Optional [List [str ]] = None , scps : Optional [List [List [dict ]]] = None ,
39
+ client_args_map : Optional [dict ] = None , partition : str = 'aws' ) -> List [Edge ]:
40
+ """Fulfills expected method return_edges."""
41
+
42
+ logger .info ('Generating Edges based on ECS.' )
43
+ result = generate_edges_locally (nodes , scps , partition )
44
+
45
+ for edge in result :
46
+ logger .info ("Found new edge: {}" .format (edge .describe_edge ()))
47
+
48
+ return result
49
+
50
+
51
+ def generate_edges_locally (nodes : List [Node ], scps : Optional [List [List [dict ]]] = None , partition : str = 'aws' ) -> List [Edge ]:
52
+ """Generates and returns Edge objects. It is possible to use this method if you are operating offline (infra-as-code).
53
+ """
54
+
55
+ # TODO: pull and include existing clusters, tasks, services
56
+
57
+ result = []
58
+
59
+ service_linked_role_exists = False
60
+ for node in nodes :
61
+ if ':role/aws-service-role/ecs.amazonaws.com/AWSServiceRoleForECS' in node .arn :
62
+ service_linked_role_exists = True # can update to point to node if we need to do intermediate checks
63
+ break
64
+
65
+ for node_destination in nodes :
66
+ if ':role/' not in node_destination .arn :
67
+ continue
68
+
69
+ sim_result = resource_policy_authorization (
70
+ 'ecs-tasks.amazonaws.com' ,
71
+ arns .get_account_id (node_destination .arn ),
72
+ node_destination .trust_policy ,
73
+ 'sts:AssumeRole' ,
74
+ node_destination .arn ,
75
+ {}
76
+ )
77
+ if sim_result is not ResourcePolicyEvalResult .SERVICE_MATCH :
78
+ continue
79
+
80
+ for node_source in nodes :
81
+ if node_source == node_destination :
82
+ continue
83
+
84
+ if node_source .is_admin :
85
+ continue
86
+
87
+ # check that either the service-linked role exists or needs to be created
88
+ create_slr_auth = False
89
+ create_slr_mfa = False
90
+ if not service_linked_role_exists :
91
+ # using auth/mfa var, since the control flow continues to the next loop if we cannot make the SLR
92
+ create_slr_auth , create_slr_mfa = query_interface .local_check_authorization_handling_mfa (
93
+ node_source ,
94
+ 'iam:CreateServiceLinkedRole' ,
95
+ f'arn:aws:iam::{ arns .get_account_id (node_source .arn )} :role/aws-service-role/ecs.amazonaws.com/AWSServiceRoleForECS' ,
96
+ {'iam:AWSServiceName' : 'ecs.amazonaws.com' },
97
+ service_control_policy_groups = scps
98
+ )
99
+ if not create_slr_auth :
100
+ continue # can't make the service-linked role -> can't use ECS (?)
101
+
102
+ # check if someone can pass this role as an ECS Task Role
103
+ pass_role_auth , pass_role_mfa = query_interface .local_check_authorization_handling_mfa (
104
+ node_source ,
105
+ 'iam:PassRole' ,
106
+ node_destination .arn ,
107
+ {'iam:PassedToService' : 'ecs-tasks.amazonaws.com' }, # verified via managed policies,
108
+ service_control_policy_groups = scps
109
+ )
110
+
111
+ if not pass_role_auth :
112
+ continue
113
+
114
+ # check if someone can start/run a task
115
+ run_task_auth , run_task_mfa = query_interface .local_check_authorization_handling_mfa (
116
+ node_source ,
117
+ 'ecs:RunTask' ,
118
+ '*' ,
119
+ {},
120
+ service_control_policy_groups = scps
121
+ )
122
+
123
+ if not run_task_auth :
124
+ continue
125
+
126
+ reason = f'{ "(requires MFA) " if create_slr_mfa or pass_role_mfa or run_task_mfa else "" } can ' \
127
+ f'{ "use the existing ECS Service-Linked Role" if service_linked_role_exists else "create the ECS Service-Linked Role" } ' \
128
+ f'to run a task in ECS and access '
129
+
130
+ result .append (Edge (
131
+ node_source , node_destination , reason , 'ECS'
132
+ ))
133
+
134
+ return result
0 commit comments