Files

118 lines
3.8 KiB
Python
Raw Permalink Normal View History

2025-12-25 11:16:59 +08:00
import logging
import re
from collections.abc import Sequence
from pathlib import Path
from typing import NamedTuple
from mitmproxy import ctx
from mitmproxy import exceptions
from mitmproxy import flowfilter
from mitmproxy import http
from mitmproxy.http import Headers
from mitmproxy.utils import strutils
from mitmproxy.utils.spec import parse_spec
class ModifySpec(NamedTuple):
matches: flowfilter.TFilter
subject: bytes
replacement_str: str
def read_replacement(self) -> bytes:
"""
Process the replacement str. This usually just involves converting it to bytes.
However, if it starts with `@`, we interpret the rest as a file path to read from.
Raises:
- IOError if the file cannot be read.
"""
if self.replacement_str.startswith("@"):
return Path(self.replacement_str[1:]).expanduser().read_bytes()
else:
# We could cache this at some point, but unlikely to be a problem.
return strutils.escaped_str_to_bytes(self.replacement_str)
def parse_modify_spec(option: str, subject_is_regex: bool) -> ModifySpec:
flow_filter, subject_str, replacement = parse_spec(option)
subject = strutils.escaped_str_to_bytes(subject_str)
if subject_is_regex:
try:
re.compile(subject)
except re.error as e:
raise ValueError(f"Invalid regular expression {subject!r} ({e})")
spec = ModifySpec(flow_filter, subject, replacement)
try:
spec.read_replacement()
except OSError as e:
raise ValueError(f"Invalid file path: {replacement[1:]} ({e})")
return spec
class ModifyHeaders:
def __init__(self) -> None:
self.replacements: list[ModifySpec] = []
def load(self, loader):
loader.add_option(
"modify_headers",
Sequence[str],
[],
"""
Header modify pattern of the form "[/flow-filter]/header-name/[@]header-value", where the
separator can be any character. The @ allows to provide a file path that is used to read
the header value string. An empty header-value removes existing header-name headers.
""",
)
def configure(self, updated):
if "modify_headers" in updated:
self.replacements = []
for option in ctx.options.modify_headers:
try:
spec = parse_modify_spec(option, False)
except ValueError as e:
raise exceptions.OptionsError(
f"Cannot parse modify_headers option {option}: {e}"
) from e
self.replacements.append(spec)
def requestheaders(self, flow):
if flow.response or flow.error or not flow.live:
return
self.run(flow, flow.request.headers)
def responseheaders(self, flow):
if flow.error or not flow.live:
return
self.run(flow, flow.response.headers)
def run(self, flow: http.HTTPFlow, hdrs: Headers) -> None:
matches = []
# first check all the filters against the original, unmodified flow
for spec in self.replacements:
matches.append(spec.matches(flow))
# unset all specified headers
for i, spec in enumerate(self.replacements):
if matches[i]:
hdrs.pop(spec.subject, None)
# set all specified headers if the replacement string is not empty
for i, spec in enumerate(self.replacements):
if matches[i]:
try:
replacement = spec.read_replacement()
except OSError as e:
logging.warning(f"Could not read replacement file: {e}")
continue
else:
if replacement:
hdrs.add(spec.subject, replacement)