2025-12-25 upload

This commit is contained in:
“shengyudong”
2025-12-25 11:16:59 +08:00
commit 322ac74336
2241 changed files with 639966 additions and 0 deletions

View File

@@ -0,0 +1,193 @@
"""
mitmproxy includes a set of content views which can be used to
format/decode/highlight/reencode data. While they are mostly used for HTTP message
bodies, the may be used in other contexts, e.g. to decode WebSocket messages.
See "Custom Contentviews" in the mitmproxy documentation for examples.
"""
import logging
import sys
import traceback
import warnings
from dataclasses import dataclass
from ..addonmanager import cut_traceback
from ._api import Contentview
from ._api import InteractiveContentview
from ._api import Metadata
from ._api import SyntaxHighlight
from ._compat import get # noqa: F401
from ._compat import LegacyContentview
from ._compat import remove # noqa: F401
from ._registry import ContentviewRegistry
from ._utils import ContentviewMessage
from ._utils import get_data
from ._utils import make_metadata
from ._view_css import css
from ._view_dns import dns
from ._view_graphql import graphql
from ._view_http3 import http3
from ._view_image import image
from ._view_javascript import javascript
from ._view_json import json_view
from ._view_mqtt import mqtt
from ._view_multipart import multipart
from ._view_query import query
from ._view_raw import raw
from ._view_socketio import socket_io
from ._view_urlencoded import urlencoded
from ._view_wbxml import wbxml
from ._view_xml_html import xml_html
from .base import View
import mitmproxy_rs.contentviews
from mitmproxy import flow
from mitmproxy.utils import strutils
logger = logging.getLogger(__name__)
@dataclass
class ContentviewResult:
text: str
syntax_highlight: SyntaxHighlight
view_name: str | None
description: str
registry = ContentviewRegistry()
def prettify_message(
message: ContentviewMessage,
flow: flow.Flow,
view_name: str = "auto",
registry: ContentviewRegistry = registry,
) -> ContentviewResult:
data, enc = get_data(message)
if data is None:
return ContentviewResult(
text="Content is missing.",
syntax_highlight="error",
description="",
view_name=None,
)
# Determine the correct view
metadata = make_metadata(message, flow)
view = registry.get_view(data, metadata, view_name)
# Finally, we can pretty-print!
try:
ret = ContentviewResult(
text=view.prettify(data, metadata),
syntax_highlight=view.syntax_highlight,
view_name=view.name,
description=enc,
)
except Exception as e:
logger.debug(f"Contentview {view.name!r} failed: {e}", exc_info=True)
if view_name == "auto":
# If the contentview was chosen as the best matching one, fall back to raw.
ret = ContentviewResult(
text=raw.prettify(data, metadata),
syntax_highlight=raw.syntax_highlight,
view_name=raw.name,
description=f"{enc}[failed to parse as {view.name}]",
)
else:
# Cut the exception traceback for display.
exc, value, tb = sys.exc_info()
tb_cut = cut_traceback(tb, "prettify_message")
if (
tb_cut == tb
): # If there are no extra frames, just skip displaying the traceback.
tb_cut = None
# If the contentview has been set explicitly, we display a hard error.
err = "".join(traceback.format_exception(exc, value=value, tb=tb_cut))
ret = ContentviewResult(
text=f"Couldn't parse as {view.name}:\n{err}",
syntax_highlight="error",
view_name=view.name,
description=enc,
)
ret.text = strutils.escape_control_characters(ret.text)
return ret
def reencode_message(
prettified: str,
message: ContentviewMessage,
flow: flow.Flow,
view_name: str,
) -> bytes:
metadata = make_metadata(message, flow)
view = registry[view_name.lower()]
if not isinstance(view, InteractiveContentview):
raise ValueError(f"Contentview {view.name} is not interactive.")
return view.reencode(prettified, metadata)
_views: list[Contentview] = [
css,
dns,
graphql,
http3,
image,
javascript,
json_view,
mqtt,
multipart,
query,
raw,
socket_io,
urlencoded,
wbxml,
xml_html,
]
for view in _views:
registry.register(view)
for name in mitmproxy_rs.contentviews.__all__:
if name.startswith("_"):
continue
cv = getattr(mitmproxy_rs.contentviews, name)
if isinstance(cv, Contentview) and not isinstance(cv, type):
registry.register(cv)
def add(contentview: Contentview | type[Contentview]) -> None:
"""
Register a contentview for use in mitmproxy.
You may pass a `Contentview` instance or the class itself.
When passing the class, its constructor will be invoked with no arguments.
"""
if isinstance(contentview, View):
warnings.warn(
f"`mitmproxy.contentviews.View` is deprecated since mitmproxy 12, "
f"migrate {contentview.__class__.__name__} to `mitmproxy.contentviews.Contentview` instead.",
stacklevel=2,
)
contentview = LegacyContentview(contentview)
registry.register(contentview)
# hack: docstring where pdoc finds it.
SyntaxHighlight = SyntaxHighlight
"""
Syntax highlighting formats currently supported by mitmproxy.
Note that YAML is a superset of JSON; so if you'd like to highlight JSON, pick the YAML highlighter.
*If you have a concrete use case for additional formats, please open an issue.*
"""
__all__ = [
# Public Contentview API
"Contentview",
"InteractiveContentview",
"SyntaxHighlight",
"add",
"Metadata",
]

View File

