693 lines
26 KiB
Python
693 lines
26 KiB
Python
import struct
|
|
import time
|
|
import typing
|
|
from collections.abc import Iterator
|
|
from dataclasses import dataclass
|
|
from logging import DEBUG
|
|
from logging import ERROR
|
|
from logging import INFO
|
|
from logging import WARNING
|
|
|
|
from OpenSSL import SSL
|
|
|
|
from mitmproxy import certs
|
|
from mitmproxy import connection
|
|
from mitmproxy.connection import TlsVersion
|
|
from mitmproxy.net.tls import starts_like_dtls_record
|
|
from mitmproxy.net.tls import starts_like_tls_record
|
|
from mitmproxy.proxy import commands
|
|
from mitmproxy.proxy import context
|
|
from mitmproxy.proxy import events
|
|
from mitmproxy.proxy import layer
|
|
from mitmproxy.proxy import tunnel
|
|
from mitmproxy.proxy.commands import StartHook
|
|
from mitmproxy.proxy.layers import tcp
|
|
from mitmproxy.proxy.layers import udp
|
|
from mitmproxy.tls import ClientHello
|
|
from mitmproxy.tls import ClientHelloData
|
|
from mitmproxy.tls import TlsData
|
|
from mitmproxy.utils import human
|
|
|
|
|
|
def handshake_record_contents(data: bytes) -> Iterator[bytes]:
|
|
"""
|
|
Returns a generator that yields the bytes contained in each handshake record.
|
|
This will raise an error on the first non-handshake record, so fully exhausting this
|
|
generator is a bad idea.
|
|
"""
|
|
offset = 0
|
|
while True:
|
|
if len(data) < offset + 5:
|
|
return
|
|
record_header = data[offset : offset + 5]
|
|
if not starts_like_tls_record(record_header):
|
|
raise ValueError(f"Expected TLS record, got {record_header!r} instead.")
|
|
record_size = struct.unpack("!H", record_header[3:])[0]
|
|
if record_size == 0:
|
|
raise ValueError("Record must not be empty.")
|
|
offset += 5
|
|
|
|
if len(data) < offset + record_size:
|
|
return
|
|
record_body = data[offset : offset + record_size]
|
|
yield record_body
|
|
offset += record_size
|
|
|
|
|
|
def get_client_hello(data: bytes) -> bytes | None:
|
|
"""
|
|
Read all TLS records that contain the initial ClientHello.
|
|
Returns the raw handshake packet bytes, without TLS record headers.
|
|
"""
|
|
client_hello = b""
|
|
for d in handshake_record_contents(data):
|
|
client_hello += d
|
|
if len(client_hello) >= 4:
|
|
client_hello_size = struct.unpack("!I", b"\x00" + client_hello[1:4])[0] + 4
|
|
if len(client_hello) >= client_hello_size:
|
|
return client_hello[:client_hello_size]
|
|
return None
|
|
|
|
|
|
def parse_client_hello(data: bytes) -> ClientHello | None:
|
|
"""
|
|
Check if the supplied bytes contain a full ClientHello message,
|
|
and if so, parse it.
|
|
|
|
Returns:
|
|
- A ClientHello object on success
|
|
- None, if the TLS record is not complete
|
|
|
|
Raises:
|
|
- A ValueError, if the passed ClientHello is invalid
|
|
"""
|
|
# Check if ClientHello is complete
|
|
client_hello = get_client_hello(data)
|
|
if client_hello:
|
|
try:
|
|
return ClientHello(client_hello[4:])
|
|
except EOFError as e:
|
|
raise ValueError("Invalid ClientHello") from e
|
|
return None
|
|
|
|
|
|
def dtls_handshake_record_contents(data: bytes) -> Iterator[bytes]:
|
|
"""
|
|
Returns a generator that yields the bytes contained in each handshake record.
|
|
This will raise an error on the first non-handshake record, so fully exhausting this
|
|
generator is a bad idea.
|
|
"""
|
|
offset = 0
|
|
while True:
|
|
# DTLS includes two new fields, totaling 8 bytes, between Version and Length
|
|
if len(data) < offset + 13:
|
|
return
|
|
record_header = data[offset : offset + 13]
|
|
if not starts_like_dtls_record(record_header):
|
|
raise ValueError(f"Expected DTLS record, got {record_header!r} instead.")
|
|
# Length fields starts at 11
|
|
record_size = struct.unpack("!H", record_header[11:])[0]
|
|
if record_size == 0:
|
|
raise ValueError("Record must not be empty.")
|
|
offset += 13
|
|
|
|
if len(data) < offset + record_size:
|
|
return
|
|
record_body = data[offset : offset + record_size]
|
|
yield record_body
|
|
offset += record_size
|
|
|
|
|
|
def get_dtls_client_hello(data: bytes) -> bytes | None:
|
|
"""
|
|
Read all DTLS records that contain the initial ClientHello.
|
|
Returns the raw handshake packet bytes, without TLS record headers.
|
|
"""
|
|
client_hello = b""
|
|
for d in dtls_handshake_record_contents(data):
|
|
client_hello += d
|
|
if len(client_hello) >= 13:
|
|
# comment about slicing: we skip the epoch and sequence number
|
|
client_hello_size = (
|
|
struct.unpack("!I", b"\x00" + client_hello[9:12])[0] + 12
|
|
)
|
|
if len(client_hello) >= client_hello_size:
|
|
return client_hello[:client_hello_size]
|
|
return None
|
|
|
|
|
|
def dtls_parse_client_hello(data: bytes) -> ClientHello | None:
|
|
"""
|
|
Check if the supplied bytes contain a full ClientHello message,
|
|
and if so, parse it.
|
|
|
|
Returns:
|
|
- A ClientHello object on success
|
|
- None, if the TLS record is not complete
|
|
|
|
Raises:
|
|
- A ValueError, if the passed ClientHello is invalid
|
|
"""
|
|
# Check if ClientHello is complete
|
|
client_hello = get_dtls_client_hello(data)
|
|
if client_hello:
|
|
try:
|
|
return ClientHello(client_hello[12:], dtls=True)
|
|
except EOFError as e:
|
|
raise ValueError("Invalid ClientHello") from e
|
|
return None
|
|
|
|
|
|
HTTP1_ALPNS = (b"http/1.1", b"http/1.0", b"http/0.9")
|
|
HTTP2_ALPN = b"h2"
|
|
HTTP3_ALPN = b"h3"
|
|
HTTP_ALPNS = (HTTP3_ALPN, HTTP2_ALPN, *HTTP1_ALPNS)
|
|
|
|
|
|
# We need these classes as hooks can only have one argument at the moment.
|
|
|
|
|
|
@dataclass
|
|
class TlsClienthelloHook(StartHook):
|
|
"""
|
|
Mitmproxy has received a TLS ClientHello message.
|
|
|
|
This hook decides whether a server connection is needed
|
|
to negotiate TLS with the client (data.establish_server_tls_first)
|
|
"""
|
|
|
|
data: ClientHelloData
|
|
|
|
|
|
@dataclass
|
|
class TlsStartClientHook(StartHook):
|
|
"""
|
|
TLS negotation between mitmproxy and a client is about to start.
|
|
|
|
An addon is expected to initialize data.ssl_conn.
|
|
(by default, this is done by `mitmproxy.addons.tlsconfig`)
|
|
"""
|
|
|
|
data: TlsData
|
|
|
|
|
|
@dataclass
|
|
class TlsStartServerHook(StartHook):
|
|
"""
|
|
TLS negotation between mitmproxy and a server is about to start.
|
|
|
|
An addon is expected to initialize data.ssl_conn.
|
|
(by default, this is done by `mitmproxy.addons.tlsconfig`)
|
|
"""
|
|
|
|
data: TlsData
|
|
|
|
|
|
@dataclass
|
|
class TlsEstablishedClientHook(StartHook):
|
|
"""
|
|
The TLS handshake with the client has been completed successfully.
|
|
"""
|
|
|
|
data: TlsData
|
|
|
|
|
|
@dataclass
|
|
class TlsEstablishedServerHook(StartHook):
|
|
"""
|
|
The TLS handshake with the server has been completed successfully.
|
|
"""
|
|
|
|
data: TlsData
|
|
|
|
|
|
@dataclass
|
|
class TlsFailedClientHook(StartHook):
|
|
"""
|
|
The TLS handshake with the client has failed.
|
|
"""
|
|
|
|
data: TlsData
|
|
|
|
|
|
@dataclass
|
|
class TlsFailedServerHook(StartHook):
|
|
"""
|
|
The TLS handshake with the server has failed.
|
|
"""
|
|
|
|
data: TlsData
|
|
|
|
|
|
class TLSLayer(tunnel.TunnelLayer):
|
|
tls: SSL.Connection = None # type: ignore
|
|
"""The OpenSSL connection object"""
|
|
|
|
def __init__(self, context: context.Context, conn: connection.Connection):
|
|
super().__init__(
|
|
context,
|
|
tunnel_connection=conn,
|
|
conn=conn,
|
|
)
|
|
|
|
conn.tls = True
|
|
|
|
def __repr__(self):
|
|
return (
|
|
super().__repr__().replace(")", f" {self.conn.sni!r} {self.conn.alpn!r})")
|
|
)
|
|
|
|
@property
|
|
def is_dtls(self):
|
|
return self.conn.transport_protocol == "udp"
|
|
|
|
@property
|
|
def proto_name(self):
|
|
return "DTLS" if self.is_dtls else "TLS"
|
|
|
|
def start_tls(self) -> layer.CommandGenerator[None]:
|
|
assert not self.tls
|
|
|
|
tls_start = TlsData(self.conn, self.context, is_dtls=self.is_dtls)
|
|
if self.conn == self.context.client:
|
|
yield TlsStartClientHook(tls_start)
|
|
else:
|
|
yield TlsStartServerHook(tls_start)
|
|
if not tls_start.ssl_conn:
|
|
yield commands.Log(
|
|
f"No {self.proto_name} context was provided, failing connection.", ERROR
|
|
)
|
|
yield commands.CloseConnection(self.conn)
|
|
return
|
|
assert tls_start.ssl_conn
|
|
self.tls = tls_start.ssl_conn
|
|
|
|
def tls_interact(self) -> layer.CommandGenerator[None]:
|
|
while True:
|
|
try:
|
|
data = self.tls.bio_read(65535)
|
|
except SSL.WantReadError:
|
|
return # Okay, nothing more waiting to be sent.
|
|
else:
|
|
yield commands.SendData(self.conn, data)
|
|
|
|
def receive_handshake_data(
|
|
self, data: bytes
|
|
) -> layer.CommandGenerator[tuple[bool, str | None]]:
|
|
# bio_write errors for b"", so we need to check first if we actually received something.
|
|
if data:
|
|
self.tls.bio_write(data)
|
|
try:
|
|
self.tls.do_handshake()
|
|
except SSL.WantReadError:
|
|
yield from self.tls_interact()
|
|
return False, None
|
|
except SSL.Error as e:
|
|
# provide more detailed information for some errors.
|
|
last_err = (
|
|
e.args and isinstance(e.args[0], list) and e.args[0] and e.args[0][-1]
|
|
)
|
|
if last_err in [
|
|
(
|
|
"SSL routines",
|
|
"tls_process_server_certificate",
|
|
"certificate verify failed",
|
|
),
|
|
("SSL routines", "", "certificate verify failed"), # OpenSSL 3+
|
|
]:
|
|
verify_result = SSL._lib.SSL_get_verify_result(self.tls._ssl) # type: ignore
|
|
error = SSL._ffi.string( # type: ignore
|
|
SSL._lib.X509_verify_cert_error_string(verify_result) # type: ignore
|
|
).decode()
|
|
err = f"Certificate verify failed: {error}"
|
|
elif last_err in [
|
|
("SSL routines", "ssl3_read_bytes", "tlsv1 alert unknown ca"),
|
|
("SSL routines", "ssl3_read_bytes", "sslv3 alert bad certificate"),
|
|
("SSL routines", "ssl3_read_bytes", "ssl/tls alert bad certificate"),
|
|
("SSL routines", "", "tlsv1 alert unknown ca"), # OpenSSL 3+
|
|
("SSL routines", "", "sslv3 alert bad certificate"), # OpenSSL 3+
|
|
("SSL routines", "", "ssl/tls alert bad certificate"), # OpenSSL 3.2+
|
|
]:
|
|
assert isinstance(last_err, tuple)
|
|
err = last_err[2]
|
|
elif (
|
|
last_err
|
|
in [
|
|
("SSL routines", "ssl3_get_record", "wrong version number"),
|
|
("SSL routines", "", "wrong version number"), # OpenSSL 3+
|
|
("SSL routines", "", "packet length too long"), # OpenSSL 3+
|
|
("SSL routines", "", "record layer failure"), # OpenSSL 3+
|
|
]
|
|
and data[:4].isascii()
|
|
):
|
|
err = f"The remote server does not speak TLS."
|
|
elif last_err in [
|
|
("SSL routines", "ssl3_read_bytes", "tlsv1 alert protocol version"),
|
|
("SSL routines", "", "tlsv1 alert protocol version"), # OpenSSL 3+
|
|
]:
|
|
err = (
|
|
f"The remote server and mitmproxy cannot agree on a TLS version to use. "
|
|
f"You may need to adjust mitmproxy's tls_version_server_min option."
|
|
)
|
|
else:
|
|
err = f"OpenSSL {e!r}"
|
|
return False, err
|
|
else:
|
|
# Here we set all attributes that are only known *after* the handshake.
|
|
|
|
# Get all peer certificates.
|
|
# https://www.openssl.org/docs/man1.1.1/man3/SSL_get_peer_cert_chain.html
|
|
# If called on the client side, the stack also contains the peer's certificate; if called on the server
|
|
# side, the peer's certificate must be obtained separately using SSL_get_peer_certificate(3).
|
|
all_certs = self.tls.get_peer_cert_chain() or []
|
|
if self.conn == self.context.client:
|
|
cert = self.tls.get_peer_certificate()
|
|
if cert:
|
|
all_certs.insert(0, cert)
|
|
self.conn.certificate_list = []
|
|
for cert in all_certs:
|
|
try:
|
|
# This may fail for weird certs, https://github.com/mitmproxy/mitmproxy/issues/6968.
|
|
parsed_cert = certs.Cert.from_pyopenssl(cert)
|
|
except ValueError as e:
|
|
yield commands.Log(
|
|
f"{self.debug}[tls] failed to parse certificate: {e}", WARNING
|
|
)
|
|
else:
|
|
self.conn.certificate_list.append(parsed_cert)
|
|
|
|
self.conn.timestamp_tls_setup = time.time()
|
|
self.conn.alpn = self.tls.get_alpn_proto_negotiated()
|
|
self.conn.cipher = self.tls.get_cipher_name()
|
|
self.conn.tls_version = typing.cast(
|
|
TlsVersion, self.tls.get_protocol_version_name()
|
|
)
|
|
if self.debug:
|
|
yield commands.Log(
|
|
f"{self.debug}[tls] tls established: {self.conn}", DEBUG
|
|
)
|
|
if self.conn == self.context.client:
|
|
yield TlsEstablishedClientHook(
|
|
TlsData(self.conn, self.context, self.tls)
|
|
)
|
|
else:
|
|
yield TlsEstablishedServerHook(
|
|
TlsData(self.conn, self.context, self.tls)
|
|
)
|
|
yield from self.receive_data(b"")
|
|
return True, None
|
|
|
|
def on_handshake_error(self, err: str) -> layer.CommandGenerator[None]:
|
|
self.conn.error = err
|
|
if self.conn == self.context.client:
|
|
yield TlsFailedClientHook(TlsData(self.conn, self.context, self.tls))
|
|
else:
|
|
yield TlsFailedServerHook(TlsData(self.conn, self.context, self.tls))
|
|
yield from super().on_handshake_error(err)
|
|
|
|
def receive_data(self, data: bytes) -> layer.CommandGenerator[None]:
|
|
if data:
|
|
self.tls.bio_write(data)
|
|
|
|
plaintext = bytearray()
|
|
close = False
|
|
while True:
|
|
try:
|
|
plaintext.extend(self.tls.recv(65535))
|
|
except SSL.WantReadError:
|
|
break
|
|
except SSL.ZeroReturnError:
|
|
close = True
|
|
break
|
|
except SSL.Error as e:
|
|
# This may be happening because the other side send an alert.
|
|
# There's somewhat ugly behavior with Firefox on Android here,
|
|
# which upon mistrusting a certificate still completes the handshake
|
|
# and then sends an alert in the next packet. At this point we have unfortunately
|
|
# already fired out `tls_established_client` hook.
|
|
yield commands.Log(f"TLS Error: {e}", WARNING)
|
|
break
|
|
|
|
# Can we send something?
|
|
# Note that this must happen after `recv()`, which may have advanced the state machine.
|
|
# https://github.com/mitmproxy/mitmproxy/discussions/7550
|
|
yield from self.tls_interact()
|
|
|
|
if plaintext:
|
|
yield from self.event_to_child(
|
|
events.DataReceived(self.conn, bytes(plaintext))
|
|
)
|
|
if close:
|
|
self.conn.state &= ~connection.ConnectionState.CAN_READ
|
|
if self.debug:
|
|
yield commands.Log(f"{self.debug}[tls] close_notify {self.conn}", DEBUG)
|
|
yield from self.event_to_child(events.ConnectionClosed(self.conn))
|
|
|
|
def receive_close(self) -> layer.CommandGenerator[None]:
|
|
if self.tls.get_shutdown() & SSL.RECEIVED_SHUTDOWN:
|
|
pass # We have already dispatched a ConnectionClosed to the child layer.
|
|
else:
|
|
yield from super().receive_close()
|
|
|
|
def send_data(self, data: bytes) -> layer.CommandGenerator[None]:
|
|
try:
|
|
self.tls.sendall(data)
|
|
except (SSL.ZeroReturnError, SSL.SysCallError):
|
|
# The other peer may still be trying to send data over, which we discard here.
|
|
pass
|
|
yield from self.tls_interact()
|
|
|
|
def send_close(
|
|
self, command: commands.CloseConnection
|
|
) -> layer.CommandGenerator[None]:
|
|
# We should probably shutdown the TLS connection properly here.
|
|
yield from super().send_close(command)
|
|
|
|
|
|
class ServerTLSLayer(TLSLayer):
|
|
"""
|
|
This layer establishes TLS for a single server connection.
|
|
"""
|
|
|
|
wait_for_clienthello: bool = False
|
|
|
|
def __init__(self, context: context.Context, conn: connection.Server | None = None):
|
|
super().__init__(context, conn or context.server)
|
|
|
|
def start_handshake(self) -> layer.CommandGenerator[None]:
|
|
wait_for_clienthello = (
|
|
# if command_to_reply_to is set, we've been instructed to open the connection from the child layer.
|
|
# in that case any potential ClientHello is already parsed (by the ClientTLS child layer).
|
|
not self.command_to_reply_to
|
|
# if command_to_reply_to is not set, the connection was already open when this layer received its Start
|
|
# event (eager connection strategy). We now want to establish TLS right away, _unless_ we already know
|
|
# that there's TLS on the client side as well (we check if our immediate child layer is set to be ClientTLS)
|
|
# In this case want to wait for ClientHello to be parsed, so that we can incorporate SNI/ALPN from there.
|
|
and isinstance(self.child_layer, ClientTLSLayer)
|
|
)
|
|
if wait_for_clienthello:
|
|
self.wait_for_clienthello = True
|
|
self.tunnel_state = tunnel.TunnelState.CLOSED
|
|
else:
|
|
yield from self.start_tls()
|
|
if self.tls:
|
|
yield from self.receive_handshake_data(b"")
|
|
|
|
def event_to_child(self, event: events.Event) -> layer.CommandGenerator[None]:
|
|
if self.wait_for_clienthello:
|
|
for command in super().event_to_child(event):
|
|
if (
|
|
isinstance(command, commands.OpenConnection)
|
|
and command.connection == self.conn
|
|
):
|
|
self.wait_for_clienthello = False
|
|
# swallow OpenConnection here by not re-yielding it.
|
|
else:
|
|
yield command
|
|
else:
|
|
yield from super().event_to_child(event)
|
|
|
|
def on_handshake_error(self, err: str) -> layer.CommandGenerator[None]:
|
|
yield commands.Log(f"Server TLS handshake failed. {err}", level=WARNING)
|
|
yield from super().on_handshake_error(err)
|
|
|
|
|
|
class ClientTLSLayer(TLSLayer):
|
|
"""
|
|
This layer establishes TLS on a single client connection.
|
|
|
|
┌─────┐
|
|
│Start│
|
|
└┬────┘
|
|
↓
|
|
┌────────────────────┐
|
|
│Wait for ClientHello│
|
|
└┬───────────────────┘
|
|
↓
|
|
┌────────────────┐
|
|
│Process messages│
|
|
└────────────────┘
|
|
|
|
"""
|
|
|
|
recv_buffer: bytearray
|
|
server_tls_available: bool
|
|
client_hello_parsed: bool = False
|
|
|
|
def __init__(self, context: context.Context):
|
|
if context.client.tls:
|
|
# In the case of TLS-over-TLS, we already have client TLS. As the outer TLS connection between client
|
|
# and proxy isn't that interesting to us, we just unset the attributes here and keep the inner TLS
|
|
# session's attributes.
|
|
# Alternatively we could create a new Client instance,
|
|
# but for now we keep it simple. There is a proof-of-concept at
|
|
# https://github.com/mitmproxy/mitmproxy/commit/9b6e2a716888b7787514733b76a5936afa485352.
|
|
context.client.alpn = None
|
|
context.client.cipher = None
|
|
context.client.sni = None
|
|
context.client.timestamp_tls_setup = None
|
|
context.client.tls_version = None
|
|
context.client.certificate_list = []
|
|
context.client.mitmcert = None
|
|
context.client.alpn_offers = []
|
|
context.client.cipher_list = []
|
|
|
|
super().__init__(context, context.client)
|
|
self.server_tls_available = isinstance(self.context.layers[-2], ServerTLSLayer)
|
|
self.recv_buffer = bytearray()
|
|
|
|
def start_handshake(self) -> layer.CommandGenerator[None]:
|
|
yield from ()
|
|
|
|
def receive_handshake_data(
|
|
self, data: bytes
|
|
) -> layer.CommandGenerator[tuple[bool, str | None]]:
|
|
if self.client_hello_parsed:
|
|
return (yield from super().receive_handshake_data(data))
|
|
self.recv_buffer.extend(data)
|
|
try:
|
|
if self.is_dtls:
|
|
client_hello = dtls_parse_client_hello(self.recv_buffer)
|
|
else:
|
|
client_hello = parse_client_hello(self.recv_buffer)
|
|
except ValueError:
|
|
return False, f"Cannot parse ClientHello: {self.recv_buffer.hex()}"
|
|
|
|
if client_hello:
|
|
self.client_hello_parsed = True
|
|
else:
|
|
return False, None
|
|
|
|
self.conn.sni = client_hello.sni
|
|
self.conn.alpn_offers = client_hello.alpn_protocols
|
|
tls_clienthello = ClientHelloData(self.context, client_hello)
|
|
yield TlsClienthelloHook(tls_clienthello)
|
|
|
|
if tls_clienthello.ignore_connection:
|
|
# we've figured out that we don't want to intercept this connection, so we assign fake connection objects
|
|
# to all TLS layers. This makes the real connection contents just go through.
|
|
self.conn = self.tunnel_connection = connection.Client(
|
|
peername=("ignore-conn", 0), sockname=("ignore-conn", 0)
|
|
)
|
|
parent_layer = self.context.layers[self.context.layers.index(self) - 1]
|
|
if isinstance(parent_layer, ServerTLSLayer):
|
|
parent_layer.conn = parent_layer.tunnel_connection = connection.Server(
|
|
address=None
|
|
)
|
|
if self.is_dtls:
|
|
self.child_layer = udp.UDPLayer(self.context, ignore=True)
|
|
else:
|
|
self.child_layer = tcp.TCPLayer(self.context, ignore=True)
|
|
yield from self.event_to_child(
|
|
events.DataReceived(self.context.client, bytes(self.recv_buffer))
|
|
)
|
|
self.recv_buffer.clear()
|
|
return True, None
|
|
if (
|
|
tls_clienthello.establish_server_tls_first
|
|
and not self.context.server.tls_established
|
|
):
|
|
err = yield from self.start_server_tls()
|
|
if err:
|
|
yield commands.Log(
|
|
f"Unable to establish {self.proto_name} connection with server ({err}). "
|
|
f"Trying to establish {self.proto_name} with client anyway. "
|
|
f"If you plan to redirect requests away from this server, "
|
|
f"consider setting `connection_strategy` to `lazy` to suppress early connections."
|
|
)
|
|
|
|
yield from self.start_tls()
|
|
if not self.conn.connected:
|
|
return False, "connection closed early"
|
|
|
|
ret = yield from super().receive_handshake_data(bytes(self.recv_buffer))
|
|
self.recv_buffer.clear()
|
|
return ret
|
|
|
|
def start_server_tls(self) -> layer.CommandGenerator[str | None]:
|
|
"""
|
|
We often need information from the upstream connection to establish TLS with the client.
|
|
For example, we need to check if the client does ALPN or not.
|
|
"""
|
|
if not self.server_tls_available:
|
|
return f"No server {self.proto_name} available."
|
|
err = yield commands.OpenConnection(self.context.server)
|
|
return err
|
|
|
|
def on_handshake_error(self, err: str) -> layer.CommandGenerator[None]:
|
|
if self.conn.sni:
|
|
dest = self.conn.sni
|
|
else:
|
|
dest = human.format_address(self.context.server.address)
|
|
level: int = WARNING
|
|
if err.startswith("Cannot parse ClientHello"):
|
|
pass
|
|
elif (
|
|
"('SSL routines', 'tls_early_post_process_client_hello', 'unsupported protocol')"
|
|
in err
|
|
or "('SSL routines', '', 'unsupported protocol')" in err # OpenSSL 3+
|
|
):
|
|
err = (
|
|
f"Client and mitmproxy cannot agree on a TLS version to use. "
|
|
f"You may need to adjust mitmproxy's tls_version_client_min option."
|
|
)
|
|
elif (
|
|
"unknown ca" in err
|
|
or "bad certificate" in err
|
|
or "certificate unknown" in err
|
|
):
|
|
err = (
|
|
f"The client does not trust the proxy's certificate for {dest} ({err})"
|
|
)
|
|
elif err == "connection closed":
|
|
err = (
|
|
f"The client disconnected during the handshake. If this happens consistently for {dest}, "
|
|
f"this may indicate that the client does not trust the proxy's certificate."
|
|
)
|
|
level = INFO
|
|
elif err == "connection closed early":
|
|
pass
|
|
else:
|
|
err = f"The client may not trust the proxy's certificate for {dest} ({err})"
|
|
if err != "connection closed early":
|
|
yield commands.Log(f"Client TLS handshake failed. {err}", level=level)
|
|
yield from super().on_handshake_error(err)
|
|
self.event_to_child = self.errored # type: ignore
|
|
|
|
def errored(self, event: events.Event) -> layer.CommandGenerator[None]:
|
|
if self.debug is not None:
|
|
yield commands.Log(
|
|
f"{self.debug}[tls] Swallowing {event} as handshake failed.", DEBUG
|
|
)
|
|
|
|
|
|
class MockTLSLayer(TLSLayer):
|
|
"""Mock layer to disable actual TLS and use cleartext in tests.
|
|
|
|
Use like so:
|
|
monkeypatch.setattr(tls, "ServerTLSLayer", tls.MockTLSLayer)
|
|
"""
|
|
|
|
def __init__(self, ctx: context.Context):
|
|
super().__init__(ctx, connection.Server(address=None))
|