|  | 
| 20 | 20 | import psutil | 
| 21 | 21 | import base64, imghdr | 
| 22 | 22 | from pathlib import Path | 
|  | 23 | +from urllib.parse import urlparse | 
| 23 | 24 | sys.path.append("..") | 
| 24 | 25 | from selenium import webdriver | 
| 25 | 26 | if "linux" in platform.system().lower(): | 
| @@ -1093,39 +1094,161 @@ def Change_Attribute_Value(step_data): | 
| 1093 | 1094 |         errMsg = "Could not find your element." | 
| 1094 | 1095 |         return CommonUtil.Exception_Handler(sys.exc_info(), None, errMsg) | 
| 1095 | 1096 | 
 | 
|  | 1097 | + | 
| 1096 | 1098 | @logger | 
| 1097 | 1099 | def capture_network_log(step_data): | 
| 1098 |  | -    sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME | 
| 1099 |  | -    try: | 
| 1100 |  | -        global selenium_driver | 
|  | 1100 | +    """ | 
|  | 1101 | +    This action captures network activity (API requests, responses, status codes, etc.) from a Chromium-based browser using Selenium. The logs can be filtered and saved to a variable for further validation or analysis. | 
| 1101 | 1102 | 
 | 
| 1102 |  | -        def process_browser_log_entry(entry): | 
| 1103 |  | -            response = json.loads(entry["message"])["message"] | 
| 1104 |  | -            return response | 
|  | 1103 | +    Example 1: | 
|  | 1104 | +    Field	                    Sub Field	            Value | 
|  | 1105 | +    capture network log         selenium action 	    start | 
| 1105 | 1106 | 
 | 
| 1106 |  | -        variable_name = None | 
| 1107 |  | -        mode = None | 
| 1108 |  | -        for left, _, right in step_data: | 
| 1109 |  | -            if left.lower().strip() == "capture network log": | 
| 1110 |  | -                mode = right.lower().strip() | 
| 1111 |  | -            if left.lower().strip() == "save": | 
| 1112 |  | -                variable_name = right.lower().strip() | 
| 1113 |  | -        if not mode or ( mode == 'stop' and variable_name == None): | 
| 1114 |  | -            CommonUtil.ExecLog(sModuleInfo, "Wrong data set provided.", 3) | 
| 1115 |  | -            return "zeuz_failed" | 
|  | 1107 | +    Example 2: | 
|  | 1108 | +    Field	                    Sub Field	            Value | 
|  | 1109 | +    save                        input parameter         variable_name | 
|  | 1110 | +    filter domain               input parameter         zeuz.ai | 
|  | 1111 | +    include status code         input parameter         201, 400-504 | 
|  | 1112 | +    include request method      input parameter         GET, POST | 
|  | 1113 | +    include response body       input parameter         false | 
|  | 1114 | +    capture network log         selenium action 	    stop | 
|  | 1115 | +    """ | 
|  | 1116 | +    sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME | 
|  | 1117 | +    global selenium_driver | 
| 1116 | 1118 | 
 | 