@@ -0,0 +1,116 @@
from __future__ import annotations
import logging
import typing
from abc import abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import Literal
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import udp
from mitmproxy.dns import DNSMessage
from mitmproxy.flow import Flow
from mitmproxy.websocket import WebSocketMessage
logger = logging.getLogger(__name__)
type SyntaxHighlight = Literal["css", "javascript", "xml", "yaml", "none", "error"]
@typing.runtime_checkable
class Contentview(typing.Protocol):
"""
Base class for all contentviews.
"""
@property
def name(self) -> str:
"""
The name of this contentview, e.g. "XML/HTML".
Inferred from the class name by default.
"""
return type(self).__name__.removesuffix("Contentview")
@property
def syntax_highlight(self) -> SyntaxHighlight:
"""Optional syntax highlighting that should be applied to the prettified output."""
return "none"
@abstractmethod
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
"""
Transform raw data into human-readable output.
May raise an exception (e.g. `ValueError`) if data cannot be prettified.
"""
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
"""
Return the priority of this view for rendering `data`.
If no particular view is chosen by the user, the view with the highest priority is selected.
If this view does not support the given data, return a float < 0.
"""
return 0
def __lt__(self, other):
return self.name.__lt__(other.name)
@typing.runtime_checkable
class InteractiveContentview(Contentview, typing.Protocol):
"""A contentview that prettifies raw data and allows for interactive editing."""
@abstractmethod
def reencode(
self,
prettified: str,
metadata: Metadata,
) -> bytes:
"""
Reencode the given (modified) `prettified` output into the original data format.
May raise an exception (e.g. `ValueError`) if reencoding failed.
"""
@dataclass
class Metadata:
"""
Metadata about the data that is being prettified.
Do not rely on any given attribute to be present.
"""
flow: Flow | None = None
"""The flow that the data belongs to, if any."""
content_type: str | None = None
"""The HTTP content type of the data, if any."""
http_message: http.Message | None = None
"""The HTTP message that the data belongs to, if any."""
tcp_message: tcp.TCPMessage | None = None
"""The TCP message that the data belongs to, if any."""
udp_message: udp.UDPMessage | None = None
"""The UDP message that the data belongs to, if any."""
websocket_message: WebSocketMessage | None = None
"""The websocket message that the data belongs to, if any."""
dns_message: DNSMessage | None = None
"""The DNS message that the data belongs to, if any."""
protobuf_definitions: Path | None = None
"""Path to a .proto file that's used to resolve Protobuf field names."""
original_data: bytes | None = None
"""When reencoding: The original data that was prettified."""
Metadata.__init__.__doc__ = "@private"

View File

@@ -0,0 +1,74 @@
from __future__ import annotations
import sys
import typing
from typing import Iterator
from mitmproxy import contentviews
from mitmproxy.contentviews import SyntaxHighlight
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
from mitmproxy.utils.strutils import always_str
if sys.version_info < (3, 13): # pragma: no cover
from typing_extensions import deprecated
else:
from warnings import deprecated
if typing.TYPE_CHECKING:
from mitmproxy.contentviews.base import TViewLine
from mitmproxy.contentviews.base import View
class LegacyContentview(Contentview):
@property
def name(self) -> str:
return self.contentview.name
@property
def syntax_highlight(self) -> SyntaxHighlight:
return getattr(self.contentview, "syntax_highlight", "none")
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
return (
self.contentview.render_priority(
data=data,
content_type=metadata.content_type,
flow=metadata.flow,
http_message=metadata.http_message,
)
or 0.0
)
def prettify(self, data: bytes, metadata: Metadata) -> str:
lines: Iterator[TViewLine]
desc_, lines = self.contentview(
data,
content_type=metadata.content_type,
flow=metadata.flow,
http_message=metadata.http_message,
)
return "\n".join(
"".join(always_str(text, "utf8", "backslashescape") for tag, text in line)
for line in lines
)
def __init__(self, contentview: View):
self.contentview = contentview
@deprecated("Use `mitmproxy.contentviews.registry` instead.")
def get(name: str) -> Contentview | None:
try:
return contentviews.registry[name.lower()]
except KeyError:
return None
@deprecated("Use `mitmproxy.contentviews.Contentview` instead.")
def remove(view: View):
pass

View File

@@ -0,0 +1,73 @@
from __future__ import annotations
import logging
import typing
from collections.abc import Mapping
from ..utils import signals
from ._api import Contentview
from ._api import Metadata
logger = logging.getLogger(__name__)
def _on_change(view: Contentview) -> None: ...
class ContentviewRegistry(Mapping[str, Contentview]):
def __init__(self):
self._by_name: dict[str, Contentview] = {}
self.on_change = signals.SyncSignal(_on_change)
def register(self, instance: Contentview | type[Contentview]) -> None:
if isinstance(instance, type):
instance = instance()
name = instance.name.lower()
if name in self._by_name:
logger.info(f"Replacing existing {name} contentview.")
self._by_name[name] = instance
self.on_change.send(instance)
def available_views(self) -> list[str]:
return ["auto", *sorted(self._by_name.keys())]
def get_view(
self, data: bytes, metadata: Metadata, view_name: str = "auto"
) -> Contentview:
"""
Get the best contentview for the given data and metadata.
If `view_name` is "auto" or the provided view not found,
the best matching contentview based on `render_priority` will be returned.
"""
if view_name != "auto":
try:
return self[view_name.lower()]
except KeyError:
logger.warning(
f"Unknown contentview {view_name!r}, selecting best match instead."
)
max_prio: tuple[float, Contentview] | None = None
for name, view in self._by_name.items():
try:
priority = view.render_priority(data, metadata)
assert isinstance(priority, (int, float)), (
f"render_priority for {view.name} did not return a number."
)
except Exception:
logger.exception(f"Error in {view.name}.render_priority")
else:
if max_prio is None or max_prio[0] < priority:
max_prio = (priority, view)
assert max_prio, "At least one view needs to have a working `render_priority`."
return max_prio[1]
def __iter__(self) -> typing.Iterator[str]:
return iter(self._by_name)
def __getitem__(self, item: str) -> Contentview:
return self._by_name[item.lower()]
def __len__(self):
return len(self._by_name)

View File

@@ -0,0 +1,106 @@
import io
import typing
from collections.abc import Iterable
from pathlib import Path
from typing import Any
from ruamel.yaml import YAML
from .. import ctx
from .. import http
from ..dns import DNSMessage
from ..flow import Flow
from ..tcp import TCPMessage
from ..udp import UDPMessage
from ..utils import strutils
from ..websocket import WebSocketMessage
from ._api import Metadata
type ContentviewMessage = (
http.Message | TCPMessage | UDPMessage | WebSocketMessage | DNSMessage
)
def make_metadata(
message: ContentviewMessage,
flow: Flow,
) -> Metadata:
metadata = Metadata(
flow=flow,
protobuf_definitions=Path(ctx.options.protobuf_definitions).expanduser()
if ctx.options.protobuf_definitions
else None,
)
match message:
case http.Message():
metadata.http_message = message
if ctype := message.headers.get("content-type"):
if ct := http.parse_content_type(ctype):
metadata.content_type = f"{ct[0]}/{ct[1]}"
case TCPMessage():
metadata.tcp_message = message
case UDPMessage():
metadata.udp_message = message
case WebSocketMessage():
metadata.websocket_message = message
case DNSMessage():
metadata.dns_message = message
case other: # pragma: no cover
typing.assert_never(other)
return metadata
def get_data(
message: ContentviewMessage,
) -> tuple[bytes | None, str]:
content: bytes | None
try:
content = message.content
except ValueError:
assert isinstance(message, http.Message)
content = message.raw_content
enc = "[cannot decode]"
else:
if isinstance(message, http.Message) and content != message.raw_content:
enc = "[decoded {}]".format(message.headers.get("content-encoding"))
else:
enc = ""
return content, enc
def yaml_dumps(d: Any) -> str:
if not d:
return ""
out = io.StringIO()
YAML(typ="rt", pure=True).dump(d, out)
return out.getvalue()
def yaml_loads(yaml: str) -> Any:
return YAML(typ="safe", pure=True).load(yaml)
def merge_repeated_keys(items: Iterable[tuple[str, str]]) -> dict[str, str | list[str]]:
"""
Helper function that takes a list of pairs and merges repeated keys.
"""
ret: dict[str, str | list[str]] = {}
for key, value in items:
if existing := ret.get(key):
if isinstance(existing, list):
existing.append(value)
else:
ret[key] = [existing, value]
else:
ret[key] = value
return ret
def byte_pairs_to_str_pairs(
items: Iterable[tuple[bytes, bytes]],
) -> Iterable[tuple[str, str]]:
for key, value in items:
yield (strutils.bytes_to_escaped_str(key), strutils.bytes_to_escaped_str(value))

