Files

115 lines
3.5 KiB
Python
Raw Permalink Normal View History

2025-12-25 11:16:59 +08:00
import json
import os
from collections.abc import Iterable
from io import BufferedReader
from typing import Any
from typing import BinaryIO
from typing import cast
from typing import Union
from mitmproxy import exceptions
from mitmproxy import flow
from mitmproxy import flowfilter
from mitmproxy.io import compat
from mitmproxy.io import tnetstring
from mitmproxy.io.har import request_to_flow
class FlowWriter:
def __init__(self, fo):
self.fo = fo
def add(self, f: flow.Flow) -> None:
d = f.get_state()
tnetstring.dump(d, self.fo)
class FlowReader:
fo: BinaryIO
def __init__(self, fo: BinaryIO):
self.fo = fo
def peek(self, n: int) -> bytes:
try:
return cast(BufferedReader, self.fo).peek(n)
except AttributeError:
# https://github.com/python/cpython/issues/90533: io.BytesIO does not have peek()
pos = self.fo.tell()
ret = self.fo.read(n)
self.fo.seek(pos)
return ret
def stream(self) -> Iterable[flow.Flow]:
"""
Yields Flow objects from the dump.
"""
if self.peek(4).startswith(
b"\xef\xbb\xbf{"
): # skip BOM, usually added by Fiddler
self.fo.read(3)
if self.peek(1).startswith(b"{"):
try:
har_file = json.loads(self.fo.read().decode("utf-8"))
for request_json in har_file["log"]["entries"]:
yield request_to_flow(request_json)
except Exception:
raise exceptions.FlowReadException(
"Unable to read HAR file. Please provide a valid HAR file"
)
else:
try:
while True:
# FIXME: This cast hides a lack of dynamic type checking
loaded = cast(
dict[Union[bytes, str], Any],
tnetstring.load(self.fo),
)
try:
if not isinstance(loaded, dict):
raise ValueError(f"Invalid flow: {loaded=}")
yield flow.Flow.from_state(compat.migrate_flow(loaded))
except ValueError as e:
raise exceptions.FlowReadException(e) from e
except (ValueError, TypeError, IndexError) as e:
if str(e) == "not a tnetstring: empty file":
return # Error is due to EOF
raise exceptions.FlowReadException("Invalid data format.") from e
class FilteredFlowWriter:
def __init__(self, fo: BinaryIO, flt: flowfilter.TFilter | None):
self.fo = fo
self.flt = flt
def add(self, f: flow.Flow) -> None:
if self.flt and not flowfilter.match(self.flt, f):
return
d = f.get_state()
tnetstring.dump(d, self.fo)
self.fo.flush()
def read_flows_from_paths(paths) -> list[flow.Flow]:
"""
Given a list of filepaths, read all flows and return a list of them.
From a performance perspective, streaming would be advisable -
however, if there's an error with one of the files, we want it to be raised immediately.
Raises:
FlowReadException, if any error occurs.
"""
try:
flows: list[flow.Flow] = []
for path in paths:
path = os.path.expanduser(path)
with open(path, "rb") as f:
flows.extend(FlowReader(f).stream())
except OSError as e:
raise exceptions.FlowReadException(e.strerror)
return flows