#!/usr/bin/env python3 # -*- coding: utf-8 -*- """盘前监控脚本 功能: 1. 周期性抓取指定数量美股的盘前价格 (富途 before_open_stock_info) 2. 在终端实时打印表格 (可 --interval 控制刷新秒数) 3. 支持 --limit 获取排行榜前N只 或 --symbols 手动指定列表 4. 自动判断美东时段是否为盘前 (4:00-9:30 ET), 非盘前可提示或使用 --force 继续 依赖已有模块: futu.py 中的 EastMoneyAPI / StockDataIntegrator 示例: python premarket_watch.py --limit 15 --interval 30 python premarket_watch.py --symbols NVDA,AAPL,TSLA --interval 20 python premarket_watch.py --limit 10 --once # 单次输出 python premarket_watch.py --limit 10 --force # 忽略时段检查 注意: 逐个请求富途页面存在速率限制风险, 建议 limit 不要太大; 脚本仅演示用途。 """ import argparse import time from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from zoneinfo import ZoneInfo from typing import List, Dict from futu import EastMoneyAPI, StockDataIntegrator from utils_time import now_et, fmt_et_hm, fmt_et from data_writer import write_symbols, append_premarket_bars, append_premarket_signals def parse_args(): parser = argparse.ArgumentParser(description='盘前实时监控脚本') group = parser.add_mutually_exclusive_group(required=False) group.add_argument('--limit', type=int, default=10, help='获取东方财富排行前N只 (默认10)') group.add_argument('--symbols', type=str, help='逗号分隔的股票代码列表, 覆盖 limit') parser.add_argument('--interval', type=int, default=10, help='刷新间隔秒, 默认60') parser.add_argument('--once', action='store_true', help='只执行一次抓取并退出') parser.add_argument('--force', action='store_true', help='忽略盘前时段判断强制抓取') parser.add_argument('--sleep', type=float, default=0.0, help='顺序模式下的延时(已多线程可忽略)') parser.add_argument('--max-workers', type=int, default=0, help='线程最大数量(0=自动=股票数,建议限制避免过度)') parser.add_argument('--no-color', action='store_true', help='关闭ANSI颜色输出') parser.add_argument('--save', action='store_true', help='保存盘前快照和信号到 data/premarket_*.csv') return parser.parse_args() def get_et_session(now_et: datetime) -> str: if now_et.weekday() >= 5: return 'off' m = now_et.hour * 60 + now_et.minute if 4*60 <= m < 9*60 + 30: return 'pre' if 9*60 + 30 <= m < 16*60: return 'regular' if 16*60 <= m < 20*60: return 'post' return 'off' def fetch_symbol_list(limit: int, api: EastMoneyAPI) -> List[Dict]: raw, _ = api.get_us_stocks(page_size=limit) parsed: List[Dict] = [] for item in raw: data = api.parse_stock_data(item) if data: parsed.append({'symbol': data['symbol'], 'name': data['name']}) return parsed def parse_symbols_arg(symbols_str: str) -> List[Dict]: result = [] for s in symbols_str.split(','): sym = s.strip().upper() if sym: result.append({'symbol': sym, 'name': ''}) return result def safe_ratio_to_float(ratio_raw) -> float: if ratio_raw in (None, ''): return 0.0 try: txt = str(ratio_raw).replace('%', '').strip() if not txt: return 0.0 return float(txt) / 100.0 except Exception: return 0.0 def colorize(s: str, positive: bool, no_color: bool) -> str: if no_color: return s if positive: return f"\x1b[32m{s}\x1b[0m" # 绿色 return f"\x1b[31m{s}\x1b[0m" # 红色 def format_table(rows: List[Dict], no_color: bool) -> str: headers = ['Symbol', 'Name', 'Premarket Price', 'Change', 'Change %', 'Updated'] col_widths = [len(h) for h in headers] for r in rows: col_widths[0] = max(col_widths[0], len(r.get('symbol',''))) col_widths[1] = max(col_widths[1], len(r.get('name',''))) col_widths[2] = max(col_widths[2], len(r.get('premarket_price',''))) col_widths[3] = max(col_widths[3], len(r.get('premarket_change',''))) col_widths[4] = max(col_widths[4], len(r.get('premarket_change_ratio_fmt',''))) col_widths[5] = max(col_widths[5], len(r.get('ts',''))) def pad(text, width): return str(text).ljust(width) line_sep = '─' * (sum(col_widths) + len(col_widths)*3 - 1) header_line = ' '.join(pad(h, col_widths[i]) for i, h in enumerate(headers)) body_lines = [] for r in rows: pos = safe_ratio_to_float(r.get('premarket_change_ratio')) >= 0 ratio_fmt = r.get('premarket_change_ratio_fmt','') ratio_fmt = colorize(ratio_fmt, pos, no_color) change = r.get('premarket_change','') change = colorize(change, pos, no_color) body_lines.append(' '.join([ pad(r.get('symbol',''), col_widths[0]), pad(r.get('name',''), col_widths[1]), pad(r.get('premarket_price',''), col_widths[2]), pad(change, col_widths[3]), pad(ratio_fmt, col_widths[4]), pad(r.get('ts',''), col_widths[5]), ])) return f"{header_line}\n{line_sep}\n" + '\n'.join(body_lines) def main(): args = parse_args() api = EastMoneyAPI() integrator = StockDataIntegrator() if args.symbols: symbols = parse_symbols_arg(args.symbols) print(f"📋 使用自定义股票列表: {[s['symbol'] for s in symbols]}") else: symbols = fetch_symbol_list(args.limit, api) print(f"📋 获取排行前 {args.limit} 只股票: {[s['symbol'] for s in symbols]}") if not symbols: print("❌ 无有效股票列表, 退出") return if not args.force: now_et = datetime.now(ZoneInfo('America/New_York')) session = get_et_session(now_et) if session != 'pre': print(f"⚠️ 当前美东时段为 {session} (ET {now_et.strftime('%H:%M')}), 非盘前, 使用 --force 可强制抓取") return def _fetch_one(info: Dict) -> Dict: sym = info['symbol'] name = info['name'] futu = integrator.get_futu_stock_details(sym) if futu and futu.get('before_open_price'): ratio_raw = futu.get('before_open_change_ratio') ratio_val = safe_ratio_to_float(ratio_raw) return { 'symbol': sym, 'name': name, 'premarket_price': futu.get('before_open_price',''), 'premarket_change': futu.get('before_open_change',''), 'premarket_change_ratio': ratio_raw, 'premarket_change_ratio_fmt': f"{ratio_val*100:.2f}%" if ratio_raw else '', 'ts': fmt_et_hm(), } return { 'symbol': sym, 'name': name, 'premarket_price': '-', 'premarket_change': '-', 'premarket_change_ratio': '', 'premarket_change_ratio_fmt': '', 'ts': fmt_et_hm(), } def run_once(): # 动态线程数:若 max-workers=0 用股票数,做一个上限保护例如 128 worker_target = args.max_workers if args.max_workers > 0 else len(symbols) max_cap = 128 # 安全软限制,避免过度线程导致资源问题 workers = min(worker_target, max_cap) if workers < len(symbols): print(f"⚠️ 线程数限制为 {workers} (股票 {len(symbols)}), 使用 --max-workers 调整或提高上限") start = time.time() rows: List[Dict] = [] # 多线程并发抓取 with ThreadPoolExecutor(max_workers=workers) as executor: future_map = {executor.submit(_fetch_one, info): info['symbol'] for info in symbols} for fut in as_completed(future_map): try: rows.append(fut.result()) except Exception as e: sym = future_map[fut] rows.append({ 'symbol': sym, 'name': '', 'premarket_price': 'ERR', 'premarket_change': '-', 'premarket_change_ratio': '', 'premarket_change_ratio_fmt': '', 'ts': fmt_et_hm(), }) print(f"⚠️ {sym} 抓取异常: {e}") # 保持原列表顺序 rows.sort(key=lambda r: [s['symbol'] for s in symbols].index(r['symbol'])) elapsed = time.time() - start print(f"🕒 ET {fmt_et()} | 刷新间隔 {args.interval}s | 总计 {len(rows)}") print(f"⏱️ 本轮耗时 {elapsed:.2f}s, 线程 {workers}") print(format_table(rows, args.no_color)) if args.save: # 建立 symbol 基础信息用于写入 symbols.csv(缺 name 也允许) symbol_base = [{'symbol': r['symbol'], 'name': r.get('name',''), 'exchange': 'US', 'currency': 'USD'} for r in rows] symbol_id_map = write_symbols(symbol_base) append_premarket_bars(rows, symbol_id_map, source='futu') # 生成盘前阈值信号(±3%) signals = [] for r in rows: raw_ratio = r.get('premarket_change_ratio') val = safe_ratio_to_float(raw_ratio) if val >= 0.03: signals.append({ 'symbol': r['symbol'], 'direction': 'BUY', 'reason': f"盘前涨幅 {val*100:.2f}% 触发阈值", 'params': {'premarket_price': r.get('premarket_price'), 'premarket_change_ratio': val} }) elif val <= -0.03: signals.append({ 'symbol': r['symbol'], 'direction': 'SELL', 'reason': f"盘前跌幅 {val*100:.2f}% 触发阈值", 'params': {'premarket_price': r.get('premarket_price'), 'premarket_change_ratio': val} }) append_premarket_signals(signals, symbol_id_map) if signals: print(f"💡 已保存盘前信号 {len(signals)} 条 -> data/premarket_signals.csv") print("🗂️ 已保存盘前快照 -> data/premarket_bars.csv") if args.once: run_once() return while True: try: run_once() if args.interval <= 0: break time.sleep(args.interval) except KeyboardInterrupt: print("\n🛑 已停止盘前监控") break except Exception as e: print(f"⚠️ 本轮捕获异常: {e}") time.sleep(args.interval) if __name__ == '__main__': main()