View File

@@ -0,0 +1,76 @@
import re
import time
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
from mitmproxy.utils import strutils
"""
A custom CSS prettifier. Compared to other prettifiers, its main features are:
- Implemented in pure Python.
- Modifies whitespace only.
- Works with any input.
- Considerably faster than e.g. cssutils.
"""
CSS_SPECIAL_AREAS = (
"'" + strutils.SINGLELINE_CONTENT + strutils.NO_ESCAPE + "'",
'"' + strutils.SINGLELINE_CONTENT + strutils.NO_ESCAPE + '"',
r"/\*" + strutils.MULTILINE_CONTENT + r"\*/",
"//" + strutils.SINGLELINE_CONTENT + "$",
)
CSS_SPECIAL_CHARS = "{};:"
def beautify(data: str, indent: str = " "):
"""Beautify a string containing CSS code"""
data = strutils.escape_special_areas(
data.strip(),
CSS_SPECIAL_AREAS,
CSS_SPECIAL_CHARS,
)
# Add newlines
data = re.sub(r"\s*;\s*", ";\n", data)
data = re.sub(r"\s*{\s*", " {\n", data)
data = re.sub(r"\s*}\s*", "\n}\n\n", data)
# Fix incorrect ":" placement
data = re.sub(r"\s*:\s*(?=[^{]+})", ": ", data)
# Fix no space after ","
data = re.sub(r"\s*,\s*", ", ", data)
# indent
data = re.sub("\n[ \t]+", "\n", data)
data = re.sub("\n(?![}\n])(?=[^{]*})", "\n" + indent, data)
data = strutils.unescape_special_areas(data)
return data.rstrip("\n") + "\n"
class ViewCSS(Contentview):
syntax_highlight = "css"
def prettify(self, data: bytes, metadata: Metadata) -> str:
data_str = data.decode("utf8", "surrogateescape")
return beautify(data_str)
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
return float(bool(data) and metadata.content_type == "text/css")
css = ViewCSS()
if __name__ == "__main__": # pragma: no cover
with open("../tools/web/static/vendor.css") as f:
data = f.read()
t = time.time()
x = beautify(data)
print(f"Beautifying vendor.css took {time.time() - t:.2}s")

View File

@@ -0,0 +1,53 @@
from mitmproxy.contentviews._api import InteractiveContentview
from mitmproxy.contentviews._api import Metadata
from mitmproxy.contentviews._utils import yaml_dumps
from mitmproxy.contentviews._utils import yaml_loads
from mitmproxy.dns import DNSMessage as DNSMessage
from mitmproxy.proxy.layers.dns import pack_message
def _is_dns_tcp(metadata: Metadata) -> bool:
return bool(metadata.tcp_message or metadata.http_message)
class DNSContentview(InteractiveContentview):
syntax_highlight = "yaml"
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
if _is_dns_tcp(metadata):
data = data[2:] # hack: cut off length label and hope for the best
message = DNSMessage.unpack(data).to_json()
del message["status_code"]
message.pop("timestamp", None)
return yaml_dumps(message)
def reencode(
self,
prettified: str,
metadata: Metadata,
) -> bytes:
data = yaml_loads(prettified)
message = DNSMessage.from_json(data)
return pack_message(message, "tcp" if _is_dns_tcp(metadata) else "udp")
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
return float(
metadata.content_type == "application/dns-message"
or bool(
metadata.flow
and metadata.flow.server_conn
and metadata.flow.server_conn.address
and metadata.flow.server_conn.address[1] in (53, 5353)
)
)
dns = DNSContentview()

View File

@@ -0,0 +1,72 @@
import json
from typing import Any
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
def format_graphql(data):
query = data["query"]
header_data = data.copy()
header_data["query"] = "..."
return """{header}
---
{query}
""".format(header=json.dumps(header_data, indent=2), query=query)
def format_query_list(data: list[Any]):
num_queries = len(data) - 1
result = ""
for i, op in enumerate(data):
result += f"--- {i}/{num_queries}\n"
result += format_graphql(op)
return result
def is_graphql_query(data):
return isinstance(data, dict) and "query" in data and "\n" in data["query"]
def is_graphql_batch_query(data):
return (
isinstance(data, list)
and len(data) > 0
and isinstance(data[0], dict)
and "query" in data[0]
)
class GraphQLContentview(Contentview):
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
gql = json.loads(data)
if is_graphql_query(gql):
return format_graphql(gql)
elif is_graphql_batch_query(gql):
return format_query_list(gql)
else:
raise ValueError("Not a GraphQL message.")
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
if metadata.content_type != "application/json" or not data:
return 0
try:
data = json.loads(data)
if is_graphql_query(data) or is_graphql_batch_query(data):
return 2
except ValueError:
pass
return 0
graphql = GraphQLContentview()

View File

