from __future__ import annotations from collections.abc import Callable from functools import lru_cache import urwid import mitmproxy.tools.console.master from mitmproxy.tools.console import commandexecutor from mitmproxy.tools.console import common from mitmproxy.tools.console import flowlist from mitmproxy.tools.console import quickhelp from mitmproxy.tools.console import signals from mitmproxy.tools.console.commander import commander from mitmproxy.utils import human @lru_cache def shorten_message( msg: tuple[str, str] | str, max_width: int ) -> list[tuple[str, str]]: """ Shorten message so that it fits into a single line in the statusbar. """ if isinstance(msg, tuple): disp_attr, msg_text = msg elif isinstance(msg, str): msg_text = msg disp_attr = "" else: raise AssertionError(f"Unexpected message type: {type(msg)}") msg_end = "\u2026" # unicode ellipsis for the end of shortened message prompt = "(more in eventlog)" msg_lines = msg_text.split("\n") first_line = msg_lines[0] if len(msg_lines) > 1: # First line of messages with a few lines must end with prompt. line_length = len(first_line) + len(prompt) else: line_length = len(first_line) if line_length > max_width: shortening_index = max(0, max_width - len(prompt) - len(msg_end)) first_line = first_line[:shortening_index] + msg_end else: if len(msg_lines) == 1: prompt = "" return [(disp_attr, first_line), ("warn", prompt)] class ActionBar(urwid.WidgetWrap): def __init__(self, master: mitmproxy.tools.console.master.ConsoleMaster) -> None: self.master = master self.top = urwid.WidgetWrap(urwid.Text("")) self.bottom = urwid.WidgetWrap(urwid.Text("")) super().__init__(urwid.Pile([self.top, self.bottom])) self.show_quickhelp() signals.status_message.connect(self.sig_message) signals.status_prompt.connect(self.sig_prompt) signals.status_prompt_onekey.connect(self.sig_prompt_onekey) signals.status_prompt_command.connect(self.sig_prompt_command) signals.window_refresh.connect(self.sig_update) master.view.focus.sig_change.connect(self.sig_update) master.view.sig_view_update.connect(self.sig_update) self.prompting: Callable[[str], None] | None = None self.onekey: set[str] | None = None def sig_update(self, flow=None) -> None: if not self.prompting and flow is None or flow == self.master.view.focus.flow: self.show_quickhelp() def sig_message( self, message: tuple[str, str] | str, expire: int | None = 1 ) -> None: if self.prompting: return cols, _ = self.master.ui.get_cols_rows() w = urwid.Text(shorten_message(message, cols)) self.top._w = w self.bottom._w = urwid.Text("") if expire: def cb(): if w == self.top._w: self.show_quickhelp() signals.call_in.send(seconds=expire, callback=cb) def sig_prompt( self, prompt: str, text: str | None, callback: Callable[[str], None] ) -> None: signals.focus.send(section="footer") self.top._w = urwid.Edit(f"{prompt.strip()}: ", text or "") self.bottom._w = urwid.Text("") self.prompting = callback def sig_prompt_command(self, partial: str = "", cursor: int | None = None) -> None: signals.focus.send(section="footer") self.top._w = commander.CommandEdit( self.master, partial, ) if cursor is not None: self.top._w.cbuf.cursor = cursor self.bottom._w = urwid.Text("") self.prompting = self.execute_command def execute_command(self, txt: str) -> None: if txt.strip(): self.master.commands.call("commands.history.add", txt) execute = commandexecutor.CommandExecutor(self.master) execute(txt) def sig_prompt_onekey( self, prompt: str, keys: list[tuple[str, str]], callback: Callable[[str], None] ) -> None: """ Keys are a set of (word, key) tuples. The appropriate key in the word is highlighted. """ signals.focus.send(section="footer") parts = [prompt, " ("] mkup = [] for i, e in enumerate(keys): mkup.extend(common.highlight_key(e[0], e[1])) if i < len(keys) - 1: mkup.append(",") parts.extend(mkup) parts.append(")? ") self.onekey = {i[1] for i in keys} self.top._w = urwid.Edit(parts, "") self.bottom._w = urwid.Text("") self.prompting = callback def selectable(self) -> bool: return True def keypress(self, size, k): if self.prompting: if k == "esc": self.prompt_done() elif self.onekey: if k == "enter": self.prompt_done() elif k in self.onekey: self.prompt_execute(k) elif k == "enter": text = self.top._w.get_edit_text() self.prompt_execute(text) else: if common.is_keypress(k): self.top._w.keypress(size, k) else: return k def show_quickhelp(self) -> None: if w := self.master.window: s = w.focus_stack() focused_widget = type(s.top_widget()) is_top_widget = len(s.stack) == 1 else: # on startup focused_widget = flowlist.FlowListBox is_top_widget = True focused_flow = self.master.view.focus.flow qh = quickhelp.make(focused_widget, focused_flow, is_top_widget) self.top._w, self.bottom._w = qh.make_rows(self.master.keymap) def prompt_done(self) -> None: self.prompting = None self.onekey = None self.show_quickhelp() signals.focus.send(section="body") def prompt_execute(self, txt) -> None: callback = self.prompting assert callback is not None self.prompt_done() msg = callback(txt) if msg: signals.status_message.send(message=msg, expire=1) class StatusBar(urwid.WidgetWrap): REFRESHTIME = 0.5 # Timed refresh time in seconds keyctx = "" def __init__(self, master: mitmproxy.tools.console.master.ConsoleMaster) -> None: self.master = master self.ib = urwid.WidgetWrap(urwid.Text("")) self.ab = ActionBar(self.master) super().__init__(urwid.Pile([self.ib, self.ab])) signals.flow_change.connect(self.sig_update) signals.update_settings.connect(self.sig_update) master.options.changed.connect(self.sig_update) master.view.focus.sig_change.connect(self.sig_update) master.view.sig_view_add.connect(self.sig_update) self.refresh() def refresh(self) -> None: self.redraw() signals.call_in.send(seconds=self.REFRESHTIME, callback=self.refresh) def sig_update(self, *args, **kwargs) -> None: self.redraw() def keypress(self, *args, **kwargs): return self.ab.keypress(*args, **kwargs) def get_status(self) -> list[tuple[str, str] | str]: r: list[tuple[str, str] | str] = [] sreplay = self.master.commands.call("replay.server.count") creplay = self.master.commands.call("replay.client.count") if len(self.master.options.modify_headers): r.append("[") r.append(("heading_key", "H")) r.append("eaders]") if len(self.master.options.modify_body): r.append("[%d body modifications]" % len(self.master.options.modify_body)) if creplay: r.append("[") r.append(("heading_key", "cplayback")) r.append(":%s]" % creplay) if sreplay: r.append("[") r.append(("heading_key", "splayback")) r.append(":%s]" % sreplay) if self.master.options.ignore_hosts: r.append("[") r.append(("heading_key", "I")) r.append("gnore:%d]" % len(self.master.options.ignore_hosts)) elif self.master.options.allow_hosts: r.append("[") r.append(("heading_key", "A")) r.append("llow:%d]" % len(self.master.options.allow_hosts)) if self.master.options.tcp_hosts: r.append("[") r.append(("heading_key", "T")) r.append("CP:%d]" % len(self.master.options.tcp_hosts)) if self.master.options.intercept: r.append("[") if not self.master.options.intercept_active: r.append("X") r.append(("heading_key", "i")) r.append(":%s]" % self.master.options.intercept) if self.master.options.view_filter: r.append("[") r.append(("heading_key", "f")) r.append(":%s]" % self.master.options.view_filter) if self.master.options.stickycookie: r.append("[") r.append(("heading_key", "t")) r.append(":%s]" % self.master.options.stickycookie) if self.master.options.stickyauth: r.append("[") r.append(("heading_key", "u")) r.append(":%s]" % self.master.options.stickyauth) if self.master.options.console_default_contentview != "auto": r.append( "[contentview:%s]" % (self.master.options.console_default_contentview) ) if self.master.options.has_changed("view_order"): r.append("[") r.append(("heading_key", "o")) r.append(":%s]" % self.master.options.view_order) opts = [] if self.master.options.anticache: opts.append("anticache") if self.master.options.anticomp: opts.append("anticomp") if self.master.options.showhost: opts.append("showhost") if not self.master.options.server_replay_refresh: opts.append("norefresh") if not self.master.options.upstream_cert: opts.append("no-upstream-cert") if self.master.options.console_focus_follow: opts.append("following") if self.master.options.stream_large_bodies: opts.append(self.master.options.stream_large_bodies) if opts: r.append("[%s]" % (":".join(opts))) if self.master.options.mode != ["regular"]: if len(self.master.options.mode) == 1: r.append(f"[{self.master.options.mode[0]}]") else: r.append(f"[modes:{len(self.master.options.mode)}]") if self.master.options.scripts: r.append("[scripts:%s]" % len(self.master.options.scripts)) if self.master.options.save_stream_file: r.append("[W:%s]" % self.master.options.save_stream_file) return r def redraw(self) -> None: fc = self.master.commands.execute("view.properties.length") if self.master.view.focus.index is None: offset = 0 else: offset = self.master.view.focus.index + 1 if self.master.options.view_order_reversed: arrow = common.SYMBOL_UP else: arrow = common.SYMBOL_DOWN marked = "" if self.master.commands.execute("view.properties.marked"): marked = "M" t: list[tuple[str, str] | str] = [ ("heading", f"{arrow} {marked} [{offset}/{fc}]".ljust(11)), ] listen_addrs: list[str] = list( dict.fromkeys( human.format_address(a) for a in self.master.addons.get("proxyserver").listen_addrs() ) ) if listen_addrs: boundaddr = f"[{', '.join(listen_addrs)}]" else: boundaddr = "" t.extend(self.get_status()) status = urwid.AttrMap( urwid.Columns( [ urwid.Text(t), urwid.Text(boundaddr, align="right"), ] ), "heading", ) self.ib._w = status def selectable(self) -> bool: return True