import io import typing from collections.abc import Iterable from pathlib import Path from typing import Any from ruamel.yaml import YAML from .. import ctx from .. import http from ..dns import DNSMessage from ..flow import Flow from ..tcp import TCPMessage from ..udp import UDPMessage from ..utils import strutils from ..websocket import WebSocketMessage from ._api import Metadata type ContentviewMessage = ( http.Message | TCPMessage | UDPMessage | WebSocketMessage | DNSMessage ) def make_metadata( message: ContentviewMessage, flow: Flow, ) -> Metadata: metadata = Metadata( flow=flow, protobuf_definitions=Path(ctx.options.protobuf_definitions).expanduser() if ctx.options.protobuf_definitions else None, ) match message: case http.Message(): metadata.http_message = message if ctype := message.headers.get("content-type"): if ct := http.parse_content_type(ctype): metadata.content_type = f"{ct[0]}/{ct[1]}" case TCPMessage(): metadata.tcp_message = message case UDPMessage(): metadata.udp_message = message case WebSocketMessage(): metadata.websocket_message = message case DNSMessage(): metadata.dns_message = message case other: # pragma: no cover typing.assert_never(other) return metadata def get_data( message: ContentviewMessage, ) -> tuple[bytes | None, str]: content: bytes | None try: content = message.content except ValueError: assert isinstance(message, http.Message) content = message.raw_content enc = "[cannot decode]" else: if isinstance(message, http.Message) and content != message.raw_content: enc = "[decoded {}]".format(message.headers.get("content-encoding")) else: enc = "" return content, enc def yaml_dumps(d: Any) -> str: if not d: return "" out = io.StringIO() YAML(typ="rt", pure=True).dump(d, out) return out.getvalue() def yaml_loads(yaml: str) -> Any: return YAML(typ="safe", pure=True).load(yaml) def merge_repeated_keys(items: Iterable[tuple[str, str]]) -> dict[str, str | list[str]]: """ Helper function that takes a list of pairs and merges repeated keys. """ ret: dict[str, str | list[str]] = {} for key, value in items: if existing := ret.get(key): if isinstance(existing, list): existing.append(value) else: ret[key] = [existing, value] else: ret[key] = value return ret def byte_pairs_to_str_pairs( items: Iterable[tuple[bytes, bytes]], ) -> Iterable[tuple[str, str]]: for key, value in items: yield (strutils.bytes_to_escaped_str(key), strutils.bytes_to_escaped_str(value))