@@ -0,0 +1,153 @@
from collections import defaultdict
from dataclasses import dataclass
from dataclasses import field
import pylsqpack
from aioquic.buffer import Buffer
from aioquic.buffer import BufferReadError
from aioquic.h3.connection import parse_settings
from aioquic.h3.connection import Setting
from ..proxy.layers.http import is_h3_alpn
from mitmproxy import tcp
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
from mitmproxy_rs.contentviews import hex_dump
@dataclass(frozen=True)
class Frame:
"""Representation of an HTTP/3 frame."""
type: int
data: bytes
def pretty(self) -> str:
frame_name = f"0x{self.type:x} Frame"
if self.type == 0:
frame_name = "DATA Frame"
elif self.type == 1:
try:
hdrs = pylsqpack.Decoder(4096, 16).feed_header(0, self.data)[1]
return f"HEADERS Frame\n" + "\n".join(
f"{k.decode(errors='backslashreplace')}: {v.decode(errors='backslashreplace')}"
for k, v in hdrs
)
except Exception as e:
frame_name = f"HEADERS Frame (error: {e})"
elif self.type == 4:
settings = []
try:
s = parse_settings(self.data)
except Exception as e:
frame_name = f"SETTINGS Frame (error: {e})"
else:
for k, v in s.items():
try:
key = Setting(k).name
except ValueError:
key = f"0x{k:x}"
settings.append(f"{key}: 0x{v:x}")
return "SETTINGS Frame\n" + "\n".join(settings)
return f"{frame_name}\n" + hex_dump.prettify(self.data, Metadata())
@dataclass(frozen=True)
class StreamType:
"""Representation of an HTTP/3 stream types."""
type: int
def pretty(self) -> str:
stream_type = {
0x00: "Control Stream",
0x01: "Push Stream",
0x02: "QPACK Encoder Stream",
0x03: "QPACK Decoder Stream",
}.get(self.type, f"0x{self.type:x} Stream")
return stream_type
@dataclass
class ConnectionState:
message_count: int = 0
frames: dict[int, list[Frame | StreamType]] = field(default_factory=dict)
client_buf: bytearray = field(default_factory=bytearray)
server_buf: bytearray = field(default_factory=bytearray)
class Http3Contentview(Contentview):
def __init__(self) -> None:
self.connections: defaultdict[tcp.TCPFlow, ConnectionState] = defaultdict(
ConnectionState
)
@property
def name(self) -> str:
return "HTTP/3 Frames"
def prettify(self, data: bytes, metadata: Metadata) -> str:
flow = metadata.flow
tcp_message = metadata.tcp_message
assert isinstance(flow, tcp.TCPFlow)
assert tcp_message
state = self.connections[flow]
for message in flow.messages[state.message_count :]:
if message.from_client:
buf = state.client_buf
else:
buf = state.server_buf
buf += message.content
if state.message_count == 0 and flow.metadata["quic_is_unidirectional"]:
h3_buf = Buffer(data=bytes(buf[:8]))
stream_type = h3_buf.pull_uint_var()
consumed = h3_buf.tell()
del buf[:consumed]
state.frames[0] = [StreamType(stream_type)]
while True:
h3_buf = Buffer(data=bytes(buf[:16]))
try:
frame_type = h3_buf.pull_uint_var()
frame_size = h3_buf.pull_uint_var()
except BufferReadError:
break
consumed = h3_buf.tell()
if len(buf) < consumed + frame_size:
break
frame_data = bytes(buf[consumed : consumed + frame_size])
frame = Frame(frame_type, frame_data)
state.frames.setdefault(state.message_count, []).append(frame)
del buf[: consumed + frame_size]
state.message_count += 1
frames = state.frames.get(flow.messages.index(tcp_message), [])
if not frames:
return ""
else:
return "\n\n".join(frame.pretty() for frame in frames)
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
flow = metadata.flow
return (
2
* float(bool(flow and is_h3_alpn(flow.client_conn.alpn)))
* float(isinstance(flow, tcp.TCPFlow))
)
http3 = Http3Contentview()

View File

@@ -0,0 +1,3 @@
from .view import image
__all__ = ["image"]

View File

@@ -0,0 +1,119 @@
import io
from kaitaistruct import KaitaiStream
from mitmproxy.contrib.kaitaistruct import gif
from mitmproxy.contrib.kaitaistruct import ico
from mitmproxy.contrib.kaitaistruct import jpeg
from mitmproxy.contrib.kaitaistruct import png
type ImageMetadata = list[tuple[str, str]]
def parse_png(data: bytes) -> ImageMetadata:
img = png.Png(KaitaiStream(io.BytesIO(data)))
parts = [
("Format", "Portable network graphics"),
("Size", f"{img.ihdr.width} x {img.ihdr.height} px"),
]
for chunk in img.chunks:
if chunk.type == "gAMA":
parts.append(("gamma", str(chunk.body.gamma_int / 100000)))
elif chunk.type == "pHYs":
aspectx = chunk.body.pixels_per_unit_x
aspecty = chunk.body.pixels_per_unit_y
parts.append(("aspect", f"{aspectx} x {aspecty}"))
elif chunk.type == "tEXt":
parts.append((chunk.body.keyword, chunk.body.text))
elif chunk.type == "iTXt":
parts.append((chunk.body.keyword, chunk.body.text))
elif chunk.type == "zTXt":
parts.append(
(chunk.body.keyword, chunk.body.text_datastream.decode("iso8859-1"))
)
return parts
def parse_gif(data: bytes) -> ImageMetadata:
img = gif.Gif(KaitaiStream(io.BytesIO(data)))
descriptor = img.logical_screen_descriptor
parts = [
("Format", "Compuserve GIF"),
("Version", f"GIF{img.hdr.version}"),
("Size", f"{descriptor.screen_width} x {descriptor.screen_height} px"),
("background", str(descriptor.bg_color_index)),
]
ext_blocks = []
for block in img.blocks:
if block.block_type.name == "extension":
ext_blocks.append(block)
comment_blocks = []
for block in ext_blocks:
if block.body.label._name_ == "comment":
comment_blocks.append(block)
for block in comment_blocks:
entries = block.body.body.entries
for entry in entries:
comment = entry.bytes
if comment != b"":
parts.append(("comment", str(comment)))
return parts
def parse_jpeg(data: bytes) -> ImageMetadata:
img = jpeg.Jpeg(KaitaiStream(io.BytesIO(data)))
parts = [("Format", "JPEG (ISO 10918)")]
for segment in img.segments:
if segment.marker._name_ == "sof0":
parts.append(
("Size", f"{segment.data.image_width} x {segment.data.image_height} px")
)
if segment.marker._name_ == "app0":
parts.append(
(
"jfif_version",
f"({segment.data.version_major}, {segment.data.version_minor})",
)
)
parts.append(
(
"jfif_density",
f"({segment.data.density_x}, {segment.data.density_y})",
)
)
parts.append(("jfif_unit", str(segment.data.density_units._value_)))
if segment.marker._name_ == "com":
parts.append(("comment", segment.data.decode("utf8", "backslashreplace")))
if segment.marker._name_ == "app1":
if hasattr(segment.data, "body"):
for field in segment.data.body.data.body.ifd0.fields:
if field.data is not None:
parts.append(
(field.tag._name_, field.data.decode("UTF-8").strip("\x00"))
)
return parts
def parse_ico(data: bytes) -> ImageMetadata:
img = ico.Ico(KaitaiStream(io.BytesIO(data)))
parts = [
("Format", "ICO"),
("Number of images", str(img.num_images)),
]
for i, image in enumerate(img.images):
parts.append(
(
f"Image {i + 1}",
"Size: {} x {}\n{: >18}Bits per pixel: {}\n{: >18}PNG: {}".format(
256 if not image.width else image.width,
256 if not image.height else image.height,
"",
image.bpp,
"",
image.is_png,
),
)
)
return parts

