341 lines
13 KiB
Python
341 lines
13 KiB
Python
|
|
"""
|
||
|
|
Base class for protocol layers.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import collections
|
||
|
|
import textwrap
|
||
|
|
from abc import abstractmethod
|
||
|
|
from collections.abc import Callable
|
||
|
|
from collections.abc import Generator
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from logging import DEBUG
|
||
|
|
from typing import Any
|
||
|
|
from typing import ClassVar
|
||
|
|
from typing import NamedTuple
|
||
|
|
from typing import TypeVar
|
||
|
|
|
||
|
|
from mitmproxy.connection import Connection
|
||
|
|
from mitmproxy.proxy import commands
|
||
|
|
from mitmproxy.proxy import events
|
||
|
|
from mitmproxy.proxy.commands import Command
|
||
|
|
from mitmproxy.proxy.commands import StartHook
|
||
|
|
from mitmproxy.proxy.context import Context
|
||
|
|
|
||
|
|
T = TypeVar("T")
|
||
|
|
CommandGenerator = Generator[Command, Any, T]
|
||
|
|
"""
|
||
|
|
A function annotated with CommandGenerator[bool] may yield commands and ultimately return a boolean value.
|
||
|
|
"""
|
||
|
|
|
||
|
|
|
||
|
|
MAX_LOG_STATEMENT_SIZE = 2048
|
||
|
|
"""Maximum size of individual log statements before they will be truncated."""
|
||
|
|
|
||
|
|
|
||
|
|
class Paused(NamedTuple):
|
||
|
|
"""
|
||
|
|
State of a layer that's paused because it is waiting for a command reply.
|
||
|
|
"""
|
||
|
|
|
||
|
|
command: commands.Command
|
||
|
|
generator: CommandGenerator
|
||
|
|
|
||
|
|
|
||
|
|
class Layer:
|
||
|
|
"""
|
||
|
|
The base class for all protocol layers.
|
||
|
|
|
||
|
|
Layers interface with their child layer(s) by calling .handle_event(event),
|
||
|
|
which returns a list (more precisely: a generator) of commands.
|
||
|
|
Most layers do not implement .directly, but instead implement ._handle_event, which
|
||
|
|
is called by the default implementation of .handle_event.
|
||
|
|
The default implementation of .handle_event allows layers to emulate blocking code:
|
||
|
|
When ._handle_event yields a command that has its blocking attribute set to True, .handle_event pauses
|
||
|
|
the execution of ._handle_event and waits until it is called with the corresponding CommandCompleted event.
|
||
|
|
All events encountered in the meantime are buffered and replayed after execution is resumed.
|
||
|
|
|
||
|
|
The result is code that looks like blocking code, but is not blocking:
|
||
|
|
|
||
|
|
def _handle_event(self, event):
|
||
|
|
err = yield OpenConnection(server) # execution continues here after a connection has been established.
|
||
|
|
|
||
|
|
Technically this is very similar to how coroutines are implemented.
|
||
|
|
"""
|
||
|
|
|
||
|
|
__last_debug_message: ClassVar[str] = ""
|
||
|
|
context: Context
|
||
|
|
_paused: Paused | None
|
||
|
|
"""
|
||
|
|
If execution is currently paused, this attribute stores the paused coroutine
|
||
|
|
and the command for which we are expecting a reply.
|
||
|
|
"""
|
||
|
|
_paused_event_queue: collections.deque[events.Event]
|
||
|
|
"""
|
||
|
|
All events that have occurred since execution was paused.
|
||
|
|
These will be replayed to ._child_layer once we resume.
|
||
|
|
"""
|
||
|
|
debug: str | None = None
|
||
|
|
"""
|
||
|
|
Enable debug logging by assigning a prefix string for log messages.
|
||
|
|
Different amounts of whitespace for different layers work well.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, context: Context) -> None:
|
||
|
|
self.context = context
|
||
|
|
self.context.layers.append(self)
|
||
|
|
self._paused = None
|
||
|
|
self._paused_event_queue = collections.deque()
|
||
|
|
|
||
|
|
show_debug_output = getattr(context.options, "proxy_debug", False)
|
||
|
|
if show_debug_output: # pragma: no cover
|
||
|
|
self.debug = " " * len(context.layers)
|
||
|
|
|
||
|
|
def __repr__(self):
|
||
|
|
statefun = getattr(self, "state", self._handle_event)
|
||
|
|
state = getattr(statefun, "__name__", "")
|
||
|
|
state = state.replace("state_", "")
|
||
|
|
if state == "_handle_event":
|
||
|
|
state = ""
|
||
|
|
else:
|
||
|
|
state = f"state: {state}"
|
||
|
|
return f"{type(self).__name__}({state})"
|
||
|
|
|
||
|
|
def __debug(self, message):
|
||
|
|
"""yield a Log command indicating what message is passing through this layer."""
|
||
|
|
if len(message) > MAX_LOG_STATEMENT_SIZE:
|
||
|
|
message = message[:MAX_LOG_STATEMENT_SIZE] + "…"
|
||
|
|
if Layer.__last_debug_message == message:
|
||
|
|
message = message.split("\n", 1)[0].strip()
|
||
|
|
if len(message) > 256:
|
||
|
|
message = message[:256] + "…"
|
||
|
|
else:
|
||
|
|
Layer.__last_debug_message = message
|
||
|
|
assert self.debug is not None
|
||
|
|
return commands.Log(textwrap.indent(message, self.debug), DEBUG)
|
||
|
|
|
||
|
|
@property
|
||
|
|
def stack_pos(self) -> str:
|
||
|
|
"""repr() for this layer and all its parent layers, only useful for debugging."""
|
||
|
|
try:
|
||
|
|
idx = self.context.layers.index(self)
|
||
|
|
except ValueError:
|
||
|
|
return repr(self)
|
||
|
|
else:
|
||
|
|
return " >> ".join(repr(x) for x in self.context.layers[: idx + 1])
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def _handle_event(self, event: events.Event) -> CommandGenerator[None]:
|
||
|
|
"""Handle a proxy server event"""
|
||
|
|
yield from () # pragma: no cover
|
||
|
|
|
||
|
|
def handle_event(self, event: events.Event) -> CommandGenerator[None]:
|
||
|
|
if self._paused:
|
||
|
|
# did we just receive the reply we were waiting for?
|
||
|
|
pause_finished = (
|
||
|
|
isinstance(event, events.CommandCompleted)
|
||
|
|
and event.command is self._paused.command
|
||
|
|
)
|
||
|
|
if self.debug is not None:
|
||
|
|
yield self.__debug(f"{'>>' if pause_finished else '>!'} {event}")
|
||
|
|
if pause_finished:
|
||
|
|
assert isinstance(event, events.CommandCompleted)
|
||
|
|
yield from self.__continue(event)
|
||
|
|
else:
|
||
|
|
self._paused_event_queue.append(event)
|
||
|
|
else:
|
||
|
|
if self.debug is not None:
|
||
|
|
yield self.__debug(f">> {event}")
|
||
|
|
command_generator = self._handle_event(event)
|
||
|
|
send = None
|
||
|
|
|
||
|
|
# inlined copy of __process to reduce call stack.
|
||
|
|
# <✂✂✂>
|
||
|
|
try:
|
||
|
|
# Run ._handle_event to the next yield statement.
|
||
|
|
# If you are not familiar with generators and their .send() method,
|
||
|
|
# https://stackoverflow.com/a/12638313/934719 has a good explanation.
|
||
|
|
command = command_generator.send(send)
|
||
|
|
except StopIteration:
|
||
|
|
return
|
||
|
|
|
||
|
|
while True:
|
||
|
|
if self.debug is not None:
|
||
|
|
if not isinstance(command, commands.Log):
|
||
|
|
yield self.__debug(f"<< {command}")
|
||
|
|
if command.blocking is True:
|
||
|
|
# We only want this layer to block, the outer layers should not block.
|
||
|
|
# For example, take an HTTP/2 connection: If we intercept one particular request,
|
||
|
|
# we don't want all other requests in the connection to be blocked a well.
|
||
|
|
# We signal to outer layers that this command is already handled by assigning our layer to
|
||
|
|
# `.blocking` here (upper layers explicitly check for `is True`).
|
||
|
|
command.blocking = self
|
||
|
|
self._paused = Paused(
|
||
|
|
command,
|
||
|
|
command_generator,
|
||
|
|
)
|
||
|
|
yield command
|
||
|
|
return
|
||
|
|
else:
|
||
|
|
yield command
|
||
|
|
try:
|
||
|
|
command = next(command_generator)
|
||
|
|
except StopIteration:
|
||
|
|
return
|
||
|
|
# </✂✂✂>
|
||
|
|
|
||
|
|
def __process(self, command_generator: CommandGenerator, send=None):
|
||
|
|
"""
|
||
|
|
Yield commands from a generator.
|
||
|
|
If a command is blocking, execution is paused and this function returns without
|
||
|
|
processing any further commands.
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
# Run ._handle_event to the next yield statement.
|
||
|
|
# If you are not familiar with generators and their .send() method,
|
||
|
|
# https://stackoverflow.com/a/12638313/934719 has a good explanation.
|
||
|
|
command = command_generator.send(send)
|
||
|
|
except StopIteration:
|
||
|
|
return
|
||
|
|
|
||
|
|
while True:
|
||
|
|
if self.debug is not None:
|
||
|
|
if not isinstance(command, commands.Log):
|
||
|
|
yield self.__debug(f"<< {command}")
|
||
|
|
if command.blocking is True:
|
||
|
|
# We only want this layer to block, the outer layers should not block.
|
||
|
|
# For example, take an HTTP/2 connection: If we intercept one particular request,
|
||
|
|
# we don't want all other requests in the connection to be blocked a well.
|
||
|
|
# We signal to outer layers that this command is already handled by assigning our layer to
|
||
|
|
# `.blocking` here (upper layers explicitly check for `is True`).
|
||
|
|
command.blocking = self
|
||
|
|
self._paused = Paused(
|
||
|
|
command,
|
||
|
|
command_generator,
|
||
|
|
)
|
||
|
|
yield command
|
||
|
|
return
|
||
|
|
else:
|
||
|
|
yield command
|
||
|
|
try:
|
||
|
|
command = next(command_generator)
|
||
|
|
except StopIteration:
|
||
|
|
return
|
||
|
|
|
||
|
|
def __continue(self, event: events.CommandCompleted):
|
||
|
|
"""
|
||
|
|
Continue processing events after being paused.
|
||
|
|
The tricky part here is that events in the event queue may trigger commands which again pause the execution,
|
||
|
|
so we may not be able to process the entire queue.
|
||
|
|
"""
|
||
|
|
assert self._paused is not None
|
||
|
|
command_generator = self._paused.generator
|
||
|
|
self._paused = None
|
||
|
|
yield from self.__process(command_generator, event.reply)
|
||
|
|
|
||
|
|
while not self._paused and self._paused_event_queue:
|
||
|
|
ev = self._paused_event_queue.popleft()
|
||
|
|
if self.debug is not None:
|
||
|
|
yield self.__debug(f"!> {ev}")
|
||
|
|
command_generator = self._handle_event(ev)
|
||
|
|
yield from self.__process(command_generator)
|
||
|
|
|
||
|
|
|
||
|
|
mevents = (
|
||
|
|
events # alias here because autocomplete above should not have aliased version.
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
class NextLayer(Layer):
|
||
|
|
layer: Layer | None
|
||
|
|
"""The next layer. To be set by an addon."""
|
||
|
|
|
||
|
|
events: list[mevents.Event]
|
||
|
|
"""All events that happened before a decision was made."""
|
||
|
|
|
||
|
|
_ask_on_start: bool
|
||
|
|
|
||
|
|
def __init__(self, context: Context, ask_on_start: bool = False) -> None:
|
||
|
|
super().__init__(context)
|
||
|
|
self.context.layers.remove(self)
|
||
|
|
self.layer = None
|
||
|
|
self.events = []
|
||
|
|
self._ask_on_start = ask_on_start
|
||
|
|
self._handle: Callable[[mevents.Event], CommandGenerator[None]] | None = None
|
||
|
|
|
||
|
|
def __repr__(self):
|
||
|
|
return f"NextLayer:{self.layer!r}"
|
||
|
|
|
||
|
|
def handle_event(self, event: mevents.Event):
|
||
|
|
if self._handle is not None:
|
||
|
|
yield from self._handle(event)
|
||
|
|
else:
|
||
|
|
yield from super().handle_event(event)
|
||
|
|
|
||
|
|
def _handle_event(self, event: mevents.Event):
|
||
|
|
self.events.append(event)
|
||
|
|
|
||
|
|
# We receive new data. Let's find out if we can determine the next layer now?
|
||
|
|
if self._ask_on_start and isinstance(event, events.Start):
|
||
|
|
yield from self._ask()
|
||
|
|
elif (
|
||
|
|
isinstance(event, mevents.ConnectionClosed)
|
||
|
|
and event.connection == self.context.client
|
||
|
|
):
|
||
|
|
# If we have not determined the next protocol yet and the client already closes the connection,
|
||
|
|
# we abort everything.
|
||
|
|
yield commands.CloseConnection(self.context.client)
|
||
|
|
elif isinstance(event, mevents.DataReceived):
|
||
|
|
# For now, we only ask if we have received new data to reduce hook noise.
|
||
|
|
yield from self._ask()
|
||
|
|
|
||
|
|
def _ask(self):
|
||
|
|
"""
|
||
|
|
Manually trigger a next_layer hook.
|
||
|
|
The only use at the moment is to make sure that the top layer is initialized.
|
||
|
|
"""
|
||
|
|
yield NextLayerHook(self)
|
||
|
|
|
||
|
|
# Has an addon decided on the next layer yet?
|
||
|
|
if self.layer:
|
||
|
|
if self.debug:
|
||
|
|
yield commands.Log(f"{self.debug}[nextlayer] {self.layer!r}", DEBUG)
|
||
|
|
for e in self.events:
|
||
|
|
yield from self.layer.handle_event(e)
|
||
|
|
self.events.clear()
|
||
|
|
|
||
|
|
# Why do we need three assignments here?
|
||
|
|
# 1. When this function here is invoked we may have paused events. Those should be
|
||
|
|
# forwarded to the sublayer right away, so we reassign ._handle_event.
|
||
|
|
# 2. This layer is not needed anymore, so we directly reassign .handle_event.
|
||
|
|
# 3. Some layers may however still have a reference to the old .handle_event.
|
||
|
|
# ._handle is just an optimization to reduce the callstack in these cases.
|
||
|
|
self.handle_event = self.layer.handle_event # type: ignore
|
||
|
|
self._handle_event = self.layer.handle_event # type: ignore
|
||
|
|
self._handle = self.layer.handle_event
|
||
|
|
|
||
|
|
# Utility methods for whoever decides what the next layer is going to be.
|
||
|
|
def data_client(self):
|
||
|
|
return self._data(self.context.client)
|
||
|
|
|
||
|
|
def data_server(self):
|
||
|
|
return self._data(self.context.server)
|
||
|
|
|
||
|
|
def _data(self, connection: Connection):
|
||
|
|
data = (
|
||
|
|
e.data
|
||
|
|
for e in self.events
|
||
|
|
if isinstance(e, mevents.DataReceived) and e.connection == connection
|
||
|
|
)
|
||
|
|
return b"".join(data)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class NextLayerHook(StartHook):
|
||
|
|
"""
|
||
|
|
Network layers are being switched. You may change which layer will be used by setting data.layer.
|
||
|
|
|
||
|
|
(by default, this is done by mitmproxy.addons.NextLayer)
|
||
|
|
"""
|
||
|
|
|
||
|
|
data: NextLayer
|