Files
ai_stock/monitor.py
2025-12-08 15:30:19 +08:00

229 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
量化交易监控主程序
功能:
1. 循环抓取美股数据 (支持全量/Top N)
2. 调用分析模块进行分析
3. 调用交易模块执行信号
"""
import time
import argparse
from logging_setup import init_logging
from futu import StockDataIntegrator, EastMoneyAPI
from market_analyzer import MarketAnalyzer
from trader import Trader
from datetime import datetime
from data_writer import write_symbols, append_bars_1m, append_bars_session, append_signals, append_features_1m, append_etl_run
from zoneinfo import ZoneInfo
from utils_time import now_et, fmt_et, fmt_et_hm
from signal_filter import SignalCooldownFilter
def main():
# 初始化简单日志(保持现有 print不强制替换
init_logging()
parser = argparse.ArgumentParser(description='AI量化交易监控系统')
parser.add_argument('--interval', type=int, default=60, help='监控间隔(秒)')
parser.add_argument('--limit', type=int, default=100, help='每次监控的股票数量')
parser.add_argument('--all', action='store_true', help='监控所有股票(速度较慢)')
parser.add_argument('--premarket', action='store_true', help='在盘前窗口抓取富途盘前价格并写入 session=pre')
parser.add_argument('--premarket-limit', type=int, default=30, help='盘前抓取的最大股票数(富途页面逐个抓取)')
parser.add_argument('--session-override', choices=['pre','regular','post'], help='测试用手动覆盖当前交易时段')
args = parser.parse_args()
print("🚀 启动 AI 量化交易监控系统...")
print(f"⏱️ 监控间隔: {args.interval}")
# 初始化模块
integrator = StockDataIntegrator()
eastmoney_api = EastMoneyAPI() # 用于快速获取列表
analyzer = MarketAnalyzer()
trader = Trader()
cooldown_filter = SignalCooldownFilter(cooldown_minutes=30)
loop_count = 0
try:
while True:
loop_start = now_et()
loop_count += 1
print(f"\n🔄 第 {loop_count} 次扫描开始 - {fmt_et_hm() } ET")
# 1. 抓取数据 (常规东方财富列表)
# 为了监控效率,我们主要使用东方财富的快速列表接口
# 如果是全量监控
stock_data = []
if args.all:
print("📡 正在获取全量市场数据...")
# 这里我们简化处理直接调用修改后的API获取所有数据
# 注意futu.py 中的 get_us_stocks 已经支持分页获取
# 为了演示,我们这里只获取前几页,或者使用 futu.py 中新增的逻辑
# 直接使用 integrator 的逻辑,但强制 eastmoney_only 以提高速度
# 我们手动调用 eastmoney_api 来获取数据,避免 integrator 的复杂逻辑
# 获取所有数据可能需要一点时间
_, total = eastmoney_api.get_us_stocks(page_size=1)
print(f"📊 市场总股票数: {total}")
# 分页获取所有数据
page_size = 100
limit = total
total_pages = (limit + page_size - 1) // page_size
all_raw_stocks = []
for page in range(1, total_pages + 1):
stocks, _ = eastmoney_api.get_us_stocks(page_size=page_size, page_index=page)
if stocks:
all_raw_stocks.extend(stocks)
# 稍微延时
# time.sleep(0.1)
# 解析数据
for item in all_raw_stocks:
parsed = eastmoney_api.parse_stock_data(item)
if parsed:
stock_data.append({
'symbol': parsed['symbol'],
'name': parsed['name'],
'eastmoney_price': parsed['current_price'],
'eastmoney_change_ratio': parsed['change_ratio']
})
else:
print(f"📡 正在获取 Top {args.limit} 热门股票数据...")
# 获取 Top N
raw_stocks, _ = eastmoney_api.get_us_stocks(page_size=args.limit)
for item in raw_stocks:
parsed = eastmoney_api.parse_stock_data(item)
if parsed:
stock_data.append({
'symbol': parsed['symbol'],
'name': parsed['name'],
'eastmoney_price': parsed['current_price'],
'eastmoney_change_ratio': parsed['change_ratio']
})
print(f"✅ 获取到 {len(stock_data)} 条有效常规行情数据 (ET {fmt_et_hm()})")
# 1.1 盘前数据补充 (仅在盘前窗口且开启参数时,对前 N 只股票抓取富途页面)
def _get_us_market_session(now_et: datetime) -> str:
"""根据美东时间判定交易时段: pre(4:00-9:30), regular(9:30-16:00), post(16:00-20:00), off 其它。
周末直接 off。夏令时由系统 tz 数据自动处理。"""
if now_et.weekday() >= 5: # Saturday=5 Sunday=6
return 'off'
minutes = now_et.hour * 60 + now_et.minute
if 4*60 <= minutes < 9*60 + 30:
return 'pre'
if 9*60 + 30 <= minutes < 16*60:
return 'regular'
if 16*60 <= minutes < 20*60:
return 'post'
return 'off'
def _current_session() -> str:
if args.session_override:
return args.session_override
now_et = datetime.now(ZoneInfo('America/New_York'))
return _get_us_market_session(now_et)
pre_rows = []
session = _current_session()
if args.premarket and session == 'pre':
pre_candidates = stock_data[: args.premarket_limit]
print(f"🌙 盘前窗口内,准备抓取富途盘前数据 {len(pre_candidates)} 条... (ET {fmt_et_hm()})")
for i, item in enumerate(pre_candidates, 1):
symbol = item['symbol']
futu_detail = integrator.get_futu_stock_details(symbol)
if futu_detail and futu_detail.get('before_open_price'):
# 正常化盘前涨跌幅 (可能含 %)
ratio_raw = futu_detail.get('before_open_change_ratio') or ''
ratio_val = 0.0
try:
ratio_clean = str(ratio_raw).replace('%','').strip()
if ratio_clean:
ratio_f = float(ratio_clean)
# 转为小数
ratio_val = ratio_f/100.0
except Exception:
ratio_val = 0.0
item.update({
'premarket_price': futu_detail.get('before_open_price'),
'premarket_change': futu_detail.get('before_open_change'),
'premarket_change_ratio': ratio_val,
'futu_before_open_price': futu_detail.get('before_open_price'), # 兼容 append_bars_session fallback
})
pre_rows.append(item)
if i % 10 == 0:
print(f"🌙 盘前抓取进度 {i}/{len(pre_candidates)} (ET {fmt_et_hm()})")
print(f"🌙 富途盘前成功获取 {len(pre_rows)} 条 (ET {fmt_et_hm()})")
else:
if args.premarket:
print(f"🌙 当前交易时段为 {session},未执行盘前抓取 (ET {fmt_et_hm()})")
# 2.1 将 symbols 与 1分钟线写入 CSVdata/ 下)
try:
symbol_id_map = write_symbols([
{
'symbol': s['symbol'],
'name': s['name'],
'exchange': 'US',
'currency': 'USD',
}
for s in stock_data
])
new_rows = append_bars_1m(stock_data, symbol_id_map, source='eastmoney')
append_features_1m(new_rows)
# 盘前 bars 写入 (不计算特征,避免与常规混淆)
if pre_rows:
append_bars_session(pre_rows, symbol_id_map, source='futu', session='pre')
except Exception as e:
print(f"⚠️ 数据落地失败: {e}")
# 3. 分析数据
raw_signals = analyzer.analyze(stock_data)
# 盘前可选:根据盘前涨幅单独生成预警信号(示例阈值 +3% / -3%
premarket_signals = []
if pre_rows:
for r in pre_rows:
ratio = r.get('premarket_change_ratio') or 0.0
sym = r['symbol']
name = r.get('name', '')
price = r.get('premarket_price') or r.get('eastmoney_price')
if ratio >= 0.03:
premarket_signals.append({'symbol': sym, 'name': name, 'price': price, 'type': 'BUY', 'reason': f'盘前涨幅 {ratio:.2%} 预警'})
elif ratio <= -0.03:
premarket_signals.append({'symbol': sym, 'name': name, 'price': price, 'type': 'SELL', 'reason': f'盘前跌幅 {ratio:.2%} 预警'})
if premarket_signals:
print(f"🌙 盘前预警信号 {len(premarket_signals)} 条 (ET {fmt_et_hm()})")
raw_signals.extend(premarket_signals)
signals = cooldown_filter.filter(raw_signals)
# 3.1 写入 signals.csv
try:
if signals:
append_signals(signals, symbol_id_map)
except Exception as e:
print(f"⚠️ 写入信号失败: {e}")
# 4. 执行交易
if signals:
trader.execute_signals(signals)
else:
print("💤 当前无交易信号")
# 等待下一次扫描
# 记录ETL运行
try:
duration = (now_et() - loop_start).total_seconds()
append_etl_run(loop_count, len(stock_data), len(signals), duration, errors=0)
except Exception as e:
print(f"⚠️ 记录ETL统计失败: {e}")
print(f"⏳ 等待 {args.interval} 秒...")
time.sleep(args.interval)
except KeyboardInterrupt:
print("\n🛑 监控已停止")
if __name__ == "__main__":
main()