from flask import Flask, request, jsonify, send_from_directory, redirect, Response, session from flask_cors import CORS from loguru import logger from functools import wraps import sys import csv import io import os import logging import secrets import threading import uuid from pathlib import Path from datetime import datetime import pandas as pd from config import Config from log_manager import LogManager from config_manager import ConfigManager from db_manager import ( EnhancedSiteManager, EnhancedClickManager, EnhancedInteractionManager, EnhancedStatisticsManager ) # 导入任务进度存储(内存) import_tasks = {} # 固定账号密码 AUTH_USERNAME = 'admin' AUTH_PASSWORD = 'admin@123' # 配置日志 Config.ensure_dirs() logger.remove() logger.add( sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", level="INFO" ) logger.add( Path(Config.LOG_DIR) / "scheduler_{time:YYYY-MM-DD}.log", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}", rotation="00:00", retention="10 days", encoding="utf-8", level="DEBUG" ) # 禁用Flask/werkzeug的HTTP请求日志 logging.getLogger('werkzeug').setLevel(logging.ERROR) # 创建Flask应用 app = Flask(__name__, static_folder='static', static_url_path='') app.secret_key = secrets.token_hex(32) # 生成随机密钥用于session CORS(app, supports_credentials=True) # 登录验证装饰器 def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): # 点击服务模式下跳过登录验证(内部API) if Config.SERVICE_MODE == 'click': return f(*args, **kwargs) if not session.get('logged_in'): return jsonify({'success': False, 'message': '未登录', 'code': 401}), 401 return f(*args, **kwargs) return decorated_function # Web系统不初始化调度器,调度器通过main.py独立运行 # 创建数据管理器(远程模式下也需要访问数据库) from data_manager import DataManager data_manager = DataManager() # 创建管理器实例 log_manager = LogManager() config_manager = ConfigManager() site_manager = EnhancedSiteManager() click_manager = EnhancedClickManager() interaction_manager = EnhancedInteractionManager() stats_manager = EnhancedStatisticsManager() @app.route('/') def index(): """首页""" if Config.SERVICE_MODE == 'click': # 点击服务模式:只返回API状态 return jsonify({ 'service': 'MIP Click Service', 'mode': 'click', 'status': 'running', 'endpoints': ['/api/scheduler/start', '/api/scheduler/stop', '/api/scheduler/status'] }) # Web服务模式:重定向到前端页面 return redirect('/app.html') # ==================== 登录相关API ==================== @app.route('/api/auth/login', methods=['POST']) def login(): """用户登录""" try: data = request.get_json() username = data.get('username', '') password = data.get('password', '') if username == AUTH_USERNAME and password == AUTH_PASSWORD: session['logged_in'] = True session['username'] = username logger.info(f"用户登录成功: {username}") return jsonify({'success': True, 'message': '登录成功', 'data': {'username': username}}) else: logger.warning(f"登录失败,用户名或密码错误: {username}") return jsonify({'success': False, 'message': '用户名或密码错误'}), 401 except Exception as e: logger.error(f"登录异常: {str(e)}") return jsonify({'success': False, 'message': f'登录异常: {str(e)}'}), 500 @app.route('/api/auth/logout', methods=['POST']) def logout(): """用户登出""" try: username = session.get('username', 'unknown') session.clear() logger.info(f"用户登出: {username}") return jsonify({'success': True, 'message': '已退出登录'}) except Exception as e: logger.error(f"登出异常: {str(e)}") return jsonify({'success': False, 'message': f'登出异常: {str(e)}'}), 500 @app.route('/api/auth/check', methods=['GET']) def check_auth(): """检查登录状态""" if session.get('logged_in'): return jsonify({ 'success': True, 'data': { 'logged_in': True, 'username': session.get('username', '') } }) else: return jsonify({ 'success': True, 'data': { 'logged_in': False } }) @app.route('/health', methods=['GET']) def health(): """健康检查""" return jsonify({'status': 'ok', 'message': '服务运行正常'}) @app.route('/api/urls', methods=['POST']) @login_required def add_urls(): """添加URL(支持单个或批量)""" try: data = request.get_json() if not data: return jsonify({'success': False, 'message': '请求数据为空'}), 400 # 支持单个URL或URL列表 if 'url' in data: # 单个URL url = data['url'] if not url: return jsonify({'success': False, 'message': 'URL不能为空'}), 400 success = data_manager.add_url(url) if success: return jsonify({'success': True, 'message': '添加成功'}) else: return jsonify({'success': False, 'message': 'URL已存在或添加失败'}), 400 elif 'urls' in data: # 批量URL urls = data['urls'] if not isinstance(urls, list) or not urls: return jsonify({'success': False, 'message': 'URLs必须是非空列表'}), 400 count = data_manager.add_urls(urls) return jsonify({ 'success': True, 'message': f'成功添加 {count}/{len(urls)} 个URL', 'added_count': count, 'total_count': len(urls) }) else: return jsonify({'success': False, 'message': '请提供url或urls参数'}), 400 except Exception as e: logger.error(f"添加URL异常: {str(e)}") return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500 @app.route('/api/urls', methods=['GET']) def get_urls(): """获取所有URL列表""" try: urls = data_manager.get_all_urls() return jsonify({'success': True, 'data': urls}) except Exception as e: logger.error(f"获取URL列表异常: {str(e)}") return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500 @app.route('/api/urls/', methods=['GET']) def get_url_detail(url: str): """获取URL详细信息""" try: url_info = data_manager.get_url_detail(url) if url_info: return jsonify({'success': True, 'data': url_info}) else: return jsonify({'success': False, 'message': 'URL不存在'}), 404 except Exception as e: logger.error(f"获取URL详情异常: {str(e)}") return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500 @app.route('/api/urls/', methods=['DELETE']) @login_required def delete_url(url: str): """删除URL""" try: success = data_manager.delete_url(url) if success: return jsonify({'success': True, 'message': '删除成功'}) else: return jsonify({'success': False, 'message': 'URL不存在'}), 404 except Exception as e: logger.error(f"删除URL异常: {str(e)}") return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500 @app.route('/api/urls//reset', methods=['POST']) @login_required def reset_url(url: str): """重置URL(重新开始点击)""" try: success = data_manager.reset_url(url) if success: return jsonify({'success': True, 'message': '重置成功'}) else: return jsonify({'success': False, 'message': 'URL不存在'}), 404 except Exception as e: logger.error(f"重置URL异常: {str(e)}") return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500 @app.route('/api/statistics', methods=['GET']) def get_statistics(): """获取统计数据""" try: stats = data_manager.get_statistics() return jsonify({'success': True, 'data': stats}) except Exception as e: logger.error(f"获取统计数据异常: {str(e)}") return jsonify({'success': False, 'message': f'服务异常: {str(e)}'}), 500 @app.route('/api/scheduler/start', methods=['POST']) def start_scheduler(): """启动调度器 - 需手动运行main.py""" return jsonify({'success': False, 'message': '请在服务器手动运行 python main.py 启动调度器'}) @app.route('/api/scheduler/stop', methods=['POST']) def stop_scheduler(): """停止调度器 - 需手动停止""" return jsonify({'success': False, 'message': '请在服务器手动停止调度器进程'}) @app.route('/api/scheduler/status', methods=['GET']) def get_scheduler_status(): """获取调度器状态 - Web端不管理调度器""" return jsonify({'success': True, 'data': {'status': 'manual', 'message': '调度器需手动管理'}}) # AdsPower 接口调试 @app.route('/api/adspower/groups', methods=['GET']) def adspower_list_groups(): """查询分组列表""" try: from adspower_client import AdsPowerClient group_name = request.args.get('group_name') page = request.args.get('page', 1, type=int) page_size = request.args.get('page_size', 2000, type=int) client = AdsPowerClient() result = client.list_groups(group_name=group_name, page=page, page_size=page_size) if result: return jsonify({'success': True, 'data': result}) else: return jsonify({'success': False, 'message': '查询分组列表失败'}), 500 except Exception as e: logger.error(f"AdsPower查询分组异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/adspower/group/env', methods=['GET']) def adspower_get_group_by_env(): """根据当前运行环境获取对应的分组ID""" try: from adspower_client import AdsPowerClient client = AdsPowerClient() group_id = client.get_group_by_env() if group_id: return jsonify({'success': True, 'data': {'group_id': group_id, 'env': Config.ENV}}) else: return jsonify({'success': False, 'message': f'未找到对应环境的分组'}), 404 except Exception as e: logger.error(f"AdsPower获取环境分组异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/adspower/profiles', methods=['GET']) def adspower_list_profiles(): """查询Profile列表(支持多个查询参数)""" try: from adspower_client import AdsPowerClient import json as json_module # 获取查询参数 group_id = request.args.get('group_id') page = request.args.get('page', 1, type=int) limit = request.args.get('limit', type=int) # 可选 page_size = request.args.get('page_size', type=int) # 可选 # 数组参数(JSON格式) profile_id = request.args.get('profile_id') profile_no = request.args.get('profile_no') # 解析JSON数组 if profile_id: try: profile_id = json_module.loads(profile_id) except: profile_id = None if profile_no: try: profile_no = json_module.loads(profile_no) except: profile_no = None # 排序参数 sort_type = request.args.get('sort_type') sort_order = request.args.get('sort_order') # 如果没有指定group_id,尝试根据环境自动获取 client = AdsPowerClient() if not group_id: group_id = client.get_group_by_env() if group_id: logger.info(f"自动获取到分组ID: {group_id}") # 查询Profile列表 result = client.list_profiles( group_id=group_id, page=page, page_size=page_size if page_size else 100, profile_id=profile_id, profile_no=profile_no, limit=limit, sort_type=sort_type, sort_order=sort_order ) if result: return jsonify({'success': True, 'data': result}) else: return jsonify({'success': False, 'message': '查询Profile列表失败'}), 500 except Exception as e: logger.error(f"AdsPower查询Profile异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/adspower/browser/start', methods=['POST']) def adspower_start_browser(): """启动浏览器""" try: from adspower_client import AdsPowerClient data = request.get_json() or {} user_id = data.get('user_id') client = AdsPowerClient() result = client.start_browser(user_id=user_id) return jsonify({'success': True, 'data': result}) except Exception as e: logger.error(f"AdsPower启动浏览器异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/adspower/browser/stop', methods=['POST']) def adspower_stop_browser(): """停止浏览器""" try: from adspower_client import AdsPowerClient data = request.get_json() or {} user_id = data.get('user_id') client = AdsPowerClient() result = client.stop_browser(user_id=user_id) return jsonify({'success': True, 'data': result}) except Exception as e: logger.error(f"AdsPower停止浏览器异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/adspower/proxy/damai', methods=['GET']) def adspower_get_damai_proxy(): """获取大麦IP代理""" try: from adspower_client import AdsPowerClient client = AdsPowerClient() result = client.get_damai_proxy() return jsonify({'success': True, 'data': result}) except Exception as e: logger.error(f"获取大麦IP异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/adspower/proxy/create', methods=['POST']) def adspower_create_proxy(): """创建代理""" try: from adspower_client import AdsPowerClient data = request.get_json() or {} proxy_config = data.get('proxy_config') if not proxy_config: return jsonify({'success': False, 'message': '缺少代理配置'}), 400 client = AdsPowerClient() proxy_id = client.create_proxy(proxy_config) return jsonify({'success': True, 'data': {'proxy_id': proxy_id}}) except Exception as e: logger.error(f"AdsPower创建代理异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/adspower/proxy/list', methods=['GET']) def adspower_list_proxies(): """查询代理列表""" try: from adspower_client import AdsPowerClient page = request.args.get('page', 1, type=int) limit = request.args.get('limit', 100, type=int) client = AdsPowerClient() result = client.list_proxies(page=page, limit=limit) if result is None: return jsonify({'success': False, 'message': '查询代理列表失败,请检查AdsPower是否运行'}), 500 return jsonify({'success': True, 'data': result}) except Exception as e: logger.error(f"AdsPower查询代理列表异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/adspower/profile/update', methods=['POST']) def adspower_update_profile(): """更新Profile代理(API v2方式)""" try: from adspower_client import AdsPowerClient data = request.get_json() or {} profile_id = data.get('profile_id') proxy_id = data.get('proxy_id') if not profile_id or not proxy_id: return jsonify({'success': False, 'message': '缺少profile_id或proxy_id'}), 400 client = AdsPowerClient() result = client.update_profile_proxy(profile_id, proxy_id) return jsonify({'success': True, 'data': {'updated': result}}) except Exception as e: logger.error(f"AdsPower更新Profile异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/adspower/profile/update-v1', methods=['POST']) def adspower_update_profile_v1(): """更新Profile代理(API v1方式,直接传入proxy_config)""" try: from adspower_client import AdsPowerClient data = request.get_json() or {} profile_id = data.get('profile_id') proxy_config = data.get('proxy_config') if not profile_id or not proxy_config: return jsonify({'success': False, 'message': '缺少profile_id或proxy_config'}), 400 # 验证必要字段 if 'proxy_host' not in proxy_config or 'proxy_port' not in proxy_config: return jsonify({'success': False, 'message': 'proxy_config中缺少proxy_host或proxy_port'}), 400 client = AdsPowerClient() result = client.update_profile_proxy_v1(profile_id, proxy_config) return jsonify({'success': True, 'data': {'updated': result}}) except Exception as e: logger.error(f"AdsPower更新Profile异常 (v1): {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 # 数据库查询接口 @app.route('/api/clicks', methods=['GET']) def get_all_clicks(): """获取所有点击记录""" try: from db_manager import ClickManager site_id = request.args.get('site_id', type=int) limit = request.args.get('limit', 100, type=int) click_mgr = ClickManager() if site_id: clicks = click_mgr.get_clicks_by_site(site_id, limit=limit) else: # 获取所有点击记录 conn = click_mgr.get_connection() cursor = conn.cursor() cursor.execute(f""" SELECT * FROM ai_mip_click ORDER BY click_time DESC LIMIT ? """, (limit,)) rows = cursor.fetchall() conn.close() clicks = [click_mgr._dict_from_row(row) for row in rows] return jsonify({'success': True, 'data': clicks}) except Exception as e: logger.error(f"查询点击记录异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/interactions', methods=['GET']) def get_all_interactions(): """获取所有互动记录""" try: from db_manager import InteractionManager site_id = request.args.get('site_id', type=int) limit = request.args.get('limit', 100, type=int) interaction_mgr = InteractionManager() if site_id: interactions = interaction_mgr.get_interactions_by_site(site_id, limit=limit) else: # 获取所有互动记录 conn = interaction_mgr.get_connection() cursor = conn.cursor() cursor.execute(f""" SELECT * FROM ai_mip_interaction ORDER BY interaction_time DESC LIMIT ? """, (limit,)) rows = cursor.fetchall() conn.close() interactions = [interaction_mgr._dict_from_row(row) for row in rows] return jsonify({'success': True, 'data': interactions}) except Exception as e: logger.error(f"查询互动记录异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 # ==================== 日志管理API ==================== @app.route('/api/logs/stream', methods=['GET']) def get_logs_stream(): """获取最新日志(用于实时流)""" try: limit = request.args.get('limit', 50, type=int) level = request.args.get('level', 'ALL') logs = log_manager.get_latest_logs(limit=limit, level=level) return jsonify({'success': True, 'data': logs}) except Exception as e: logger.error(f"获取日志流异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/logs/list', methods=['GET']) def get_logs_list(): """分页查询日志""" try: page = request.args.get('page', 1, type=int) page_size = request.args.get('page_size', 100, type=int) level = request.args.get('level', 'ALL') keyword = request.args.get('keyword', '') start_time = request.args.get('start_time', '') end_time = request.args.get('end_time', '') logs, total = log_manager.search_logs( keyword=keyword, level=level, start_time=start_time, end_time=end_time, page=page, page_size=page_size ) return jsonify({ 'success': True, 'data': { 'logs': logs, 'total': total, 'page': page, 'page_size': page_size } }) except Exception as e: logger.error(f"查询日志列表异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/logs/files', methods=['GET']) def get_log_files(): """获取日志文件列表""" try: files = log_manager.get_log_files() return jsonify({'success': True, 'data': files}) except Exception as e: logger.error(f"获取日志文件列表异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/logs/stats', methods=['GET']) def get_log_stats(): """获取日志统计信息""" try: stats = log_manager.get_log_stats() return jsonify({'success': True, 'data': stats}) except Exception as e: logger.error(f"获取日志统计异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 # ==================== 配置管理API ==================== @app.route('/api/config', methods=['GET']) def get_config(): """获取当前配置""" try: config = config_manager.get_current_config() return jsonify({'success': True, 'data': config}) except Exception as e: logger.error(f"获取配置异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/config/update', methods=['POST']) @login_required def update_config(): """更新配置""" try: data = request.get_json() if not data: return jsonify({'success': False, 'message': '请求数据为空'}), 400 result = config_manager.update_config(data) return jsonify(result) except Exception as e: logger.error(f"更新配置异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/config/schema', methods=['GET']) def get_config_schema(): """获取配置项定义""" try: schema = config_manager.get_config_schema() return jsonify({'success': True, 'data': schema}) except Exception as e: logger.error(f"获取配置定义异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 # ==================== 图表统计API ==================== @app.route('/api/statistics/trend', methods=['GET']) def get_statistics_trend(): """获取点击趋势数据""" try: days = request.args.get('days', 7, type=int) data = stats_manager.get_click_trend(days=days) return jsonify({'success': True, 'data': data}) except Exception as e: logger.error(f"获取点击趋势异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/statistics/hourly', methods=['GET']) def get_statistics_hourly(): """获取时段分布数据""" try: data = stats_manager.get_hourly_distribution() return jsonify({'success': True, 'data': data}) except Exception as e: logger.error(f"获取时段分布异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/statistics/top-sites', methods=['GET']) def get_top_sites(): """获取Top活跃站点""" try: limit = request.args.get('limit', 10, type=int) data = stats_manager.get_top_sites(limit=limit) return jsonify({'success': True, 'data': data}) except Exception as e: logger.error(f"获取Top站点异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/statistics/reply-rate', methods=['GET']) def get_reply_rate(): """获取回复率分布""" try: data = stats_manager.get_reply_rate_distribution() return jsonify({'success': True, 'data': data}) except Exception as e: logger.error(f"获取回复率分布异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 # ==================== 增强的分页查询API ==================== @app.route('/api/sites/paginated', methods=['GET']) def get_sites_paginated(): """分页获取站点列表""" try: page = request.args.get('page', 1, type=int) page_size = request.args.get('page_size', 20, type=int) status = request.args.get('status', '') keyword = request.args.get('keyword', '') sort_by = request.args.get('sort_by', 'created_at') sort_order = request.args.get('sort_order', 'desc') sites, total = site_manager.get_sites_paginated( page=page, page_size=page_size, status=status if status else None, keyword=keyword if keyword else None, sort_by=sort_by, sort_order=sort_order ) return jsonify({ 'success': True, 'data': { 'items': sites, 'total': total, 'page': page, 'page_size': page_size } }) except Exception as e: logger.error(f"分页查询站点异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/clicks/paginated', methods=['GET']) def get_clicks_paginated(): """分页获取点击记录""" try: page = request.args.get('page', 1, type=int) page_size = request.args.get('page_size', 20, type=int) site_id = request.args.get('site_id', type=int) start_date = request.args.get('start_date', '') end_date = request.args.get('end_date', '') sort_by = request.args.get('sort_by', 'click_time') sort_order = request.args.get('sort_order', 'desc') clicks, total = click_manager.get_clicks_paginated( page=page, page_size=page_size, site_id=site_id, start_date=start_date if start_date else None, end_date=end_date if end_date else None, sort_by=sort_by, sort_order=sort_order ) return jsonify({ 'success': True, 'data': { 'items': clicks, 'total': total, 'page': page, 'page_size': page_size } }) except Exception as e: logger.error(f"分页查询点击记录异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/interactions/paginated', methods=['GET']) def get_interactions_paginated(): """分页获取互动记录""" try: page = request.args.get('page', 1, type=int) page_size = request.args.get('page_size', 20, type=int) site_id = request.args.get('site_id', type=int) start_date = request.args.get('start_date', '') end_date = request.args.get('end_date', '') status = request.args.get('status', '') sort_by = request.args.get('sort_by', 'interaction_time') sort_order = request.args.get('sort_order', 'desc') interactions, total = interaction_manager.get_interactions_paginated( page=page, page_size=page_size, site_id=site_id, start_date=start_date if start_date else None, end_date=end_date if end_date else None, status=status if status else None, sort_by=sort_by, sort_order=sort_order ) return jsonify({ 'success': True, 'data': { 'items': interactions, 'total': total, 'page': page, 'page_size': page_size } }) except Exception as e: logger.error(f"分页查询互动记录异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 # ==================== 批量操作API ==================== def _run_import_task(task_id: str, df, params: dict): """后台执行导入任务""" global import_tasks try: task = import_tasks[task_id] task['status'] = 'running' # 获取参数 url_column = params['url_column'] query_word = params.get('query_word') site_dimension = params.get('site_dimension') frequency = params.get('frequency', 1) time_start = params.get('time_start', '09:00:00') time_end = params.get('time_end', '21:00:00') interval_minutes = params.get('interval_minutes', 30) site_manager = EnhancedSiteManager() total = len(df) # 批量获取已存在的URL(优化查询) all_urls = [str(row.get(url_column, '')).strip() for _, row in df.iterrows() if not pd.isna(row.get(url_column)) and str(row.get(url_column, '')).strip()] existing_urls = set(site_manager.get_existing_urls(all_urls) if hasattr(site_manager, 'get_existing_urls') else []) for idx, row in df.iterrows(): if task.get('cancelled'): task['status'] = 'cancelled' return try: # 更新进度 task['current'] = idx + 1 task['progress'] = int((idx + 1) / total * 100) # 获取链接 site_url = row.get(url_column, None) # 跳过空链接 if pd.isna(site_url) or not site_url or str(site_url).strip() == '': task['stats']['skipped'] += 1 continue site_url = str(site_url).strip() # 验证URL格式 if not (site_url.startswith('http://') or site_url.startswith('https://')): task['stats']['skipped'] += 1 continue # 检查是否已存在(使用预查询结果) if site_url in existing_urls: task['stats']['duplicate'] += 1 continue # 获取站点名称 site_name = None for col in ['医生', '名称', 'name', 'Name', 'site_name', '站点名称']: if col in df.columns: site_name = row.get(col) break if pd.isna(site_name) or not site_name: site_name = site_url else: site_name = str(site_name).strip() # 获取查询词 row_query_word = None for col in ['查询词', 'query_word', 'keyword', '关键词']: if col in df.columns: row_query_word = row.get(col) break if pd.isna(row_query_word) or not row_query_word: row_query_word = query_word else: row_query_word = str(row_query_word).strip() # 获取维度 row_dimension = None for col in ['维度', 'dimension', 'site_dimension', '分类']: if col in df.columns: row_dimension = row.get(col) break if pd.isna(row_dimension) or not row_dimension: row_dimension = site_dimension else: row_dimension = str(row_dimension).strip() # 插入数据库 site_id = site_manager.add_site( site_url=site_url, site_name=site_name, site_dimension=row_dimension, query_word=row_query_word, frequency=frequency, time_start=time_start, time_end=time_end, interval_minutes=interval_minutes ) if site_id: task['stats']['success'] += 1 existing_urls.add(site_url) # 避免重复插入 else: task['stats']['failed'] += 1 except Exception as e: logger.error(f"处理第 {idx+1} 行失败: {str(e)}") task['stats']['failed'] += 1 task['status'] = 'completed' task['progress'] = 100 logger.info(f"导入任务 {task_id} 完成: {task['stats']}") except Exception as e: logger.error(f"导入任务 {task_id} 异常: {str(e)}") import_tasks[task_id]['status'] = 'error' import_tasks[task_id]['error'] = str(e) @app.route('/api/sites/import', methods=['POST']) @login_required def import_sites_file(): """从Excel或CSV文件导入站点(异步处理)""" try: if 'file' not in request.files: return jsonify({'success': False, 'message': '请上传文件'}), 400 file = request.files['file'] if not file.filename: return jsonify({'success': False, 'message': '文件名为空'}), 400 filename = file.filename.lower() # 获取额外参数 params = { 'query_word': request.form.get('query_word', None), 'site_dimension': request.form.get('site_dimension', None), 'frequency': int(request.form.get('frequency', 1)), 'time_start': request.form.get('time_start', '09:00:00'), 'time_end': request.form.get('time_end', '21:00:00'), 'interval_minutes': int(request.form.get('interval_minutes', 30)) } # 根据文件类型读取数据 if filename.endswith('.xlsx') or filename.endswith('.xls'): df = pd.read_excel(file) elif filename.endswith('.csv'): df = pd.read_csv(file, encoding='utf-8-sig') else: return jsonify({'success': False, 'message': '请上传Excel(.xlsx/.xls)或CSV(.csv)格式文件'}), 400 logger.info(f"导入文件: {file.filename}, 共 {len(df)} 行数据") # 检测URL列名 url_column = None for col_name in ['链接', 'url', 'URL', 'link', 'Link', '网址', 'site_url']: if col_name in df.columns: url_column = col_name break if url_column is None: url_column = df.columns[0] params['url_column'] = url_column # 创建导入任务 task_id = str(uuid.uuid4())[:8] import_tasks[task_id] = { 'id': task_id, 'filename': file.filename, 'total': len(df), 'current': 0, 'progress': 0, 'status': 'pending', 'stats': {'total': len(df), 'success': 0, 'failed': 0, 'skipped': 0, 'duplicate': 0}, 'created_at': datetime.now().isoformat() } # 启动后台线程 thread = threading.Thread(target=_run_import_task, args=(task_id, df, params)) thread.daemon = True thread.start() return jsonify({ 'success': True, 'message': '导入任务已启动', 'task_id': task_id, 'total': len(df) }) except Exception as e: logger.error(f"导入文件异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/sites/import/progress/', methods=['GET']) def get_import_progress(task_id: str): """获取导入任务进度""" if task_id not in import_tasks: return jsonify({'success': False, 'message': '任务不存在'}), 404 task = import_tasks[task_id] return jsonify({ 'success': True, 'data': { 'id': task['id'], 'filename': task['filename'], 'total': task['total'], 'current': task['current'], 'progress': task['progress'], 'status': task['status'], 'stats': task['stats'], 'error': task.get('error') } }) @app.route('/api/sites/import/cancel/', methods=['POST']) @login_required def cancel_import_task(task_id: str): """取消导入任务""" if task_id not in import_tasks: return jsonify({'success': False, 'message': '任务不存在'}), 404 import_tasks[task_id]['cancelled'] = True return jsonify({'success': True, 'message': '已发送取消请求'}) @app.route('/api/sites/import/tasks', methods=['GET']) def list_import_tasks(): """获取所有导入任务(包括进行中的)""" # 按创建时间倒序,只返回最近10个 tasks = sorted(import_tasks.values(), key=lambda x: x.get('created_at', ''), reverse=True)[:10] # 只返回进行中和最近完成的任务 active_tasks = [t for t in tasks if t.get('status') in ['pending', 'running'] or (t.get('status') == 'completed' and t.get('progress', 0) == 100)] return jsonify({'success': True, 'data': active_tasks}) @app.route('/api/sites//status', methods=['PUT']) @login_required def update_site_status(site_id: int): """更新单个站点状态""" try: data = request.get_json() status = data.get('status', '') if status not in ['active', 'inactive']: return jsonify({'success': False, 'message': '无效的状态值'}), 400 from db_manager import SiteManager mgr = SiteManager() success = mgr.update_site_status(site_id, status) if success: return jsonify({'success': True, 'message': '状态更新成功'}) else: return jsonify({'success': False, 'message': '更新失败'}), 400 except Exception as e: logger.error(f"更新站点状态异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/sites/batch/delete', methods=['POST']) @login_required def batch_delete_sites(): """批量删除站点""" try: data = request.get_json() site_ids = data.get('site_ids', []) if not site_ids: return jsonify({'success': False, 'message': '请选择要删除的站点'}), 400 deleted = site_manager.delete_sites_batch(site_ids) return jsonify({ 'success': True, 'message': f'成功删除 {deleted}/{len(site_ids)} 个站点', 'deleted_count': deleted }) except Exception as e: logger.error(f"批量删除站点异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/sites/batch/status', methods=['POST']) @login_required def batch_update_sites_status(): """批量更新站点状态""" try: data = request.get_json() site_ids = data.get('site_ids', []) status = data.get('status', '') if not site_ids or not status: return jsonify({'success': False, 'message': '请提供站点ID和状态'}), 400 if status not in ['active', 'inactive']: return jsonify({'success': False, 'message': '无效的状态值'}), 400 updated = site_manager.update_sites_status_batch(site_ids, status) return jsonify({ 'success': True, 'message': f'成功更新 {updated}/{len(site_ids)} 个站点', 'updated_count': updated }) except Exception as e: logger.error(f"批量更新站点状态异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 # ==================== 数据导出API ==================== @app.route('/api/export/sites', methods=['GET']) def export_sites(): """导出站点数据为CSV""" try: status = request.args.get('status', '') keyword = request.args.get('keyword', '') sites = site_manager.export_sites( status=status if status else None, keyword=keyword if keyword else None ) # 生成CSV output = io.StringIO() writer = csv.writer(output) # 写入表头 headers = ['ID', '站点URL', '站点名称', '状态', '点击次数', '回复次数', '频次', '开始时间', '结束时间', '维度', '查询词', '创建时间'] writer.writerow(headers) # 写入数据 for site in sites: writer.writerow([ site.get('id', ''), site.get('site_url', ''), site.get('site_name', ''), site.get('status', ''), site.get('click_count', 0), site.get('reply_count', 0), site.get('frequency', ''), site.get('time_start', ''), site.get('time_end', ''), site.get('site_dimension', ''), site.get('query_word', ''), site.get('created_at', '') ]) output.seek(0) return Response( output.getvalue(), mimetype='text/csv', headers={ 'Content-Disposition': f'attachment; filename=sites_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv', 'Content-Type': 'text/csv; charset=utf-8-sig' } ) except Exception as e: logger.error(f"导出站点数据异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/export/clicks', methods=['GET']) def export_clicks(): """导出点击记录为CSV""" try: site_id = request.args.get('site_id', type=int) start_date = request.args.get('start_date', '') end_date = request.args.get('end_date', '') clicks = click_manager.export_clicks( site_id=site_id, start_date=start_date if start_date else None, end_date=end_date if end_date else None ) # 生成CSV output = io.StringIO() writer = csv.writer(output) headers = ['ID', '站点ID', '站点名称', '站点URL', '点击时间', '用户IP', '设备类型', '任务ID'] writer.writerow(headers) for click in clicks: writer.writerow([ click.get('id', ''), click.get('site_id', ''), click.get('site_name', ''), click.get('site_url', ''), click.get('click_time', ''), click.get('user_ip', ''), click.get('device_type', ''), click.get('task_id', '') ]) output.seek(0) return Response( output.getvalue(), mimetype='text/csv', headers={ 'Content-Disposition': f'attachment; filename=clicks_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv', 'Content-Type': 'text/csv; charset=utf-8-sig' } ) except Exception as e: logger.error(f"导出点击记录异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/export/interactions', methods=['GET']) def export_interactions(): """导出互动记录为CSV""" try: site_id = request.args.get('site_id', type=int) start_date = request.args.get('start_date', '') end_date = request.args.get('end_date', '') interactions = interaction_manager.export_interactions( site_id=site_id, start_date=start_date if start_date else None, end_date=end_date if end_date else None ) # 生成CSV output = io.StringIO() writer = csv.writer(output) headers = ['ID', '站点ID', '站点名称', '站点URL', '互动时间', '互动类型', '互动状态', '发送内容', '是否收到回复', '回复内容', '代理IP'] writer.writerow(headers) for interaction in interactions: writer.writerow([ interaction.get('id', ''), interaction.get('site_id', ''), interaction.get('site_name', ''), interaction.get('site_url', ''), interaction.get('interaction_time', ''), interaction.get('interaction_type', ''), interaction.get('interaction_status', ''), interaction.get('reply_content', ''), '是' if interaction.get('response_received') else '否', interaction.get('response_content', ''), interaction.get('proxy_ip', '') ]) output.seek(0) return Response( output.getvalue(), mimetype='text/csv', headers={ 'Content-Disposition': f'attachment; filename=interactions_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv', 'Content-Type': 'text/csv; charset=utf-8-sig' } ) except Exception as e: logger.error(f"导出互动记录异常: {str(e)}") return jsonify({'success': False, 'message': str(e)}), 500 # ==================== Query挖掘API ==================== @app.route('/api/query/upload', methods=['POST']) @login_required def upload_query_file(): """上传Query关键词Excel文件""" try: if 'file' not in request.files: return jsonify({'success': False, 'message': '请上传文件'}), 400 file = request.files['file'] if not file.filename: return jsonify({'success': False, 'message': '文件名为空'}), 400 filename = file.filename ext = filename.rsplit('.', 1)[-1].lower() if '.' in filename else '' if ext not in ('xlsx', 'xls'): return jsonify({'success': False, 'message': '请上传Excel文件(.xlsx/.xls)'}), 400 # 获取导入模式 import_mode = request.form.get('import_mode', 'query_only') # 先读取Excel验证列数 file_bytes = file.read() from io import BytesIO df = pd.read_excel(BytesIO(file_bytes)) col_count = len(df.columns) if import_mode == 'query_only': if col_count != 1: return jsonify({'success': False, 'message': f'仅导入Query模式要求Excel只有1列,当前有{col_count}列'}) elif import_mode == 'full_import': if col_count != 3: return jsonify({'success': False, 'message': f'完整导入模式要求Excel恰好3列(科室、关键字、query),当前有{col_count}列'}) # 确保上传目录存在 upload_dir = Config.QUERY_UPLOAD_DIR os.makedirs(upload_dir, exist_ok=True) # 生成带时间戳的文件名避免重名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') safe_filename = f"{timestamp}_{filename}" filepath = os.path.abspath(os.path.join(upload_dir, safe_filename)) # 保存文件 with open(filepath, 'wb') as f: f.write(file_bytes) logger.info(f"[Query上传] 文件已保存: {filepath}, 模式: {import_mode}") # 创建导入日志 from db_manager import QueryImportLogManager log_mgr = QueryImportLogManager() log_id = log_mgr.create_log(filename, filepath) # 启动后台导入线程 from query_keyword_importer import QueryKeywordImporter importer = QueryKeywordImporter() thread = threading.Thread(target=importer.import_file, args=(filepath, log_id, import_mode)) thread.daemon = True thread.start() return jsonify({ 'success': True, 'message': '文件上传成功,导入任务已启动', 'log_id': log_id, 'filename': filename }) except Exception as e: logger.error(f"[Query上传] 异常: {e}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/query/import/records', methods=['GET']) @login_required def get_query_import_records(): """获取Query导入记录(分页)""" try: page = int(request.args.get('page', 1)) page_size = int(request.args.get('page_size', 20)) from db_manager import QueryImportLogManager log_mgr = QueryImportLogManager() result = log_mgr.get_logs_paginated(page, page_size) return jsonify({'success': True, 'data': result}) except Exception as e: logger.error(f"[Query记录] 查询失败: {e}") return jsonify({'success': False, 'message': str(e)}), 500 @app.route('/api/query/import/trigger', methods=['POST']) @login_required def trigger_query_import(): """手动触发Query目录扫描和导入""" try: from query_keyword_importer import QueryKeywordImporter importer = QueryKeywordImporter() thread = threading.Thread(target=importer.scan_and_import) thread.daemon = True thread.start() return jsonify({ 'success': True, 'message': '目录扫描任务已启动' }) except Exception as e: logger.error(f"[Query触发] 异常: {e}") return jsonify({'success': False, 'message': str(e)}), 500 # ==================== 任务队列API ==================== @app.route('/api/tasks/queue', methods=['GET']) def get_task_queue(): """获取任务队列状态 - Web端不管理调度器""" return jsonify({ 'success': True, 'data': { 'pending': [], 'running': None, 'completed': [], 'scheduler_status': 'manual', 'message': '调度器需通过 main.py 独立运行' } }) if __name__ == '__main__': logger.info(f"启动MIP广告点击服务 - 环境: {Config.ENV}") logger.info(f"服务地址: http://{Config.SERVER_HOST}:{Config.SERVER_PORT}") logger.info(f"调试模式: {Config.DEBUG}") logger.info("调度器需通过 python main.py 独立启动") # 启动Flask应用 app.run( host=Config.SERVER_HOST, port=Config.SERVER_PORT, debug=Config.DEBUG )