Files
ai_mip/db_manager.py
2026-02-24 12:46:35 +08:00

1639 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
数据库管理器
使用 MySQL 数据库
"""
import json
import random
from datetime import datetime
from typing import List, Dict, Optional, Union
from pathlib import Path
from loguru import logger
from config import Config
try:
import pymysql
MYSQL_AVAILABLE = True
except ImportError:
MYSQL_AVAILABLE = False
logger.error("pymysql 未安装,请安装: pip install pymysql")
raise ImportError("使用 MySQL 需要安装 pymysql: pip install pymysql")
class DatabaseManager:
"""数据库管理器,使用 MySQL"""
def __init__(self, db_path: str = None):
"""
初始化数据库连接
Args:
db_path: 忽略,仅为了兼容性
"""
if not MYSQL_AVAILABLE:
raise ImportError("使用 MySQL 需要安装 pymysql: pip install pymysql")
self.db_config = {
'host': Config.MYSQL_HOST,
'port': Config.MYSQL_PORT,
'user': Config.MYSQL_USER,
'password': Config.MYSQL_PASSWORD,
'database': Config.MYSQL_DATABASE,
'charset': 'utf8mb4'
}
def get_connection(self) -> 'pymysql.Connection':
"""获取MySQL数据库连接"""
conn = pymysql.connect(**self.db_config)
return conn
def _dict_from_row(self, row) -> Dict:
"""将数据库行转换为字典,处理特殊类型"""
if row is None:
return None
result = dict(row) if isinstance(row, dict) else row
# 处理特殊类型确保JSON可序列化
if isinstance(result, dict):
from datetime import datetime, date, timedelta
from decimal import Decimal
for key, value in result.items():
if isinstance(value, datetime):
result[key] = value.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(value, date):
result[key] = value.strftime('%Y-%m-%d')
elif isinstance(value, timedelta):
# 将timedelta转换为字符串格式 HH:MM:SS
total_seconds = int(value.total_seconds())
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
result[key] = f'{hours:02d}:{minutes:02d}:{seconds:02d}'
elif isinstance(value, Decimal):
result[key] = float(value)
return result
def _get_placeholder(self) -> str:
"""获取SQL占位符MySQL使用 %s"""
return '%s'
def _execute_query(self, conn, sql: str, params: tuple = None):
"""执行SQL查询使用DictCursor"""
cursor = conn.cursor(pymysql.cursors.DictCursor)
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
return cursor
class SiteManager(DatabaseManager):
"""站点管理"""
def add_site(self, site_url: str, site_name: str = None,
site_dimension: str = None, query_word: str = None,
frequency: int = None,
time_start: str = None, time_end: str = None,
interval_minutes: int = None) -> Optional[int]:
"""
添加新站点
Args:
site_url: 网站URL
site_name: 网站名称
site_dimension: 网站维度标签
query_word: 来源查询词(从哪个关键词抓取)
frequency: 频次
time_start: 开始时间
time_end: 结束时间
interval_minutes: 执行间隔(分钟)
Returns:
站点ID失败返回None
"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 生成随机目标点击次数(兼容原有逻辑)
target_clicks = random.randint(
getattr(Config, 'MIN_CLICK_COUNT', 1),
getattr(Config, 'MAX_CLICK_COUNT', 10)
)
sql = f"""
INSERT INTO ai_mip_site (
site_url, site_name, status, frequency,
time_start, time_end, interval_minutes,
site_dimension, query_word, created_by
) VALUES ({ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph})
"""
cursor.execute(sql, (
site_url,
site_name or site_url,
'active',
frequency or 1,
time_start or '09:00:00',
time_end or '21:00:00',
interval_minutes or 30,
site_dimension,
query_word, # 新增:来源查询词
'system'
))
site_id = cursor.lastrowid
conn.commit()
conn.close()
logger.info(f"成功添加站点: {site_url} (ID: {site_id}, 查询词: {query_word})")
return site_id
except pymysql.IntegrityError:
logger.warning(f"站点URL已存在: {site_url}")
return None
except Exception as e:
logger.error(f"添加站点失败: {str(e)}")
return None
def get_site_by_url(self, site_url: str) -> Optional[Dict]:
"""根据URL获取站点信息"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = self._execute_query(conn, f"SELECT * FROM ai_mip_site WHERE site_url = {ph}", (site_url,))
row = cursor.fetchone()
conn.close()
return self._dict_from_row(row) if row else None
except Exception as e:
logger.error(f"查询站点失败: {str(e)}")
return None
def get_site_by_id(self, site_id: int) -> Optional[Dict]:
"""根据ID获取站点信息"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = self._execute_query(conn, f"SELECT * FROM ai_mip_site WHERE id = {ph}", (site_id,))
row = cursor.fetchone()
conn.close()
return self._dict_from_row(row) if row else None
except Exception as e:
logger.error(f"查询站点失败: {str(e)}")
return None
def get_active_sites(self) -> List[Dict]:
"""获取所有活跃站点"""
try:
conn = self.get_connection()
cursor = self._execute_query(conn, "SELECT * FROM ai_mip_site WHERE status = 'active' ORDER BY created_at DESC")
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows]
except Exception as e:
logger.error(f"查询活跃站点失败: {str(e)}")
return []
def get_all_sites(self) -> List[Dict]:
"""获取所有站点"""
try:
conn = self.get_connection()
cursor = self._execute_query(conn, "SELECT * FROM ai_mip_site ORDER BY created_at DESC")
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows]
except Exception as e:
logger.error(f"查询所有站点失败: {str(e)}")
return []
def update_site_status(self, site_id: int, status: str) -> bool:
"""更新站点状态"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = self._execute_query(
conn,
f"UPDATE ai_mip_site SET status = {ph}, updated_by = {ph} WHERE id = {ph}",
(status, 'system', site_id)
)
conn.commit()
conn.close()
logger.info(f"更新站点状态: ID={site_id}, status={status}")
return True
except Exception as e:
logger.error(f"更新站点状态失败: {str(e)}")
return False
def increment_click_count(self, site_id: int, count: int = 1) -> bool:
"""增加点击次数"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = self._execute_query(
conn,
f"UPDATE ai_mip_site SET click_count = click_count + {ph} WHERE id = {ph}",
(count, site_id)
)
conn.commit()
conn.close()
return True
except Exception as e:
logger.error(f"更新点击次数失败: {str(e)}")
return False
def increment_reply_count(self, site_id: int, count: int = 1) -> bool:
"""增加回复次数"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = self._execute_query(
conn,
f"UPDATE ai_mip_site SET reply_count = reply_count + {ph} WHERE id = {ph}",
(count, site_id)
)
conn.commit()
conn.close()
return True
except Exception as e:
logger.error(f"更新回复次数失败: {str(e)}")
return False
def delete_site(self, site_id: int) -> bool:
"""删除站点"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = self._execute_query(conn, f"DELETE FROM ai_mip_site WHERE id = {ph}", (site_id,))
conn.commit()
conn.close()
logger.info(f"已删除站点: ID={site_id}")
return True
except Exception as e:
logger.error(f"删除站点失败: {str(e)}")
return False
class ClickManager(DatabaseManager):
"""点击记录管理"""
def record_click(self, site_id: int, site_url: str,
user_ip: str = None, device_type: str = 'pc',
task_id: str = None) -> Optional[int]:
"""
记录一次点击
Args:
site_id: 站点ID
site_url: 站点URL
user_ip: 用户IP代理IP
device_type: 设备类型
task_id: 任务ID
Returns:
点击记录ID
"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = f"""
INSERT INTO ai_mip_click (
site_id, site_url, click_time, user_ip,
device_type, task_id, operator
) VALUES ({ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph})
"""
cursor.execute(sql, (
site_id,
site_url,
datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
user_ip,
device_type,
task_id or f'TASK_{datetime.now().strftime("%Y%m%d%H%M%S")}',
'RPA_SYSTEM'
))
click_id = cursor.lastrowid
conn.commit()
conn.close()
# 更新站点点击次数
site_mgr = SiteManager()
site_mgr.increment_click_count(site_id)
logger.info(f"记录点击: site_id={site_id}, click_id={click_id}")
return click_id
except Exception as e:
logger.error(f"记录点击失败: {str(e)}")
return None
def get_clicks_by_site(self, site_id: int, limit: int = 100) -> List[Dict]:
"""获取站点的点击记录"""
try:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM ai_mip_click
WHERE site_id = ?
ORDER BY click_time DESC
LIMIT ?
""", (site_id, limit))
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows]
except Exception as e:
logger.error(f"查询点击记录失败: {str(e)}")
return []
def get_click_count_by_site(self, site_id: int) -> int:
"""获取站点的总点击次数"""
try:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM ai_mip_click WHERE site_id = ?", (site_id,))
row = cursor.fetchone()
conn.close()
return row['count'] if row else 0
except Exception as e:
logger.error(f"查询点击次数失败: {str(e)}")
return 0
class InteractionManager(DatabaseManager):
"""互动记录管理"""
def record_interaction(self, site_id: int, click_id: int = None,
task_id: str = None, interaction_type: str = 'reply',
reply_content: str = None, is_successful: bool = False,
response_received: bool = False, response_content: str = None,
proxy_ip: str = None, fingerprint_id: str = None,
error_message: str = None) -> Optional[int]:
"""
记录一次互动
Args:
site_id: 站点ID
click_id: 关联的点击记录ID
task_id: 任务ID
interaction_type: 互动类型reply/comment等
reply_content: 回复内容
is_successful: 是否成功
response_received: 是否收到回复
response_content: 对方回复内容
proxy_ip: 使用的代理IP
fingerprint_id: 浏览器指纹ID
error_message: 错误信息
Returns:
互动记录ID
"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = conn.cursor(pymysql.cursors.DictCursor)
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
sql = f"""
INSERT INTO ai_mip_interaction (
site_id, click_id, task_id, interaction_type,
interaction_time, interaction_status, reply_content,
execution_mode, browser_type, proxy_ip, fingerprint_id,
response_received, response_content, is_successful,
error_message, operator
) VALUES ({ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph})
"""
cursor.execute(sql, (
site_id,
click_id,
task_id or f'TASK_{datetime.now().strftime("%Y%m%d%H%M%S")}',
interaction_type,
now,
'success' if is_successful else 'failed',
reply_content,
'auto',
'playwright',
proxy_ip,
fingerprint_id,
1 if response_received else 0,
response_content,
1 if is_successful else 0,
error_message,
'RPA_SYSTEM'
))
interaction_id = cursor.lastrowid
conn.commit()
conn.close()
# 如果收到回复,更新站点回复次数
if response_received:
site_mgr = SiteManager()
site_mgr.increment_reply_count(site_id)
logger.info(f"记录互动: site_id={site_id}, interaction_id={interaction_id}, success={is_successful}")
return interaction_id
except Exception as e:
logger.error(f"记录互动失败: {str(e)}")
return None
def get_interactions_by_site(self, site_id: int, limit: int = 100) -> List[Dict]:
"""获取站点的互动记录"""
try:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM ai_mip_interaction
WHERE site_id = ?
ORDER BY interaction_time DESC
LIMIT ?
""", (site_id, limit))
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows]
except Exception as e:
logger.error(f"查询互动记录失败: {str(e)}")
return []
def get_successful_interactions_count(self, site_id: int) -> int:
"""获取站点的成功互动次数"""
try:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) as count FROM ai_mip_interaction
WHERE site_id = ? AND is_successful = 1
""", (site_id,))
row = cursor.fetchone()
conn.close()
return row['count'] if row else 0
except Exception as e:
logger.error(f"查询成功互动次数失败: {str(e)}")
return 0
class StatisticsManager(DatabaseManager):
"""统计数据管理"""
def get_statistics(self) -> Dict:
"""
获取全局统计数据
Returns:
统计数据字典
"""
try:
conn = self.get_connection()
# 站点统计
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_site")
total_sites = cursor.fetchone()['total']
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_site WHERE status = 'active'")
active_sites = cursor.fetchone()['total']
# 点击统计
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_click")
total_clicks = cursor.fetchone()['total']
# 互动统计
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_interaction WHERE response_received = 1")
total_replies = cursor.fetchone()['total']
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_interaction WHERE is_successful = 1")
successful_interactions = cursor.fetchone()['total']
conn.close()
reply_rate = (total_replies / total_clicks * 100) if total_clicks > 0 else 0
success_rate = (successful_interactions / total_clicks * 100) if total_clicks > 0 else 0
return {
'total_sites': total_sites,
'active_sites': active_sites,
'total_clicks': total_clicks,
'total_replies': total_replies,
'successful_interactions': successful_interactions,
'reply_rate': f"{reply_rate:.2f}%",
'success_rate': f"{success_rate:.2f}%"
}
except Exception as e:
logger.error(f"获取统计数据失败: {str(e)}")
return {}
def get_site_statistics(self, site_id: int) -> Dict:
"""
获取单个站点的统计数据
Args:
site_id: 站点ID
Returns:
站点统计数据
"""
try:
site_mgr = SiteManager(self.db_path)
click_mgr = ClickManager(self.db_path)
interaction_mgr = InteractionManager(self.db_path)
site = site_mgr.get_site_by_id(site_id)
if not site:
return {}
click_count = click_mgr.get_click_count_by_site(site_id)
success_count = interaction_mgr.get_successful_interactions_count(site_id)
return {
'site_url': site['site_url'],
'site_name': site['site_name'],
'status': site['status'],
'click_count': click_count,
'reply_count': site['reply_count'],
'successful_interactions': success_count,
'reply_rate': f"{(site['reply_count'] / click_count * 100) if click_count > 0 else 0:.2f}%"
}
except Exception as e:
logger.error(f"获取站点统计失败: {str(e)}")
return {}
class QueryTaskManager(DatabaseManager):
"""查询任务管理器"""
def create_task(self, query_word: str, task_date: str = None,
query_type: str = 'keyword', threshold_max: int = 100,
priority: int = 5, category: str = None,
source_platform: str = 'baidu',
created_by: str = 'system',
remark: str = None) -> Optional[int]:
"""
创建查询任务
Args:
query_word: 查询词
task_date: 任务日期 YYYYMMDD默认今天
query_type: 查询类型
threshold_max: 最大抓取数量
priority: 优先级 1-10
category: 分类标签
source_platform: 来源平台
created_by: 创建人
remark: 备注
Returns:
任务ID失败返回None
"""
try:
if task_date is None:
task_date = datetime.now().strftime('%Y%m%d')
conn = self.get_connection()
ph = self._get_placeholder()
sql = f"""
INSERT INTO ai_mip_query_task (
query_word, query_type, task_date, threshold_max,
priority, category, source_platform, created_by, remark
) VALUES ({ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph})
"""
cursor = conn.cursor()
cursor.execute(sql, (
query_word, query_type, task_date, threshold_max,
priority, category, source_platform, created_by, remark
))
task_id = cursor.lastrowid
conn.commit()
conn.close()
logger.info(f"创建查询任务成功: {query_word} (ID: {task_id})")
return task_id
except pymysql.IntegrityError:
logger.warning(f"查询任务已存在: {query_word} @ {task_date}")
return None
except Exception as e:
logger.error(f"创建查询任务失败: {str(e)}")
return None
def get_task_by_id(self, task_id: int) -> Optional[Dict]:
"""根据ID获取任务"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = self._execute_query(
conn,
f"SELECT * FROM ai_mip_query_task WHERE id = {ph}",
(task_id,)
)
row = cursor.fetchone()
conn.close()
return self._dict_from_row(row) if row else None
except Exception as e:
logger.error(f"查询任务失败: {str(e)}")
return None
def get_ready_tasks(self, limit: int = None) -> List[Dict]:
"""
获取准备执行的任务(按优先级排序)
Args:
limit: 限制数量
Returns:
任务列表
"""
try:
conn = self.get_connection()
sql = "SELECT * FROM ai_mip_query_task WHERE status = 'ready' ORDER BY priority ASC, created_at ASC"
if limit:
sql += f" LIMIT {limit}"
cursor = self._execute_query(conn, sql)
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows]
except Exception as e:
logger.error(f"查询ready任务失败: {str(e)}")
return []
def get_tasks_by_date(self, task_date: str) -> List[Dict]:
"""根据日期获取任务"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = self._execute_query(
conn,
f"SELECT * FROM ai_mip_query_task WHERE task_date = {ph} ORDER BY priority ASC",
(task_date,)
)
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows]
except Exception as e:
logger.error(f"查询日期任务失败: {str(e)}")
return []
def update_task_status(self, task_id: int, status: str,
error_message: str = None) -> bool:
"""
更新任务状态
Args:
task_id: 任务ID
status: 状态 ready/doing/failed/finished/closed
error_message: 错误信息(失败时)
"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
# 根据状态更新时间字段
timestamp_field = None
if status == 'doing':
timestamp_field = 'started_at'
elif status in ['finished', 'failed']:
timestamp_field = 'finished_at'
elif status == 'closed':
timestamp_field = 'closed_at'
if timestamp_field:
sql = f"""
UPDATE ai_mip_query_task
SET status = {ph}, {timestamp_field} = NOW()
WHERE id = {ph}
"""
params = (status, task_id)
else:
sql = f"UPDATE ai_mip_query_task SET status = {ph} WHERE id = {ph}"
params = (status, task_id)
# 如果有错误信息更新error_message
if error_message:
sql = sql.replace('WHERE', f", error_message = '{error_message}' WHERE")
cursor = conn.cursor()
cursor.execute(sql, params)
conn.commit()
conn.close()
logger.info(f"更新任务状态: {task_id} -> {status}")
return True
except Exception as e:
logger.error(f"更新任务状态失败: {str(e)}")
return False
def increment_crawl_count(self, task_id: int,
crawl_count: int = 1,
valid_count: int = 0) -> bool:
"""
增加抓取计数
Args:
task_id: 任务ID
crawl_count: 抓取URL数量
valid_count: 有效URL数量带广告
"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
sql = f"""
UPDATE ai_mip_query_task
SET crawl_url_count = crawl_url_count + {ph},
valid_url_count = valid_url_count + {ph},
current_count = current_count + {ph}
WHERE id = {ph}
"""
cursor = conn.cursor()
cursor.execute(sql, (crawl_count, valid_count, valid_count, task_id))
conn.commit()
conn.close()
return True
except Exception as e:
logger.error(f"更新抓取计数失败: {str(e)}")
return False
def check_threshold(self, task_id: int) -> bool:
"""
检查是否达到阈值,达到则自动关闭任务
Returns:
True=已达到阈值, False=未达到
"""
try:
task = self.get_task_by_id(task_id)
if not task:
return False
if task['current_count'] >= task['threshold_max']:
self.update_task_status(task_id, 'closed')
logger.info(f"任务达到阈值并关闭: {task['query_word']} ({task['current_count']}/{task['threshold_max']})")
return True
return False
except Exception as e:
logger.error(f"检查阈值失败: {str(e)}")
return False
def get_task_statistics(self, task_date: str = None) -> Dict:
"""
获取任务统计信息
Args:
task_date: 日期为None则统计所有
"""
try:
conn = self.get_connection()
if task_date:
ph = self._get_placeholder()
where_clause = f"WHERE task_date = {ph}"
params = (task_date,)
else:
where_clause = ""
params = None
sql = f"""
SELECT
COUNT(*) as total_tasks,
SUM(CASE WHEN status = 'ready' THEN 1 ELSE 0 END) as ready_count,
SUM(CASE WHEN status = 'doing' THEN 1 ELSE 0 END) as doing_count,
SUM(CASE WHEN status = 'finished' THEN 1 ELSE 0 END) as finished_count,
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_count,
SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) as closed_count,
SUM(crawl_url_count) as total_crawled,
SUM(valid_url_count) as total_valid
FROM ai_mip_query_task
{where_clause}
"""
cursor = self._execute_query(conn, sql, params)
row = cursor.fetchone()
conn.close()
return self._dict_from_row(row) if row else {}
except Exception as e:
logger.error(f"获取任务统计失败: {str(e)}")
return {}
class EnhancedSiteManager(SiteManager):
"""增强的站点管理器,支持分页、排序、筛选"""
def get_sites_paginated(
self,
page: int = 1,
page_size: int = 20,
status: str = None,
keyword: str = None,
sort_by: str = 'created_at',
sort_order: str = 'desc'
) -> tuple:
"""
分页获取站点列表
Returns:
(站点列表, 总数)
"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
# 构建WHERE条件
conditions = []
params = []
if status:
conditions.append(f"status = {ph}")
params.append(status)
if keyword:
conditions.append(f"(site_url LIKE {ph} OR site_name LIKE {ph})")
params.extend([f'%{keyword}%', f'%{keyword}%'])
where_clause = ' AND '.join(conditions) if conditions else '1=1'
# 允许的排序字段
allowed_sort_fields = ['created_at', 'click_count', 'reply_count', 'site_url', 'status']
if sort_by not in allowed_sort_fields:
sort_by = 'created_at'
sort_order = 'DESC' if sort_order.upper() == 'DESC' else 'ASC'
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_mip_site WHERE {where_clause}"
cursor = self._execute_query(conn, count_sql, tuple(params) if params else None)
total = cursor.fetchone()['total']
# 查询数据
offset = (page - 1) * page_size
data_sql = f"""
SELECT * FROM ai_mip_site
WHERE {where_clause}
ORDER BY {sort_by} {sort_order}
LIMIT {ph} OFFSET {ph}
"""
params.extend([page_size, offset])
cursor = self._execute_query(conn, data_sql, tuple(params))
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows], total
except Exception as e:
logger.error(f"分页查询站点失败: {str(e)}")
return [], 0
def delete_sites_batch(self, site_ids: List[int]) -> int:
"""
批量删除站点
Returns:
成功删除的数量
"""
if not site_ids:
return 0
try:
conn = self.get_connection()
placeholders = ','.join(['%s'] * len(site_ids))
sql = f"DELETE FROM ai_mip_site WHERE id IN ({placeholders})"
cursor = conn.cursor()
cursor.execute(sql, tuple(site_ids))
deleted = cursor.rowcount
conn.commit()
conn.close()
logger.info(f"批量删除站点: {deleted}/{len(site_ids)}")
return deleted
except Exception as e:
logger.error(f"批量删除站点失败: {str(e)}")
return 0
def update_sites_status_batch(self, site_ids: List[int], status: str) -> int:
"""
批量更新站点状态
Returns:
成功更新的数量
"""
if not site_ids:
return 0
try:
conn = self.get_connection()
placeholders = ','.join(['%s'] * len(site_ids))
sql = f"UPDATE ai_mip_site SET status = %s WHERE id IN ({placeholders})"
cursor = conn.cursor()
cursor.execute(sql, (status, *site_ids))
updated = cursor.rowcount
conn.commit()
conn.close()
logger.info(f"批量更新站点状态为{status}: {updated}/{len(site_ids)}")
return updated
except Exception as e:
logger.error(f"批量更新站点状态失败: {str(e)}")
return 0
def export_sites(self, status: str = None, keyword: str = None) -> List[Dict]:
"""导出站点数据"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
conditions = []
params = []
if status:
conditions.append(f"status = {ph}")
params.append(status)
if keyword:
conditions.append(f"(site_url LIKE {ph} OR site_name LIKE {ph})")
params.extend([f'%{keyword}%', f'%{keyword}%'])
where_clause = ' AND '.join(conditions) if conditions else '1=1'
sql = f"""
SELECT id, site_url, site_name, status, click_count, reply_count,
frequency, time_start, time_end, site_dimension, query_word,
created_at
FROM ai_mip_site
WHERE {where_clause}
ORDER BY created_at DESC
"""
cursor = self._execute_query(conn, sql, tuple(params) if params else None)
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows]
except Exception as e:
logger.error(f"导出站点数据失败: {str(e)}")
return []
class EnhancedClickManager(ClickManager):
"""增强的点击记录管理器"""
def get_clicks_paginated(
self,
page: int = 1,
page_size: int = 20,
site_id: int = None,
start_date: str = None,
end_date: str = None,
sort_by: str = 'click_time',
sort_order: str = 'desc'
) -> tuple:
"""
分页获取点击记录
Returns:
(点击记录列表, 总数)
"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
conditions = []
params = []
if site_id:
conditions.append(f"c.site_id = {ph}")
params.append(site_id)
if start_date:
conditions.append(f"c.click_time >= {ph}")
params.append(f"{start_date} 00:00:00")
if end_date:
conditions.append(f"c.click_time <= {ph}")
params.append(f"{end_date} 23:59:59")
where_clause = ' AND '.join(conditions) if conditions else '1=1'
allowed_sort_fields = ['click_time', 'site_id', 'device_type']
if sort_by not in allowed_sort_fields:
sort_by = 'click_time'
sort_order = 'DESC' if sort_order.upper() == 'DESC' else 'ASC'
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_mip_click c WHERE {where_clause}"
cursor = self._execute_query(conn, count_sql, tuple(params) if params else None)
total = cursor.fetchone()['total']
# 查询数据
offset = (page - 1) * page_size
data_sql = f"""
SELECT c.*, s.site_name
FROM ai_mip_click c
LEFT JOIN ai_mip_site s ON c.site_id = s.id
WHERE {where_clause}
ORDER BY c.{sort_by} {sort_order}
LIMIT {ph} OFFSET {ph}
"""
params.extend([page_size, offset])
cursor = self._execute_query(conn, data_sql, tuple(params))
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows], total
except Exception as e:
logger.error(f"分页查询点击记录失败: {str(e)}")
return [], 0
def export_clicks(
self,
site_id: int = None,
start_date: str = None,
end_date: str = None
) -> List[Dict]:
"""导出点击记录"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
conditions = []
params = []
if site_id:
conditions.append(f"c.site_id = {ph}")
params.append(site_id)
if start_date:
conditions.append(f"c.click_time >= {ph}")
params.append(f"{start_date} 00:00:00")
if end_date:
conditions.append(f"c.click_time <= {ph}")
params.append(f"{end_date} 23:59:59")
where_clause = ' AND '.join(conditions) if conditions else '1=1'
sql = f"""
SELECT c.id, c.site_id, s.site_name, c.site_url, c.click_time,
c.user_ip, c.device_type, c.task_id
FROM ai_mip_click c
LEFT JOIN ai_mip_site s ON c.site_id = s.id
WHERE {where_clause}
ORDER BY c.click_time DESC
"""
cursor = self._execute_query(conn, sql, tuple(params) if params else None)
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows]
except Exception as e:
logger.error(f"导出点击记录失败: {str(e)}")
return []
class EnhancedInteractionManager(InteractionManager):
"""增强的互动记录管理器"""
def get_interactions_paginated(
self,
page: int = 1,
page_size: int = 20,
site_id: int = None,
start_date: str = None,
end_date: str = None,
status: str = None,
sort_by: str = 'interaction_time',
sort_order: str = 'desc'
) -> tuple:
"""
分页获取互动记录
Returns:
(互动记录列表, 总数)
"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
conditions = []
params = []
if site_id:
conditions.append(f"i.site_id = {ph}")
params.append(site_id)
if start_date:
conditions.append(f"i.interaction_time >= {ph}")
params.append(f"{start_date} 00:00:00")
if end_date:
conditions.append(f"i.interaction_time <= {ph}")
params.append(f"{end_date} 23:59:59")
if status:
conditions.append(f"i.interaction_status = {ph}")
params.append(status)
where_clause = ' AND '.join(conditions) if conditions else '1=1'
allowed_sort_fields = ['interaction_time', 'site_id', 'interaction_status']
if sort_by not in allowed_sort_fields:
sort_by = 'interaction_time'
sort_order = 'DESC' if sort_order.upper() == 'DESC' else 'ASC'
# 查询总数
count_sql = f"SELECT COUNT(*) as total FROM ai_mip_interaction i WHERE {where_clause}"
cursor = self._execute_query(conn, count_sql, tuple(params) if params else None)
total = cursor.fetchone()['total']
# 查询数据
offset = (page - 1) * page_size
data_sql = f"""
SELECT i.*, s.site_name, s.site_url as site_url_ref
FROM ai_mip_interaction i
LEFT JOIN ai_mip_site s ON i.site_id = s.id
WHERE {where_clause}
ORDER BY i.{sort_by} {sort_order}
LIMIT {ph} OFFSET {ph}
"""
params.extend([page_size, offset])
cursor = self._execute_query(conn, data_sql, tuple(params))
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows], total
except Exception as e:
logger.error(f"分页查询互动记录失败: {str(e)}")
return [], 0
def export_interactions(
self,
site_id: int = None,
start_date: str = None,
end_date: str = None
) -> List[Dict]:
"""导出互动记录"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
conditions = []
params = []
if site_id:
conditions.append(f"i.site_id = {ph}")
params.append(site_id)
if start_date:
conditions.append(f"i.interaction_time >= {ph}")
params.append(f"{start_date} 00:00:00")
if end_date:
conditions.append(f"i.interaction_time <= {ph}")
params.append(f"{end_date} 23:59:59")
where_clause = ' AND '.join(conditions) if conditions else '1=1'
sql = f"""
SELECT i.id, i.site_id, s.site_name, s.site_url, i.interaction_time,
i.interaction_type, i.interaction_status, i.reply_content,
i.response_received, i.response_content, i.proxy_ip
FROM ai_mip_interaction i
LEFT JOIN ai_mip_site s ON i.site_id = s.id
WHERE {where_clause}
ORDER BY i.interaction_time DESC
"""
cursor = self._execute_query(conn, sql, tuple(params) if params else None)
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows]
except Exception as e:
logger.error(f"导出互动记录失败: {str(e)}")
return []
class EnhancedStatisticsManager(StatisticsManager):
"""增强的统计管理器,支持图表数据"""
def get_click_trend(self, days: int = 7) -> Dict:
"""
获取点击趋势数据
Args:
days: 天数
Returns:
{'dates': [...], 'clicks': [...], 'successes': [...]}
"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
# 点击趋势
click_sql = f"""
SELECT DATE(click_time) as date, COUNT(*) as count
FROM ai_mip_click
WHERE click_time >= DATE_SUB(CURDATE(), INTERVAL {ph} DAY)
GROUP BY DATE(click_time)
ORDER BY date
"""
cursor = self._execute_query(conn, click_sql, (days,))
click_rows = cursor.fetchall()
# 成功次数趋势is_successful=1
success_sql = f"""
SELECT DATE(interaction_time) as date, COUNT(*) as count
FROM ai_mip_interaction
WHERE interaction_time >= DATE_SUB(CURDATE(), INTERVAL {ph} DAY)
AND is_successful = 1
GROUP BY DATE(interaction_time)
ORDER BY date
"""
cursor = self._execute_query(conn, success_sql, (days,))
success_rows = cursor.fetchall()
conn.close()
# 构建结果
from datetime import timedelta
dates = []
clicks = []
successes = []
click_map = {str(row['date']): row['count'] for row in click_rows}
success_map = {str(row['date']): row['count'] for row in success_rows}
for i in range(days - 1, -1, -1):
date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
dates.append(date)
clicks.append(click_map.get(date, 0))
successes.append(success_map.get(date, 0))
return {
'dates': dates,
'clicks': clicks,
'successes': successes
}
except Exception as e:
logger.error(f"获取点击趋势失败: {str(e)}")
return {'dates': [], 'clicks': [], 'successes': []}
def get_hourly_distribution(self) -> Dict:
"""
获取按小时分布的点击数据
Returns:
{'hours': [0-23], 'clicks': [...]}
"""
try:
conn = self.get_connection()
sql = """
SELECT HOUR(click_time) as hour, COUNT(*) as count
FROM ai_mip_click
WHERE click_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY HOUR(click_time)
ORDER BY hour
"""
cursor = self._execute_query(conn, sql)
rows = cursor.fetchall()
conn.close()
hour_map = {row['hour']: row['count'] for row in rows}
hours = list(range(24))
clicks = [hour_map.get(h, 0) for h in hours]
return {
'hours': hours,
'clicks': clicks
}
except Exception as e:
logger.error(f"获取时段分布失败: {str(e)}")
return {'hours': list(range(24)), 'clicks': [0] * 24}
def get_top_sites(self, limit: int = 10) -> List[Dict]:
"""
获取Top活跃站点
Args:
limit: 数量
Returns:
站点列表 [{'site_name', 'click_count', 'reply_count'}, ...]
"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
sql = f"""
SELECT id, site_name, site_url, click_count, reply_count
FROM ai_mip_site
WHERE status = 'active'
ORDER BY click_count DESC
LIMIT {ph}
"""
cursor = self._execute_query(conn, sql, (limit,))
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows]
except Exception as e:
logger.error(f"获取Top站点失败: {str(e)}")
return []
def get_reply_rate_distribution(self) -> Dict:
"""
获取回复率分布数据(用于饼图)
Returns:
{'labels': [...], 'values': [...]}
"""
try:
conn = self.get_connection()
# 获取总点击和回复
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_click")
total_clicks = cursor.fetchone()['total']
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_interaction WHERE response_received = 1")
total_replies = cursor.fetchone()['total']
conn.close()
no_reply = total_clicks - total_replies if total_clicks > total_replies else 0
return {
'labels': ['有回复', '无回复'],
'values': [total_replies, no_reply]
}
except Exception as e:
logger.error(f"获取回复率分布失败: {str(e)}")
return {'labels': ['有回复', '无回复'], 'values': [0, 0]}
class QueryImportLogManager(DatabaseManager):
"""Query导入日志管理器"""
def ensure_table(self):
"""确保 query_import_log 表存在"""
try:
conn = self.get_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS `query_import_log` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`filename` VARCHAR(255) NOT NULL COMMENT '上传的文件名',
`filepath` VARCHAR(500) NOT NULL COMMENT '文件完整路径',
`upload_time` DATETIME NOT NULL COMMENT '上传时间',
`import_time` DATETIME NULL COMMENT '实际导入时间',
`status` VARCHAR(20) DEFAULT 'pending' COMMENT '导入状态',
`total_count` INT DEFAULT 0 COMMENT '总行数',
`success_count` INT DEFAULT 0 COMMENT '成功插入数',
`skip_count` INT DEFAULT 0 COMMENT '跳过数(已存在)',
`fail_count` INT DEFAULT 0 COMMENT '失败数',
`error_message` TEXT NULL COMMENT '错误信息',
`created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
`updated_at` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX `idx_status` (`status`),
INDEX `idx_upload_time` (`upload_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='关键词导入日志表'
""")
conn.commit()
conn.close()
except Exception as e:
logger.error(f"创建 query_import_log 表失败: {e}")
def create_log(self, filename: str, filepath: str) -> Optional[int]:
"""创建导入日志记录"""
try:
self.ensure_table()
conn = self.get_connection()
ph = self._get_placeholder()
cursor = conn.cursor()
cursor.execute(
f"INSERT INTO query_import_log (filename, filepath, upload_time, status) VALUES ({ph}, {ph}, NOW(), 'pending')",
(filename, filepath)
)
log_id = cursor.lastrowid
conn.commit()
conn.close()
logger.info(f"创建导入日志: {filename} (ID: {log_id})")
return log_id
except Exception as e:
logger.error(f"创建导入日志失败: {e}")
return None
def update_status(self, log_id: int, status: str,
total_count: int = 0, success_count: int = 0,
skip_count: int = 0, fail_count: int = 0,
error_message: str = None):
"""更新导入状态和统计数据"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = conn.cursor()
import_time_sql = ", import_time = NOW()" if status in ('running', 'completed', 'failed') else ""
cursor.execute(
f"""UPDATE query_import_log
SET status = {ph}, total_count = {ph}, success_count = {ph},
skip_count = {ph}, fail_count = {ph}, error_message = {ph}
{import_time_sql}
WHERE id = {ph}""",
(status, total_count, success_count, skip_count, fail_count, error_message, log_id)
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"更新导入日志失败: {e}")
def get_pending_logs(self) -> List[Dict]:
"""获取待处理的导入日志"""
try:
self.ensure_table()
conn = self.get_connection()
cursor = self._execute_query(
conn, "SELECT * FROM query_import_log WHERE status = 'pending' ORDER BY created_at ASC"
)
rows = cursor.fetchall()
conn.close()
return [self._dict_from_row(row) for row in rows]
except Exception as e:
logger.error(f"查询待处理日志失败: {e}")
return []
def get_logs_paginated(self, page: int = 1, page_size: int = 20) -> Dict:
"""分页获取导入日志"""
try:
self.ensure_table()
conn = self.get_connection()
ph = self._get_placeholder()
# 总数
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM query_import_log")
total = cursor.fetchone()['total']
# 分页数据
offset = (page - 1) * page_size
cursor = self._execute_query(
conn,
f"SELECT * FROM query_import_log ORDER BY created_at DESC LIMIT {ph} OFFSET {ph}",
(page_size, offset)
)
rows = cursor.fetchall()
conn.close()
return {
'items': [self._dict_from_row(row) for row in rows],
'total': total,
'page': page,
'page_size': page_size
}
except Exception as e:
logger.error(f"分页查询导入日志失败: {e}")
return {'items': [], 'total': 0, 'page': page, 'page_size': page_size}
def is_file_logged(self, filepath: str) -> bool:
"""检查文件是否已有导入记录"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = self._execute_query(
conn,
f"SELECT COUNT(*) as cnt FROM query_import_log WHERE filepath = {ph}",
(filepath,)
)
cnt = cursor.fetchone()['cnt']
conn.close()
return cnt > 0
except Exception as e:
logger.error(f"检查文件日志失败: {e}")
return False
class QueryKeywordManager(DatabaseManager):
"""Query关键词管理器 - 操作 baidu_keyword 表"""
def insert_keyword(self, keyword: str, seed_id: int = 9999, seed_name: str = '手动提交',
crawled: int = 1, department: str = '', department_id: int = 0,
author_id: int = 0, author_name: str = '') -> int:
"""
插入单条关键词到 baidu_keyword 表INSERT IGNORE
Returns:
affected rows: 1=新插入, 0=已存在被跳过, -1=失败
"""
try:
conn = self.get_connection()
ph = self._get_placeholder()
cursor = conn.cursor()
cursor.execute(
f"""INSERT IGNORE INTO baidu_keyword
(keyword, seed_id, seed_name, crawled, parents_id, created_at,
department, department_id, query_status, author_id, author_name)
VALUES ({ph}, {ph}, {ph}, {ph}, 0, NOW(), {ph}, {ph}, 'manual_review', {ph}, {ph})""",
(keyword, seed_id, seed_name, crawled, department, department_id, author_id, author_name)
)
affected = cursor.rowcount
conn.commit()
conn.close()
return affected
except Exception as e:
logger.error(f"插入关键词失败: {keyword} - {e}")
return -1
def batch_insert_keywords(self, keyword_list: list, seed_id: int = 9999,
seed_name: str = '手动提交', crawled: int = 1,
query_status: str = 'manual_review') -> dict:
"""
批量插入关键词到 baidu_keyword 表INSERT IGNORE
Args:
keyword_list: [{'keyword': str, 'department': str, 'seed_name': str(可选)}, ...]
query_status: 写入的query_status值'draft''manual_review'
Returns:
{'success': int, 'skip': int, 'fail': int}
"""
stats = {'success': 0, 'skip': 0, 'fail': 0}
if not keyword_list:
return stats
try:
conn = self.get_connection()
cursor = conn.cursor()
values = []
for item in keyword_list:
values.append((
item['keyword'], seed_id, seed_name, crawled,
item.get('department', ''), query_status
))
cursor.executemany(
"""INSERT IGNORE INTO baidu_keyword
(keyword, seed_id, seed_name, crawled, parents_id, created_at,
department, department_id, query_status, author_id, author_name)
VALUES (%s, %s, %s, %s, 0, NOW(), %s, 0, %s, 0, '')""",
values
)
# executemany 的 rowcount 返回实际插入的行数
inserted = cursor.rowcount
conn.commit()
conn.close()
stats['success'] = inserted
stats['skip'] = len(keyword_list) - inserted
return stats
except Exception as e:
logger.error(f"批量插入关键词失败: {e}")
stats['fail'] = len(keyword_list)
return stats