Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 52 additions & 118 deletions bbot/modules/postman.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
from bbot.modules.templates.subdomain_enum import subdomain_enum
from bbot.modules.templates.postman import postman


class postman(subdomain_enum):
watched_events = ["DNS_NAME"]
produced_events = ["URL_UNVERIFIED"]
class postman(postman):
watched_events = ["ORG_STUB", "SOCIAL"]
produced_events = ["CODE_REPOSITORY"]
flags = ["passive", "subdomain-enum", "safe", "code-enum"]
meta = {
"description": "Query Postman's API for related workspaces, collections, requests",
"created_date": "2023-12-23",
"description": "Query Postman's API for related workspaces, collections, requests and download them",
"created_date": "2024-09-07",
"author": "@domwhewell-sage",
}

base_url = "https://www.postman.com/_api"

headers = {
"Content-Type": "application/json",
"X-App-Version": "10.18.8-230926-0808",
Expand All @@ -24,13 +22,51 @@ class postman(subdomain_enum):
reject_wildcards = False

async def handle_event(self, event):
query = self.make_query(event)
self.verbose(f"Searching for any postman workspaces, collections, requests belonging to {query}")
for url, context in await self.query(query):
await self.emit_event(url, "URL_UNVERIFIED", parent=event, tags="httpx-safe", context=context)
# Handle postman profile
if event.type == "SOCIAL":
await self.handle_profile(event)
elif event.type == "ORG_STUB":
await self.handle_org_stub(event)

async def handle_profile(self, event):
profile_name = event.data.get("profile_name", "")
self.verbose(f"Searching for postman workspaces, collections, requests belonging to {profile_name}")
for item in await self.query(profile_name):
workspace = item["document"]
name = workspace["slug"]
profile = workspace["publisherHandle"]
if profile_name.lower() == profile.lower():
self.verbose(f"Got {name}")
workspace_url = f"{self.html_url}/{profile}/{name}"
await self.emit_event(
{"url": workspace_url},
"CODE_REPOSITORY",
tags="postman",
parent=event,
context=f'{{module}} searched postman.com for workspaces belonging to "{profile_name}" and found "{name}" at {{event.type}}: {workspace_url}',
)

async def handle_org_stub(self, event):
org_name = event.data
self.verbose(f"Searching for any postman workspaces, collections, requests for {org_name}")
for item in await self.query(org_name):
workspace = item["document"]
human_readable_name = workspace["name"]
name = workspace["slug"]
profile = workspace["publisherHandle"]
if org_name.lower() in human_readable_name.lower():
self.verbose(f"Got {name}")
workspace_url = f"{self.html_url}/{profile}/{name}"
await self.emit_event(
{"url": workspace_url},
"CODE_REPOSITORY",
tags="postman",
parent=event,
context=f'{{module}} searched postman.com for "{org_name}" and found matching workspace "{name}" at {{event.type}}: {workspace_url}',
)

async def query(self, query):
interesting_urls = []
data = []
url = f"{self.base_url}/ws/proxy"
json = {
"service": "search",
Expand All @@ -39,11 +75,6 @@ async def query(self, query):
"body": {
"queryIndices": [
"collaboration.workspace",
"runtime.collection",
"runtime.request",
"adp.api",
"flow.flow",
"apinetwork.team",
],
"queryText": self.helpers.quote(query),
"size": 100,
Expand All @@ -57,108 +88,11 @@ async def query(self, query):
}
r = await self.helpers.request(url, method="POST", json=json, headers=self.headers)
if r is None:
return interesting_urls
status_code = getattr(r, "status_code", 0)
try:
json = r.json()
except Exception as e:
self.warning(f"Failed to decode JSON for {r.url} (HTTP status: {status_code}): {e}")
return interesting_urls
workspaces = []
for item in json.get("data", {}):
for workspace in item.get("document", {}).get("workspaces", []):
if workspace not in workspaces:
workspaces.append(workspace)
for item in workspaces:
id = item.get("id", "")
name = item.get("name", "")
tldextract = self.helpers.tldextract(query)
if tldextract.domain.lower() in name.lower():
self.verbose(f"Discovered workspace {name} ({id})")
workspace_url = f"{self.base_url}/workspace/{id}"
interesting_urls.append(
(
workspace_url,
f'{{module}} searched postman.com for "{query}" and found matching workspace "{name}" at {{event.type}}: {workspace_url}',
)
)
environments, collections = await self.search_workspace(id)
globals_url = f"{self.base_url}/workspace/{id}/globals"
interesting_urls.append(
(
globals_url,
f'{{module}} searched postman.com for "{query}", found matching workspace "{name}" at {workspace_url}, and found globals at {{event.type}}: {globals_url}',
)
)
for e_id in environments:
env_url = f"{self.base_url}/environment/{e_id}"
interesting_urls.append(
(
env_url,
f'{{module}} searched postman.com for "{query}", found matching workspace "{name}" at {workspace_url}, enumerated environments, and found {{event.type}}: {env_url}',
)
)
for c_id in collections:
collection_url = f"{self.base_url}/collection/{c_id}"
interesting_urls.append(
(
collection_url,
f'{{module}} searched postman.com for "{query}", found matching workspace "{name}" at {workspace_url}, enumerated collections, and found {{event.type}}: {collection_url}',
)
)
requests = await self.search_collections(id)
for r_id in requests:
request_url = f"{self.base_url}/request/{r_id}"
interesting_urls.append(
(
request_url,
f'{{module}} searched postman.com for "{query}", found matching workspace "{name}" at {workspace_url}, enumerated requests, and found {{event.type}}: {request_url}',
)
)
else:
self.verbose(f"Skipping workspace {name} ({id}) as it does not appear to be in scope")
return interesting_urls

async def search_workspace(self, id):
url = f"{self.base_url}/workspace/{id}"
r = await self.helpers.request(url)
if r is None:
return [], []
return data
status_code = getattr(r, "status_code", 0)
try:
json = r.json()
if not isinstance(json, dict):
raise ValueError(f"Got unexpected value for JSON: {json}")
except Exception as e:
self.warning(f"Failed to decode JSON for {r.url} (HTTP status: {status_code}): {e}")
return [], []
environments = json.get("data", {}).get("dependencies", {}).get("environments", [])
collections = json.get("data", {}).get("dependencies", {}).get("collections", [])
return environments, collections

async def search_collections(self, id):
request_ids = []
url = f"{self.base_url}/list/collection?workspace={id}"
r = await self.helpers.request(url, method="POST")
if r is None:
return request_ids
status_code = getattr(r, "status_code", 0)
try:
json = r.json()
except Exception as e:
self.warning(f"Failed to decode JSON for {r.url} (HTTP status: {status_code}): {e}")
return request_ids
for item in json.get("data", {}):
request_ids.extend(await self.parse_collection(item))
return request_ids

async def parse_collection(self, json):
request_ids = []
folders = json.get("folders", [])
requests = json.get("requests", [])
for folder in folders:
request_ids.extend(await self.parse_collection(folder))
for request in requests:
r_id = request.get("id", "")
request_ids.append(r_id)
return request_ids
return None
return json.get("data", [])
11 changes: 11 additions & 0 deletions bbot/modules/templates/postman.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from bbot.modules.base import BaseModule


class postman(BaseModule):
"""
A template module for use of the GitHub API
Inherited by several other github modules.
"""

base_url = "https://www.postman.com/_api"
html_url = "https://www.postman.com"
Loading