Files
“shengyudong” 322ac74336 2025-12-25 upload
2025-12-25 11:16:59 +08:00

132 lines
4.0 KiB
Python

import asyncio
import gc
import linecache
import os
import platform
import signal
import sys
import threading
import traceback
from collections import Counter
from contextlib import redirect_stdout
from OpenSSL import SSL
from mitmproxy import version
from mitmproxy.utils import asyncio_utils
def dump_system_info():
mitmproxy_version = version.get_dev_version()
openssl_version: str | bytes = SSL.SSLeay_version(SSL.SSLEAY_VERSION)
if isinstance(openssl_version, bytes):
openssl_version = openssl_version.decode()
data = [
f"Mitmproxy: {mitmproxy_version}",
f"Python: {platform.python_version()}",
f"OpenSSL: {openssl_version}",
f"Platform: {platform.platform()}",
]
return "\n".join(data)
def dump_info(signal=None, frame=None, file=sys.stdout): # pragma: no cover
with redirect_stdout(file):
print("****************************************************")
print("Summary")
print("=======")
try:
import psutil
except ModuleNotFoundError:
print("(psutil not installed, skipping some debug info)")
else:
p = psutil.Process()
print("num threads: ", p.num_threads())
if hasattr(p, "num_fds"):
print("num fds: ", p.num_fds())
print("memory: ", p.memory_info())
print()
print("Files")
print("=====")
for i in p.open_files():
print(i)
print()
print("Connections")
print("===========")
for i in p.connections():
print(i)
print()
print("Threads")
print("=======")
bthreads = []
for i in threading.enumerate():
if hasattr(i, "_threadinfo"):
bthreads.append(i)
else:
print(i.name)
bthreads.sort(key=lambda x: getattr(x, "_thread_started", 0))
for i in bthreads:
print(i._threadinfo())
print()
print("Memory")
print("======")
gc.collect()
objs = Counter(str(type(i)) for i in gc.get_objects())
for cls, count in objs.most_common(20):
print(f"{count} {cls}")
print()
print("Memory (mitmproxy only)")
print("=======================")
mitm_objs = Counter({k: v for k, v in objs.items() if "mitmproxy" in k})
for cls, count in mitm_objs.most_common(20):
print(f"{count} {cls}")
try:
asyncio.get_running_loop()
except RuntimeError:
pass
else:
print()
print("Tasks")
print("=======")
for task in asyncio.all_tasks():
f = task.get_stack(limit=1)[0]
line = linecache.getline(
f.f_code.co_filename, f.f_lineno, f.f_globals
).strip()
line = f"{line} # at {os.path.basename(f.f_code.co_filename)}:{f.f_lineno}"
print(f"{asyncio_utils.task_repr(task)}\n {line}")
print("****************************************************")
if os.getenv("MITMPROXY_DEBUG_EXIT"): # pragma: no cover
sys.exit(1)
def dump_stacks(signal=None, frame=None, file=sys.stdout):
id2name = {th.ident: th.name for th in threading.enumerate()}
code = []
for threadId, stack in sys._current_frames().items():
code.append("\n# Thread: %s(%d)" % (id2name.get(threadId, ""), threadId))
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
if line:
code.append(" %s" % (line.strip()))
print("\n".join(code), file=file)
if os.getenv("MITMPROXY_DEBUG_EXIT"): # pragma: no cover
sys.exit(1)
def register_info_dumpers():
if os.name != "nt": # pragma: windows no cover
signal.signal(signal.SIGUSR1, dump_info) # type: ignore
signal.signal(signal.SIGUSR2, dump_stacks) # type: ignore