import struct import time import typing from collections.abc import Iterator from dataclasses import dataclass from logging import DEBUG from logging import ERROR from logging import INFO from logging import WARNING from OpenSSL import SSL from mitmproxy import certs from mitmproxy import connection from mitmproxy.connection import TlsVersion from mitmproxy.net.tls import starts_like_dtls_record from mitmproxy.net.tls import starts_like_tls_record from mitmproxy.proxy import commands from mitmproxy.proxy import context from mitmproxy.proxy import events from mitmproxy.proxy import layer from mitmproxy.proxy import tunnel from mitmproxy.proxy.commands import StartHook from mitmproxy.proxy.layers import tcp from mitmproxy.proxy.layers import udp from mitmproxy.tls import ClientHello from mitmproxy.tls import ClientHelloData from mitmproxy.tls import TlsData from mitmproxy.utils import human def handshake_record_contents(data: bytes) -> Iterator[bytes]: """ Returns a generator that yields the bytes contained in each handshake record. This will raise an error on the first non-handshake record, so fully exhausting this generator is a bad idea. """ offset = 0 while True: if len(data) < offset + 5: return record_header = data[offset : offset + 5] if not starts_like_tls_record(record_header): raise ValueError(f"Expected TLS record, got {record_header!r} instead.") record_size = struct.unpack("!H", record_header[3:])[0] if record_size == 0: raise ValueError("Record must not be empty.") offset += 5 if len(data) < offset + record_size: return record_body = data[offset : offset + record_size] yield record_body offset += record_size def get_client_hello(data: bytes) -> bytes | None: """ Read all TLS records that contain the initial ClientHello. Returns the raw handshake packet bytes, without TLS record headers. """ client_hello = b"" for d in handshake_record_contents(data): client_hello += d if len(client_hello) >= 4: client_hello_size = struct.unpack("!I", b"\x00" + client_hello[1:4])[0] + 4 if len(client_hello) >= client_hello_size: return client_hello[:client_hello_size] return None def parse_client_hello(data: bytes) -> ClientHello | None: """ Check if the supplied bytes contain a full ClientHello message, and if so, parse it. Returns: - A ClientHello object on success - None, if the TLS record is not complete Raises: - A ValueError, if the passed ClientHello is invalid """ # Check if ClientHello is complete client_hello = get_client_hello(data) if client_hello: try: return ClientHello(client_hello[4:]) except EOFError as e: raise ValueError("Invalid ClientHello") from e return None def dtls_handshake_record_contents(data: bytes) -> Iterator[bytes]: """ Returns a generator that yields the bytes contained in each handshake record. This will raise an error on the first non-handshake record, so fully exhausting this generator is a bad idea. """ offset = 0 while True: # DTLS includes two new fields, totaling 8 bytes, between Version and Length if len(data) < offset + 13: return record_header = data[offset : offset + 13] if not starts_like_dtls_record(record_header): raise ValueError(f"Expected DTLS record, got {record_header!r} instead.") # Length fields starts at 11 record_size = struct.unpack("!H", record_header[11:])[0] if record_size == 0: raise ValueError("Record must not be empty.") offset += 13 if len(data) < offset + record_size: return record_body = data[offset : offset + record_size] yield record_body offset += record_size def get_dtls_client_hello(data: bytes) -> bytes | None: """ Read all DTLS records that contain the initial ClientHello. Returns the raw handshake packet bytes, without TLS record headers. """ client_hello = b"" for d in dtls_handshake_record_contents(data): client_hello += d if len(client_hello) >= 13: # comment about slicing: we skip the epoch and sequence number client_hello_size = ( struct.unpack("!I", b"\x00" + client_hello[9:12])[0] + 12 ) if len(client_hello) >= client_hello_size: return client_hello[:client_hello_size] return None def dtls_parse_client_hello(data: bytes) -> ClientHello | None: """ Check if the supplied bytes contain a full ClientHello message, and if so, parse it. Returns: - A ClientHello object on success - None, if the TLS record is not complete Raises: - A ValueError, if the passed ClientHello is invalid """ # Check if ClientHello is complete client_hello = get_dtls_client_hello(data) if client_hello: try: return ClientHello(client_hello[12:], dtls=True) except EOFError as e: raise ValueError("Invalid ClientHello") from e return None HTTP1_ALPNS = (b"http/1.1", b"http/1.0", b"http/0.9") HTTP2_ALPN = b"h2" HTTP3_ALPN = b"h3" HTTP_ALPNS = (HTTP3_ALPN, HTTP2_ALPN, *HTTP1_ALPNS) # We need these classes as hooks can only have one argument at the moment. @dataclass class TlsClienthelloHook(StartHook): """ Mitmproxy has received a TLS ClientHello message. This hook decides whether a server connection is needed to negotiate TLS with the client (data.establish_server_tls_first) """ data: ClientHelloData @dataclass class TlsStartClientHook(StartHook): """ TLS negotation between mitmproxy and a client is about to start. An addon is expected to initialize data.ssl_conn. (by default, this is done by `mitmproxy.addons.tlsconfig`) """ data: TlsData @dataclass class TlsStartServerHook(StartHook): """ TLS negotation between mitmproxy and a server is about to start. An addon is expected to initialize data.ssl_conn. (by default, this is done by `mitmproxy.addons.tlsconfig`) """ data: TlsData @dataclass class TlsEstablishedClientHook(StartHook): """ The TLS handshake with the client has been completed successfully. """ data: TlsData @dataclass class TlsEstablishedServerHook(StartHook): """ The TLS handshake with the server has been completed successfully. """ data: TlsData @dataclass class TlsFailedClientHook(StartHook): """ The TLS handshake with the client has failed. """ data: TlsData @dataclass class TlsFailedServerHook(StartHook): """ The TLS handshake with the server has failed. """ data: TlsData class TLSLayer(tunnel.TunnelLayer): tls: SSL.Connection = None # type: ignore """The OpenSSL connection object""" def __init__(self, context: context.Context, conn: connection.Connection): super().__init__( context, tunnel_connection=conn, conn=conn, ) conn.tls = True def __repr__(self): return ( super().__repr__().replace(")", f" {self.conn.sni!r} {self.conn.alpn!r})") ) @property def is_dtls(self): return self.conn.transport_protocol == "udp" @property def proto_name(self): return "DTLS" if self.is_dtls else "TLS" def start_tls(self) -> layer.CommandGenerator[None]: assert not self.tls tls_start = TlsData(self.conn, self.context, is_dtls=self.is_dtls) if self.conn == self.context.client: yield TlsStartClientHook(tls_start) else: yield TlsStartServerHook(tls_start) if not tls_start.ssl_conn: yield commands.Log( f"No {self.proto_name} context was provided, failing connection.", ERROR ) yield commands.CloseConnection(self.conn) return assert tls_start.ssl_conn self.tls = tls_start.ssl_conn def tls_interact(self) -> layer.CommandGenerator[None]: while True: try: data = self.tls.bio_read(65535) except SSL.WantReadError: return # Okay, nothing more waiting to be sent. else: yield commands.SendData(self.conn, data) def receive_handshake_data( self, data: bytes ) -> layer.CommandGenerator[tuple[bool, str | None]]: # bio_write errors for b"", so we need to check first if we actually received something. if data: self.tls.bio_write(data) try: self.tls.do_handshake() except SSL.WantReadError: yield from self.tls_interact() return False, None except SSL.Error as e: # provide more detailed information for some errors. last_err = ( e.args and isinstance(e.args[0], list) and e.args[0] and e.args[0][-1] ) if last_err in [ ( "SSL routines", "tls_process_server_certificate", "certificate verify failed", ), ("SSL routines", "", "certificate verify failed"), # OpenSSL 3+ ]: verify_result = SSL._lib.SSL_get_verify_result(self.tls._ssl) # type: ignore error = SSL._ffi.string( # type: ignore SSL._lib.X509_verify_cert_error_string(verify_result) # type: ignore ).decode() err = f"Certificate verify failed: {error}" elif last_err in [ ("SSL routines", "ssl3_read_bytes", "tlsv1 alert unknown ca"), ("SSL routines", "ssl3_read_bytes", "sslv3 alert bad certificate"), ("SSL routines", "ssl3_read_bytes", "ssl/tls alert bad certificate"), ("SSL routines", "", "tlsv1 alert unknown ca"), # OpenSSL 3+ ("SSL routines", "", "sslv3 alert bad certificate"), # OpenSSL 3+ ("SSL routines", "", "ssl/tls alert bad certificate"), # OpenSSL 3.2+ ]: assert isinstance(last_err, tuple) err = last_err[2] elif ( last_err in [ ("SSL routines", "ssl3_get_record", "wrong version number"), ("SSL routines", "", "wrong version number"), # OpenSSL 3+ ("SSL routines", "", "packet length too long"), # OpenSSL 3+ ("SSL routines", "", "record layer failure"), # OpenSSL 3+ ] and data[:4].isascii() ): err = f"The remote server does not speak TLS." elif last_err in [ ("SSL routines", "ssl3_read_bytes", "tlsv1 alert protocol version"), ("SSL routines", "", "tlsv1 alert protocol version"), # OpenSSL 3+ ]: err = ( f"The remote server and mitmproxy cannot agree on a TLS version to use. " f"You may need to adjust mitmproxy's tls_version_server_min option." ) else: err = f"OpenSSL {e!r}" return False, err else: # Here we set all attributes that are only known *after* the handshake. # Get all peer certificates. # https://www.openssl.org/docs/man1.1.1/man3/SSL_get_peer_cert_chain.html # If called on the client side, the stack also contains the peer's certificate; if called on the server # side, the peer's certificate must be obtained separately using SSL_get_peer_certificate(3). all_certs = self.tls.get_peer_cert_chain() or [] if self.conn == self.context.client: cert = self.tls.get_peer_certificate() if cert: all_certs.insert(0, cert) self.conn.certificate_list = [] for cert in all_certs: try: # This may fail for weird certs, https://github.com/mitmproxy/mitmproxy/issues/6968. parsed_cert = certs.Cert.from_pyopenssl(cert) except ValueError as e: yield commands.Log( f"{self.debug}[tls] failed to parse certificate: {e}", WARNING ) else: self.conn.certificate_list.append(parsed_cert) self.conn.timestamp_tls_setup = time.time() self.conn.alpn = self.tls.get_alpn_proto_negotiated() self.conn.cipher = self.tls.get_cipher_name() self.conn.tls_version = typing.cast( TlsVersion, self.tls.get_protocol_version_name() ) if self.debug: yield commands.Log( f"{self.debug}[tls] tls established: {self.conn}", DEBUG ) if self.conn == self.context.client: yield TlsEstablishedClientHook( TlsData(self.conn, self.context, self.tls) ) else: yield TlsEstablishedServerHook( TlsData(self.conn, self.context, self.tls) ) yield from self.receive_data(b"") return True, None def on_handshake_error(self, err: str) -> layer.CommandGenerator[None]: self.conn.error = err if self.conn == self.context.client: yield TlsFailedClientHook(TlsData(self.conn, self.context, self.tls)) else: yield TlsFailedServerHook(TlsData(self.conn, self.context, self.tls)) yield from super().on_handshake_error(err) def receive_data(self, data: bytes) -> layer.CommandGenerator[None]: if data: self.tls.bio_write(data) plaintext = bytearray() close = False while True: try: plaintext.extend(self.tls.recv(65535)) except SSL.WantReadError: break except SSL.ZeroReturnError: close = True break except SSL.Error as e: # This may be happening because the other side send an alert. # There's somewhat ugly behavior with Firefox on Android here, # which upon mistrusting a certificate still completes the handshake # and then sends an alert in the next packet. At this point we have unfortunately # already fired out `tls_established_client` hook. yield commands.Log(f"TLS Error: {e}", WARNING) break # Can we send something? # Note that this must happen after `recv()`, which may have advanced the state machine. # https://github.com/mitmproxy/mitmproxy/discussions/7550 yield from self.tls_interact() if plaintext: yield from self.event_to_child( events.DataReceived(self.conn, bytes(plaintext)) ) if close: self.conn.state &= ~connection.ConnectionState.CAN_READ if self.debug: yield commands.Log(f"{self.debug}[tls] close_notify {self.conn}", DEBUG) yield from self.event_to_child(events.ConnectionClosed(self.conn)) def receive_close(self) -> layer.CommandGenerator[None]: if self.tls.get_shutdown() & SSL.RECEIVED_SHUTDOWN: pass # We have already dispatched a ConnectionClosed to the child layer. else: yield from super().receive_close() def send_data(self, data: bytes) -> layer.CommandGenerator[None]: try: self.tls.sendall(data) except (SSL.ZeroReturnError, SSL.SysCallError): # The other peer may still be trying to send data over, which we discard here. pass yield from self.tls_interact() def send_close( self, command: commands.CloseConnection ) -> layer.CommandGenerator[None]: # We should probably shutdown the TLS connection properly here. yield from super().send_close(command) class ServerTLSLayer(TLSLayer): """ This layer establishes TLS for a single server connection. """ wait_for_clienthello: bool = False def __init__(self, context: context.Context, conn: connection.Server | None = None): super().__init__(context, conn or context.server) def start_handshake(self) -> layer.CommandGenerator[None]: wait_for_clienthello = ( # if command_to_reply_to is set, we've been instructed to open the connection from the child layer. # in that case any potential ClientHello is already parsed (by the ClientTLS child layer). not self.command_to_reply_to # if command_to_reply_to is not set, the connection was already open when this layer received its Start # event (eager connection strategy). We now want to establish TLS right away, _unless_ we already know # that there's TLS on the client side as well (we check if our immediate child layer is set to be ClientTLS) # In this case want to wait for ClientHello to be parsed, so that we can incorporate SNI/ALPN from there. and isinstance(self.child_layer, ClientTLSLayer) ) if wait_for_clienthello: self.wait_for_clienthello = True self.tunnel_state = tunnel.TunnelState.CLOSED else: yield from self.start_tls() if self.tls: yield from self.receive_handshake_data(b"") def event_to_child(self, event: events.Event) -> layer.CommandGenerator[None]: if self.wait_for_clienthello: for command in super().event_to_child(event): if ( isinstance(command, commands.OpenConnection) and command.connection == self.conn ): self.wait_for_clienthello = False # swallow OpenConnection here by not re-yielding it. else: yield command else: yield from super().event_to_child(event) def on_handshake_error(self, err: str) -> layer.CommandGenerator[None]: yield commands.Log(f"Server TLS handshake failed. {err}", level=WARNING) yield from super().on_handshake_error(err) class ClientTLSLayer(TLSLayer): """ This layer establishes TLS on a single client connection. ┌─────┐ │Start│ └┬────┘ ↓ ┌────────────────────┐ │Wait for ClientHello│ └┬───────────────────┘ ↓ ┌────────────────┐ │Process messages│ └────────────────┘ """ recv_buffer: bytearray server_tls_available: bool client_hello_parsed: bool = False def __init__(self, context: context.Context): if context.client.tls: # In the case of TLS-over-TLS, we already have client TLS. As the outer TLS connection between client # and proxy isn't that interesting to us, we just unset the attributes here and keep the inner TLS # session's attributes. # Alternatively we could create a new Client instance, # but for now we keep it simple. There is a proof-of-concept at # https://github.com/mitmproxy/mitmproxy/commit/9b6e2a716888b7787514733b76a5936afa485352. context.client.alpn = None context.client.cipher = None context.client.sni = None context.client.timestamp_tls_setup = None context.client.tls_version = None context.client.certificate_list = [] context.client.mitmcert = None context.client.alpn_offers = [] context.client.cipher_list = [] super().__init__(context, context.client) self.server_tls_available = isinstance(self.context.layers[-2], ServerTLSLayer) self.recv_buffer = bytearray() def start_handshake(self) -> layer.CommandGenerator[None]: yield from () def receive_handshake_data( self, data: bytes ) -> layer.CommandGenerator[tuple[bool, str | None]]: if self.client_hello_parsed: return (yield from super().receive_handshake_data(data)) self.recv_buffer.extend(data) try: if self.is_dtls: client_hello = dtls_parse_client_hello(self.recv_buffer) else: client_hello = parse_client_hello(self.recv_buffer) except ValueError: return False, f"Cannot parse ClientHello: {self.recv_buffer.hex()}" if client_hello: self.client_hello_parsed = True else: return False, None self.conn.sni = client_hello.sni self.conn.alpn_offers = client_hello.alpn_protocols tls_clienthello = ClientHelloData(self.context, client_hello) yield TlsClienthelloHook(tls_clienthello) if tls_clienthello.ignore_connection: # we've figured out that we don't want to intercept this connection, so we assign fake connection objects # to all TLS layers. This makes the real connection contents just go through. self.conn = self.tunnel_connection = connection.Client( peername=("ignore-conn", 0), sockname=("ignore-conn", 0) ) parent_layer = self.context.layers[self.context.layers.index(self) - 1] if isinstance(parent_layer, ServerTLSLayer): parent_layer.conn = parent_layer.tunnel_connection = connection.Server( address=None ) if self.is_dtls: self.child_layer = udp.UDPLayer(self.context, ignore=True) else: self.child_layer = tcp.TCPLayer(self.context, ignore=True) yield from self.event_to_child( events.DataReceived(self.context.client, bytes(self.recv_buffer)) ) self.recv_buffer.clear() return True, None if ( tls_clienthello.establish_server_tls_first and not self.context.server.tls_established ): err = yield from self.start_server_tls() if err: yield commands.Log( f"Unable to establish {self.proto_name} connection with server ({err}). " f"Trying to establish {self.proto_name} with client anyway. " f"If you plan to redirect requests away from this server, " f"consider setting `connection_strategy` to `lazy` to suppress early connections." ) yield from self.start_tls() if not self.conn.connected: return False, "connection closed early" ret = yield from super().receive_handshake_data(bytes(self.recv_buffer)) self.recv_buffer.clear() return ret def start_server_tls(self) -> layer.CommandGenerator[str | None]: """ We often need information from the upstream connection to establish TLS with the client. For example, we need to check if the client does ALPN or not. """ if not self.server_tls_available: return f"No server {self.proto_name} available." err = yield commands.OpenConnection(self.context.server) return err def on_handshake_error(self, err: str) -> layer.CommandGenerator[None]: if self.conn.sni: dest = self.conn.sni else: dest = human.format_address(self.context.server.address) level: int = WARNING if err.startswith("Cannot parse ClientHello"): pass elif ( "('SSL routines', 'tls_early_post_process_client_hello', 'unsupported protocol')" in err or "('SSL routines', '', 'unsupported protocol')" in err # OpenSSL 3+ ): err = ( f"Client and mitmproxy cannot agree on a TLS version to use. " f"You may need to adjust mitmproxy's tls_version_client_min option." ) elif ( "unknown ca" in err or "bad certificate" in err or "certificate unknown" in err ): err = ( f"The client does not trust the proxy's certificate for {dest} ({err})" ) elif err == "connection closed": err = ( f"The client disconnected during the handshake. If this happens consistently for {dest}, " f"this may indicate that the client does not trust the proxy's certificate." ) level = INFO elif err == "connection closed early": pass else: err = f"The client may not trust the proxy's certificate for {dest} ({err})" if err != "connection closed early": yield commands.Log(f"Client TLS handshake failed. {err}", level=level) yield from super().on_handshake_error(err) self.event_to_child = self.errored # type: ignore def errored(self, event: events.Event) -> layer.CommandGenerator[None]: if self.debug is not None: yield commands.Log( f"{self.debug}[tls] Swallowing {event} as handshake failed.", DEBUG ) class MockTLSLayer(TLSLayer): """Mock layer to disable actual TLS and use cleartext in tests. Use like so: monkeypatch.setattr(tls, "ServerTLSLayer", tls.MockTLSLayer) """ def __init__(self, ctx: context.Context): super().__init__(ctx, connection.Server(address=None))