View File

@@ -0,0 +1,57 @@
from .._utils import merge_repeated_keys
from .._utils import yaml_dumps
from . import image_parser
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
from mitmproxy.contrib import imghdr
def test_ico(h, f):
if h.startswith(b"\x00\x00\x01\x00"):
return "ico"
return None
imghdr.tests.append(test_ico)
class ImageContentview(Contentview):
syntax_highlight = "yaml"
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
image_type = imghdr.what("", h=data)
if image_type == "png":
image_metadata = image_parser.parse_png(data)
elif image_type == "gif":
image_metadata = image_parser.parse_gif(data)
elif image_type == "jpeg":
image_metadata = image_parser.parse_jpeg(data)
elif image_type == "ico":
image_metadata = image_parser.parse_ico(data)
else:
image_metadata = []
if image_type:
view_name = f"{image_type.upper()} Image"
else:
view_name = "Unknown Image"
return f"# {view_name}\n" + yaml_dumps(merge_repeated_keys(image_metadata))
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
return float(
bool(
metadata.content_type
and metadata.content_type.startswith("image/")
and not metadata.content_type.endswith("+xml")
)
)
image = ImageContentview()

View File

@@ -0,0 +1,68 @@
import io
import re
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
from mitmproxy.utils import strutils
DELIMITERS = "{};\n"
SPECIAL_AREAS = (
r"(?<=[^\w\s)])\s*/(?:[^\n/]|(?<!\\)(?:\\\\)*\\/)+?/(?=[gimsuy]{0,6}\s*(?:[;,).\n]|$))",
r"'" + strutils.MULTILINE_CONTENT_LINE_CONTINUATION + strutils.NO_ESCAPE + "'",
r'"' + strutils.MULTILINE_CONTENT_LINE_CONTINUATION + strutils.NO_ESCAPE + '"',
r"`" + strutils.MULTILINE_CONTENT + strutils.NO_ESCAPE + "`",
r"/\*" + strutils.MULTILINE_CONTENT + r"\*/",
r"//" + strutils.SINGLELINE_CONTENT + "$",
r"for\(" + strutils.SINGLELINE_CONTENT + r"\)",
)
def beautify(data):
data = strutils.escape_special_areas(data, SPECIAL_AREAS, DELIMITERS)
data = re.sub(r"\s*{\s*(?!};)", " {\n", data)
data = re.sub(r"\s*;\s*", ";\n", data)
data = re.sub(r"(?<!{)\s*}(;)?\s*", r"\n}\1\n", data)
beautified = io.StringIO()
indent_level = 0
for line in data.splitlines(True):
if line.endswith("{\n"):
beautified.write(" " * 2 * indent_level + line)
indent_level += 1
elif line.startswith("}"):
indent_level -= 1
beautified.write(" " * 2 * indent_level + line)
else:
beautified.write(" " * 2 * indent_level + line)
data = strutils.unescape_special_areas(beautified.getvalue())
return data
class JavaScriptContentview(Contentview):
syntax_highlight = "javascript"
__content_types = (
"application/x-javascript",
"application/javascript",
"text/javascript",
)
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
data_str = data.decode("utf-8", "replace")
return beautify(data_str)
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
return float(bool(data) and metadata.content_type in self.__content_types)
javascript = JavaScriptContentview()

View File

@@ -0,0 +1,31 @@
import json
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
class JSONContentview(Contentview):
syntax_highlight = "yaml"
def prettify(self, data: bytes, metadata: Metadata) -> str:
data = json.loads(data)
return json.dumps(data, indent=4, ensure_ascii=False)
def render_priority(self, data: bytes, metadata: Metadata) -> float:
if not data:
return 0
if metadata.content_type in (
"application/json",
"application/json-rpc",
):
return 1
if (
metadata.content_type
and metadata.content_type.startswith("application/")
and metadata.content_type.endswith("json")
):
return 1
return 0
json_view = JSONContentview()

View File

