262 lines
11 KiB
Python
262 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""盘前监控脚本
|
||
功能:
|
||
1. 周期性抓取指定数量美股的盘前价格 (富途 before_open_stock_info)
|
||
2. 在终端实时打印表格 (可 --interval 控制刷新秒数)
|
||
3. 支持 --limit 获取排行榜前N只 或 --symbols 手动指定列表
|
||
4. 自动判断美东时段是否为盘前 (4:00-9:30 ET), 非盘前可提示或使用 --force 继续
|
||
|
||
依赖已有模块: futu.py 中的 EastMoneyAPI / StockDataIntegrator
|
||
|
||
示例:
|
||
python premarket_watch.py --limit 15 --interval 30
|
||
python premarket_watch.py --symbols NVDA,AAPL,TSLA --interval 20
|
||
python premarket_watch.py --limit 10 --once # 单次输出
|
||
python premarket_watch.py --limit 10 --force # 忽略时段检查
|
||
|
||
注意: 逐个请求富途页面存在速率限制风险, 建议 limit 不要太大; 脚本仅演示用途。
|
||
"""
|
||
import argparse
|
||
import time
|
||
from datetime import datetime
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from zoneinfo import ZoneInfo
|
||
from typing import List, Dict
|
||
from futu import EastMoneyAPI, StockDataIntegrator
|
||
from utils_time import now_et, fmt_et_hm, fmt_et
|
||
from data_writer import write_symbols, append_premarket_bars, append_premarket_signals
|
||
|
||
|
||
def parse_args():
|
||
parser = argparse.ArgumentParser(description='盘前实时监控脚本')
|
||
group = parser.add_mutually_exclusive_group(required=False)
|
||
group.add_argument('--limit', type=int, default=10, help='获取东方财富排行前N只 (默认10)')
|
||
group.add_argument('--symbols', type=str, help='逗号分隔的股票代码列表, 覆盖 limit')
|
||
parser.add_argument('--interval', type=int, default=10, help='刷新间隔秒, 默认60')
|
||
parser.add_argument('--once', action='store_true', help='只执行一次抓取并退出')
|
||
parser.add_argument('--force', action='store_true', help='忽略盘前时段判断强制抓取')
|
||
parser.add_argument('--sleep', type=float, default=0.0, help='顺序模式下的延时(已多线程可忽略)')
|
||
parser.add_argument('--max-workers', type=int, default=0, help='线程最大数量(0=自动=股票数,建议限制避免过度)')
|
||
parser.add_argument('--no-color', action='store_true', help='关闭ANSI颜色输出')
|
||
parser.add_argument('--save', action='store_true', help='保存盘前快照和信号到 data/premarket_*.csv')
|
||
return parser.parse_args()
|
||
|
||
|
||
def get_et_session(now_et: datetime) -> str:
|
||
if now_et.weekday() >= 5:
|
||
return 'off'
|
||
m = now_et.hour * 60 + now_et.minute
|
||
if 4*60 <= m < 9*60 + 30:
|
||
return 'pre'
|
||
if 9*60 + 30 <= m < 16*60:
|
||
return 'regular'
|
||
if 16*60 <= m < 20*60:
|
||
return 'post'
|
||
return 'off'
|
||
|
||
|
||
def fetch_symbol_list(limit: int, api: EastMoneyAPI) -> List[Dict]:
|
||
raw, _ = api.get_us_stocks(page_size=limit)
|
||
parsed: List[Dict] = []
|
||
for item in raw:
|
||
data = api.parse_stock_data(item)
|
||
if data:
|
||
parsed.append({'symbol': data['symbol'], 'name': data['name']})
|
||
return parsed
|
||
|
||
|
||
def parse_symbols_arg(symbols_str: str) -> List[Dict]:
|
||
result = []
|
||
for s in symbols_str.split(','):
|
||
sym = s.strip().upper()
|
||
if sym:
|
||
result.append({'symbol': sym, 'name': ''})
|
||
return result
|
||
|
||
|
||
def safe_ratio_to_float(ratio_raw) -> float:
|
||
if ratio_raw in (None, ''):
|
||
return 0.0
|
||
try:
|
||
txt = str(ratio_raw).replace('%', '').strip()
|
||
if not txt:
|
||
return 0.0
|
||
return float(txt) / 100.0
|
||
except Exception:
|
||
return 0.0
|
||
|
||
|
||
def colorize(s: str, positive: bool, no_color: bool) -> str:
|
||
if no_color:
|
||
return s
|
||
if positive:
|
||
return f"\x1b[32m{s}\x1b[0m" # 绿色
|
||
return f"\x1b[31m{s}\x1b[0m" # 红色
|
||
|
||
|
||
def format_table(rows: List[Dict], no_color: bool) -> str:
|
||
headers = ['Symbol', 'Name', 'Premarket Price', 'Change', 'Change %', 'Updated']
|
||
col_widths = [len(h) for h in headers]
|
||
for r in rows:
|
||
col_widths[0] = max(col_widths[0], len(r.get('symbol','')))
|
||
col_widths[1] = max(col_widths[1], len(r.get('name','')))
|
||
col_widths[2] = max(col_widths[2], len(r.get('premarket_price','')))
|
||
col_widths[3] = max(col_widths[3], len(r.get('premarket_change','')))
|
||
col_widths[4] = max(col_widths[4], len(r.get('premarket_change_ratio_fmt','')))
|
||
col_widths[5] = max(col_widths[5], len(r.get('ts','')))
|
||
def pad(text, width):
|
||
return str(text).ljust(width)
|
||
line_sep = '─' * (sum(col_widths) + len(col_widths)*3 - 1)
|
||
header_line = ' '.join(pad(h, col_widths[i]) for i, h in enumerate(headers))
|
||
body_lines = []
|
||
for r in rows:
|
||
pos = safe_ratio_to_float(r.get('premarket_change_ratio')) >= 0
|
||
ratio_fmt = r.get('premarket_change_ratio_fmt','')
|
||
ratio_fmt = colorize(ratio_fmt, pos, no_color)
|
||
change = r.get('premarket_change','')
|
||
change = colorize(change, pos, no_color)
|
||
body_lines.append(' '.join([
|
||
pad(r.get('symbol',''), col_widths[0]),
|
||
pad(r.get('name',''), col_widths[1]),
|
||
pad(r.get('premarket_price',''), col_widths[2]),
|
||
pad(change, col_widths[3]),
|
||
pad(ratio_fmt, col_widths[4]),
|
||
pad(r.get('ts',''), col_widths[5]),
|
||
]))
|
||
return f"{header_line}\n{line_sep}\n" + '\n'.join(body_lines)
|
||
|
||
|
||
def main():
|
||
args = parse_args()
|
||
api = EastMoneyAPI()
|
||
integrator = StockDataIntegrator()
|
||
|
||
if args.symbols:
|
||
symbols = parse_symbols_arg(args.symbols)
|
||
print(f"📋 使用自定义股票列表: {[s['symbol'] for s in symbols]}")
|
||
else:
|
||
symbols = fetch_symbol_list(args.limit, api)
|
||
print(f"📋 获取排行前 {args.limit} 只股票: {[s['symbol'] for s in symbols]}")
|
||
|
||
if not symbols:
|
||
print("❌ 无有效股票列表, 退出")
|
||
return
|
||
|
||
if not args.force:
|
||
now_et = datetime.now(ZoneInfo('America/New_York'))
|
||
session = get_et_session(now_et)
|
||
if session != 'pre':
|
||
print(f"⚠️ 当前美东时段为 {session} (ET {now_et.strftime('%H:%M')}), 非盘前, 使用 --force 可强制抓取")
|
||
return
|
||
|
||
def _fetch_one(info: Dict) -> Dict:
|
||
sym = info['symbol']
|
||
name = info['name']
|
||
futu = integrator.get_futu_stock_details(sym)
|
||
if futu and futu.get('before_open_price'):
|
||
ratio_raw = futu.get('before_open_change_ratio')
|
||
ratio_val = safe_ratio_to_float(ratio_raw)
|
||
return {
|
||
'symbol': sym,
|
||
'name': name,
|
||
'premarket_price': futu.get('before_open_price',''),
|
||
'premarket_change': futu.get('before_open_change',''),
|
||
'premarket_change_ratio': ratio_raw,
|
||
'premarket_change_ratio_fmt': f"{ratio_val*100:.2f}%" if ratio_raw else '',
|
||
'ts': fmt_et_hm(),
|
||
}
|
||
return {
|
||
'symbol': sym,
|
||
'name': name,
|
||
'premarket_price': '-',
|
||
'premarket_change': '-',
|
||
'premarket_change_ratio': '',
|
||
'premarket_change_ratio_fmt': '',
|
||
'ts': fmt_et_hm(),
|
||
}
|
||
|
||
def run_once():
|
||
# 动态线程数:若 max-workers=0 用股票数,做一个上限保护例如 128
|
||
worker_target = args.max_workers if args.max_workers > 0 else len(symbols)
|
||
max_cap = 128 # 安全软限制,避免过度线程导致资源问题
|
||
workers = min(worker_target, max_cap)
|
||
if workers < len(symbols):
|
||
print(f"⚠️ 线程数限制为 {workers} (股票 {len(symbols)}), 使用 --max-workers 调整或提高上限")
|
||
start = time.time()
|
||
rows: List[Dict] = []
|
||
# 多线程并发抓取
|
||
with ThreadPoolExecutor(max_workers=workers) as executor:
|
||
future_map = {executor.submit(_fetch_one, info): info['symbol'] for info in symbols}
|
||
for fut in as_completed(future_map):
|
||
try:
|
||
rows.append(fut.result())
|
||
except Exception as e:
|
||
sym = future_map[fut]
|
||
rows.append({
|
||
'symbol': sym,
|
||
'name': '',
|
||
'premarket_price': 'ERR',
|
||
'premarket_change': '-',
|
||
'premarket_change_ratio': '',
|
||
'premarket_change_ratio_fmt': '',
|
||
'ts': fmt_et_hm(),
|
||
})
|
||
print(f"⚠️ {sym} 抓取异常: {e}")
|
||
# 保持原列表顺序
|
||
rows.sort(key=lambda r: [s['symbol'] for s in symbols].index(r['symbol']))
|
||
elapsed = time.time() - start
|
||
print(f"🕒 ET {fmt_et()} | 刷新间隔 {args.interval}s | 总计 {len(rows)}")
|
||
print(f"⏱️ 本轮耗时 {elapsed:.2f}s, 线程 {workers}")
|
||
print(format_table(rows, args.no_color))
|
||
|
||
if args.save:
|
||
# 建立 symbol 基础信息用于写入 symbols.csv(缺 name 也允许)
|
||
symbol_base = [{'symbol': r['symbol'], 'name': r.get('name',''), 'exchange': 'US', 'currency': 'USD'} for r in rows]
|
||
symbol_id_map = write_symbols(symbol_base)
|
||
append_premarket_bars(rows, symbol_id_map, source='futu')
|
||
# 生成盘前阈值信号(±3%)
|
||
signals = []
|
||
for r in rows:
|
||
raw_ratio = r.get('premarket_change_ratio')
|
||
val = safe_ratio_to_float(raw_ratio)
|
||
if val >= 0.03:
|
||
signals.append({
|
||
'symbol': r['symbol'],
|
||
'direction': 'BUY',
|
||
'reason': f"盘前涨幅 {val*100:.2f}% 触发阈值",
|
||
'params': {'premarket_price': r.get('premarket_price'), 'premarket_change_ratio': val}
|
||
})
|
||
elif val <= -0.03:
|
||
signals.append({
|
||
'symbol': r['symbol'],
|
||
'direction': 'SELL',
|
||
'reason': f"盘前跌幅 {val*100:.2f}% 触发阈值",
|
||
'params': {'premarket_price': r.get('premarket_price'), 'premarket_change_ratio': val}
|
||
})
|
||
append_premarket_signals(signals, symbol_id_map)
|
||
if signals:
|
||
print(f"💡 已保存盘前信号 {len(signals)} 条 -> data/premarket_signals.csv")
|
||
print("🗂️ 已保存盘前快照 -> data/premarket_bars.csv")
|
||
|
||
if args.once:
|
||
run_once()
|
||
return
|
||
|
||
while True:
|
||
try:
|
||
run_once()
|
||
if args.interval <= 0:
|
||
break
|
||
time.sleep(args.interval)
|
||
except KeyboardInterrupt:
|
||
print("\n🛑 已停止盘前监控")
|
||
break
|
||
except Exception as e:
|
||
print(f"⚠️ 本轮捕获异常: {e}")
|
||
time.sleep(args.interval)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|