| 1117 |  | -        if mode == 'start': | 
|  | 1119 | +    try: | 
|  | 1120 | +        # Helper function to parse status code | 
|  | 1121 | +        def parse_status_codes(code_str): | 
|  | 1122 | +            result = [] | 
|  | 1123 | +            for part in code_str.split(','): | 
|  | 1124 | +                part = part.strip() | 
|  | 1125 | +                if '-' in part: | 
|  | 1126 | +                    start, end = map(int, part.split('-')) | 
|  | 1127 | +                    result.extend(range(start, end + 1)) | 
|  | 1128 | +                else: | 
|  | 1129 | +                    result.append(int(part)) | 
|  | 1130 | +            return result | 
|  | 1131 | + | 
|  | 1132 | +        params = { | 
|  | 1133 | +            'variable_name': None, | 
|  | 1134 | +            'mode': None, | 
|  | 1135 | +            'filter_domains': [], | 
|  | 1136 | +            'status_filter': [], | 
|  | 1137 | +            'method_filter': [], | 
|  | 1138 | +            'include_body': False | 
|  | 1139 | +        } | 
|  | 1140 | +         | 
|  | 1141 | +        # Parse | 
|  | 1142 | +        for left, mid, right in step_data: | 
|  | 1143 | +            left = left.lower().strip() | 
|  | 1144 | +            if left == "capture network log": | 
|  | 1145 | +                params['mode'] = right.lower().strip() | 
|  | 1146 | +            elif left == "save": | 
|  | 1147 | +                params['variable_name'] = right.strip() | 
|  | 1148 | +            elif left == "filter domain": | 
|  | 1149 | +                params['filter_domains'] = [d.strip() for d in right.split(',')] | 
|  | 1150 | +            elif left == "include status code": | 
|  | 1151 | +                params['status_filter'] = parse_status_codes(right.strip()) | 
|  | 1152 | +            elif left == "include request method": | 
|  | 1153 | +                params['method_filter'] = [m.strip().upper() for m in right.split(',')] | 
|  | 1154 | +            elif left == "include response body": | 
|  | 1155 | +                params['include_body'] = right.strip().lower() == "true" | 
|  | 1156 | + | 
|  | 1157 | +        # Start/stop handling | 
|  | 1158 | +        if params['mode'] == 'start': | 
| 1118 | 1159 |             selenium_driver.get_log("performance") | 
| 1119 |  | -            CommonUtil.ExecLog(sModuleInfo, "Started collecting network logs", 1    ) | 
| 1120 |  | -        if mode == 'stop': | 
|  | 1160 | +            CommonUtil.ExecLog(sModuleInfo, "Started collecting network logs...", 1) | 
|  | 1161 | +            return "passed" | 
|  | 1162 | + | 
|  | 1163 | +        if params['mode'] == 'stop': | 
|  | 1164 | +            EXCLUDED_EXTENSIONS = {'.js', '.css', '.png', '.jpg', '.jpeg', '.gif', '.svg', '.ico',  | 
|  | 1165 | +                                  '.woff', '.woff2', '.ttf', '.eot', '.webp', '.map', '.txt'} | 
|  | 1166 | +            EXCLUDED_MIME_PREFIXES = {'image/', 'font/', 'text/css', 'application/javascript',  | 
|  | 1167 | +                                     'text/javascript', 'application/font-', 'application/x-font-'} | 
|  | 1168 | +             | 
| 1121 | 1169 |             browser_log = selenium_driver.get_log("performance") | 
| 1122 |  | -            events = [process_browser_log_entry(entry) for entry in browser_log] | 
| 1123 |  | -            Shared_Resources.Set_Shared_Variables(variable_name, events) | 
|  | 1170 | +            api_logs = [] | 
|  | 1171 | +            requests = {} | 
|  | 1172 | +             | 
|  | 1173 | +            for entry in browser_log: | 
|  | 1174 | +                try: | 
|  | 1175 | +                    outer = json.loads(entry['message']) | 
|  | 1176 | +                    message_data = outer.get('message', {}) | 
|  | 1177 | +                    method = message_data.get('method') | 
|  | 1178 | +                    log_params = message_data.get('params', {}) | 
|  | 1179 | +                    request_id = log_params.get('requestId') | 
|  | 1180 | +                     | 
|  | 1181 | +                    if method == 'Network.requestWillBeSent': | 
|  | 1182 | +                        requests[request_id] = log_params.get('request', {}) | 
|  | 1183 | +                         | 
|  | 1184 | +                    elif method == 'Network.responseReceived': | 
|  | 1185 | +                        response = log_params.get('response', {}) | 
|  | 1186 | +                        url = response.get('url', '') | 
|  | 1187 | +                        mime_type = response.get('mimeType', '') | 
|  | 1188 | +                        status = response.get('status', 0) | 
|  | 1189 | +                         | 
|  | 1190 | +                        # Skip static resources | 
|  | 1191 | +                        if any(url.endswith(ext) for ext in EXCLUDED_EXTENSIONS): | 
|  | 1192 | +                            continue | 
|  | 1193 | +                        if any(mime_type.startswith(prefix) for prefix in EXCLUDED_MIME_PREFIXES): | 
|  | 1194 | +                            continue | 
|  | 1195 | +                         | 
|  | 1196 | +                        request = requests.get(request_id, {}) | 
|  | 1197 | + | 
|  | 1198 | +                        # Apply filters | 
|  | 1199 | +                        if params['filter_domains']: | 
|  | 1200 | +                            domain = urlparse(url).netloc | 
|  | 1201 | +                            if not any(d in domain for d in params['filter_domains']): | 
|  | 1202 | +                                continue | 
|  | 1203 | +                         | 
|  | 1204 | +                        if params['method_filter']: | 
|  | 1205 | +                            request_method = request.get('method', '').upper() | 
|  | 1206 | +                            if request_method not in params['method_filter']: | 
|  | 1207 | +                                continue | 
|  | 1208 | +                         | 
|  | 1209 | +                        if params['status_filter']: | 
|  | 1210 | +                            if status not in params['status_filter']: | 
|  | 1211 | +                                continue | 
|  | 1212 | +                         | 
|  | 1213 | +                        # Build log entry | 
|  | 1214 | +                        log_entry = { | 
|  | 1215 | +                            'url': url, | 
|  | 1216 | +                            'status': status, | 
|  | 1217 | +                            'method': request.get('method', ''), | 
|  | 1218 | +                            'mimeType': mime_type, | 
|  | 1219 | +                            'type': log_params.get('type', ''), | 
|  | 1220 | +                            'timestamp': entry.get('timestamp') | 
|  | 1221 | +                        } | 
|  | 1222 | +                         | 
|  | 1223 | +                        # Add response body if requested | 
|  | 1224 | +                        if params['include_body']: | 
|  | 1225 | +                            try: | 
|  | 1226 | +                                body = selenium_driver.execute_cdp_cmd( | 
|  | 1227 | +                                    'Network.getResponseBody',  | 
|  | 1228 | +                                    {'requestId': request_id} | 
|  | 1229 | +                                ) | 
|  | 1230 | +                                log_entry['body'] = body.get('body', 'Unavailable') | 
|  | 1231 | +                            except: | 
|  | 1232 | +                                log_entry['body'] = 'Unavailable' | 
|  | 1233 | +                         | 
|  | 1234 | +                        api_logs.append(log_entry) | 
|  | 1235 | + | 
|  | 1236 | +                except Exception as e: | 
|  | 1237 | +                    err = f"Error processing log entry: {str(e)}" | 
|  | 1238 | +                    CommonUtil.ExecLog(sModuleInfo, err, 3) | 
|  | 1239 | +                    continue | 
|  | 1240 | +             | 
|  | 1241 | +            # Save results | 
|  | 1242 | +            if params['variable_name']: | 
|  | 1243 | +                Shared_Resources.Set_Shared_Variables(params['variable_name'], api_logs) | 
|  | 1244 | +                CommonUtil.ExecLog(sModuleInfo, f"Saved {len(api_logs)} network events to '{params['variable_name']}'", 1) | 
| 1124 | 1245 |             return "passed" | 
|  | 1246 | +             | 
| 1125 | 1247 |     except Exception: | 
| 1126 |  | -        errMsg = "Could not collect network logs. Make sure logging is enabled at browser startup" | 
|  | 1248 | +        errMsg = "Could not collect network logs. Ensure performance logging is enabled in browser options" | 
| 1127 | 1249 |         return CommonUtil.Exception_Handler(sys.exc_info(), None, errMsg) | 
| 1128 |  | -     | 
|  | 1250 | + | 
|  | 1251 | + | 
| 1129 | 1252 | # Method to enter texts in a text box; step data passed on by the user | 
| 1130 | 1253 | @logger | 
| 1131 | 1254 | def Enter_Text_In_Text_Box(step_data): | 
|  | 
0 commit comments