@@ -0,0 +1,277 @@
import struct
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
from mitmproxy.utils import strutils
# from https://github.com/nikitastupin/mitmproxy-mqtt-script
class MQTTControlPacket:
# Packet types
(
CONNECT,
CONNACK,
PUBLISH,
PUBACK,
PUBREC,
PUBREL,
PUBCOMP,
SUBSCRIBE,
SUBACK,
UNSUBSCRIBE,
UNSUBACK,
PINGREQ,
PINGRESP,
DISCONNECT,
) = range(1, 15)
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.1_-
Names = [
"reserved",
"CONNECT",
"CONNACK",
"PUBLISH",
"PUBACK",
"PUBREC",
"PUBREL",
"PUBCOMP",
"SUBSCRIBE",
"SUBACK",
"UNSUBSCRIBE",
"UNSUBACK",
"PINGREQ",
"PINGRESP",
"DISCONNECT",
"reserved",
]
PACKETS_WITH_IDENTIFIER = [
PUBACK,
PUBREC,
PUBREL,
PUBCOMP,
SUBSCRIBE,
SUBACK,
UNSUBSCRIBE,
UNSUBACK,
]
def __init__(self, packet):
self._packet = packet
# Fixed header
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718020
self.packet_type = self._parse_packet_type()
self.packet_type_human = self.Names[self.packet_type]
self.dup, self.qos, self.retain = self._parse_flags()
self.remaining_length = self._parse_remaining_length()
# Variable header & Payload
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718024
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718026
if self.packet_type == self.CONNECT:
self._parse_connect_variable_headers()
self._parse_connect_payload()
elif self.packet_type == self.PUBLISH:
self._parse_publish_variable_headers()
self._parse_publish_payload()
elif self.packet_type == self.SUBSCRIBE:
self._parse_subscribe_variable_headers()
self._parse_subscribe_payload()
elif self.packet_type == self.SUBACK:
pass
elif self.packet_type == self.UNSUBSCRIBE:
pass
else:
self.payload = None
def pprint(self):
s = f"[{self.Names[self.packet_type]}]"
if self.packet_type == self.CONNECT:
assert self.payload
s += f"""
Client Id: {self.payload["ClientId"]}
Will Topic: {self.payload.get("WillTopic")}
Will Message: {strutils.bytes_to_escaped_str(self.payload.get("WillMessage", b"None"))}
User Name: {self.payload.get("UserName")}
Password: {strutils.bytes_to_escaped_str(self.payload.get("Password", b"None"))}
"""
elif self.packet_type == self.SUBSCRIBE:
s += " sent topic filters: "
s += ", ".join([f"'{tf}'" for tf in self.topic_filters])
elif self.packet_type == self.PUBLISH:
assert self.payload
topic_name = strutils.bytes_to_escaped_str(self.topic_name)
payload = strutils.bytes_to_escaped_str(self.payload)
s += f" '{payload}' to topic '{topic_name}'"
elif self.packet_type in [self.PINGREQ, self.PINGRESP]:
pass
else:
s = f"Packet type {self.Names[self.packet_type]} is not supported yet!"
return s
def _parse_length_prefixed_bytes(self, offset):
field_length_bytes = self._packet[offset : offset + 2]
field_length = struct.unpack("!H", field_length_bytes)[0]
field_content_bytes = self._packet[offset + 2 : offset + 2 + field_length]
return field_length + 2, field_content_bytes
def _parse_publish_variable_headers(self):
offset = len(self._packet) - self.remaining_length
field_length, field_content_bytes = self._parse_length_prefixed_bytes(offset)
self.topic_name = field_content_bytes
if self.qos in [0x01, 0x02]:
offset += field_length
self.packet_identifier = self._packet[offset : offset + 2]
def _parse_publish_payload(self):
fixed_header_length = len(self._packet) - self.remaining_length
variable_header_length = 2 + len(self.topic_name)
if self.qos in [0x01, 0x02]:
variable_header_length += 2
offset = fixed_header_length + variable_header_length
self.payload = self._packet[offset:]
def _parse_subscribe_variable_headers(self):
self._parse_packet_identifier()
def _parse_subscribe_payload(self):
offset = len(self._packet) - self.remaining_length + 2
self.topic_filters = {}
while len(self._packet) - offset > 0:
field_length, topic_filter_bytes = self._parse_length_prefixed_bytes(offset)
offset += field_length
qos = self._packet[offset : offset + 1]
offset += 1
topic_filter = topic_filter_bytes.decode("utf-8")
self.topic_filters[topic_filter] = {"qos": qos}
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718030
def _parse_connect_variable_headers(self):
offset = len(self._packet) - self.remaining_length
self.variable_headers = {}
self.connect_flags = {}
self.variable_headers["ProtocolName"] = self._packet[offset : offset + 6]
self.variable_headers["ProtocolLevel"] = self._packet[offset + 6 : offset + 7]
self.variable_headers["ConnectFlags"] = self._packet[offset + 7 : offset + 8]
self.variable_headers["KeepAlive"] = self._packet[offset + 8 : offset + 10]
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349229
self.connect_flags["CleanSession"] = bool(
self.variable_headers["ConnectFlags"][0] & 0x02
)
self.connect_flags["Will"] = bool(
self.variable_headers["ConnectFlags"][0] & 0x04
)
self.will_qos = (self.variable_headers["ConnectFlags"][0] >> 3) & 0x03
self.connect_flags["WillRetain"] = bool(
self.variable_headers["ConnectFlags"][0] & 0x20
)
self.connect_flags["Password"] = bool(
self.variable_headers["ConnectFlags"][0] & 0x40
)
self.connect_flags["UserName"] = bool(
self.variable_headers["ConnectFlags"][0] & 0x80
)
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718031
def _parse_connect_payload(self):
fields = []
offset = len(self._packet) - self.remaining_length + 10
while len(self._packet) - offset > 0:
field_length, field_content = self._parse_length_prefixed_bytes(offset)
fields.append(field_content)
offset += field_length
self.payload = {}
for f in fields:
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349242
if "ClientId" not in self.payload:
self.payload["ClientId"] = f.decode("utf-8")
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349243
elif self.connect_flags["Will"] and "WillTopic" not in self.payload:
self.payload["WillTopic"] = f.decode("utf-8")
elif self.connect_flags["Will"] and "WillMessage" not in self.payload:
self.payload["WillMessage"] = f
elif (
self.connect_flags["UserName"] and "UserName" not in self.payload
): # pragma: no cover
self.payload["UserName"] = f.decode("utf-8")
elif (
self.connect_flags["Password"] and "Password" not in self.payload
): # pragma: no cover
self.payload["Password"] = f
else:
raise AssertionError(f"Unknown field in CONNECT payload: {f}")
def _parse_packet_type(self):
return self._packet[0] >> 4
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022
def _parse_flags(self):
dup = None
qos = None
retain = None
if self.packet_type == self.PUBLISH:
dup = (self._packet[0] >> 3) & 0x01
qos = (self._packet[0] >> 1) & 0x03
retain = self._packet[0] & 0x01
return dup, qos, retain
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.4_Size
def _parse_remaining_length(self):
multiplier = 1
value = 0
i = 1
while True:
encodedByte = self._packet[i]
value += (encodedByte & 127) * multiplier
multiplier *= 128
if multiplier > 128 * 128 * 128:
raise ValueError("Malformed Remaining Length")
if encodedByte & 128 == 0:
break
i += 1
return value
# http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Table_2.5_-
def _parse_packet_identifier(self):
offset = len(self._packet) - self.remaining_length
self.packet_identifier = self._packet[offset : offset + 2]
class MQTTContentview(Contentview):
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
mqtt_packet = MQTTControlPacket(data)
return mqtt_packet.pprint()
mqtt = MQTTContentview()

View File

@@ -0,0 +1,32 @@
from ._utils import byte_pairs_to_str_pairs
from ._utils import merge_repeated_keys
from ._utils import yaml_dumps
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
from mitmproxy.net.http.multipart import decode_multipart
class MultipartContentview(Contentview):
name = "Multipart Form"
syntax_highlight = "yaml"
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
if not metadata.http_message:
raise ValueError("Not an HTTP message")
content_type = metadata.http_message.headers["content-type"]
items = decode_multipart(content_type, data)
return yaml_dumps(merge_repeated_keys(byte_pairs_to_str_pairs(items)))
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
return float(bool(data) and metadata.content_type == "multipart/form-data")
multipart = MultipartContentview()

View File

@@ -0,0 +1,31 @@
from .. import http
from ._utils import merge_repeated_keys
from ._utils import yaml_dumps
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
class QueryContentview(Contentview):
syntax_highlight = "yaml"
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
if not isinstance(metadata.http_message, http.Request):
raise ValueError("Not an HTTP request.")
items = metadata.http_message.query.items(multi=True)
return yaml_dumps(merge_repeated_keys(items))
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
return 0.3 * float(
not data and bool(getattr(metadata.http_message, "query", False))
)
query = QueryContentview()

