Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 85 additions & 16 deletions pcap2har/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

logger = logging.getLogger(__name__)

MAX_BODY_SIZE = 10 * 1024 * 1024


def check_tshark_version():
"""Check tshark version and log warning if <= 4.4.10."""
Expand Down Expand Up @@ -107,7 +109,7 @@ class HttpResponse:
body: bytes = b""
compressionSaved: int = 0

def to_har_response(self):
def to_har_response(self, max_body_size=MAX_BODY_SIZE):
"""Convert this HTTP response to HAR format."""
return {
"status": self.status,
Expand All @@ -124,6 +126,7 @@ def to_har_response(self):
**content_to_json(
first(self.headers.get("content-type", [])),
self.body,
max_body_size=max_body_size,
),
},
}
Expand Down Expand Up @@ -170,14 +173,14 @@ def __str__(self):
s += ")"
return s

def to_har_entry(self, cid):
def to_har_entry(self, cid, max_body_size=MAX_BODY_SIZE):
"""Convert this HTTP session to a HAR entry."""
return {
"startedDateTime": unix_ts_to8601(self.request.startTimestamp),
"time": (self.maxPacketTs - self.request.startTimestamp) * 1000.0,
"serverIPAddress": self.remoteAddress.rsplit(":", 1)[0],
"request": self.request.to_har_request(),
"response": self.response.to_har_response(),
"response": self.response.to_har_response(max_body_size=max_body_size),
"_resourceType": "websocket" if self.websocketMessages else None,
"_webSocketMessages": (
[m.to_har_message() for m in self.websocketMessages]
Expand Down Expand Up @@ -229,8 +232,19 @@ def to_har_timings(self):
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]),
help="Set the logging level.",
)
@click.option(
"--max-body-size",
default=MAX_BODY_SIZE,
type=int,
help="Maximum response body size to include in HAR (in bytes).",
)
def main(
pcap_file: Path, output: str = None, pretty=False, log_level="INFO", check="warning"
pcap_file: Path,
output: str = None,
pretty=False,
log_level="INFO",
check="warning",
max_body_size=MAX_BODY_SIZE,
):
"""Convert PCAP file to HAR format"""

Expand All @@ -253,7 +267,12 @@ def main(
if not run_consistency_checks(conv_details, fatal=check == "error"):
sys.exit(-1)

js = to_har_json(conv_details, comment=f"From {pcap_file}", fatal=check == "error")
js = to_har_json(
conv_details,
comment=f"From {pcap_file}",
fatal=check == "error",
max_body_size=max_body_size,
)

logger.info(f"Writing {len(conv_details)} conversations to {output_path}")
with click.open_file(output_path, "w") as fp:
Expand All @@ -278,9 +297,10 @@ def log_fn(*args, **kwargs):
if content_length and int(content_length[0]) > 0 and not conv.request.body:
log_fn(f"{conv!s}: Missing request body")

content_length = conv.response.headers.get("content-length")
if content_length and int(content_length[0]) > 0 and not conv.response.body:
log_fn(f"{conv!s}: Missing response body")
if conv.request.method != "HEAD":
content_length = conv.response.headers.get("content-length")
if content_length and int(content_length[0]) > 0 and not conv.response.body:
log_fn(f"{conv!s}: Missing response body")
Comment on lines +300 to +303
Copy link

Copilot AI Nov 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The indentation change moves the response body check inside the HEAD method guard, but the request body check above (lines 296-298) is not similarly guarded. Consider documenting why HEAD responses specifically need this special handling, or extract this logic into a helper function for clarity.

Copilot uses AI. Check for mistakes.

content_type = conv.response.headers.get("content-type")
if (
Expand Down Expand Up @@ -308,6 +328,8 @@ def read_pcap_file(pcap_file):
)

conv_details = defaultdict(HttpSession)
http1_sequence_counters = defaultdict(int)
http1_last_request_direction = {}

def unnest(packet):
return ((layer, packet) for layer in packet.layers)
Expand All @@ -329,11 +351,21 @@ def unnest(packet):
port = packet.tcp.dstport
http_version = "HTTP/2"
elif layer.layer_name == "http":
full_stream_id = ("1", packet.tcp.stream)
tcp_stream = packet.tcp.stream

if layer.get_field("request_line"):
if conv_details[
(1, tcp_stream, http1_sequence_counters[tcp_stream])
].request.url:
# This is a new request on the same connection, increment sequence
http1_sequence_counters[tcp_stream] += 1

full_stream_id = ("1", tcp_stream, http1_sequence_counters[tcp_stream])
Comment on lines +362 to +363
Copy link

Copilot AI Nov 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable current_session_id is assigned but never used. This appears to be dead code that should either be removed or the logic should use current_session_id instead of full_stream_id on line 368 when processing request packets.

Suggested change
full_stream_id = ("1", tcp_stream, http1_sequence_counters[tcp_stream])
full_stream_id = current_session_id
else:
full_stream_id = ("1", tcp_stream, http1_sequence_counters[tcp_stream])

Copilot uses AI. Check for mistakes.
port = packet.tcp.dstport
http_version = "HTTP/1"
elif layer.layer_name == "websocket":
full_stream_id = ("1", packet.tcp.stream)
tcp_stream = packet.tcp.stream
full_stream_id = ("1", tcp_stream, http1_sequence_counters[tcp_stream])
port = packet.tcp.dstport
else:
continue
Expand All @@ -353,6 +385,11 @@ def unnest(packet):
)
else "recv"
)
elif (
layer.layer_name == "http"
and packet.tcp.stream in http1_last_request_direction
):
direction = http1_last_request_direction[packet.tcp.stream]
else:
direction = "send"

