Files

85 lines
2.3 KiB
Python
Raw Permalink Normal View History

2025-12-25 11:16:59 +08:00
from collections.abc import Callable
from collections.abc import Iterator
from typing import Any
from mitmproxy import dns
from mitmproxy import flow
from mitmproxy import hooks
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import udp
from mitmproxy.proxy import layers
TEventGenerator = Iterator[hooks.Hook]
def _iterate_http(f: http.HTTPFlow) -> TEventGenerator:
if f.request:
yield layers.http.HttpRequestHeadersHook(f)
yield layers.http.HttpRequestHook(f)
if f.response:
yield layers.http.HttpResponseHeadersHook(f)
yield layers.http.HttpResponseHook(f)
if f.websocket:
message_queue = f.websocket.messages
f.websocket.messages = []
yield layers.websocket.WebsocketStartHook(f)
for m in message_queue:
f.websocket.messages.append(m)
yield layers.websocket.WebsocketMessageHook(f)
yield layers.websocket.WebsocketEndHook(f)
elif f.error:
yield layers.http.HttpErrorHook(f)
def _iterate_tcp(f: tcp.TCPFlow) -> TEventGenerator:
messages = f.messages
f.messages = []
yield layers.tcp.TcpStartHook(f)
while messages:
f.messages.append(messages.pop(0))
yield layers.tcp.TcpMessageHook(f)
if f.error:
yield layers.tcp.TcpErrorHook(f)
else:
yield layers.tcp.TcpEndHook(f)
def _iterate_udp(f: udp.UDPFlow) -> TEventGenerator:
messages = f.messages
f.messages = []
yield layers.udp.UdpStartHook(f)
while messages:
f.messages.append(messages.pop(0))
yield layers.udp.UdpMessageHook(f)
if f.error:
yield layers.udp.UdpErrorHook(f)
else:
yield layers.udp.UdpEndHook(f)
def _iterate_dns(f: dns.DNSFlow) -> TEventGenerator:
if f.request:
yield layers.dns.DnsRequestHook(f)
if f.response:
yield layers.dns.DnsResponseHook(f)
if f.error:
yield layers.dns.DnsErrorHook(f)
_iterate_map: dict[type[flow.Flow], Callable[[Any], TEventGenerator]] = {
http.HTTPFlow: _iterate_http,
tcp.TCPFlow: _iterate_tcp,
udp.UDPFlow: _iterate_udp,
dns.DNSFlow: _iterate_dns,
}
def iterate(f: flow.Flow) -> TEventGenerator:
try:
e = _iterate_map[type(f)]
except KeyError as err:
raise TypeError(f"Unknown flow type: {f.__class__.__name__}") from err
else:
yield from e(f)