import logging import re from collections.abc import Sequence from pathlib import Path from typing import NamedTuple from mitmproxy import ctx from mitmproxy import exceptions from mitmproxy import flowfilter from mitmproxy import http from mitmproxy.http import Headers from mitmproxy.utils import strutils from mitmproxy.utils.spec import parse_spec class ModifySpec(NamedTuple): matches: flowfilter.TFilter subject: bytes replacement_str: str def read_replacement(self) -> bytes: """ Process the replacement str. This usually just involves converting it to bytes. However, if it starts with `@`, we interpret the rest as a file path to read from. Raises: - IOError if the file cannot be read. """ if self.replacement_str.startswith("@"): return Path(self.replacement_str[1:]).expanduser().read_bytes() else: # We could cache this at some point, but unlikely to be a problem. return strutils.escaped_str_to_bytes(self.replacement_str) def parse_modify_spec(option: str, subject_is_regex: bool) -> ModifySpec: flow_filter, subject_str, replacement = parse_spec(option) subject = strutils.escaped_str_to_bytes(subject_str) if subject_is_regex: try: re.compile(subject) except re.error as e: raise ValueError(f"Invalid regular expression {subject!r} ({e})") spec = ModifySpec(flow_filter, subject, replacement) try: spec.read_replacement() except OSError as e: raise ValueError(f"Invalid file path: {replacement[1:]} ({e})") return spec class ModifyHeaders: def __init__(self) -> None: self.replacements: list[ModifySpec] = [] def load(self, loader): loader.add_option( "modify_headers", Sequence[str], [], """ Header modify pattern of the form "[/flow-filter]/header-name/[@]header-value", where the separator can be any character. The @ allows to provide a file path that is used to read the header value string. An empty header-value removes existing header-name headers. """, ) def configure(self, updated): if "modify_headers" in updated: self.replacements = [] for option in ctx.options.modify_headers: try: spec = parse_modify_spec(option, False) except ValueError as e: raise exceptions.OptionsError( f"Cannot parse modify_headers option {option}: {e}" ) from e self.replacements.append(spec) def requestheaders(self, flow): if flow.response or flow.error or not flow.live: return self.run(flow, flow.request.headers) def responseheaders(self, flow): if flow.error or not flow.live: return self.run(flow, flow.response.headers) def run(self, flow: http.HTTPFlow, hdrs: Headers) -> None: matches = [] # first check all the filters against the original, unmodified flow for spec in self.replacements: matches.append(spec.matches(flow)) # unset all specified headers for i, spec in enumerate(self.replacements): if matches[i]: hdrs.pop(spec.subject, None) # set all specified headers if the replacement string is not empty for i, spec in enumerate(self.replacements): if matches[i]: try: replacement = spec.read_replacement() except OSError as e: logging.warning(f"Could not read replacement file: {e}") continue else: if replacement: hdrs.add(spec.subject, replacement)