Files
ai_stock/futu.py
2025-12-08 15:30:19 +08:00

1035 lines
44 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
富途牛牛股票价格数据抓取工具
功能解析HTML页面提取股票价格、涨跌额、涨跌幅数据并保存到CSV文件
作者: AI Stock Trading Assistant
日期: 2024年
"""
import requests
import re
import csv
import time
import sys
import argparse
import json
import urllib.parse
from datetime import datetime
from bs4 import BeautifulSoup
from logging_setup import init_logging
class EastMoneyAPI:
"""东方财富API接口类用于获取美股市值排行数据"""
def __init__(self):
self.base_url = "https://push2.eastmoney.com/api/qt/clist/get"
self.headers = {
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Referer': 'https://quote.eastmoney.com/center/gridlist.html',
'Sec-Fetch-Dest': 'script',
'Sec-Fetch-Mode': 'no-cors',
'Sec-Fetch-Site': 'same-site',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36',
'sec-ch-ua': '"Google Chrome";v="141", "Not?A_Brand";v="8", "Chromium";v="141"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"'
}
self.cookies = {
'qgqp_b_id': '6762b4d1088a5df99fef2aaf99350ad6',
'st_nvi': '5kjYZa9DBnsX5oWsYvA_Fe959',
'nid': '0e17cb22ecf6960f4858bfd8cbdced17',
'nid_create_time': '1756556375507',
'gvi': 'l15-44w-TU820v6GOA4-re3ed',
'gvi_create_time': '1756556375507',
'websitepoptg_api_time': '1762206479838',
'st_si': '15354362676602',
'st_asi': 'delete',
'fullscreengg': '1',
'fullscreengg2': '1',
'st_pvi': '72179808487060',
'st_sp': '2025-08-30%2020%3A19%3A35',
'st_inirUrl': 'https%3A%2F%2Femcreative.eastmoney.com%2Fapp_fortune%2Farticle%2Findex.html',
'st_sn': '3',
'st_psi': '20251104055541297-113200301321-2855469466'
}
def get_us_stocks_top50(self, page_size=50):
"""获取美股市值前N名股票数据"""
return self.get_us_stocks(page_size=page_size)
def get_us_stocks(self, page_size=50, page_index=1):
"""获取美股股票数据(支持分页)"""
try:
# 构建请求参数
timestamp = int(time.time() * 1000)
callback = f"jQuery37106960268121993591_{timestamp}"
params = {
'np': '1',
'fltt': '1',
'invt': '2',
'cb': callback,
'fs': 'm:105,m:106,m:107', # 美股市场代码
'fields': 'f12,f13,f14,f1,f2,f4,f3,f152,f17,f28,f15,f16,f18,f20,f115',
'fid': 'f20', # 按市值排序
'pn': str(page_index),
'pz': str(page_size),
'po': '1',
'dect': '1',
'ut': 'fa5fd1943c7b386f172d6893dbfba10b',
'wbp2u': '|0|0|0|web',
'_': str(timestamp)
}
print(f"🌐 正在获取美股数据 (第{page_index}页, 每页{page_size}条)...")
response = requests.get(
self.base_url,
params=params,
headers=self.headers,
cookies=self.cookies,
timeout=30,
verify=False # 跳过SSL证书校验
)
if response.status_code == 200:
# 解析JSONP响应
content = response.text
print("[调试] 东方财富API返回内容:", content[:500]) # 打印前500字符避免过长
# 提取JSON部分
if not content or '(' not in content or ')' not in content:
print("❌ 返回内容异常未包含有效JSONP")
return [], 0
start = content.find('(') + 1
end = content.rfind(')')
json_str = content[start:end]
try:
data = json.loads(json_str)
except Exception as e:
print(f"❌ JSON解析失败: {e}")
return [], 0
if data.get('rc') == 0 and 'data' in data:
stocks = data['data'].get('diff', [])
total = data['data'].get('total', 0)
print(f"✅ 成功获取 {len(stocks)} 只股票数据 (总数: {total})")
return stocks, total
else:
print(f"❌ 接口返回错误: {data}")
return [], 0
else:
print(f"❌ 请求失败,状态码: {response.status_code}")
print("[调试] 返回内容:", response.text[:500])
return [], 0
except Exception as e:
print(f"❌ 获取数据失败: {e}")
return [], 0
def parse_stock_data(self, stock_item):
"""解析单个股票数据,返回数值化字段
约定:
- 价格/涨跌额 等返回 float货币数值
- 涨跌幅 change_ratio 返回小数(如 0.0402 表示 4.02%
"""
try:
def _to_float(x, default=0.0):
if x in (None, '-', ''):
return default
try:
return float(x)
except Exception:
return default
def _normalize_ratio(v):
"""将东财返回的涨跌幅统一转为小数。
兼容两种可能:
- v 为百分数值(如 4.02 表示 4.02%
- v 为基点/扩大100 的数值(如 402 表示 4.02%
"""
fv = _to_float(v, 0.0)
# 若绝对值大于100优先认为是扩大100的百分数
percent = fv / 100.0 if abs(fv) > 100 else fv
return percent / 100.0
symbol = stock_item.get('f12', '')
name = stock_item.get('f14', '')
current_price = _to_float(stock_item.get('f2', 0.0))
# f4 通常为涨跌额
change_amount = _to_float(stock_item.get('f4', 0.0))
# f3 通常为涨跌幅(百分数值),统一转为小数
change_ratio = _normalize_ratio(stock_item.get('f3', 0.0))
market_cap = _to_float(stock_item.get('f20', 0.0))
high_price = _to_float(stock_item.get('f15', 0.0))
low_price = _to_float(stock_item.get('f16', 0.0))
open_price = _to_float(stock_item.get('f17', 0.0))
prev_close = _to_float(stock_item.get('f18', 0.0))
# 交易所/货币简单填充(东财 US 列表)
exchange = 'US'
currency = 'USD'
return {
'symbol': symbol,
'name': name,
'current_price': current_price,
'change_amount': change_amount,
'change_ratio': change_ratio,
'market_cap': market_cap,
'high_price': high_price,
'low_price': low_price,
'open_price': open_price,
'prev_close': prev_close,
'exchange': exchange,
'currency': currency,
}
except Exception as e:
print(f"❌ 解析股票数据失败: {e}")
return None
def _format_price(self, price_value):
"""返回价格的 float 数值(为保持接口名不变)。"""
if price_value in (None, '-', ''):
return 0.0
try:
return float(price_value)
except Exception:
return 0.0
def _format_ratio(self, ratio_value):
"""返回涨跌幅的小数(为保持接口名不变)。"""
if ratio_value in (None, '-', ''):
return 0.0
try:
rv = float(ratio_value)
except Exception:
return 0.0
percent = rv / 100.0 if abs(rv) > 100 else rv
return percent / 100.0
class FutuStockParser:
def __init__(self):
"""初始化富途股票解析器"""
self.cookies = {
'cipher_device_id': '1757556073667578',
'device_id': '1757556073667578',
'_gcl_au': '1.1.1663570279.1758365279',
'showWatch': '1',
'invite_from': '10237865',
'sensorsdata2015jssdkcross': '%7B%22distinct_id%22%3A%22ftv1PuOG%2BAdnk9zxdFTbZjIrOSbcir6XtNvwdxf2Y34zO%2FCriKNPyEOfzRH7jhboo2SL%22%2C%22first_id%22%3A%2219936818c19622-028fe866d247376-26061951-1024000-19936818c1b100%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E8%87%AA%E7%84%B6%E6%90%9C%E7%B4%A2%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC%22%2C%22%24latest_referrer%22%3A%22https%3A%2F%2Fwww.baidu.com%2Flink%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMTk5MzY4MThjMTk2MjItMDI4ZmU4NjZkMjQ3Mzc2LTI2MDYxOTUxLTEwMjQwMDAtMTk5MzY4MThjMWIxMDAiLCIkaWRlbnRpdHlfbG9naW5faWQiOiJmdHYxUHVPRytBZG5rOXp4ZEZUYlpqSXJPU2JjaXI2WHROdndkeGYyWTM0ek8vQ3JpS05QeUVPZnpSSDdqaGJvbzJTTCJ9%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%24identity_login_id%22%2C%22value%22%3A%22ftv1PuOG%2BAdnk9zxdFTbZjIrOSbcir6XtNvwdxf2Y34zO%2FCriKNPyEOfzRH7jhboo2SL%22%7D%7D',
'futu-csrf': 'oiTa//eJsjCp/OY8h3KrAY8REws=',
'locale': 'zh-cn',
'csrfToken': 'VRY8_4JPRRdq5GEsxaC4wio5',
'Hm_lvt_f3ecfeb354419b501942b6f9caf8d0db': '1760076566,1762203125',
'HMACCOUNT': '98F1F80B74EBD3E2',
'Hm_lpvt_f3ecfeb354419b501942b6f9caf8d0db': '1762203146',
'locale.sig': 'ObiqV0BmZw7fEycdGJRoK-Q0Yeuop294gBeiHL1LqgQ',
}
self.headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'max-age=0',
'priority': 'u=0, i',
'referer': 'https://www.futunn.com/',
'sec-ch-ua': '"Google Chrome";v="141", "Not?A_Brand";v="8", "Chromium";v="141"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'same-origin',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36',
}
def fetch_stock_page(self, url):
"""
获取股票页面HTML内容
Args:
url (str): 股票页面URL
Returns:
str: HTML内容失败返回None
"""
try:
response = requests.get(url, cookies=self.cookies, headers=self.headers, timeout=30)
response.raise_for_status()
return response.text
except requests.RequestException as e:
print(f"❌ 获取页面失败: {e}")
return None
def parse_javascript_data(self, html_content):
"""
解析HTML中的JavaScript数据提取window.__INITIAL_STATE__中的股票信息
Args:
html_content (str): HTML内容
Returns:
dict: 包含股票信息的字典失败返回None
"""
if not html_content:
return None
try:
# 查找包含window.__INITIAL_STATE__的script标签
script_pattern = r'window\.__INITIAL_STATE__\s*=\s*({.*?});'
match = re.search(script_pattern, html_content, re.DOTALL)
if not match:
print("❌ 未找到window.__INITIAL_STATE__数据")
return None
# 提取JSON字符串
json_str = match.group(1)
# 解析JSON数据
initial_state = json.loads(json_str)
# 提取stock_info
stock_info = initial_state.get('stock_info')
if not stock_info:
print("❌ 未找到stock_info数据")
return None
# 提取before_open_stock_info数据
before_open_info = stock_info.get('before_open_stock_info', {})
before_price = before_open_info.get('price')
before_change = before_open_info.get('change')
before_change_ratio = before_open_info.get('changeRatio')
# 提取data数据
data_info = stock_info.get('data', {})
current_price = data_info.get('price')
current_change_ratio = data_info.get('changeRatio')
# 构建结果字典
result = {
'before_open_price': before_price,
'before_open_change': before_change,
'before_open_change_ratio': before_change_ratio,
'current_price': current_price,
'current_change_ratio': current_change_ratio,
'timestamp': int(time.time()),
'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
return result
except json.JSONDecodeError as e:
print(f"❌ JSON解析失败: {e}")
return None
except Exception as e:
print(f"❌ 解析JavaScript数据失败: {e}")
return None
def parse_price_data(self, html_content):
"""
解析HTML内容提取价格数据包括盘后交易数据
Args:
html_content (str): HTML内容
Returns:
dict: 包含价格、涨跌额、涨跌幅、盘后数据的字典
"""
if not html_content:
return None
try:
soup = BeautifulSoup(html_content, 'html.parser')
# 查找常规交易时间价格容器
price_container = soup.find('ul', class_='flex-end price-current')
if not price_container:
print("❌ 未找到价格容器")
return None
# 提取当前价格 (mg-r-8 price direct-up/down)
price_element = price_container.find('li', class_=re.compile(r'mg-r-8 price'))
current_price = None
if price_element:
price_text = price_element.get_text(strip=True)
# 使用正则表达式提取数字
price_match = re.search(r'[\d,]+\.?\d*', price_text)
if price_match:
current_price = price_match.group().replace(',', '')
# 提取涨跌信息
change_element = price_container.find('li', class_=re.compile(r'change'))
change_price = None
change_ratio = None
if change_element:
# 提取涨跌额 (change-price)
change_price_span = change_element.find('span', class_='change-price')
if change_price_span:
change_price_text = change_price_span.get_text(strip=True)
# 提取数字,保留正负号
price_match = re.search(r'[+-]?[\d,]+\.?\d*', change_price_text)
if price_match:
change_price = price_match.group().replace(',', '')
# 提取涨跌幅 (mg-l-8 change-ratio)
change_ratio_span = change_element.find('span', class_=re.compile(r'mg-l-8 change-ratio'))
if change_ratio_span:
change_ratio_text = change_ratio_span.get_text(strip=True)
# 提取百分比
ratio_match = re.search(r'[+-]?[\d,]+\.?\d*%', change_ratio_text)
if ratio_match:
change_ratio = ratio_match.group()
# 判断涨跌方向
direction = "up" if "direct-up" in str(price_container) else "down" if "direct-down" in str(price_container) else "flat"
# 查找盘后交易数据
after_hours_data = self._parse_after_hours_data(soup)
result = {
'current_price': current_price,
'change_price': change_price,
'change_ratio': change_ratio,
'direction': direction,
'timestamp': int(time.time()),
'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
# 合并盘后数据
if after_hours_data:
result.update(after_hours_data)
return result
except Exception as e:
print(f"❌ 解析HTML失败: {e}")
return None
def _parse_after_hours_data(self, soup):
"""
解析盘后交易数据
Args:
soup: BeautifulSoup对象
Returns:
dict: 盘后交易数据
"""
after_hours_data = {
'after_hours_price': None,
'after_hours_change': None,
'after_hours_ratio': None,
'after_hours_direction': None,
'after_hours_status': None
}
try:
# 查找盘后信息容器
disc_info = soup.find('div', class_='disc-info')
if not disc_info:
return after_hours_data
# 查找盘后价格容器
after_price_container = disc_info.find('ul', class_='flex-end price-current')
if not after_price_container:
return after_hours_data
# 提取盘后价格 (mg-r-8 disc-price direct-down/up)
after_price_element = after_price_container.find('li', class_=re.compile(r'mg-r-8 disc-price'))
if after_price_element:
after_price_text = after_price_element.get_text(strip=True)
price_match = re.search(r'[\d,]+\.?\d*', after_price_text)
if price_match:
after_hours_data['after_hours_price'] = price_match.group().replace(',', '')
# 判断盘后涨跌方向
if "direct-up" in after_price_element.get('class', []):
after_hours_data['after_hours_direction'] = "up"
elif "direct-down" in after_price_element.get('class', []):
after_hours_data['after_hours_direction'] = "down"
else:
after_hours_data['after_hours_direction'] = "flat"
# 提取盘后涨跌信息
after_change_element = after_price_container.find('li', class_=re.compile(r'direct-'))
if after_change_element:
# 提取盘后涨跌额和涨跌幅
change_spans = after_change_element.find_all('span')
if len(change_spans) >= 2:
# 第一个span是涨跌额
change_text = change_spans[0].get_text(strip=True)
# 确保提取的是涨跌额,不是价格
if change_text.startswith(('+', '-')):
change_match = re.search(r'[+-]?[\d,]+\.?\d*', change_text)
if change_match:
after_hours_data['after_hours_change'] = change_match.group().replace(',', '')
# 第二个span是涨跌幅 (mg-l-8)
ratio_span = change_spans[1]
ratio_text = ratio_span.get_text(strip=True)
ratio_match = re.search(r'[+-]?[\d,]+\.?\d*%', ratio_text)
if ratio_match:
after_hours_data['after_hours_ratio'] = ratio_match.group()
elif len(change_spans) == 1:
# 如果只有一个span可能包含涨跌额和涨跌幅
span_text = change_spans[0].get_text(strip=True)
# 只有当文本以+/-开头时才是涨跌额
if span_text.startswith(('+', '-')):
# 尝试提取涨跌额
change_match = re.search(r'[+-]?[\d,]+\.?\d*(?!%)', span_text)
if change_match:
after_hours_data['after_hours_change'] = change_match.group().replace(',', '')
# 尝试提取涨跌幅
ratio_match = re.search(r'[+-]?[\d,]+\.?\d*%', span_text)
if ratio_match:
after_hours_data['after_hours_ratio'] = ratio_match.group()
else:
# 如果没有span直接从li元素中提取
full_text = after_change_element.get_text(strip=True)
# 提取涨跌额(必须以+/-开头)
change_match = re.search(r'([+-][\d,]+\.?\d*)(?!\s*%)', full_text)
if change_match:
after_hours_data['after_hours_change'] = change_match.group(1).replace(',', '')
# 提取涨跌幅(包含%的数字)
ratio_match = re.search(r'([+-]?[\d,]+\.?\d*%)', full_text)
if ratio_match:
after_hours_data['after_hours_ratio'] = ratio_match.group(1)
# 提取盘后状态信息
status_element = disc_info.find('div', class_='status')
if status_element:
after_hours_data['after_hours_status'] = status_element.get_text(strip=True)
except Exception as e:
print(f"⚠️ 解析盘后数据失败: {e}")
return after_hours_data
def save_to_csv_js(self, data, filename=None):
"""
将JavaScript解析的数据保存到CSV文件
Args:
data (dict): JavaScript解析的价格数据
filename (str): 文件名如果为None则自动生成
"""
if not data:
print("❌ 没有数据可保存")
return False
if filename is None:
timestamp = int(time.time())
filename = f"futu_{timestamp}.csv"
try:
# 检查文件是否存在,决定是否写入表头
file_exists = False
try:
with open(filename, 'r', encoding='utf-8-sig'):
file_exists = True
except FileNotFoundError:
pass
with open(filename, 'a', newline='', encoding='utf-8-sig') as csvfile:
fieldnames = ['timestamp', 'datetime', 'before_open_price', 'before_open_change',
'before_open_change_ratio', 'current_price', 'current_change_ratio']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# 如果文件不存在,写入表头
if not file_exists:
writer.writeheader()
writer.writerow(data)
print(f"✅ 数据已保存到: {filename}")
return True
except Exception as e:
print(f"❌ 保存CSV失败: {e}")
return False
def save_to_csv(self, data, filename=None):
"""
将数据保存到CSV文件
Args:
data (dict): 价格数据
filename (str): 文件名如果为None则自动生成
"""
if not data:
print("❌ 没有数据可保存")
return False
if filename is None:
timestamp = int(time.time())
filename = f"futu_{timestamp}.csv"
try:
# 检查文件是否存在,决定是否写入表头
file_exists = False
try:
with open(filename, 'r', encoding='utf-8-sig'):
file_exists = True
except FileNotFoundError:
pass
with open(filename, 'a', newline='', encoding='utf-8-sig') as csvfile:
fieldnames = ['timestamp', 'datetime', 'current_price', 'change_price', 'change_ratio', 'direction',
'after_hours_price', 'after_hours_change', 'after_hours_ratio', 'after_hours_direction', 'after_hours_status']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# 如果文件不存在,写入表头
if not file_exists:
writer.writeheader()
writer.writerow(data)
print(f"✅ 数据已保存到: {filename}")
return True
except Exception as e:
print(f"❌ 保存CSV失败: {e}")
return False
def parse_from_html_string(self, html_string):
"""
从HTML字符串解析价格数据
Args:
html_string (str): HTML字符串
Returns:
dict: 解析结果
"""
return self.parse_price_data(html_string)
class StockDataIntegrator:
"""股票数据整合器,结合东方财富和富途数据"""
def __init__(self):
self.eastmoney_api = EastMoneyAPI()
self.futu_parser = FutuStockParser()
def get_futu_stock_details(self, symbol):
"""根据股票代码获取富途详细数据"""
try:
# 构建富途URL
futu_url = f"https://www.futunn.com/stock/{symbol}-US"
print(f"🔍 正在获取 {symbol} 的富途数据...")
html_content = self.futu_parser.fetch_stock_page(futu_url)
if not html_content:
print(f"❌ 无法获取 {symbol} 的富途页面")
return None
# 尝试解析JavaScript数据
js_data = self.futu_parser.parse_javascript_data(html_content)
if js_data:
return {
'before_open_price': js_data.get('before_open_price', ''),
'before_open_change': js_data.get('before_open_change', ''),
'before_open_change_ratio': js_data.get('before_open_change_ratio', ''),
'current_price': js_data.get('current_price', ''),
'current_change_ratio': js_data.get('current_change_ratio', '')
}
# 如果JavaScript解析失败尝试HTML解析
html_data = self.futu_parser.parse_price_data(html_content)
if html_data:
return {
'before_open_price': '',
'before_open_change': '',
'before_open_change_ratio': '',
'current_price': html_data.get('current_price', ''),
'current_change_ratio': html_data.get('change_ratio', '')
}
return None
except Exception as e:
print(f"❌ 获取 {symbol} 富途数据失败: {e}")
return None
def get_top50_integrated_data(self, limit=50, fetch_all=False):
"""
获取美股整合数据
Args:
limit: 限制数量
fetch_all: 是否获取所有股票忽略limit
"""
if fetch_all:
print("📊 开始获取所有美股整合数据...")
# 获取第一页以确定总数
_, total_count = self.eastmoney_api.get_us_stocks(page_size=1)
limit = total_count
print(f"📊 预计总数: {total_count}")
else:
print(f"📊 开始获取美股市值前{limit}名整合数据...")
# 东方财富一次最多获取约100条比较稳定如果数量大需要分页
# 这里为了简化如果limit很大我们分批获取
all_stocks = []
page_size = 100
total_pages = (limit + page_size - 1) // page_size
for page in range(1, total_pages + 1):
current_limit = min(page_size, limit - (page-1)*page_size)
if current_limit <= 0:
break
stocks, _ = self.eastmoney_api.get_us_stocks(page_size=page_size, page_index=page)
if not stocks:
break
all_stocks.extend(stocks)
# 稍微延时防止封IP
time.sleep(0.2)
if not all_stocks:
print("❌ 无法获取东方财富数据")
return []
# 截取需要的数量
all_stocks = all_stocks[:limit]
integrated_data = []
print(f"📋 已获取 {len(all_stocks)} 条基础数据,开始处理详情...")
# 注意如果要处理3000+股票,逐个请求富途会非常慢且容易被封
# 建议如果是全量抓取仅使用东方财富数据或者只对Top N进行富途详情抓取
# 这里我们做一个策略:如果是全量抓取(>100),则只抓取东方财富数据,除非特殊指定
skip_futu_details = len(all_stocks) > 100
if skip_futu_details:
print("⚠️ 股票数量较多,将跳过富途详情页抓取以提高速度...")
for i, stock_item in enumerate(all_stocks, 1):
try:
# 解析东方财富数据
eastmoney_data = self.eastmoney_api.parse_stock_data(stock_item)
if not eastmoney_data:
continue
symbol = eastmoney_data['symbol']
if not skip_futu_details:
print(f"📈 处理第 {i}/{len(all_stocks)}: {symbol} - {eastmoney_data['name']}")
elif i % 100 == 0:
print(f"📈 处理进度 {i}/{len(all_stocks)}...")
# 整合数据
integrated_item = {
'rank': i,
'symbol': symbol,
'name': eastmoney_data['name'],
'eastmoney_price': eastmoney_data['current_price'],
'eastmoney_change': eastmoney_data['change_amount'],
'eastmoney_change_ratio': eastmoney_data['change_ratio'],
'market_cap': eastmoney_data['market_cap'],
'high_price': eastmoney_data['high_price'],
'low_price': eastmoney_data['low_price'],
'open_price': eastmoney_data['open_price'],
'prev_close': eastmoney_data['prev_close'],
'timestamp': int(time.time()),
'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
# 默认空值
'futu_before_open_price': '',
'futu_before_open_change': '',
'futu_before_open_change_ratio': '',
'futu_current_price': '',
'futu_current_change_ratio': ''
}
# 获取富途数据(仅在数量较少时)
if not skip_futu_details:
futu_data = self.get_futu_stock_details(symbol)
if futu_data:
integrated_item.update({
'futu_before_open_price': futu_data['before_open_price'],
'futu_before_open_change': futu_data['before_open_change'],
'futu_before_open_change_ratio': futu_data['before_open_change_ratio'],
'futu_current_price': futu_data['current_price'],
'futu_current_change_ratio': futu_data['current_change_ratio']
})
# 添加延迟
time.sleep(0.5)
integrated_data.append(integrated_item)
except Exception as e:
print(f"❌ 处理股票 {i} 失败: {e}")
continue
print(f"✅ 成功整合 {len(integrated_data)} 只股票数据")
return integrated_data
def save_to_csv(self, integrated_data, filename=None):
"""保存整合数据到CSV文件"""
if not integrated_data:
print("❌ 没有数据可保存")
return
if not filename:
timestamp = int(time.time())
filename = f"futu_{timestamp}.csv"
try:
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = [
'rank', 'symbol', 'name', 'timestamp', 'datetime',
'eastmoney_price', 'eastmoney_change', 'eastmoney_change_ratio',
'market_cap', 'high_price', 'low_price', 'open_price', 'prev_close',
'futu_before_open_price', 'futu_before_open_change', 'futu_before_open_change_ratio',
'futu_current_price', 'futu_current_change_ratio'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for item in integrated_data:
writer.writerow(item)
print(f"✅ 数据已保存到: {filename}")
except Exception as e:
print(f"❌ 保存CSV文件失败: {e}")
def main():
"""主函数"""
# 初始化日志
init_logging()
parser = argparse.ArgumentParser(description='富途牛牛股票价格数据抓取工具')
parser.add_argument('--url', '-u', help='股票页面URL')
parser.add_argument('--html', '-f', help='本地HTML文件路径')
parser.add_argument('--output', '-o', help='输出CSV文件名')
parser.add_argument('--test', '-t', action='store_true', help='使用示例HTML测试')
parser.add_argument('--js', '-j', action='store_true', help='解析JavaScript数据window.__INITIAL_STATE__')
parser.add_argument('--top50', action='store_true', help='获取美股市值前N名数据整合东方财富和富途数据')
parser.add_argument('--all', action='store_true', help='获取所有美股数据(注意:数量巨大,默认跳过富途详情)')
parser.add_argument('--limit', type=int, default=50, help='指定获取股票的数量默认为50')
parser.add_argument('--eastmoney-only', action='store_true', help='仅使用东方财富数据,不获取富途数据')
args = parser.parse_args()
# 如果使用top50模式 或 all模式
if args.top50 or args.all:
limit = args.limit
if args.all:
print("🚀 启动全量美股数据获取模式...")
else:
print(f"🚀 启动美股市值前{limit}名数据获取模式...")
integrator = StockDataIntegrator()
if args.eastmoney_only:
print("📊 仅获取东方财富数据...")
eastmoney_api = EastMoneyAPI()
if args.all:
# 获取所有
_, total = eastmoney_api.get_us_stocks(page_size=1)
limit = total
print(f"📊 准备获取全部 {total} 只股票...")
# 分页获取
all_stocks = []
page_size = 100
total_pages = (limit + page_size - 1) // page_size
for page in range(1, total_pages + 1):
current_limit = min(page_size, limit - (page-1)*page_size)
if current_limit <= 0: break
stocks, _ = eastmoney_api.get_us_stocks(page_size=page_size, page_index=page)
if stocks:
all_stocks.extend(stocks)
print(f"📥 已获取 {len(all_stocks)}/{limit}...")
time.sleep(0.2)
if all_stocks:
integrated_data = []
for i, stock_item in enumerate(all_stocks, 1):
eastmoney_data = eastmoney_api.parse_stock_data(stock_item)
if eastmoney_data:
item = {
'rank': i,
'symbol': eastmoney_data['symbol'],
'name': eastmoney_data['name'],
'eastmoney_price': eastmoney_data['current_price'],
'eastmoney_change': eastmoney_data['change_amount'],
'eastmoney_change_ratio': eastmoney_data['change_ratio'],
'market_cap': eastmoney_data['market_cap'],
'high_price': eastmoney_data['high_price'],
'low_price': eastmoney_data['low_price'],
'open_price': eastmoney_data['open_price'],
'prev_close': eastmoney_data['prev_close'],
'timestamp': int(time.time()),
'datetime': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'futu_before_open_price': '',
'futu_before_open_change': '',
'futu_before_open_change_ratio': '',
'futu_current_price': '',
'futu_current_change_ratio': ''
}
integrated_data.append(item)
# 保存数据
output_file = args.output if args.output else None
integrator.save_to_csv(integrated_data, output_file)
else:
print("❌ 无法获取东方财富数据")
else:
# 获取整合数据
integrated_data = integrator.get_top50_integrated_data(limit, fetch_all=args.all)
if integrated_data:
output_file = args.output if args.output else None
integrator.save_to_csv(integrated_data, output_file)
else:
print("❌ 无法获取整合数据")
return
# 原有的单股票模式
# 创建解析器实例
futu_parser = FutuStockParser()
html_content = None
if args.test:
if args.js:
# 使用示例JavaScript数据进行测试
test_html = '''
<script>
window.__INITIAL_STATE__ = {
"stock_info": {
"before_open_stock_info": {
"price": "253.560",
"change": "-0.471",
"changeRatio": "-0.19%"
},
"data": {
"price": "254.031",
"changeRatio": "+4.02%"
}
}
};
</script>
'''
html_content = test_html
print("🧪 使用示例JavaScript数据进行测试...")
else:
# 使用示例HTML进行测试包含盘后数据
test_html = '''
<ul class="flex-end price-current" data-v-6afeb239>
<li class="mg-r-8 price direct-up" data-v-6afeb239>
253.740
<i class="icon-direct-status icon-direct-up" data-v-6afeb239></i>
</li>
<li class="change direct-up" data-v-6afeb239>
<span class="change-price" data-v-6afeb239>+9.520</span>
<span class="mg-l-8 change-ratio" data-v-6afeb239>+3.90%</span>
</li>
</ul>
<div class="disc-info" data-v-6afeb239>
<ul class="flex-end price-current" data-v-6afeb239>
<li class="mg-r-8 disc-price direct-down" data-v-6afeb239>253.516</li>
<li class="direct-down" data-v-6afeb239>
<span data-v-6afeb239>-0.515</span>
<span class="mg-l-8" data-v-6afeb239>-0.20%</span>
</li>
</ul>
<div class="status" data-v-6afeb239>盘后 16:14 (美东)</div>
</div>
'''
html_content = test_html
print("🧪 使用示例HTML进行测试包含盘后数据...")
elif args.html:
# 从本地HTML文件读取
try:
with open(args.html, 'r', encoding='utf-8') as f:
html_content = f.read()
print(f"📁 从本地文件读取: {args.html}")
except Exception as e:
print(f"❌ 读取本地文件失败: {e}")
return
elif args.url:
# 从URL获取
print(f"🌐 正在获取页面: {args.url}")
html_content = futu_parser.fetch_stock_page(args.url)
else:
# 默认使用原有的AMZN URL
default_url = 'https://www.futunn.com/stock/AMZN-US?global_content=%7B%22promote_id%22%3A13766,%22sub_promote_id%22%3A36,%22invite%22%3A%2210237865%22,%22promote_content%22%3A%22nn%3Afeed%3A115061320123972%22,%22f%22%3A%22q.futunn.com%2Ffeed%2F115061320123972%22%7D&chain_id=KcFts02dZGw_d-.1kgi5g0'
print(f"🌐 使用默认URL获取AMZN股票数据...")
html_content = futu_parser.fetch_stock_page(default_url)
if not html_content:
print("❌ 无法获取HTML内容")
return
# 根据参数选择解析方式
if args.js:
# 解析JavaScript数据
print("🔍 正在解析JavaScript数据...")
js_data = futu_parser.parse_javascript_data(html_content)
if js_data:
print("\n📊 JavaScript解析结果:")
print(f"盘前价格: {js_data['before_open_price']}")
print(f"盘前涨跌额: {js_data['before_open_change']}")
print(f"盘前涨跌幅: {js_data['before_open_change_ratio']}")
print(f"当前价格: {js_data['current_price']}")
print(f"当前涨跌幅: {js_data['current_change_ratio']}")
print(f"时间: {js_data['datetime']}")
# 保存到CSV使用Unix时间戳命名
if args.output:
output_file = args.output
else:
timestamp = int(time.time())
output_file = f"futu_{timestamp}.csv"
futu_parser.save_to_csv_js(js_data, output_file)
else:
print("❌ JavaScript数据解析失败")
else:
# 解析HTML价格数据
print("🔍 正在解析HTML价格数据...")
price_data = futu_parser.parse_price_data(html_content)
if price_data:
print("\n📊 HTML解析结果:")
print(f"当前价格: {price_data['current_price']}")
print(f"涨跌额: {price_data['change_price']}")
print(f"涨跌幅: {price_data['change_ratio']}")
print(f"方向: {price_data['direction']}")
print(f"时间: {price_data['datetime']}")
# 显示盘后数据(如果存在)
if price_data.get('after_hours_price'):
print("\n🌙 盘后交易数据:")
print(f"盘后价格: {price_data['after_hours_price']}")
print(f"盘后涨跌额: {price_data['after_hours_change']}")
print(f"盘后涨跌幅: {price_data['after_hours_ratio']}")
print(f"盘后方向: {price_data['after_hours_direction']}")
print(f"盘后状态: {price_data['after_hours_status']}")
# 保存到CSV
output_file = args.output if args.output else None
futu_parser.save_to_csv(price_data, output_file)
else:
print("❌ HTML数据解析失败未能提取到价格数据")
if __name__ == "__main__":
main()