View File

@@ -0,0 +1,17 @@
from ._api import Contentview
from ._api import Metadata
class RawContentview(Contentview):
def prettify(self, data: bytes, metadata: Metadata) -> str:
return data.decode("utf-8", "backslashreplace")
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
return 0.1
raw = RawContentview()

View File

@@ -0,0 +1,98 @@
from abc import abstractmethod
from enum import Enum
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
from mitmproxy.http import HTTPFlow
from mitmproxy.utils import strutils
class PacketType(Enum):
@property
@abstractmethod
def visible(self) -> bool:
raise RuntimeError # pragma: no cover
def __str__(self):
return f"{type(self).__name__}.{self.name}"
class EngineIO(PacketType):
# https://github.com/socketio/engine.io-protocol?tab=readme-ov-file#protocol
OPEN = ord("0")
CLOSE = ord("1")
PING = ord("2")
PONG = ord("3")
MESSAGE = ord("4")
UPGRADE = ord("5")
NOOP = ord("6")
@property
def visible(self):
return self not in (
self.PING,
self.PONG,
)
class SocketIO(PacketType):
# https://github.com/socketio/socket.io-protocol?tab=readme-ov-file#exchange-protocol
CONNECT = ord("0")
DISCONNECT = ord("1")
EVENT = ord("2")
ACK = ord("3")
CONNECT_ERROR = ord("4")
BINARY_EVENT = ord("5")
BINARY_ACK = ord("6")
@property
def visible(self):
return self not in (
self.ACK,
self.BINARY_ACK,
)
def parse_packet(data: bytes) -> tuple[PacketType, bytes]:
# throws IndexError/ValueError if invalid packet
engineio_type = EngineIO(data[0])
data = data[1:]
if engineio_type is not EngineIO.MESSAGE:
return engineio_type, data
socketio_type = SocketIO(data[0])
data = data[1:]
return socketio_type, data
class SocketIOContentview(Contentview):
name = "Socket.IO"
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
packet_type, msg = parse_packet(data)
if not packet_type.visible:
return ""
return f"{packet_type} {strutils.bytes_to_escaped_str(msg)}"
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
return float(
bool(
data
and isinstance(metadata.flow, HTTPFlow)
and metadata.flow.websocket is not None
and "/socket.io/?" in metadata.flow.request.path
)
)
socket_io = SocketIOContentview()

View File

@@ -0,0 +1,33 @@
import urllib
import urllib.parse
from ._utils import byte_pairs_to_str_pairs
from ._utils import merge_repeated_keys
from ._utils import yaml_dumps
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
class URLEncodedContentview(Contentview):
name = "URL-encoded"
syntax_highlight = "yaml"
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
items = urllib.parse.parse_qsl(data, keep_blank_values=True)
return yaml_dumps(merge_repeated_keys(byte_pairs_to_str_pairs(items)))
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
return float(
bool(data) and metadata.content_type == "application/x-www-form-urlencoded"
)
urlencoded = URLEncodedContentview()

View File

@@ -0,0 +1,25 @@
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
from mitmproxy.contrib.wbxml import ASCommandResponse
class WBXMLContentview(Contentview):
__content_types = ("application/vnd.wap.wbxml", "application/vnd.ms-sync.wbxml")
syntax_highlight = "xml"
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
return ASCommandResponse.ASCommandResponse(data).xmlString
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
return float(bool(data) and metadata.content_type in self.__content_types)
wbxml = WBXMLContentview()

View File

