264 lines
8.2 KiB
Python
264 lines
8.2 KiB
Python
import asyncio
|
|
import contextlib
|
|
import mimetypes
|
|
import os.path
|
|
import shlex
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
from typing import TypeVar
|
|
|
|
import urwid
|
|
from tornado.platform.asyncio import AddThreadSelectorEventLoop
|
|
|
|
from mitmproxy import addons
|
|
from mitmproxy import log
|
|
from mitmproxy import master
|
|
from mitmproxy import options
|
|
from mitmproxy.addons import errorcheck
|
|
from mitmproxy.addons import eventstore
|
|
from mitmproxy.addons import intercept
|
|
from mitmproxy.addons import readfile
|
|
from mitmproxy.addons import view
|
|
from mitmproxy.tools.console import consoleaddons
|
|
from mitmproxy.tools.console import defaultkeys
|
|
from mitmproxy.tools.console import keymap
|
|
from mitmproxy.tools.console import palettes
|
|
from mitmproxy.tools.console import signals
|
|
from mitmproxy.tools.console import window
|
|
from mitmproxy.utils import strutils
|
|
|
|
T = TypeVar("T", str, bytes)
|
|
|
|
|
|
class ConsoleMaster(master.Master):
|
|
def __init__(self, opts: options.Options) -> None:
|
|
super().__init__(opts)
|
|
|
|
self.view: view.View = view.View()
|
|
self.events = eventstore.EventStore()
|
|
self.events.sig_add.connect(self.sig_add_log)
|
|
|
|
self.stream_path = None
|
|
self.keymap = keymap.Keymap(self)
|
|
defaultkeys.map(self.keymap)
|
|
self.options.errored.connect(self.options_error)
|
|
|
|
self.addons.add(*addons.default_addons())
|
|
self.addons.add(
|
|
intercept.Intercept(),
|
|
self.view,
|
|
self.events,
|
|
readfile.ReadFile(),
|
|
consoleaddons.ConsoleAddon(self),
|
|
keymap.KeymapConfig(self),
|
|
errorcheck.ErrorCheck(repeat_errors_on_stderr=True),
|
|
)
|
|
|
|
self.window: window.Window | None = None
|
|
|
|
def __setattr__(self, name, value):
|
|
super().__setattr__(name, value)
|
|
signals.update_settings.send()
|
|
|
|
def options_error(self, exc) -> None:
|
|
signals.status_message.send(message=str(exc), expire=1)
|
|
|
|
def prompt_for_user_choice(self, prompt, callback) -> None:
|
|
signals.status_prompt_onekey.send(
|
|
prompt=prompt,
|
|
keys=[
|
|
("yes", "y"),
|
|
("no", "n"),
|
|
],
|
|
callback=callback,
|
|
)
|
|
|
|
def prompt_for_exit(self) -> None:
|
|
self.prompt_for_user_choice("Quit", self.quit)
|
|
|
|
def sig_add_log(self, entry: log.LogEntry):
|
|
if log.log_tier(self.options.console_eventlog_verbosity) < log.log_tier(
|
|
entry.level
|
|
):
|
|
return
|
|
if entry.level in ("error", "warn", "alert"):
|
|
signals.status_message.send(
|
|
message=(
|
|
entry.level,
|
|
f"{entry.level.title()}: {entry.msg.lstrip()}",
|
|
),
|
|
expire=5,
|
|
)
|
|
|
|
def sig_call_in(self, seconds, callback):
|
|
def cb(*_):
|
|
return callback()
|
|
|
|
self.loop.set_alarm_in(seconds, cb)
|
|
|
|
@contextlib.contextmanager
|
|
def uistopped(self):
|
|
self.loop.stop()
|
|
try:
|
|
yield
|
|
finally:
|
|
self.loop.start()
|
|
self.loop.screen_size = None
|
|
self.loop.draw_screen()
|
|
|
|
def get_editor(self) -> str:
|
|
# based upon https://github.com/pallets/click/blob/main/src/click/_termui_impl.py
|
|
if m := os.environ.get("MITMPROXY_EDITOR"):
|
|
return m
|
|
if m := os.environ.get("EDITOR"):
|
|
return m
|
|
for editor in "sensible-editor", "nano", "vim":
|
|
if shutil.which(editor):
|
|
return editor
|
|
if os.name == "nt":
|
|
return "notepad"
|
|
else:
|
|
return "vi"
|
|
|
|
def get_hex_editor(self) -> str:
|
|
editors = ["ghex", "hexedit", "hxd", "hexer", "hexcurse"]
|
|
for editor in editors:
|
|
if shutil.which(editor):
|
|
return editor
|
|
return self.get_editor()
|
|
|
|
def spawn_editor(self, data: T) -> T:
|
|
text = isinstance(data, str)
|
|
fd, name = tempfile.mkstemp("", "mitmproxy", text=text)
|
|
with_hexeditor = isinstance(data, bytes) and strutils.is_mostly_bin(data)
|
|
with open(fd, "w" if text else "wb") as f:
|
|
f.write(data)
|
|
if with_hexeditor:
|
|
c = self.get_hex_editor()
|
|
else:
|
|
c = self.get_editor()
|
|
cmd = shlex.split(c)
|
|
cmd.append(name)
|
|
with self.uistopped():
|
|
try:
|
|
subprocess.call(cmd)
|
|
except Exception:
|
|
signals.status_message.send(message="Can't start editor: %s" % c)
|
|
else:
|
|
with open(name, "r" if text else "rb") as f:
|
|
data = f.read()
|
|
os.unlink(name)
|
|
return data
|
|
|
|
def spawn_external_viewer(self, data, contenttype):
|
|
if contenttype:
|
|
contenttype = contenttype.split(";")[0]
|
|
ext = mimetypes.guess_extension(contenttype) or ""
|
|
else:
|
|
ext = ""
|
|
fd, name = tempfile.mkstemp(ext, "mproxy")
|
|
os.write(fd, data)
|
|
os.close(fd)
|
|
|
|
# read-only to remind the user that this is a view function
|
|
os.chmod(name, stat.S_IREAD)
|
|
|
|
# hm which one should get priority?
|
|
c = (
|
|
os.environ.get("MITMPROXY_EDITOR")
|
|
or os.environ.get("PAGER")
|
|
or os.environ.get("EDITOR")
|
|
)
|
|
if not c:
|
|
c = "less"
|
|
cmd = shlex.split(c)
|
|
cmd.append(name)
|
|
|
|
with self.uistopped():
|
|
try:
|
|
subprocess.call(cmd, shell=False)
|
|
except Exception:
|
|
signals.status_message.send(
|
|
message="Can't start external viewer: %s" % " ".join(c)
|
|
)
|
|
# add a small delay before deletion so that the file is not removed before being loaded by the viewer
|
|
t = threading.Timer(1.0, os.unlink, args=[name])
|
|
t.start()
|
|
|
|
def set_palette(self, *_) -> None:
|
|
self.ui.register_palette(
|
|
palettes.palettes[self.options.console_palette].palette(
|
|
self.options.console_palette_transparent
|
|
)
|
|
)
|
|
self.ui.clear()
|
|
|
|
def inject_key(self, key):
|
|
self.loop.process_input([key])
|
|
|
|
async def running(self) -> None:
|
|
if not sys.stdout.isatty():
|
|
print(
|
|
"Error: mitmproxy's console interface requires a tty. "
|
|
"Please run mitmproxy in an interactive shell environment.",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
detected_encoding = urwid.detected_encoding.lower()
|
|
if os.name != "nt" and detected_encoding and "utf" not in detected_encoding:
|
|
print(
|
|
f"mitmproxy expects a UTF-8 console environment, not {urwid.detected_encoding!r}. "
|
|
f"Set your LANG environment variable to something like en_US.UTF-8.",
|
|
file=sys.stderr,
|
|
)
|
|
# Experimental (04/2022): We just don't exit here and see if/how that affects users.
|
|
# sys.exit(1)
|
|
urwid.set_encoding("utf8")
|
|
|
|
signals.call_in.connect(self.sig_call_in)
|
|
self.ui = window.Screen()
|
|
self.ui.set_terminal_properties(256)
|
|
self.set_palette(None)
|
|
self.options.subscribe(
|
|
self.set_palette, ["console_palette", "console_palette_transparent"]
|
|
)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
if isinstance(loop, getattr(asyncio, "ProactorEventLoop", tuple())):
|
|
# fix for https://bugs.python.org/issue37373
|
|
loop = AddThreadSelectorEventLoop(loop) # type: ignore
|
|
self.loop = urwid.MainLoop(
|
|
urwid.SolidFill("x"),
|
|
event_loop=urwid.AsyncioEventLoop(loop=loop),
|
|
screen=self.ui,
|
|
handle_mouse=self.options.console_mouse,
|
|
)
|
|
self.window = window.Window(self)
|
|
self.loop.widget = self.window
|
|
self.window.refresh()
|
|
|
|
self.loop.start()
|
|
|
|
await super().running()
|
|
|
|
async def done(self):
|
|
self.loop.stop()
|
|
await super().done()
|
|
|
|
def overlay(self, widget, **kwargs):
|
|
assert self.window
|
|
self.window.set_overlay(widget, **kwargs)
|
|
|
|
def switch_view(self, name):
|
|
assert self.window
|
|
self.window.push(name)
|
|
|
|
def quit(self, a):
|
|
if a != "n":
|
|
self.shutdown()
|