Expand Down Expand Up @@ -400,6 +437,9 @@ def unnest(packet):
if method := layer.get_field("request_method"):
my_conv_details.method = method

if layer.layer_name == "http":
http1_last_request_direction[packet.tcp.stream] = direction

if header := layer.get_field("response_line"):
has_something = True

Expand Down Expand Up @@ -500,12 +540,12 @@ def unnest(packet):
return conv_details


def to_har_json(conv_details, comment=None, fatal=False):
def to_har_json(conv_details, comment=None, fatal=False, max_body_size=MAX_BODY_SIZE):
har_entries = []
for cid, conv in conv_details.items():
if conv.request.method != "CONNECT" and conv.maxPacketTs > 0:
try:
har_entries.append(conv.to_har_entry(cid))
har_entries.append(conv.to_har_entry(cid, max_body_size=max_body_size))
except Exception:
logger.exception(f"Failed to convert {conv!r} to HAR")
if fatal:
Expand All @@ -526,9 +566,20 @@ def to_har_json(conv_details, comment=None, fatal=False):
return output


def content_to_json(content_type, body):
def content_to_json(content_type, body, max_body_size=MAX_BODY_SIZE):
if not body:
return {"mimeType": "", "text": ""}

original_size = len(body)
truncated = False
if original_size > max_body_size:
logger.warning(
f"Response body size ({original_size} bytes) exceeds maximum "
f"({max_body_size} bytes). Truncating body in HAR output."
)
body = body[:max_body_size]
truncated = True

if content_type and content_type.split(";", 1)[0].strip() in (
"application/x-www-form-urlencoded",
"application/json",
Expand All @@ -538,22 +589,40 @@ def content_to_json(content_type, body):
"application/json+protobuf",
):
try:
return {"mimeType": content_type, "text": body.decode("utf-8")}
result = {"mimeType": content_type, "text": body.decode("utf-8")}
if truncated:
result["comment"] = (
f"Body truncated: original size ({original_size} bytes) "
f"exceeds {max_body_size} byte limit"
)
return result
except UnicodeDecodeError:
logger.warning(
f"Could not convert {body!r} to {content_type}", exc_info=True
)
return {
result = {
"mimeType": content_type,
"text": base64.b64encode(body).decode("ascii"),
"encoding": "base64",
}
if truncated:
result["comment"] = (
f"Body truncated: original size ({original_size} bytes) "
f"exceeds {max_body_size} byte limit"
)
return result
else:
return {
result = {
"mimeType": content_type,
"text": base64.b64encode(body).decode("ascii"),
"encoding": "base64",
}
if truncated:
result["comment"] = (
f"Body truncated: original size ({original_size} bytes) "
f"exceeds {max_body_size} byte limit"
)
return result


def first(it, default=None):
Expand Down
2 changes: 1 addition & 1 deletion tests/goldens/test_http.py-test_chunked_gzip
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"_resourceType": null,
"_webSocketMessages": null,
"cache": {},
"connection": "1-0",
"connection": "1-0-0",
"request": {
"bodySize": 0,
"headers": [
Expand Down
2 changes: 1 addition & 1 deletion tests/goldens/test_websocket.py-test_websocket_parse
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
}
],
"cache": {},
"connection": "1-0",
"connection": "1-0-0",
"request": {
"bodySize": 0,
"headers": [
Expand Down
2 changes: 1 addition & 1 deletion tests/goldens/test_websocket.py-test_websocket_segmented
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
}
],
"cache": {},
"connection": "1-1",
"connection": "1-1-0",
"request": {
"bodySize": 0,
"headers": [
Expand Down
Binary file added tests/resources/http-keep-alive.pcap
Binary file not shown.
7 changes: 7 additions & 0 deletions tests/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@ def test_chunked_gzip(golden):

har_data = parse_pcap_to_har(str(pcap_file))
golden.test(har_data)


def test_keep_alive(golden):
pcap_file = Path(__file__).parent / "resources" / "http-keep-alive.pcap"

har_data = parse_pcap_to_har(str(pcap_file))
golden.test(har_data)
14 changes: 13 additions & 1 deletion tests/test_main.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Tests for main module."""

from click.testing import CliRunner
from pcap2har.main import main
from pcap2har.main import main, content_to_json


class TestMain:
Expand All @@ -19,3 +19,15 @@ def test_missing_file(self):
runner = CliRunner()
result = runner.invoke(main, ["nonexistent.pcap"])
assert result.exit_code != 0


def test_body_truncation():
body = b"x" * 200
max_size = 100

result = content_to_json("text/plain", body, max_body_size=max_size)

assert result["text"] == "x" * 100
assert len(result["text"]) == max_size
assert "comment" in result
assert "truncated" in result["comment"].lower()
Loading