@@ -0,0 +1,278 @@
import io
import re
import textwrap
from collections.abc import Iterable
from mitmproxy.contentviews._api import Contentview
from mitmproxy.contentviews._api import Metadata
from mitmproxy.utils import sliding_window
from mitmproxy.utils import strutils
"""
A custom XML/HTML prettifier. Compared to other prettifiers, its main features are:
- Implemented in pure Python.
- Modifies whitespace only.
- Works with any input.
- Lazy evaluation.
The implementation is split into two main parts: tokenization and formatting of tokens.
"""
# http://www.xml.com/pub/a/2001/07/25/namingparts.html - this is close enough for what we do.
REGEX_TAG = re.compile(r"[a-zA-Z0-9._:\-]+(?!=)")
# https://www.w3.org/TR/html5/syntax.html#void-elements
HTML_VOID_ELEMENTS = {
"area",
"base",
"br",
"col",
"embed",
"hr",
"img",
"input",
"keygen",
"link",
"meta",
"param",
"source",
"track",
"wbr",
}
NO_INDENT_TAGS = {"xml", "doctype", "html"}
INDENT = 2
class Token:
def __init__(self, data):
self.data = data
def __repr__(self):
return f"{type(self).__name__}({self.data})"
class Text(Token):
@property
def text(self):
return self.data.strip()
class Tag(Token):
@property
def tag(self):
t = REGEX_TAG.search(self.data)
if t is not None:
return t.group(0).lower()
return "<empty>"
@property
def is_comment(self) -> bool:
return self.data.startswith("<!--")
@property
def is_cdata(self) -> bool:
return self.data.startswith("<![CDATA[")
@property
def is_closing(self):
return self.data.startswith("</")
@property
def is_self_closing(self):
return (
self.is_comment
or self.is_cdata
or self.data.endswith("/>")
or self.tag in HTML_VOID_ELEMENTS
)
@property
def is_opening(self):
return not self.is_closing and not self.is_self_closing
@property
def done(self):
if self.is_comment:
return self.data.endswith("-->")
elif self.is_cdata:
return self.data.endswith("]]>")
else:
# This fails for attributes that contain an unescaped ">"
return self.data.endswith(">")
def tokenize(data: str) -> Iterable[Token]:
token: Token = Text("")
i = 0
def readuntil(char, start, include=1):
nonlocal i
end = data.find(char, start)
if end == -1:
end = len(data)
ret = data[i : end + include]
i = end + include
return ret
while i < len(data):
if isinstance(token, Text):
token.data = readuntil("<", i, 0)
if token.text:
yield token
token = Tag("")
elif isinstance(token, Tag):
token.data += readuntil(">", i, 1)
if token.done:
yield token
token = Text("")
if token.data.strip():
yield token
def indent_text(data: str, prefix: str) -> str:
# Add spacing to first line so that we dedent in cases like this:
# <li>This is
# example text
# over multiple lines
# </li>
dedented = textwrap.dedent(" " * 32 + data).strip()
return textwrap.indent(dedented, prefix[:32])
def is_inline_text(a: Token | None, b: Token | None, c: Token | None) -> bool:
if isinstance(a, Tag) and isinstance(b, Text) and isinstance(c, Tag):
if a.is_opening and "\n" not in b.data and c.is_closing and a.tag == c.tag:
return True
return False
def is_inline(
prev2: Token | None,
prev1: Token | None,
t: Token | None,
next1: Token | None,
next2: Token | None,
) -> bool:
if isinstance(t, Text):
return is_inline_text(prev1, t, next1)
elif isinstance(t, Tag):
if is_inline_text(prev2, prev1, t) or is_inline_text(t, next1, next2):
return True
if (
isinstance(next1, Tag)
and t.is_opening
and next1.is_closing
and t.tag == next1.tag
):
return True # <div></div> (start tag)
if (
isinstance(prev1, Tag)
and prev1.is_opening
and t.is_closing
and prev1.tag == t.tag
):
return True # <div></div> (end tag)
return False
class ElementStack:
"""
Keep track of how deeply nested our document is.
"""
def __init__(self):
self.open_tags = []
self.indent = ""
def push_tag(self, tag: str):
if len(self.open_tags) > 16:
return
self.open_tags.append(tag)
if tag not in NO_INDENT_TAGS:
self.indent += " " * INDENT
def pop_tag(self, tag: str):
if tag in self.open_tags:
remove_indent = 0
while True:
t = self.open_tags.pop()
if t not in NO_INDENT_TAGS:
remove_indent += INDENT
if t == tag:
break
self.indent = self.indent[:-remove_indent]
else:
pass # this closing tag has no start tag. let's keep indentation as-is.
def format_xml(tokens: Iterable[Token]) -> str:
out = io.StringIO()
context = ElementStack()
for prev2, prev1, token, next1, next2 in sliding_window.window(tokens, 2, 2):
if isinstance(token, Tag):
if token.is_opening:
out.write(indent_text(token.data, context.indent))
if not is_inline(prev2, prev1, token, next1, next2):
out.write("\n")
context.push_tag(token.tag)
elif token.is_closing:
context.pop_tag(token.tag)
if is_inline(prev2, prev1, token, next1, next2):
out.write(token.data)
else:
out.write(indent_text(token.data, context.indent))
out.write("\n")
else: # self-closing
out.write(indent_text(token.data, context.indent))
out.write("\n")
elif isinstance(token, Text):
if is_inline(prev2, prev1, token, next1, next2):
out.write(token.text)
else:
out.write(indent_text(token.data, context.indent))
out.write("\n")
else: # pragma: no cover
raise RuntimeError()
return out.getvalue()
class XmlHtmlContentview(Contentview):
__content_types = ("text/xml", "text/html")
name = "XML/HTML"
syntax_highlight = "xml"
def prettify(
self,
data: bytes,
metadata: Metadata,
) -> str:
if metadata.http_message:
data_str = metadata.http_message.get_text(strict=False) or ""
else:
data_str = data.decode("utf8", "backslashreplace")
tokens = tokenize(data_str)
return format_xml(tokens)
def render_priority(
self,
data: bytes,
metadata: Metadata,
) -> float:
if not data:
return 0
if metadata.content_type in self.__content_types:
return 1
elif strutils.is_xml(data):
return 0.4
return 0
xml_html = XmlHtmlContentview()

View File

@@ -0,0 +1,129 @@
# Default view cutoff *in lines*
import sys
from abc import ABC
from abc import abstractmethod
from collections.abc import Iterable
from collections.abc import Iterator
from collections.abc import Mapping
from typing import ClassVar
from typing import Union
from mitmproxy import flow
from mitmproxy import http
if sys.version_info < (3, 13): # pragma: no cover
from typing_extensions import deprecated
else:
from warnings import deprecated
KEY_MAX = 30
TTextType = Union[str, bytes] # FIXME: This should be either bytes or str ultimately.
TViewLine = list[tuple[str, TTextType]]
TViewResult = tuple[str, Iterator[TViewLine]]
@deprecated("Use `mitmproxy.contentviews.Contentview` instead.")
class View(ABC):
"""
Deprecated, do not use.
"""
name: ClassVar[str]
@abstractmethod
def __call__(
self,
data: bytes,
*,
content_type: str | None = None,
flow: flow.Flow | None = None,
http_message: http.Message | None = None,
**unknown_metadata,
) -> TViewResult:
"""
Transform raw data into human-readable output.
Returns a (description, content generator) tuple.
The content generator yields lists of (style, text) tuples, where each list represents
a single line. ``text`` is a unfiltered string which may need to be escaped,
depending on the used output. For example, it may contain terminal control sequences
or unfiltered HTML.
Except for `data`, implementations must not rely on any given argument to be present.
To ensure compatibility with future mitmproxy versions, unknown keyword arguments should be ignored.
The content generator must not yield tuples of tuples, because urwid cannot process that.
You have to yield a *list* of tuples per line.
"""
raise NotImplementedError() # pragma: no cover
def render_priority(
self,
data: bytes,
*,
content_type: str | None = None,
flow: flow.Flow | None = None,
http_message: http.Message | None = None,
**unknown_metadata,
) -> float:
"""
Return the priority of this view for rendering `data`.
If no particular view is chosen by the user, the view with the highest priority is selected.
Except for `data`, implementations must not rely on any given argument to be present.
To ensure compatibility with future mitmproxy versions, unknown keyword arguments should be ignored.
"""
return 0
def __lt__(self, other):
assert isinstance(other, View)
return self.name.__lt__(other.name)
@deprecated("Use `mitmproxy.contentviews.Contentview` instead.")
def format_pairs(items: Iterable[tuple[TTextType, TTextType]]) -> Iterator[TViewLine]:
"""
Helper function that accepts a list of (k,v) pairs into a list of
[
("key", key )
("value", value)
]
where key is padded to a uniform width
"""
max_key_len = max((len(k[0]) for k in items), default=0)
max_key_len = min((max_key_len, KEY_MAX), default=0)
for key, value in items:
if isinstance(key, bytes):
key += b":"
else:
key += ":"
key = key.ljust(max_key_len + 2)
yield [("header", key), ("text", value)]
@deprecated("Use `mitmproxy.contentviews.Contentview` instead.")
def format_dict(d: Mapping[TTextType, TTextType]) -> Iterator[TViewLine]:
"""
Helper function that transforms the given dictionary into a list of
[
("key", key )
("value", value)
]
entries, where key is padded to a uniform width.
"""
return format_pairs(d.items())
@deprecated("Use `mitmproxy.contentviews.Contentview` instead.")
def format_text(text: TTextType) -> Iterator[TViewLine]:
"""
Helper function that transforms bytes into the view output format.
"""
for line in text.splitlines():
yield [("text", line)]