85 lines
2.3 KiB
Python
85 lines
2.3 KiB
Python
from collections.abc import Callable
|
|
from collections.abc import Iterator
|
|
from typing import Any
|
|
|
|
from mitmproxy import dns
|
|
from mitmproxy import flow
|
|
from mitmproxy import hooks
|
|
from mitmproxy import http
|
|
from mitmproxy import tcp
|
|
from mitmproxy import udp
|
|
from mitmproxy.proxy import layers
|
|
|
|
TEventGenerator = Iterator[hooks.Hook]
|
|
|
|
|
|
def _iterate_http(f: http.HTTPFlow) -> TEventGenerator:
|
|
if f.request:
|
|
yield layers.http.HttpRequestHeadersHook(f)
|
|
yield layers.http.HttpRequestHook(f)
|
|
if f.response:
|
|
yield layers.http.HttpResponseHeadersHook(f)
|
|
yield layers.http.HttpResponseHook(f)
|
|
if f.websocket:
|
|
message_queue = f.websocket.messages
|
|
f.websocket.messages = []
|
|
yield layers.websocket.WebsocketStartHook(f)
|
|
for m in message_queue:
|
|
f.websocket.messages.append(m)
|
|
yield layers.websocket.WebsocketMessageHook(f)
|
|
yield layers.websocket.WebsocketEndHook(f)
|
|
elif f.error:
|
|
yield layers.http.HttpErrorHook(f)
|
|
|
|
|
|
def _iterate_tcp(f: tcp.TCPFlow) -> TEventGenerator:
|
|
messages = f.messages
|
|
f.messages = []
|
|
yield layers.tcp.TcpStartHook(f)
|
|
while messages:
|
|
f.messages.append(messages.pop(0))
|
|
yield layers.tcp.TcpMessageHook(f)
|
|
if f.error:
|
|
yield layers.tcp.TcpErrorHook(f)
|
|
else:
|
|
yield layers.tcp.TcpEndHook(f)
|
|
|
|
|
|
def _iterate_udp(f: udp.UDPFlow) -> TEventGenerator:
|
|
messages = f.messages
|
|
f.messages = []
|
|
yield layers.udp.UdpStartHook(f)
|
|
while messages:
|
|
f.messages.append(messages.pop(0))
|
|
yield layers.udp.UdpMessageHook(f)
|
|
if f.error:
|
|
yield layers.udp.UdpErrorHook(f)
|
|
else:
|
|
yield layers.udp.UdpEndHook(f)
|
|
|
|
|
|
def _iterate_dns(f: dns.DNSFlow) -> TEventGenerator:
|
|
if f.request:
|
|
yield layers.dns.DnsRequestHook(f)
|
|
if f.response:
|
|
yield layers.dns.DnsResponseHook(f)
|
|
if f.error:
|
|
yield layers.dns.DnsErrorHook(f)
|
|
|
|
|
|
_iterate_map: dict[type[flow.Flow], Callable[[Any], TEventGenerator]] = {
|
|
http.HTTPFlow: _iterate_http,
|
|
tcp.TCPFlow: _iterate_tcp,
|
|
udp.UDPFlow: _iterate_udp,
|
|
dns.DNSFlow: _iterate_dns,
|
|
}
|
|
|
|
|
|
def iterate(f: flow.Flow) -> TEventGenerator:
|
|
try:
|
|
e = _iterate_map[type(f)]
|
|
except KeyError as err:
|
|
raise TypeError(f"Unknown flow type: {f.__class__.__name__}") from err
|
|
else:
|
|
yield from e(f)
|