Files
ai_stock/premarket_watch.py
2025-12-08 15:30:19 +08:00

262 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""盘前监控脚本
功能:
1. 周期性抓取指定数量美股的盘前价格 (富途 before_open_stock_info)
2. 在终端实时打印表格 (可 --interval 控制刷新秒数)
3. 支持 --limit 获取排行榜前N只 或 --symbols 手动指定列表
4. 自动判断美东时段是否为盘前 (4:00-9:30 ET), 非盘前可提示或使用 --force 继续
依赖已有模块: futu.py 中的 EastMoneyAPI / StockDataIntegrator
示例:
python premarket_watch.py --limit 15 --interval 30
python premarket_watch.py --symbols NVDA,AAPL,TSLA --interval 20
python premarket_watch.py --limit 10 --once # 单次输出
python premarket_watch.py --limit 10 --force # 忽略时段检查
注意: 逐个请求富途页面存在速率限制风险, 建议 limit 不要太大; 脚本仅演示用途。
"""
import argparse
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from zoneinfo import ZoneInfo
from typing import List, Dict
from futu import EastMoneyAPI, StockDataIntegrator
from utils_time import now_et, fmt_et_hm, fmt_et
from data_writer import write_symbols, append_premarket_bars, append_premarket_signals
def parse_args():
parser = argparse.ArgumentParser(description='盘前实时监控脚本')
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument('--limit', type=int, default=10, help='获取东方财富排行前N只 (默认10)')
group.add_argument('--symbols', type=str, help='逗号分隔的股票代码列表, 覆盖 limit')
parser.add_argument('--interval', type=int, default=10, help='刷新间隔秒, 默认60')
parser.add_argument('--once', action='store_true', help='只执行一次抓取并退出')
parser.add_argument('--force', action='store_true', help='忽略盘前时段判断强制抓取')
parser.add_argument('--sleep', type=float, default=0.0, help='顺序模式下的延时(已多线程可忽略)')
parser.add_argument('--max-workers', type=int, default=0, help='线程最大数量(0=自动=股票数,建议限制避免过度)')
parser.add_argument('--no-color', action='store_true', help='关闭ANSI颜色输出')
parser.add_argument('--save', action='store_true', help='保存盘前快照和信号到 data/premarket_*.csv')
return parser.parse_args()
def get_et_session(now_et: datetime) -> str:
if now_et.weekday() >= 5:
return 'off'
m = now_et.hour * 60 + now_et.minute
if 4*60 <= m < 9*60 + 30:
return 'pre'
if 9*60 + 30 <= m < 16*60:
return 'regular'
if 16*60 <= m < 20*60:
return 'post'
return 'off'
def fetch_symbol_list(limit: int, api: EastMoneyAPI) -> List[Dict]:
raw, _ = api.get_us_stocks(page_size=limit)
parsed: List[Dict] = []
for item in raw:
data = api.parse_stock_data(item)
if data:
parsed.append({'symbol': data['symbol'], 'name': data['name']})
return parsed
def parse_symbols_arg(symbols_str: str) -> List[Dict]:
result = []
for s in symbols_str.split(','):
sym = s.strip().upper()
if sym:
result.append({'symbol': sym, 'name': ''})
return result
def safe_ratio_to_float(ratio_raw) -> float:
if ratio_raw in (None, ''):
return 0.0
try:
txt = str(ratio_raw).replace('%', '').strip()
if not txt:
return 0.0
return float(txt) / 100.0
except Exception:
return 0.0
def colorize(s: str, positive: bool, no_color: bool) -> str:
if no_color:
return s
if positive:
return f"\x1b[32m{s}\x1b[0m" # 绿色
return f"\x1b[31m{s}\x1b[0m" # 红色
def format_table(rows: List[Dict], no_color: bool) -> str:
headers = ['Symbol', 'Name', 'Premarket Price', 'Change', 'Change %', 'Updated']
col_widths = [len(h) for h in headers]
for r in rows:
col_widths[0] = max(col_widths[0], len(r.get('symbol','')))
col_widths[1] = max(col_widths[1], len(r.get('name','')))
col_widths[2] = max(col_widths[2], len(r.get('premarket_price','')))
col_widths[3] = max(col_widths[3], len(r.get('premarket_change','')))
col_widths[4] = max(col_widths[4], len(r.get('premarket_change_ratio_fmt','')))
col_widths[5] = max(col_widths[5], len(r.get('ts','')))
def pad(text, width):
return str(text).ljust(width)
line_sep = '' * (sum(col_widths) + len(col_widths)*3 - 1)
header_line = ' '.join(pad(h, col_widths[i]) for i, h in enumerate(headers))
body_lines = []
for r in rows:
pos = safe_ratio_to_float(r.get('premarket_change_ratio')) >= 0
ratio_fmt = r.get('premarket_change_ratio_fmt','')
ratio_fmt = colorize(ratio_fmt, pos, no_color)
change = r.get('premarket_change','')
change = colorize(change, pos, no_color)
body_lines.append(' '.join([
pad(r.get('symbol',''), col_widths[0]),
pad(r.get('name',''), col_widths[1]),
pad(r.get('premarket_price',''), col_widths[2]),
pad(change, col_widths[3]),
pad(ratio_fmt, col_widths[4]),
pad(r.get('ts',''), col_widths[5]),
]))
return f"{header_line}\n{line_sep}\n" + '\n'.join(body_lines)
def main():
args = parse_args()
api = EastMoneyAPI()
integrator = StockDataIntegrator()
if args.symbols:
symbols = parse_symbols_arg(args.symbols)
print(f"📋 使用自定义股票列表: {[s['symbol'] for s in symbols]}")
else:
symbols = fetch_symbol_list(args.limit, api)
print(f"📋 获取排行前 {args.limit} 只股票: {[s['symbol'] for s in symbols]}")
if not symbols:
print("❌ 无有效股票列表, 退出")
return
if not args.force:
now_et = datetime.now(ZoneInfo('America/New_York'))
session = get_et_session(now_et)
if session != 'pre':
print(f"⚠️ 当前美东时段为 {session} (ET {now_et.strftime('%H:%M')}), 非盘前, 使用 --force 可强制抓取")
return
def _fetch_one(info: Dict) -> Dict:
sym = info['symbol']
name = info['name']
futu = integrator.get_futu_stock_details(sym)
if futu and futu.get('before_open_price'):
ratio_raw = futu.get('before_open_change_ratio')
ratio_val = safe_ratio_to_float(ratio_raw)
return {
'symbol': sym,
'name': name,
'premarket_price': futu.get('before_open_price',''),
'premarket_change': futu.get('before_open_change',''),
'premarket_change_ratio': ratio_raw,
'premarket_change_ratio_fmt': f"{ratio_val*100:.2f}%" if ratio_raw else '',
'ts': fmt_et_hm(),
}
return {
'symbol': sym,
'name': name,
'premarket_price': '-',
'premarket_change': '-',
'premarket_change_ratio': '',
'premarket_change_ratio_fmt': '',
'ts': fmt_et_hm(),
}
def run_once():
# 动态线程数:若 max-workers=0 用股票数,做一个上限保护例如 128
worker_target = args.max_workers if args.max_workers > 0 else len(symbols)
max_cap = 128 # 安全软限制,避免过度线程导致资源问题
workers = min(worker_target, max_cap)
if workers < len(symbols):
print(f"⚠️ 线程数限制为 {workers} (股票 {len(symbols)}), 使用 --max-workers 调整或提高上限")
start = time.time()
rows: List[Dict] = []
# 多线程并发抓取
with ThreadPoolExecutor(max_workers=workers) as executor:
future_map = {executor.submit(_fetch_one, info): info['symbol'] for info in symbols}
for fut in as_completed(future_map):
try:
rows.append(fut.result())
except Exception as e:
sym = future_map[fut]
rows.append({
'symbol': sym,
'name': '',
'premarket_price': 'ERR',
'premarket_change': '-',
'premarket_change_ratio': '',
'premarket_change_ratio_fmt': '',
'ts': fmt_et_hm(),
})
print(f"⚠️ {sym} 抓取异常: {e}")
# 保持原列表顺序
rows.sort(key=lambda r: [s['symbol'] for s in symbols].index(r['symbol']))
elapsed = time.time() - start
print(f"🕒 ET {fmt_et()} | 刷新间隔 {args.interval}s | 总计 {len(rows)}")
print(f"⏱️ 本轮耗时 {elapsed:.2f}s, 线程 {workers}")
print(format_table(rows, args.no_color))
if args.save:
# 建立 symbol 基础信息用于写入 symbols.csv缺 name 也允许)
symbol_base = [{'symbol': r['symbol'], 'name': r.get('name',''), 'exchange': 'US', 'currency': 'USD'} for r in rows]
symbol_id_map = write_symbols(symbol_base)
append_premarket_bars(rows, symbol_id_map, source='futu')
# 生成盘前阈值信号±3%
signals = []
for r in rows:
raw_ratio = r.get('premarket_change_ratio')
val = safe_ratio_to_float(raw_ratio)
if val >= 0.03:
signals.append({
'symbol': r['symbol'],
'direction': 'BUY',
'reason': f"盘前涨幅 {val*100:.2f}% 触发阈值",
'params': {'premarket_price': r.get('premarket_price'), 'premarket_change_ratio': val}
})
elif val <= -0.03:
signals.append({
'symbol': r['symbol'],
'direction': 'SELL',
'reason': f"盘前跌幅 {val*100:.2f}% 触发阈值",
'params': {'premarket_price': r.get('premarket_price'), 'premarket_change_ratio': val}
})
append_premarket_signals(signals, symbol_id_map)
if signals:
print(f"💡 已保存盘前信号 {len(signals)} 条 -> data/premarket_signals.csv")
print("🗂️ 已保存盘前快照 -> data/premarket_bars.csv")
if args.once:
run_once()
return
while True:
try:
run_once()
if args.interval <= 0:
break
time.sleep(args.interval)
except KeyboardInterrupt:
print("\n🛑 已停止盘前监控")
break
except Exception as e:
print(f"⚠️ 本轮捕获异常: {e}")
time.sleep(args.interval)
if __name__ == '__main__':
main()