Files
ai_mip/app.py

1477 lines
52 KiB
Python
Raw Permalink Normal View History

2026-02-24 12:46:35 +08:00
from flask import Flask, request, jsonify, send_from_directory, redirect, Response, session
2026-01-13 18:59:26 +08:00
from flask_cors import CORS
from loguru import logger
2026-02-24 12:46:35 +08:00
from functools import wraps
2026-01-13 18:59:26 +08:00
import sys
2026-02-24 12:46:35 +08:00
import csv
import io
import os
import logging
import secrets
import threading
import uuid
2026-01-13 18:59:26 +08:00
from pathlib import Path
2026-02-24 12:46:35 +08:00
from datetime import datetime
import pandas as pd
2026-01-13 18:59:26 +08:00
from config import Config
2026-02-24 12:46:35 +08:00
from log_manager import LogManager
from config_manager import ConfigManager
from db_manager import (
EnhancedSiteManager, EnhancedClickManager,
EnhancedInteractionManager, EnhancedStatisticsManager
)
# 导入任务进度存储(内存)
import_tasks = {}
# 固定账号密码
AUTH_USERNAME = 'admin'
AUTH_PASSWORD = 'admin@123'
2026-01-13 18:59:26 +08:00
# 配置日志
Config.ensure_dirs()
logger.remove()
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO"
)
logger.add(
2026-02-24 12:46:35 +08:00
Path(Config.LOG_DIR) / "scheduler_{time:YYYY-MM-DD}.log",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}",
rotation="00:00",
2026-01-13 18:59:26 +08:00
retention="10 days",
encoding="utf-8",
level="DEBUG"
)
2026-02-24 12:46:35 +08:00
# 禁用Flask/werkzeug的HTTP请求日志
logging.getLogger('werkzeug').setLevel(logging.ERROR)
2026-01-13 18:59:26 +08:00
# 创建Flask应用
app = Flask(__name__, static_folder='static', static_url_path='')
2026-02-24 12:46:35 +08:00
app.secret_key = secrets.token_hex(32) # 生成随机密钥用于session
CORS(app, supports_credentials=True)
2026-01-13 18:59:26 +08:00
2026-02-24 12:46:35 +08:00
# 登录验证装饰器
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 点击服务模式下跳过登录验证内部API
if Config.SERVICE_MODE == 'click':
return f(*args, **kwargs)
if not session.get('logged_in'):
return jsonify({'success': False, 'message': '未登录', 'code': 401}), 401
return f(*args, **kwargs)
return decorated_function
# Web系统不初始化调度器调度器通过main.py独立运行
# 创建数据管理器(远程模式下也需要访问数据库)
from data_manager import DataManager
data_manager = DataManager()
# 创建管理器实例
log_manager = LogManager()
config_manager = ConfigManager()
site_manager = EnhancedSiteManager()
click_manager = EnhancedClickManager()
interaction_manager = EnhancedInteractionManager()
stats_manager = EnhancedStatisticsManager()
2026-01-13 18:59:26 +08:00
@app.route('/')
def index():
2026-02-24 12:46:35 +08:00
"""首页"""
if Config.SERVICE_MODE == 'click':
# 点击服务模式只返回API状态
return jsonify({
'service': 'MIP Click Service',
'mode': 'click',
'status': 'running',
'endpoints': ['/api/scheduler/start', '/api/scheduler/stop', '/api/scheduler/status']
})
# Web服务模式重定向到前端页面
2026-01-16 22:06:46 +08:00
return redirect('/app.html')
2026-01-13 18:59:26 +08:00
2026-02-24 12:46:35 +08:00
# ==================== 登录相关API ====================
@app.route('/api/auth/login', methods=['POST'])
def login():
"""用户登录"""
try:
data = request.get_json()
username = data.get('username', '')
password = data.get('password', '')
if username == AUTH_USERNAME and password == AUTH_PASSWORD:
session['logged_in'] = True
session['username'] = username
logger.info(f"用户登录成功: {username}")
return jsonify({'success': True, 'message': '登录成功', 'data': {'username': username}})
else:
logger.warning(f"登录失败,用户名或密码错误: {username}")
return jsonify({'success': False, 'message': '用户名或密码错误'}), 401
except Exception as e:
logger.error(f"登录异常: {str(e)}")
return jsonify({'success': False, 'message': f'登录异常: {str(e)}'}), 500
@app.route('/api/auth/logout', methods=['POST'])
def logout():
"""用户登出"""
try:
username = session.get('username', 'unknown')
session.clear()
logger.info(f"用户登出: {username}")
return jsonify({'success': True, 'message': '已退出登录'})
except Exception as e:
logger.error(f"登出异常: {str(e)}")
return jsonify({'success': False, 'message': f'登出异常: {str(e)}'}), 500
@app.route('/api/auth/check', methods=['GET'])
def check_auth():
"""检查登录状态"""
if session.get('logged_in'):
return jsonify({
'success': True,
'data': {
'logged_in': True,
'username': session.get('username', '')
}
})
else:
return jsonify({
'success': True,
'data': {
'logged_in': False
}
})
2026-01-13 18:59:26 +08:00
@app.route('/health', methods=['GET'])
def health():
"""健康检查"""
return jsonify({'status': 'ok', 'message': '服务运行正常'})
@app.route('/api/urls', methods=['POST'])
2026-02-24 12:46:35 +08:00
@login_required
2026-01-13 18:59:26 +08:00
def add_urls():
"""添加URL支持单个或批量"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'message': '请求数据为空'}), 400
# 支持单个URL或URL列表
if 'url' in data:
# 单个URL
url = data['url']
if not url:
return jsonify({'success': False, 'message': 'URL不能为空'}), 400
2026-02-24 12:46:35 +08:00
success = data_manager.add_url(url)
2026-01-13 18:59:26 +08:00
if success:
return jsonify({'success': True, 'message': '添加成功'})
else:
return jsonify({'success': False, 'message': 'URL已存在或添加失败'}), 400
elif 'urls' in data:
# 批量URL
urls = data['urls']
if not isinstance(urls, list) or not urls:
return jsonify({'success': False, 'message': 'URLs必须是非空列表'}), 400
2026-02-24 12:46:35 +08:00
count = data_manager.add_urls(urls)
2026-01-13 18:59:26 +08:00
return jsonify({
'success': True,
'message': f'成功添加 {count}/{len(urls)} 个URL',
'added_count': count,
'total_count': len(urls)
})
else:
return jsonify({'success': False, 'message': '请提供url或urls参数'}), 400
except Exception as e:
logger.error(f"添加URL异常: {str(e)}")
return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500
@app.route('/api/urls', methods=['GET'])
def get_urls():
"""获取所有URL列表"""
try:
2026-02-24 12:46:35 +08:00
urls = data_manager.get_all_urls()
2026-01-13 18:59:26 +08:00
return jsonify({'success': True, 'data': urls})
except Exception as e:
logger.error(f"获取URL列表异常: {str(e)}")
return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500
@app.route('/api/urls/<path:url>', methods=['GET'])
def get_url_detail(url: str):
"""获取URL详细信息"""
try:
2026-02-24 12:46:35 +08:00
url_info = data_manager.get_url_detail(url)
2026-01-13 18:59:26 +08:00
if url_info:
return jsonify({'success': True, 'data': url_info})
else:
return jsonify({'success': False, 'message': 'URL不存在'}), 404
except Exception as e:
logger.error(f"获取URL详情异常: {str(e)}")
return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500
@app.route('/api/urls/<path:url>', methods=['DELETE'])
2026-02-24 12:46:35 +08:00
@login_required
2026-01-13 18:59:26 +08:00
def delete_url(url: str):
"""删除URL"""
try:
2026-02-24 12:46:35 +08:00
success = data_manager.delete_url(url)
2026-01-13 18:59:26 +08:00
if success:
return jsonify({'success': True, 'message': '删除成功'})
else:
return jsonify({'success': False, 'message': 'URL不存在'}), 404
except Exception as e:
logger.error(f"删除URL异常: {str(e)}")
return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500
@app.route('/api/urls/<path:url>/reset', methods=['POST'])
2026-02-24 12:46:35 +08:00
@login_required
2026-01-13 18:59:26 +08:00
def reset_url(url: str):
"""重置URL重新开始点击"""
try:
2026-02-24 12:46:35 +08:00
success = data_manager.reset_url(url)
2026-01-13 18:59:26 +08:00
if success:
return jsonify({'success': True, 'message': '重置成功'})
else:
return jsonify({'success': False, 'message': 'URL不存在'}), 404
except Exception as e:
logger.error(f"重置URL异常: {str(e)}")
return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500
@app.route('/api/statistics', methods=['GET'])
def get_statistics():
"""获取统计数据"""
try:
2026-02-24 12:46:35 +08:00
stats = data_manager.get_statistics()
2026-01-13 18:59:26 +08:00
return jsonify({'success': True, 'data': stats})
except Exception as e:
logger.error(f"获取统计数据异常: {str(e)}")
return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500
@app.route('/api/scheduler/start', methods=['POST'])
def start_scheduler():
2026-02-24 12:46:35 +08:00
"""启动调度器 - 需手动运行main.py"""
return jsonify({'success': False, 'message': '请在服务器手动运行 python main.py 启动调度器'})
2026-01-13 18:59:26 +08:00
@app.route('/api/scheduler/stop', methods=['POST'])
def stop_scheduler():
2026-02-24 12:46:35 +08:00
"""停止调度器 - 需手动停止"""
return jsonify({'success': False, 'message': '请在服务器手动停止调度器进程'})
2026-01-13 18:59:26 +08:00
@app.route('/api/scheduler/status', methods=['GET'])
def get_scheduler_status():
2026-02-24 12:46:35 +08:00
"""获取调度器状态 - Web端不管理调度器"""
return jsonify({'success': True, 'data': {'status': 'manual', 'message': '调度器需手动管理'}})
2026-01-13 18:59:26 +08:00
# AdsPower 接口调试
2026-01-16 22:06:46 +08:00
@app.route('/api/adspower/groups', methods=['GET'])
def adspower_list_groups():
"""查询分组列表"""
try:
from adspower_client import AdsPowerClient
group_name = request.args.get('group_name')
page = request.args.get('page', 1, type=int)
page_size = request.args.get('page_size', 2000, type=int)
client = AdsPowerClient()
result = client.list_groups(group_name=group_name, page=page, page_size=page_size)
if result:
return jsonify({'success': True, 'data': result})
else:
return jsonify({'success': False, 'message': '查询分组列表失败'}), 500
except Exception as e:
logger.error(f"AdsPower查询分组异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/adspower/group/env', methods=['GET'])
def adspower_get_group_by_env():
"""根据当前运行环境获取对应的分组ID"""
try:
from adspower_client import AdsPowerClient
client = AdsPowerClient()
group_id = client.get_group_by_env()
if group_id:
return jsonify({'success': True, 'data': {'group_id': group_id, 'env': Config.ENV}})
else:
return jsonify({'success': False, 'message': f'未找到对应环境的分组'}), 404
except Exception as e:
logger.error(f"AdsPower获取环境分组异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
2026-01-13 18:59:26 +08:00
@app.route('/api/adspower/profiles', methods=['GET'])
def adspower_list_profiles():
2026-01-16 22:06:46 +08:00
"""查询Profile列表支持多个查询参数"""
2026-01-13 18:59:26 +08:00
try:
from adspower_client import AdsPowerClient
2026-01-16 22:06:46 +08:00
import json as json_module
# 获取查询参数
group_id = request.args.get('group_id')
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', type=int) # 可选
page_size = request.args.get('page_size', type=int) # 可选
# 数组参数JSON格式
profile_id = request.args.get('profile_id')
profile_no = request.args.get('profile_no')
# 解析JSON数组
if profile_id:
try:
profile_id = json_module.loads(profile_id)
except:
profile_id = None
if profile_no:
try:
profile_no = json_module.loads(profile_no)
except:
profile_no = None
# 排序参数
sort_type = request.args.get('sort_type')
sort_order = request.args.get('sort_order')
# 如果没有指定group_id尝试根据环境自动获取
2026-01-13 18:59:26 +08:00
client = AdsPowerClient()
2026-01-16 22:06:46 +08:00
if not group_id:
group_id = client.get_group_by_env()
if group_id:
logger.info(f"自动获取到分组ID: {group_id}")
# 查询Profile列表
result = client.list_profiles(
group_id=group_id,
page=page,
page_size=page_size if page_size else 100,
profile_id=profile_id,
profile_no=profile_no,
limit=limit,
sort_type=sort_type,
sort_order=sort_order
)
if result:
return jsonify({'success': True, 'data': result})
else:
return jsonify({'success': False, 'message': '查询Profile列表失败'}), 500
2026-01-13 18:59:26 +08:00
except Exception as e:
logger.error(f"AdsPower查询Profile异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/adspower/browser/start', methods=['POST'])
def adspower_start_browser():
"""启动浏览器"""
try:
from adspower_client import AdsPowerClient
data = request.get_json() or {}
user_id = data.get('user_id')
client = AdsPowerClient()
result = client.start_browser(user_id=user_id)
return jsonify({'success': True, 'data': result})
except Exception as e:
logger.error(f"AdsPower启动浏览器异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/adspower/browser/stop', methods=['POST'])
def adspower_stop_browser():
"""停止浏览器"""
try:
from adspower_client import AdsPowerClient
data = request.get_json() or {}
user_id = data.get('user_id')
client = AdsPowerClient()
result = client.stop_browser(user_id=user_id)
return jsonify({'success': True, 'data': result})
except Exception as e:
logger.error(f"AdsPower停止浏览器异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/adspower/proxy/damai', methods=['GET'])
def adspower_get_damai_proxy():
"""获取大麦IP代理"""
try:
from adspower_client import AdsPowerClient
client = AdsPowerClient()
result = client.get_damai_proxy()
return jsonify({'success': True, 'data': result})
except Exception as e:
logger.error(f"获取大麦IP异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/adspower/proxy/create', methods=['POST'])
def adspower_create_proxy():
"""创建代理"""
try:
from adspower_client import AdsPowerClient
data = request.get_json() or {}
proxy_config = data.get('proxy_config')
if not proxy_config:
return jsonify({'success': False, 'message': '缺少代理配置'}), 400
client = AdsPowerClient()
proxy_id = client.create_proxy(proxy_config)
return jsonify({'success': True, 'data': {'proxy_id': proxy_id}})
except Exception as e:
logger.error(f"AdsPower创建代理异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/adspower/proxy/list', methods=['GET'])
def adspower_list_proxies():
"""查询代理列表"""
try:
from adspower_client import AdsPowerClient
page = request.args.get('page', 1, type=int)
limit = request.args.get('limit', 100, type=int)
client = AdsPowerClient()
result = client.list_proxies(page=page, limit=limit)
if result is None:
return jsonify({'success': False, 'message': '查询代理列表失败请检查AdsPower是否运行'}), 500
return jsonify({'success': True, 'data': result})
except Exception as e:
logger.error(f"AdsPower查询代理列表异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/adspower/profile/update', methods=['POST'])
def adspower_update_profile():
"""更新Profile代理API v2方式"""
try:
from adspower_client import AdsPowerClient
data = request.get_json() or {}
profile_id = data.get('profile_id')
proxy_id = data.get('proxy_id')
if not profile_id or not proxy_id:
return jsonify({'success': False, 'message': '缺少profile_id或proxy_id'}), 400
client = AdsPowerClient()
result = client.update_profile_proxy(profile_id, proxy_id)
return jsonify({'success': True, 'data': {'updated': result}})
except Exception as e:
logger.error(f"AdsPower更新Profile异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/adspower/profile/update-v1', methods=['POST'])
def adspower_update_profile_v1():
"""更新Profile代理API v1方式直接传入proxy_config"""
try:
from adspower_client import AdsPowerClient
data = request.get_json() or {}
profile_id = data.get('profile_id')
proxy_config = data.get('proxy_config')
if not profile_id or not proxy_config:
return jsonify({'success': False, 'message': '缺少profile_id或proxy_config'}), 400
# 验证必要字段
if 'proxy_host' not in proxy_config or 'proxy_port' not in proxy_config:
return jsonify({'success': False, 'message': 'proxy_config中缺少proxy_host或proxy_port'}), 400
client = AdsPowerClient()
result = client.update_profile_proxy_v1(profile_id, proxy_config)
return jsonify({'success': True, 'data': {'updated': result}})
except Exception as e:
logger.error(f"AdsPower更新Profile异常 (v1): {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
2026-01-16 22:06:46 +08:00
# 数据库查询接口
@app.route('/api/clicks', methods=['GET'])
def get_all_clicks():
"""获取所有点击记录"""
try:
from db_manager import ClickManager
site_id = request.args.get('site_id', type=int)
limit = request.args.get('limit', 100, type=int)
click_mgr = ClickManager()
if site_id:
clicks = click_mgr.get_clicks_by_site(site_id, limit=limit)
else:
# 获取所有点击记录
conn = click_mgr.get_connection()
cursor = conn.cursor()
cursor.execute(f"""
SELECT * FROM ai_mip_click
ORDER BY click_time DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
conn.close()
clicks = [click_mgr._dict_from_row(row) for row in rows]
return jsonify({'success': True, 'data': clicks})
except Exception as e:
logger.error(f"查询点击记录异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/interactions', methods=['GET'])
def get_all_interactions():
"""获取所有互动记录"""
try:
from db_manager import InteractionManager
site_id = request.args.get('site_id', type=int)
limit = request.args.get('limit', 100, type=int)
interaction_mgr = InteractionManager()
if site_id:
interactions = interaction_mgr.get_interactions_by_site(site_id, limit=limit)
else:
# 获取所有互动记录
conn = interaction_mgr.get_connection()
cursor = conn.cursor()
cursor.execute(f"""
SELECT * FROM ai_mip_interaction
ORDER BY interaction_time DESC
LIMIT ?
""", (limit,))
rows = cursor.fetchall()
conn.close()
interactions = [interaction_mgr._dict_from_row(row) for row in rows]
return jsonify({'success': True, 'data': interactions})
except Exception as e:
logger.error(f"查询互动记录异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
2026-02-24 12:46:35 +08:00
# ==================== 日志管理API ====================
@app.route('/api/logs/stream', methods=['GET'])
def get_logs_stream():
"""获取最新日志(用于实时流)"""
try:
limit = request.args.get('limit', 50, type=int)
level = request.args.get('level', 'ALL')
logs = log_manager.get_latest_logs(limit=limit, level=level)
return jsonify({'success': True, 'data': logs})
except Exception as e:
logger.error(f"获取日志流异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/logs/list', methods=['GET'])
def get_logs_list():
"""分页查询日志"""
try:
page = request.args.get('page', 1, type=int)
page_size = request.args.get('page_size', 100, type=int)
level = request.args.get('level', 'ALL')
keyword = request.args.get('keyword', '')
start_time = request.args.get('start_time', '')
end_time = request.args.get('end_time', '')
logs, total = log_manager.search_logs(
keyword=keyword,
level=level,
start_time=start_time,
end_time=end_time,
page=page,
page_size=page_size
)
return jsonify({
'success': True,
'data': {
'logs': logs,
'total': total,
'page': page,
'page_size': page_size
}
})
except Exception as e:
logger.error(f"查询日志列表异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/logs/files', methods=['GET'])
def get_log_files():
"""获取日志文件列表"""
try:
files = log_manager.get_log_files()
return jsonify({'success': True, 'data': files})
except Exception as e:
logger.error(f"获取日志文件列表异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/logs/stats', methods=['GET'])
def get_log_stats():
"""获取日志统计信息"""
try:
stats = log_manager.get_log_stats()
return jsonify({'success': True, 'data': stats})
except Exception as e:
logger.error(f"获取日志统计异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
# ==================== 配置管理API ====================
@app.route('/api/config', methods=['GET'])
def get_config():
"""获取当前配置"""
try:
config = config_manager.get_current_config()
return jsonify({'success': True, 'data': config})
except Exception as e:
logger.error(f"获取配置异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/config/update', methods=['POST'])
@login_required
def update_config():
"""更新配置"""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'message': '请求数据为空'}), 400
result = config_manager.update_config(data)
return jsonify(result)
except Exception as e:
logger.error(f"更新配置异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/config/schema', methods=['GET'])
def get_config_schema():
"""获取配置项定义"""
try:
schema = config_manager.get_config_schema()
return jsonify({'success': True, 'data': schema})
except Exception as e:
logger.error(f"获取配置定义异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
# ==================== 图表统计API ====================
@app.route('/api/statistics/trend', methods=['GET'])
def get_statistics_trend():
"""获取点击趋势数据"""
try:
days = request.args.get('days', 7, type=int)
data = stats_manager.get_click_trend(days=days)
return jsonify({'success': True, 'data': data})
except Exception as e:
logger.error(f"获取点击趋势异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/statistics/hourly', methods=['GET'])
def get_statistics_hourly():
"""获取时段分布数据"""
try:
data = stats_manager.get_hourly_distribution()
return jsonify({'success': True, 'data': data})
except Exception as e:
logger.error(f"获取时段分布异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/statistics/top-sites', methods=['GET'])
def get_top_sites():
"""获取Top活跃站点"""
try:
limit = request.args.get('limit', 10, type=int)
data = stats_manager.get_top_sites(limit=limit)
return jsonify({'success': True, 'data': data})
except Exception as e:
logger.error(f"获取Top站点异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/statistics/reply-rate', methods=['GET'])
def get_reply_rate():
"""获取回复率分布"""
try:
data = stats_manager.get_reply_rate_distribution()
return jsonify({'success': True, 'data': data})
except Exception as e:
logger.error(f"获取回复率分布异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
# ==================== 增强的分页查询API ====================
@app.route('/api/sites/paginated', methods=['GET'])
def get_sites_paginated():
"""分页获取站点列表"""
try:
page = request.args.get('page', 1, type=int)
page_size = request.args.get('page_size', 20, type=int)
status = request.args.get('status', '')
keyword = request.args.get('keyword', '')
sort_by = request.args.get('sort_by', 'created_at')
sort_order = request.args.get('sort_order', 'desc')
sites, total = site_manager.get_sites_paginated(
page=page,
page_size=page_size,
status=status if status else None,
keyword=keyword if keyword else None,
sort_by=sort_by,
sort_order=sort_order
)
return jsonify({
'success': True,
'data': {
'items': sites,
'total': total,
'page': page,
'page_size': page_size
}
})
except Exception as e:
logger.error(f"分页查询站点异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/clicks/paginated', methods=['GET'])
def get_clicks_paginated():
"""分页获取点击记录"""
try:
page = request.args.get('page', 1, type=int)
page_size = request.args.get('page_size', 20, type=int)
site_id = request.args.get('site_id', type=int)
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
sort_by = request.args.get('sort_by', 'click_time')
sort_order = request.args.get('sort_order', 'desc')
clicks, total = click_manager.get_clicks_paginated(
page=page,
page_size=page_size,
site_id=site_id,
start_date=start_date if start_date else None,
end_date=end_date if end_date else None,
sort_by=sort_by,
sort_order=sort_order
)
return jsonify({
'success': True,
'data': {
'items': clicks,
'total': total,
'page': page,
'page_size': page_size
}
})
except Exception as e:
logger.error(f"分页查询点击记录异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/interactions/paginated', methods=['GET'])
def get_interactions_paginated():
"""分页获取互动记录"""
try:
page = request.args.get('page', 1, type=int)
page_size = request.args.get('page_size', 20, type=int)
site_id = request.args.get('site_id', type=int)
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
status = request.args.get('status', '')
sort_by = request.args.get('sort_by', 'interaction_time')
sort_order = request.args.get('sort_order', 'desc')
interactions, total = interaction_manager.get_interactions_paginated(
page=page,
page_size=page_size,
site_id=site_id,
start_date=start_date if start_date else None,
end_date=end_date if end_date else None,
status=status if status else None,
sort_by=sort_by,
sort_order=sort_order
)
return jsonify({
'success': True,
'data': {
'items': interactions,
'total': total,
'page': page,
'page_size': page_size
}
})
except Exception as e:
logger.error(f"分页查询互动记录异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
# ==================== 批量操作API ====================
def _run_import_task(task_id: str, df, params: dict):
"""后台执行导入任务"""
global import_tasks
try:
task = import_tasks[task_id]
task['status'] = 'running'
# 获取参数
url_column = params['url_column']
query_word = params.get('query_word')
site_dimension = params.get('site_dimension')
frequency = params.get('frequency', 1)
time_start = params.get('time_start', '09:00:00')
time_end = params.get('time_end', '21:00:00')
interval_minutes = params.get('interval_minutes', 30)
site_manager = EnhancedSiteManager()
total = len(df)
# 批量获取已存在的URL优化查询
all_urls = [str(row.get(url_column, '')).strip() for _, row in df.iterrows()
if not pd.isna(row.get(url_column)) and str(row.get(url_column, '')).strip()]
existing_urls = set(site_manager.get_existing_urls(all_urls) if hasattr(site_manager, 'get_existing_urls') else [])
for idx, row in df.iterrows():
if task.get('cancelled'):
task['status'] = 'cancelled'
return
try:
# 更新进度
task['current'] = idx + 1
task['progress'] = int((idx + 1) / total * 100)
# 获取链接
site_url = row.get(url_column, None)
# 跳过空链接
if pd.isna(site_url) or not site_url or str(site_url).strip() == '':
task['stats']['skipped'] += 1
continue
site_url = str(site_url).strip()
# 验证URL格式
if not (site_url.startswith('http://') or site_url.startswith('https://')):
task['stats']['skipped'] += 1
continue
# 检查是否已存在(使用预查询结果)
if site_url in existing_urls:
task['stats']['duplicate'] += 1
continue
# 获取站点名称
site_name = None
for col in ['医生', '名称', 'name', 'Name', 'site_name', '站点名称']:
if col in df.columns:
site_name = row.get(col)
break
if pd.isna(site_name) or not site_name:
site_name = site_url
else:
site_name = str(site_name).strip()
# 获取查询词
row_query_word = None
for col in ['查询词', 'query_word', 'keyword', '关键词']:
if col in df.columns:
row_query_word = row.get(col)
break
if pd.isna(row_query_word) or not row_query_word:
row_query_word = query_word
else:
row_query_word = str(row_query_word).strip()
# 获取维度
row_dimension = None
for col in ['维度', 'dimension', 'site_dimension', '分类']:
if col in df.columns:
row_dimension = row.get(col)
break
if pd.isna(row_dimension) or not row_dimension:
row_dimension = site_dimension
else:
row_dimension = str(row_dimension).strip()
# 插入数据库
site_id = site_manager.add_site(
site_url=site_url,
site_name=site_name,
site_dimension=row_dimension,
query_word=row_query_word,
frequency=frequency,
time_start=time_start,
time_end=time_end,
interval_minutes=interval_minutes
)
if site_id:
task['stats']['success'] += 1
existing_urls.add(site_url) # 避免重复插入
else:
task['stats']['failed'] += 1
except Exception as e:
logger.error(f"处理第 {idx+1} 行失败: {str(e)}")
task['stats']['failed'] += 1
task['status'] = 'completed'
task['progress'] = 100
logger.info(f"导入任务 {task_id} 完成: {task['stats']}")
except Exception as e:
logger.error(f"导入任务 {task_id} 异常: {str(e)}")
import_tasks[task_id]['status'] = 'error'
import_tasks[task_id]['error'] = str(e)
@app.route('/api/sites/import', methods=['POST'])
@login_required
def import_sites_file():
"""从Excel或CSV文件导入站点异步处理"""
try:
if 'file' not in request.files:
return jsonify({'success': False, 'message': '请上传文件'}), 400
file = request.files['file']
if not file.filename:
return jsonify({'success': False, 'message': '文件名为空'}), 400
filename = file.filename.lower()
# 获取额外参数
params = {
'query_word': request.form.get('query_word', None),
'site_dimension': request.form.get('site_dimension', None),
'frequency': int(request.form.get('frequency', 1)),
'time_start': request.form.get('time_start', '09:00:00'),
'time_end': request.form.get('time_end', '21:00:00'),
'interval_minutes': int(request.form.get('interval_minutes', 30))
}
# 根据文件类型读取数据
if filename.endswith('.xlsx') or filename.endswith('.xls'):
df = pd.read_excel(file)
elif filename.endswith('.csv'):
df = pd.read_csv(file, encoding='utf-8-sig')
else:
return jsonify({'success': False, 'message': '请上传Excel(.xlsx/.xls)或CSV(.csv)格式文件'}), 400
logger.info(f"导入文件: {file.filename}, 共 {len(df)} 行数据")
# 检测URL列名
url_column = None
for col_name in ['链接', 'url', 'URL', 'link', 'Link', '网址', 'site_url']:
if col_name in df.columns:
url_column = col_name
break
if url_column is None:
url_column = df.columns[0]
params['url_column'] = url_column
# 创建导入任务
task_id = str(uuid.uuid4())[:8]
import_tasks[task_id] = {
'id': task_id,
'filename': file.filename,
'total': len(df),
'current': 0,
'progress': 0,
'status': 'pending',
'stats': {'total': len(df), 'success': 0, 'failed': 0, 'skipped': 0, 'duplicate': 0},
'created_at': datetime.now().isoformat()
}
# 启动后台线程
thread = threading.Thread(target=_run_import_task, args=(task_id, df, params))
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'message': '导入任务已启动',
'task_id': task_id,
'total': len(df)
})
except Exception as e:
logger.error(f"导入文件异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/sites/import/progress/<task_id>', methods=['GET'])
def get_import_progress(task_id: str):
"""获取导入任务进度"""
if task_id not in import_tasks:
return jsonify({'success': False, 'message': '任务不存在'}), 404
task = import_tasks[task_id]
return jsonify({
'success': True,
'data': {
'id': task['id'],
'filename': task['filename'],
'total': task['total'],
'current': task['current'],
'progress': task['progress'],
'status': task['status'],
'stats': task['stats'],
'error': task.get('error')
}
})
@app.route('/api/sites/import/cancel/<task_id>', methods=['POST'])
@login_required
def cancel_import_task(task_id: str):
"""取消导入任务"""
if task_id not in import_tasks:
return jsonify({'success': False, 'message': '任务不存在'}), 404
import_tasks[task_id]['cancelled'] = True
return jsonify({'success': True, 'message': '已发送取消请求'})
@app.route('/api/sites/import/tasks', methods=['GET'])
def list_import_tasks():
"""获取所有导入任务(包括进行中的)"""
# 按创建时间倒序只返回最近10个
tasks = sorted(import_tasks.values(), key=lambda x: x.get('created_at', ''), reverse=True)[:10]
# 只返回进行中和最近完成的任务
active_tasks = [t for t in tasks if t.get('status') in ['pending', 'running'] or
(t.get('status') == 'completed' and t.get('progress', 0) == 100)]
return jsonify({'success': True, 'data': active_tasks})
@app.route('/api/sites/<int:site_id>/status', methods=['PUT'])
@login_required
def update_site_status(site_id: int):
"""更新单个站点状态"""
try:
data = request.get_json()
status = data.get('status', '')
if status not in ['active', 'inactive']:
return jsonify({'success': False, 'message': '无效的状态值'}), 400
from db_manager import SiteManager
mgr = SiteManager()
success = mgr.update_site_status(site_id, status)
if success:
return jsonify({'success': True, 'message': '状态更新成功'})
else:
return jsonify({'success': False, 'message': '更新失败'}), 400
except Exception as e:
logger.error(f"更新站点状态异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/sites/batch/delete', methods=['POST'])
@login_required
def batch_delete_sites():
"""批量删除站点"""
try:
data = request.get_json()
site_ids = data.get('site_ids', [])
if not site_ids:
return jsonify({'success': False, 'message': '请选择要删除的站点'}), 400
deleted = site_manager.delete_sites_batch(site_ids)
return jsonify({
'success': True,
'message': f'成功删除 {deleted}/{len(site_ids)} 个站点',
'deleted_count': deleted
})
except Exception as e:
logger.error(f"批量删除站点异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/sites/batch/status', methods=['POST'])
@login_required
def batch_update_sites_status():
"""批量更新站点状态"""
try:
data = request.get_json()
site_ids = data.get('site_ids', [])
status = data.get('status', '')
if not site_ids or not status:
return jsonify({'success': False, 'message': '请提供站点ID和状态'}), 400
if status not in ['active', 'inactive']:
return jsonify({'success': False, 'message': '无效的状态值'}), 400
updated = site_manager.update_sites_status_batch(site_ids, status)
return jsonify({
'success': True,
'message': f'成功更新 {updated}/{len(site_ids)} 个站点',
'updated_count': updated
})
except Exception as e:
logger.error(f"批量更新站点状态异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
# ==================== 数据导出API ====================
@app.route('/api/export/sites', methods=['GET'])
def export_sites():
"""导出站点数据为CSV"""
try:
status = request.args.get('status', '')
keyword = request.args.get('keyword', '')
sites = site_manager.export_sites(
status=status if status else None,
keyword=keyword if keyword else None
)
# 生成CSV
output = io.StringIO()
writer = csv.writer(output)
# 写入表头
headers = ['ID', '站点URL', '站点名称', '状态', '点击次数', '回复次数',
'频次', '开始时间', '结束时间', '维度', '查询词', '创建时间']
writer.writerow(headers)
# 写入数据
for site in sites:
writer.writerow([
site.get('id', ''),
site.get('site_url', ''),
site.get('site_name', ''),
site.get('status', ''),
site.get('click_count', 0),
site.get('reply_count', 0),
site.get('frequency', ''),
site.get('time_start', ''),
site.get('time_end', ''),
site.get('site_dimension', ''),
site.get('query_word', ''),
site.get('created_at', '')
])
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={
'Content-Disposition': f'attachment; filename=sites_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv',
'Content-Type': 'text/csv; charset=utf-8-sig'
}
)
except Exception as e:
logger.error(f"导出站点数据异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/export/clicks', methods=['GET'])
def export_clicks():
"""导出点击记录为CSV"""
try:
site_id = request.args.get('site_id', type=int)
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
clicks = click_manager.export_clicks(
site_id=site_id,
start_date=start_date if start_date else None,
end_date=end_date if end_date else None
)
# 生成CSV
output = io.StringIO()
writer = csv.writer(output)
headers = ['ID', '站点ID', '站点名称', '站点URL', '点击时间',
'用户IP', '设备类型', '任务ID']
writer.writerow(headers)
for click in clicks:
writer.writerow([
click.get('id', ''),
click.get('site_id', ''),
click.get('site_name', ''),
click.get('site_url', ''),
click.get('click_time', ''),
click.get('user_ip', ''),
click.get('device_type', ''),
click.get('task_id', '')
])
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={
'Content-Disposition': f'attachment; filename=clicks_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv',
'Content-Type': 'text/csv; charset=utf-8-sig'
}
)
except Exception as e:
logger.error(f"导出点击记录异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/export/interactions', methods=['GET'])
def export_interactions():
"""导出互动记录为CSV"""
try:
site_id = request.args.get('site_id', type=int)
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
interactions = interaction_manager.export_interactions(
site_id=site_id,
start_date=start_date if start_date else None,
end_date=end_date if end_date else None
)
# 生成CSV
output = io.StringIO()
writer = csv.writer(output)
headers = ['ID', '站点ID', '站点名称', '站点URL', '互动时间',
'互动类型', '互动状态', '发送内容', '是否收到回复', '回复内容', '代理IP']
writer.writerow(headers)
for interaction in interactions:
writer.writerow([
interaction.get('id', ''),
interaction.get('site_id', ''),
interaction.get('site_name', ''),
interaction.get('site_url', ''),
interaction.get('interaction_time', ''),
interaction.get('interaction_type', ''),
interaction.get('interaction_status', ''),
interaction.get('reply_content', ''),
'' if interaction.get('response_received') else '',
interaction.get('response_content', ''),
interaction.get('proxy_ip', '')
])
output.seek(0)
return Response(
output.getvalue(),
mimetype='text/csv',
headers={
'Content-Disposition': f'attachment; filename=interactions_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv',
'Content-Type': 'text/csv; charset=utf-8-sig'
}
)
except Exception as e:
logger.error(f"导出互动记录异常: {str(e)}")
return jsonify({'success': False, 'message': str(e)}), 500
# ==================== Query挖掘API ====================
@app.route('/api/query/upload', methods=['POST'])
@login_required
def upload_query_file():
"""上传Query关键词Excel文件"""
try:
if 'file' not in request.files:
return jsonify({'success': False, 'message': '请上传文件'}), 400
file = request.files['file']
if not file.filename:
return jsonify({'success': False, 'message': '文件名为空'}), 400
filename = file.filename
ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else ''
if ext not in ('xlsx', 'xls'):
return jsonify({'success': False, 'message': '请上传Excel文件(.xlsx/.xls)'}), 400
# 获取导入模式
import_mode = request.form.get('import_mode', 'query_only')
# 先读取Excel验证列数
file_bytes = file.read()
from io import BytesIO
df = pd.read_excel(BytesIO(file_bytes))
col_count = len(df.columns)
if import_mode == 'query_only':
if col_count != 1:
return jsonify({'success': False, 'message': f'仅导入Query模式要求Excel只有1列当前有{col_count}'})
elif import_mode == 'full_import':
if col_count != 3:
return jsonify({'success': False, 'message': f'完整导入模式要求Excel恰好3列科室、关键字、query当前有{col_count}'})
# 确保上传目录存在
upload_dir = Config.QUERY_UPLOAD_DIR
os.makedirs(upload_dir, exist_ok=True)
# 生成带时间戳的文件名避免重名
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
safe_filename = f"{timestamp}_{filename}"
filepath = os.path.abspath(os.path.join(upload_dir, safe_filename))
# 保存文件
with open(filepath, 'wb') as f:
f.write(file_bytes)
logger.info(f"[Query上传] 文件已保存: {filepath}, 模式: {import_mode}")
# 创建导入日志
from db_manager import QueryImportLogManager
log_mgr = QueryImportLogManager()
log_id = log_mgr.create_log(filename, filepath)
# 启动后台导入线程
from query_keyword_importer import QueryKeywordImporter
importer = QueryKeywordImporter()
thread = threading.Thread(target=importer.import_file, args=(filepath, log_id, import_mode))
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'message': '文件上传成功,导入任务已启动',
'log_id': log_id,
'filename': filename
})
except Exception as e:
logger.error(f"[Query上传] 异常: {e}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/query/import/records', methods=['GET'])
@login_required
def get_query_import_records():
"""获取Query导入记录分页"""
try:
page = int(request.args.get('page', 1))
page_size = int(request.args.get('page_size', 20))
from db_manager import QueryImportLogManager
log_mgr = QueryImportLogManager()
result = log_mgr.get_logs_paginated(page, page_size)
return jsonify({'success': True, 'data': result})
except Exception as e:
logger.error(f"[Query记录] 查询失败: {e}")
return jsonify({'success': False, 'message': str(e)}), 500
@app.route('/api/query/import/trigger', methods=['POST'])
@login_required
def trigger_query_import():
"""手动触发Query目录扫描和导入"""
try:
from query_keyword_importer import QueryKeywordImporter
importer = QueryKeywordImporter()
thread = threading.Thread(target=importer.scan_and_import)
thread.daemon = True
thread.start()
return jsonify({
'success': True,
'message': '目录扫描任务已启动'
})
except Exception as e:
logger.error(f"[Query触发] 异常: {e}")
return jsonify({'success': False, 'message': str(e)}), 500
# ==================== 任务队列API ====================
@app.route('/api/tasks/queue', methods=['GET'])
def get_task_queue():
"""获取任务队列状态 - Web端不管理调度器"""
return jsonify({
'success': True,
'data': {
'pending': [],
'running': None,
'completed': [],
'scheduler_status': 'manual',
'message': '调度器需通过 main.py 独立运行'
}
})
2026-01-13 18:59:26 +08:00
if __name__ == '__main__':
logger.info(f"启动MIP广告点击服务 - 环境: {Config.ENV}")
logger.info(f"服务地址: http://{Config.SERVER_HOST}:{Config.SERVER_PORT}")
logger.info(f"调试模式: {Config.DEBUG}")
2026-02-24 12:46:35 +08:00
logger.info("调度器需通过 python main.py 独立启动")
2026-01-13 18:59:26 +08:00
# 启动Flask应用
app.run(
host=Config.SERVER_HOST,
port=Config.SERVER_PORT,
debug=Config.DEBUG
)