2026-01-16 22:06:46 +08:00
|
|
|
|
"""
|
|
|
|
|
|
数据库管理器
|
|
|
|
|
|
使用 MySQL 数据库
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
|
import random
|
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
from typing import List, Dict, Optional, Union
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
from loguru import logger
|
|
|
|
|
|
from config import Config
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
import pymysql
|
|
|
|
|
|
MYSQL_AVAILABLE = True
|
|
|
|
|
|
except ImportError:
|
|
|
|
|
|
MYSQL_AVAILABLE = False
|
|
|
|
|
|
logger.error("pymysql 未安装,请安装: pip install pymysql")
|
|
|
|
|
|
raise ImportError("使用 MySQL 需要安装 pymysql: pip install pymysql")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DatabaseManager:
|
|
|
|
|
|
"""数据库管理器,使用 MySQL"""
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, db_path: str = None):
|
|
|
|
|
|
"""
|
|
|
|
|
|
初始化数据库连接
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
db_path: 忽略,仅为了兼容性
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not MYSQL_AVAILABLE:
|
|
|
|
|
|
raise ImportError("使用 MySQL 需要安装 pymysql: pip install pymysql")
|
|
|
|
|
|
|
|
|
|
|
|
self.db_config = {
|
|
|
|
|
|
'host': Config.MYSQL_HOST,
|
|
|
|
|
|
'port': Config.MYSQL_PORT,
|
|
|
|
|
|
'user': Config.MYSQL_USER,
|
|
|
|
|
|
'password': Config.MYSQL_PASSWORD,
|
|
|
|
|
|
'database': Config.MYSQL_DATABASE,
|
|
|
|
|
|
'charset': 'utf8mb4'
|
|
|
|
|
|
}
|
2026-01-21 14:33:10 +08:00
|
|
|
|
|
2026-01-16 22:06:46 +08:00
|
|
|
|
def get_connection(self) -> 'pymysql.Connection':
|
|
|
|
|
|
"""获取MySQL数据库连接"""
|
|
|
|
|
|
conn = pymysql.connect(**self.db_config)
|
|
|
|
|
|
return conn
|
|
|
|
|
|
|
|
|
|
|
|
def _dict_from_row(self, row) -> Dict:
|
2026-02-24 12:46:35 +08:00
|
|
|
|
"""将数据库行转换为字典,处理特殊类型"""
|
2026-01-16 22:06:46 +08:00
|
|
|
|
if row is None:
|
|
|
|
|
|
return None
|
2026-02-24 12:46:35 +08:00
|
|
|
|
|
|
|
|
|
|
result = dict(row) if isinstance(row, dict) else row
|
|
|
|
|
|
|
|
|
|
|
|
# 处理特殊类型,确保JSON可序列化
|
|
|
|
|
|
if isinstance(result, dict):
|
|
|
|
|
|
from datetime import datetime, date, timedelta
|
|
|
|
|
|
from decimal import Decimal
|
|
|
|
|
|
|
|
|
|
|
|
for key, value in result.items():
|
|
|
|
|
|
if isinstance(value, datetime):
|
|
|
|
|
|
result[key] = value.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
|
|
elif isinstance(value, date):
|
|
|
|
|
|
result[key] = value.strftime('%Y-%m-%d')
|
|
|
|
|
|
elif isinstance(value, timedelta):
|
|
|
|
|
|
# 将timedelta转换为字符串格式 HH:MM:SS
|
|
|
|
|
|
total_seconds = int(value.total_seconds())
|
|
|
|
|
|
hours, remainder = divmod(total_seconds, 3600)
|
|
|
|
|
|
minutes, seconds = divmod(remainder, 60)
|
|
|
|
|
|
result[key] = f'{hours:02d}:{minutes:02d}:{seconds:02d}'
|
|
|
|
|
|
elif isinstance(value, Decimal):
|
|
|
|
|
|
result[key] = float(value)
|
|
|
|
|
|
|
|
|
|
|
|
return result
|
2026-01-16 22:06:46 +08:00
|
|
|
|
|
|
|
|
|
|
def _get_placeholder(self) -> str:
|
|
|
|
|
|
"""获取SQL占位符,MySQL使用 %s"""
|
|
|
|
|
|
return '%s'
|
|
|
|
|
|
|
|
|
|
|
|
def _execute_query(self, conn, sql: str, params: tuple = None):
|
|
|
|
|
|
"""执行SQL查询,使用DictCursor"""
|
|
|
|
|
|
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
|
|
|
|
|
|
|
|
|
|
|
if params:
|
|
|
|
|
|
cursor.execute(sql, params)
|
|
|
|
|
|
else:
|
|
|
|
|
|
cursor.execute(sql)
|
|
|
|
|
|
|
|
|
|
|
|
return cursor
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class SiteManager(DatabaseManager):
|
|
|
|
|
|
"""站点管理"""
|
|
|
|
|
|
|
|
|
|
|
|
def add_site(self, site_url: str, site_name: str = None,
|
2026-01-21 14:33:10 +08:00
|
|
|
|
site_dimension: str = None, query_word: str = None,
|
|
|
|
|
|
frequency: int = None,
|
2026-01-16 22:06:46 +08:00
|
|
|
|
time_start: str = None, time_end: str = None,
|
|
|
|
|
|
interval_minutes: int = None) -> Optional[int]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
添加新站点
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
site_url: 网站URL
|
|
|
|
|
|
site_name: 网站名称
|
|
|
|
|
|
site_dimension: 网站维度标签
|
2026-01-21 14:33:10 +08:00
|
|
|
|
query_word: 来源查询词(从哪个关键词抓取)
|
2026-01-16 22:06:46 +08:00
|
|
|
|
frequency: 频次
|
|
|
|
|
|
time_start: 开始时间
|
|
|
|
|
|
time_end: 结束时间
|
|
|
|
|
|
interval_minutes: 执行间隔(分钟)
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
站点ID,失败返回None
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
|
|
|
|
|
|
|
|
|
|
|
# 生成随机目标点击次数(兼容原有逻辑)
|
|
|
|
|
|
target_clicks = random.randint(
|
|
|
|
|
|
getattr(Config, 'MIN_CLICK_COUNT', 1),
|
|
|
|
|
|
getattr(Config, 'MAX_CLICK_COUNT', 10)
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
INSERT INTO ai_mip_site (
|
|
|
|
|
|
site_url, site_name, status, frequency,
|
|
|
|
|
|
time_start, time_end, interval_minutes,
|
2026-01-21 14:33:10 +08:00
|
|
|
|
site_dimension, query_word, created_by
|
|
|
|
|
|
) VALUES ({ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph})
|
2026-01-16 22:06:46 +08:00
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
cursor.execute(sql, (
|
|
|
|
|
|
site_url,
|
|
|
|
|
|
site_name or site_url,
|
|
|
|
|
|
'active',
|
|
|
|
|
|
frequency or 1,
|
|
|
|
|
|
time_start or '09:00:00',
|
|
|
|
|
|
time_end or '21:00:00',
|
2026-01-21 14:33:10 +08:00
|
|
|
|
interval_minutes or 30,
|
2026-01-16 22:06:46 +08:00
|
|
|
|
site_dimension,
|
2026-01-21 14:33:10 +08:00
|
|
|
|
query_word, # 新增:来源查询词
|
2026-01-16 22:06:46 +08:00
|
|
|
|
'system'
|
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
|
|
site_id = cursor.lastrowid
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
2026-01-21 14:33:10 +08:00
|
|
|
|
logger.info(f"成功添加站点: {site_url} (ID: {site_id}, 查询词: {query_word})")
|
2026-01-16 22:06:46 +08:00
|
|
|
|
return site_id
|
|
|
|
|
|
|
|
|
|
|
|
except pymysql.IntegrityError:
|
|
|
|
|
|
logger.warning(f"站点URL已存在: {site_url}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"添加站点失败: {str(e)}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def get_site_by_url(self, site_url: str) -> Optional[Dict]:
|
|
|
|
|
|
"""根据URL获取站点信息"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = self._execute_query(conn, f"SELECT * FROM ai_mip_site WHERE site_url = {ph}", (site_url,))
|
|
|
|
|
|
row = cursor.fetchone()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return self._dict_from_row(row) if row else None
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询站点失败: {str(e)}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def get_site_by_id(self, site_id: int) -> Optional[Dict]:
|
|
|
|
|
|
"""根据ID获取站点信息"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = self._execute_query(conn, f"SELECT * FROM ai_mip_site WHERE id = {ph}", (site_id,))
|
|
|
|
|
|
row = cursor.fetchone()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return self._dict_from_row(row) if row else None
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询站点失败: {str(e)}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def get_active_sites(self) -> List[Dict]:
|
|
|
|
|
|
"""获取所有活跃站点"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
cursor = self._execute_query(conn, "SELECT * FROM ai_mip_site WHERE status = 'active' ORDER BY created_at DESC")
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows]
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询活跃站点失败: {str(e)}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_sites(self) -> List[Dict]:
|
|
|
|
|
|
"""获取所有站点"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
cursor = self._execute_query(conn, "SELECT * FROM ai_mip_site ORDER BY created_at DESC")
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows]
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询所有站点失败: {str(e)}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def update_site_status(self, site_id: int, status: str) -> bool:
|
|
|
|
|
|
"""更新站点状态"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = self._execute_query(
|
|
|
|
|
|
conn,
|
|
|
|
|
|
f"UPDATE ai_mip_site SET status = {ph}, updated_by = {ph} WHERE id = {ph}",
|
|
|
|
|
|
(status, 'system', site_id)
|
|
|
|
|
|
)
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
logger.info(f"更新站点状态: ID={site_id}, status={status}")
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"更新站点状态失败: {str(e)}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def increment_click_count(self, site_id: int, count: int = 1) -> bool:
|
|
|
|
|
|
"""增加点击次数"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = self._execute_query(
|
|
|
|
|
|
conn,
|
|
|
|
|
|
f"UPDATE ai_mip_site SET click_count = click_count + {ph} WHERE id = {ph}",
|
|
|
|
|
|
(count, site_id)
|
|
|
|
|
|
)
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"更新点击次数失败: {str(e)}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def increment_reply_count(self, site_id: int, count: int = 1) -> bool:
|
|
|
|
|
|
"""增加回复次数"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = self._execute_query(
|
|
|
|
|
|
conn,
|
|
|
|
|
|
f"UPDATE ai_mip_site SET reply_count = reply_count + {ph} WHERE id = {ph}",
|
|
|
|
|
|
(count, site_id)
|
|
|
|
|
|
)
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"更新回复次数失败: {str(e)}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def delete_site(self, site_id: int) -> bool:
|
|
|
|
|
|
"""删除站点"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = self._execute_query(conn, f"DELETE FROM ai_mip_site WHERE id = {ph}", (site_id,))
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
logger.info(f"已删除站点: ID={site_id}")
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"删除站点失败: {str(e)}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ClickManager(DatabaseManager):
|
|
|
|
|
|
"""点击记录管理"""
|
|
|
|
|
|
|
|
|
|
|
|
def record_click(self, site_id: int, site_url: str,
|
|
|
|
|
|
user_ip: str = None, device_type: str = 'pc',
|
|
|
|
|
|
task_id: str = None) -> Optional[int]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
记录一次点击
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
site_id: 站点ID
|
|
|
|
|
|
site_url: 站点URL
|
|
|
|
|
|
user_ip: 用户IP(代理IP)
|
|
|
|
|
|
device_type: 设备类型
|
|
|
|
|
|
task_id: 任务ID
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
点击记录ID
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
|
|
|
|
|
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
INSERT INTO ai_mip_click (
|
|
|
|
|
|
site_id, site_url, click_time, user_ip,
|
|
|
|
|
|
device_type, task_id, operator
|
|
|
|
|
|
) VALUES ({ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph})
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
cursor.execute(sql, (
|
|
|
|
|
|
site_id,
|
|
|
|
|
|
site_url,
|
|
|
|
|
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
|
|
|
|
|
user_ip,
|
|
|
|
|
|
device_type,
|
|
|
|
|
|
task_id or f'TASK_{datetime.now().strftime("%Y%m%d%H%M%S")}',
|
|
|
|
|
|
'RPA_SYSTEM'
|
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
|
|
click_id = cursor.lastrowid
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
# 更新站点点击次数
|
|
|
|
|
|
site_mgr = SiteManager()
|
|
|
|
|
|
site_mgr.increment_click_count(site_id)
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"记录点击: site_id={site_id}, click_id={click_id}")
|
|
|
|
|
|
return click_id
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"记录点击失败: {str(e)}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def get_clicks_by_site(self, site_id: int, limit: int = 100) -> List[Dict]:
|
|
|
|
|
|
"""获取站点的点击记录"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
|
SELECT * FROM ai_mip_click
|
|
|
|
|
|
WHERE site_id = ?
|
|
|
|
|
|
ORDER BY click_time DESC
|
|
|
|
|
|
LIMIT ?
|
|
|
|
|
|
""", (site_id, limit))
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows]
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询点击记录失败: {str(e)}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def get_click_count_by_site(self, site_id: int) -> int:
|
|
|
|
|
|
"""获取站点的总点击次数"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute("SELECT COUNT(*) as count FROM ai_mip_click WHERE site_id = ?", (site_id,))
|
|
|
|
|
|
row = cursor.fetchone()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return row['count'] if row else 0
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询点击次数失败: {str(e)}")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class InteractionManager(DatabaseManager):
|
|
|
|
|
|
"""互动记录管理"""
|
|
|
|
|
|
|
|
|
|
|
|
def record_interaction(self, site_id: int, click_id: int = None,
|
|
|
|
|
|
task_id: str = None, interaction_type: str = 'reply',
|
|
|
|
|
|
reply_content: str = None, is_successful: bool = False,
|
|
|
|
|
|
response_received: bool = False, response_content: str = None,
|
|
|
|
|
|
proxy_ip: str = None, fingerprint_id: str = None,
|
|
|
|
|
|
error_message: str = None) -> Optional[int]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
记录一次互动
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
site_id: 站点ID
|
|
|
|
|
|
click_id: 关联的点击记录ID
|
|
|
|
|
|
task_id: 任务ID
|
|
|
|
|
|
interaction_type: 互动类型(reply/comment等)
|
|
|
|
|
|
reply_content: 回复内容
|
|
|
|
|
|
is_successful: 是否成功
|
|
|
|
|
|
response_received: 是否收到回复
|
|
|
|
|
|
response_content: 对方回复内容
|
|
|
|
|
|
proxy_ip: 使用的代理IP
|
|
|
|
|
|
fingerprint_id: 浏览器指纹ID
|
|
|
|
|
|
error_message: 错误信息
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
互动记录ID
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
cursor = conn.cursor(pymysql.cursors.DictCursor)
|
|
|
|
|
|
|
|
|
|
|
|
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
|
|
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
INSERT INTO ai_mip_interaction (
|
|
|
|
|
|
site_id, click_id, task_id, interaction_type,
|
|
|
|
|
|
interaction_time, interaction_status, reply_content,
|
|
|
|
|
|
execution_mode, browser_type, proxy_ip, fingerprint_id,
|
|
|
|
|
|
response_received, response_content, is_successful,
|
|
|
|
|
|
error_message, operator
|
|
|
|
|
|
) VALUES ({ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph})
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
cursor.execute(sql, (
|
|
|
|
|
|
site_id,
|
|
|
|
|
|
click_id,
|
|
|
|
|
|
task_id or f'TASK_{datetime.now().strftime("%Y%m%d%H%M%S")}',
|
|
|
|
|
|
interaction_type,
|
|
|
|
|
|
now,
|
|
|
|
|
|
'success' if is_successful else 'failed',
|
|
|
|
|
|
reply_content,
|
|
|
|
|
|
'auto',
|
|
|
|
|
|
'playwright',
|
|
|
|
|
|
proxy_ip,
|
|
|
|
|
|
fingerprint_id,
|
|
|
|
|
|
1 if response_received else 0,
|
|
|
|
|
|
response_content,
|
|
|
|
|
|
1 if is_successful else 0,
|
|
|
|
|
|
error_message,
|
|
|
|
|
|
'RPA_SYSTEM'
|
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
|
|
interaction_id = cursor.lastrowid
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
# 如果收到回复,更新站点回复次数
|
|
|
|
|
|
if response_received:
|
|
|
|
|
|
site_mgr = SiteManager()
|
|
|
|
|
|
site_mgr.increment_reply_count(site_id)
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"记录互动: site_id={site_id}, interaction_id={interaction_id}, success={is_successful}")
|
|
|
|
|
|
return interaction_id
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"记录互动失败: {str(e)}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def get_interactions_by_site(self, site_id: int, limit: int = 100) -> List[Dict]:
|
|
|
|
|
|
"""获取站点的互动记录"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
|
SELECT * FROM ai_mip_interaction
|
|
|
|
|
|
WHERE site_id = ?
|
|
|
|
|
|
ORDER BY interaction_time DESC
|
|
|
|
|
|
LIMIT ?
|
|
|
|
|
|
""", (site_id, limit))
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows]
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询互动记录失败: {str(e)}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def get_successful_interactions_count(self, site_id: int) -> int:
|
|
|
|
|
|
"""获取站点的成功互动次数"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
|
SELECT COUNT(*) as count FROM ai_mip_interaction
|
|
|
|
|
|
WHERE site_id = ? AND is_successful = 1
|
|
|
|
|
|
""", (site_id,))
|
|
|
|
|
|
row = cursor.fetchone()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return row['count'] if row else 0
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询成功互动次数失败: {str(e)}")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StatisticsManager(DatabaseManager):
|
|
|
|
|
|
"""统计数据管理"""
|
|
|
|
|
|
|
|
|
|
|
|
def get_statistics(self) -> Dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
获取全局统计数据
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
统计数据字典
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
|
|
|
|
|
|
# 站点统计
|
|
|
|
|
|
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_site")
|
|
|
|
|
|
total_sites = cursor.fetchone()['total']
|
|
|
|
|
|
|
|
|
|
|
|
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_site WHERE status = 'active'")
|
|
|
|
|
|
active_sites = cursor.fetchone()['total']
|
|
|
|
|
|
|
|
|
|
|
|
# 点击统计
|
|
|
|
|
|
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_click")
|
|
|
|
|
|
total_clicks = cursor.fetchone()['total']
|
|
|
|
|
|
|
|
|
|
|
|
# 互动统计
|
|
|
|
|
|
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_interaction WHERE response_received = 1")
|
|
|
|
|
|
total_replies = cursor.fetchone()['total']
|
|
|
|
|
|
|
|
|
|
|
|
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_interaction WHERE is_successful = 1")
|
|
|
|
|
|
successful_interactions = cursor.fetchone()['total']
|
|
|
|
|
|
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
reply_rate = (total_replies / total_clicks * 100) if total_clicks > 0 else 0
|
|
|
|
|
|
success_rate = (successful_interactions / total_clicks * 100) if total_clicks > 0 else 0
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
'total_sites': total_sites,
|
|
|
|
|
|
'active_sites': active_sites,
|
|
|
|
|
|
'total_clicks': total_clicks,
|
|
|
|
|
|
'total_replies': total_replies,
|
|
|
|
|
|
'successful_interactions': successful_interactions,
|
|
|
|
|
|
'reply_rate': f"{reply_rate:.2f}%",
|
|
|
|
|
|
'success_rate': f"{success_rate:.2f}%"
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取统计数据失败: {str(e)}")
|
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
def get_site_statistics(self, site_id: int) -> Dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
获取单个站点的统计数据
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
site_id: 站点ID
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
站点统计数据
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
site_mgr = SiteManager(self.db_path)
|
|
|
|
|
|
click_mgr = ClickManager(self.db_path)
|
|
|
|
|
|
interaction_mgr = InteractionManager(self.db_path)
|
|
|
|
|
|
|
|
|
|
|
|
site = site_mgr.get_site_by_id(site_id)
|
|
|
|
|
|
if not site:
|
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
click_count = click_mgr.get_click_count_by_site(site_id)
|
|
|
|
|
|
success_count = interaction_mgr.get_successful_interactions_count(site_id)
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
'site_url': site['site_url'],
|
|
|
|
|
|
'site_name': site['site_name'],
|
|
|
|
|
|
'status': site['status'],
|
|
|
|
|
|
'click_count': click_count,
|
|
|
|
|
|
'reply_count': site['reply_count'],
|
|
|
|
|
|
'successful_interactions': success_count,
|
|
|
|
|
|
'reply_rate': f"{(site['reply_count'] / click_count * 100) if click_count > 0 else 0:.2f}%"
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取站点统计失败: {str(e)}")
|
|
|
|
|
|
return {}
|
2026-01-21 14:33:10 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class QueryTaskManager(DatabaseManager):
|
|
|
|
|
|
"""查询任务管理器"""
|
|
|
|
|
|
|
|
|
|
|
|
def create_task(self, query_word: str, task_date: str = None,
|
|
|
|
|
|
query_type: str = 'keyword', threshold_max: int = 100,
|
|
|
|
|
|
priority: int = 5, category: str = None,
|
|
|
|
|
|
source_platform: str = 'baidu',
|
|
|
|
|
|
created_by: str = 'system',
|
|
|
|
|
|
remark: str = None) -> Optional[int]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
创建查询任务
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
query_word: 查询词
|
|
|
|
|
|
task_date: 任务日期 YYYYMMDD,默认今天
|
|
|
|
|
|
query_type: 查询类型
|
|
|
|
|
|
threshold_max: 最大抓取数量
|
|
|
|
|
|
priority: 优先级 1-10
|
|
|
|
|
|
category: 分类标签
|
|
|
|
|
|
source_platform: 来源平台
|
|
|
|
|
|
created_by: 创建人
|
|
|
|
|
|
remark: 备注
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
任务ID,失败返回None
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
if task_date is None:
|
|
|
|
|
|
task_date = datetime.now().strftime('%Y%m%d')
|
|
|
|
|
|
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
INSERT INTO ai_mip_query_task (
|
|
|
|
|
|
query_word, query_type, task_date, threshold_max,
|
|
|
|
|
|
priority, category, source_platform, created_by, remark
|
|
|
|
|
|
) VALUES ({ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph}, {ph})
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute(sql, (
|
|
|
|
|
|
query_word, query_type, task_date, threshold_max,
|
|
|
|
|
|
priority, category, source_platform, created_by, remark
|
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
|
|
task_id = cursor.lastrowid
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"创建查询任务成功: {query_word} (ID: {task_id})")
|
|
|
|
|
|
return task_id
|
|
|
|
|
|
|
|
|
|
|
|
except pymysql.IntegrityError:
|
|
|
|
|
|
logger.warning(f"查询任务已存在: {query_word} @ {task_date}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"创建查询任务失败: {str(e)}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def get_task_by_id(self, task_id: int) -> Optional[Dict]:
|
|
|
|
|
|
"""根据ID获取任务"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = self._execute_query(
|
|
|
|
|
|
conn,
|
|
|
|
|
|
f"SELECT * FROM ai_mip_query_task WHERE id = {ph}",
|
|
|
|
|
|
(task_id,)
|
|
|
|
|
|
)
|
|
|
|
|
|
row = cursor.fetchone()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return self._dict_from_row(row) if row else None
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询任务失败: {str(e)}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def get_ready_tasks(self, limit: int = None) -> List[Dict]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
获取准备执行的任务(按优先级排序)
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
limit: 限制数量
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
任务列表
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
sql = "SELECT * FROM ai_mip_query_task WHERE status = 'ready' ORDER BY priority ASC, created_at ASC"
|
|
|
|
|
|
|
|
|
|
|
|
if limit:
|
|
|
|
|
|
sql += f" LIMIT {limit}"
|
|
|
|
|
|
|
|
|
|
|
|
cursor = self._execute_query(conn, sql)
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows]
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询ready任务失败: {str(e)}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def get_tasks_by_date(self, task_date: str) -> List[Dict]:
|
|
|
|
|
|
"""根据日期获取任务"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = self._execute_query(
|
|
|
|
|
|
conn,
|
|
|
|
|
|
f"SELECT * FROM ai_mip_query_task WHERE task_date = {ph} ORDER BY priority ASC",
|
|
|
|
|
|
(task_date,)
|
|
|
|
|
|
)
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows]
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询日期任务失败: {str(e)}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def update_task_status(self, task_id: int, status: str,
|
|
|
|
|
|
error_message: str = None) -> bool:
|
|
|
|
|
|
"""
|
|
|
|
|
|
更新任务状态
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
task_id: 任务ID
|
|
|
|
|
|
status: 状态 ready/doing/failed/finished/closed
|
|
|
|
|
|
error_message: 错误信息(失败时)
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
# 根据状态更新时间字段
|
|
|
|
|
|
timestamp_field = None
|
|
|
|
|
|
if status == 'doing':
|
|
|
|
|
|
timestamp_field = 'started_at'
|
|
|
|
|
|
elif status in ['finished', 'failed']:
|
|
|
|
|
|
timestamp_field = 'finished_at'
|
|
|
|
|
|
elif status == 'closed':
|
|
|
|
|
|
timestamp_field = 'closed_at'
|
|
|
|
|
|
|
|
|
|
|
|
if timestamp_field:
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
UPDATE ai_mip_query_task
|
|
|
|
|
|
SET status = {ph}, {timestamp_field} = NOW()
|
|
|
|
|
|
WHERE id = {ph}
|
|
|
|
|
|
"""
|
|
|
|
|
|
params = (status, task_id)
|
|
|
|
|
|
else:
|
|
|
|
|
|
sql = f"UPDATE ai_mip_query_task SET status = {ph} WHERE id = {ph}"
|
|
|
|
|
|
params = (status, task_id)
|
|
|
|
|
|
|
|
|
|
|
|
# 如果有错误信息,更新error_message
|
|
|
|
|
|
if error_message:
|
|
|
|
|
|
sql = sql.replace('WHERE', f", error_message = '{error_message}' WHERE")
|
|
|
|
|
|
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute(sql, params)
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"更新任务状态: {task_id} -> {status}")
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"更新任务状态失败: {str(e)}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def increment_crawl_count(self, task_id: int,
|
|
|
|
|
|
crawl_count: int = 1,
|
|
|
|
|
|
valid_count: int = 0) -> bool:
|
|
|
|
|
|
"""
|
|
|
|
|
|
增加抓取计数
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
task_id: 任务ID
|
|
|
|
|
|
crawl_count: 抓取URL数量
|
|
|
|
|
|
valid_count: 有效URL数量(带广告)
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
UPDATE ai_mip_query_task
|
|
|
|
|
|
SET crawl_url_count = crawl_url_count + {ph},
|
|
|
|
|
|
valid_url_count = valid_url_count + {ph},
|
|
|
|
|
|
current_count = current_count + {ph}
|
|
|
|
|
|
WHERE id = {ph}
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute(sql, (crawl_count, valid_count, valid_count, task_id))
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"更新抓取计数失败: {str(e)}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def check_threshold(self, task_id: int) -> bool:
|
|
|
|
|
|
"""
|
|
|
|
|
|
检查是否达到阈值,达到则自动关闭任务
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
True=已达到阈值, False=未达到
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
task = self.get_task_by_id(task_id)
|
|
|
|
|
|
if not task:
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
if task['current_count'] >= task['threshold_max']:
|
|
|
|
|
|
self.update_task_status(task_id, 'closed')
|
|
|
|
|
|
logger.info(f"任务达到阈值并关闭: {task['query_word']} ({task['current_count']}/{task['threshold_max']})")
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"检查阈值失败: {str(e)}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def get_task_statistics(self, task_date: str = None) -> Dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
获取任务统计信息
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
task_date: 日期,为None则统计所有
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
|
|
|
|
|
|
if task_date:
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
where_clause = f"WHERE task_date = {ph}"
|
|
|
|
|
|
params = (task_date,)
|
|
|
|
|
|
else:
|
|
|
|
|
|
where_clause = ""
|
|
|
|
|
|
params = None
|
|
|
|
|
|
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
SELECT
|
|
|
|
|
|
COUNT(*) as total_tasks,
|
|
|
|
|
|
SUM(CASE WHEN status = 'ready' THEN 1 ELSE 0 END) as ready_count,
|
|
|
|
|
|
SUM(CASE WHEN status = 'doing' THEN 1 ELSE 0 END) as doing_count,
|
|
|
|
|
|
SUM(CASE WHEN status = 'finished' THEN 1 ELSE 0 END) as finished_count,
|
|
|
|
|
|
SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_count,
|
|
|
|
|
|
SUM(CASE WHEN status = 'closed' THEN 1 ELSE 0 END) as closed_count,
|
|
|
|
|
|
SUM(crawl_url_count) as total_crawled,
|
|
|
|
|
|
SUM(valid_url_count) as total_valid
|
|
|
|
|
|
FROM ai_mip_query_task
|
|
|
|
|
|
{where_clause}
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
cursor = self._execute_query(conn, sql, params)
|
|
|
|
|
|
row = cursor.fetchone()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
return self._dict_from_row(row) if row else {}
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取任务统计失败: {str(e)}")
|
|
|
|
|
|
return {}
|
2026-02-24 12:46:35 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EnhancedSiteManager(SiteManager):
|
|
|
|
|
|
"""增强的站点管理器,支持分页、排序、筛选"""
|
|
|
|
|
|
|
|
|
|
|
|
def get_sites_paginated(
|
|
|
|
|
|
self,
|
|
|
|
|
|
page: int = 1,
|
|
|
|
|
|
page_size: int = 20,
|
|
|
|
|
|
status: str = None,
|
|
|
|
|
|
keyword: str = None,
|
|
|
|
|
|
sort_by: str = 'created_at',
|
|
|
|
|
|
sort_order: str = 'desc'
|
|
|
|
|
|
) -> tuple:
|
|
|
|
|
|
"""
|
|
|
|
|
|
分页获取站点列表
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
(站点列表, 总数)
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
# 构建WHERE条件
|
|
|
|
|
|
conditions = []
|
|
|
|
|
|
params = []
|
|
|
|
|
|
|
|
|
|
|
|
if status:
|
|
|
|
|
|
conditions.append(f"status = {ph}")
|
|
|
|
|
|
params.append(status)
|
|
|
|
|
|
|
|
|
|
|
|
if keyword:
|
|
|
|
|
|
conditions.append(f"(site_url LIKE {ph} OR site_name LIKE {ph})")
|
|
|
|
|
|
params.extend([f'%{keyword}%', f'%{keyword}%'])
|
|
|
|
|
|
|
|
|
|
|
|
where_clause = ' AND '.join(conditions) if conditions else '1=1'
|
|
|
|
|
|
|
|
|
|
|
|
# 允许的排序字段
|
|
|
|
|
|
allowed_sort_fields = ['created_at', 'click_count', 'reply_count', 'site_url', 'status']
|
|
|
|
|
|
if sort_by not in allowed_sort_fields:
|
|
|
|
|
|
sort_by = 'created_at'
|
|
|
|
|
|
|
|
|
|
|
|
sort_order = 'DESC' if sort_order.upper() == 'DESC' else 'ASC'
|
|
|
|
|
|
|
|
|
|
|
|
# 查询总数
|
|
|
|
|
|
count_sql = f"SELECT COUNT(*) as total FROM ai_mip_site WHERE {where_clause}"
|
|
|
|
|
|
cursor = self._execute_query(conn, count_sql, tuple(params) if params else None)
|
|
|
|
|
|
total = cursor.fetchone()['total']
|
|
|
|
|
|
|
|
|
|
|
|
# 查询数据
|
|
|
|
|
|
offset = (page - 1) * page_size
|
|
|
|
|
|
data_sql = f"""
|
|
|
|
|
|
SELECT * FROM ai_mip_site
|
|
|
|
|
|
WHERE {where_clause}
|
|
|
|
|
|
ORDER BY {sort_by} {sort_order}
|
|
|
|
|
|
LIMIT {ph} OFFSET {ph}
|
|
|
|
|
|
"""
|
|
|
|
|
|
params.extend([page_size, offset])
|
|
|
|
|
|
|
|
|
|
|
|
cursor = self._execute_query(conn, data_sql, tuple(params))
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows], total
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"分页查询站点失败: {str(e)}")
|
|
|
|
|
|
return [], 0
|
|
|
|
|
|
|
|
|
|
|
|
def delete_sites_batch(self, site_ids: List[int]) -> int:
|
|
|
|
|
|
"""
|
|
|
|
|
|
批量删除站点
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
成功删除的数量
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not site_ids:
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
placeholders = ','.join(['%s'] * len(site_ids))
|
|
|
|
|
|
sql = f"DELETE FROM ai_mip_site WHERE id IN ({placeholders})"
|
|
|
|
|
|
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute(sql, tuple(site_ids))
|
|
|
|
|
|
deleted = cursor.rowcount
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"批量删除站点: {deleted}/{len(site_ids)}")
|
|
|
|
|
|
return deleted
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"批量删除站点失败: {str(e)}")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
def update_sites_status_batch(self, site_ids: List[int], status: str) -> int:
|
|
|
|
|
|
"""
|
|
|
|
|
|
批量更新站点状态
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
成功更新的数量
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not site_ids:
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
placeholders = ','.join(['%s'] * len(site_ids))
|
|
|
|
|
|
sql = f"UPDATE ai_mip_site SET status = %s WHERE id IN ({placeholders})"
|
|
|
|
|
|
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute(sql, (status, *site_ids))
|
|
|
|
|
|
updated = cursor.rowcount
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"批量更新站点状态为{status}: {updated}/{len(site_ids)}")
|
|
|
|
|
|
return updated
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"批量更新站点状态失败: {str(e)}")
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
def export_sites(self, status: str = None, keyword: str = None) -> List[Dict]:
|
|
|
|
|
|
"""导出站点数据"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
conditions = []
|
|
|
|
|
|
params = []
|
|
|
|
|
|
|
|
|
|
|
|
if status:
|
|
|
|
|
|
conditions.append(f"status = {ph}")
|
|
|
|
|
|
params.append(status)
|
|
|
|
|
|
|
|
|
|
|
|
if keyword:
|
|
|
|
|
|
conditions.append(f"(site_url LIKE {ph} OR site_name LIKE {ph})")
|
|
|
|
|
|
params.extend([f'%{keyword}%', f'%{keyword}%'])
|
|
|
|
|
|
|
|
|
|
|
|
where_clause = ' AND '.join(conditions) if conditions else '1=1'
|
|
|
|
|
|
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
SELECT id, site_url, site_name, status, click_count, reply_count,
|
|
|
|
|
|
frequency, time_start, time_end, site_dimension, query_word,
|
|
|
|
|
|
created_at
|
|
|
|
|
|
FROM ai_mip_site
|
|
|
|
|
|
WHERE {where_clause}
|
|
|
|
|
|
ORDER BY created_at DESC
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
cursor = self._execute_query(conn, sql, tuple(params) if params else None)
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows]
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"导出站点数据失败: {str(e)}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EnhancedClickManager(ClickManager):
|
|
|
|
|
|
"""增强的点击记录管理器"""
|
|
|
|
|
|
|
|
|
|
|
|
def get_clicks_paginated(
|
|
|
|
|
|
self,
|
|
|
|
|
|
page: int = 1,
|
|
|
|
|
|
page_size: int = 20,
|
|
|
|
|
|
site_id: int = None,
|
|
|
|
|
|
start_date: str = None,
|
|
|
|
|
|
end_date: str = None,
|
|
|
|
|
|
sort_by: str = 'click_time',
|
|
|
|
|
|
sort_order: str = 'desc'
|
|
|
|
|
|
) -> tuple:
|
|
|
|
|
|
"""
|
|
|
|
|
|
分页获取点击记录
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
(点击记录列表, 总数)
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
conditions = []
|
|
|
|
|
|
params = []
|
|
|
|
|
|
|
|
|
|
|
|
if site_id:
|
|
|
|
|
|
conditions.append(f"c.site_id = {ph}")
|
|
|
|
|
|
params.append(site_id)
|
|
|
|
|
|
|
|
|
|
|
|
if start_date:
|
|
|
|
|
|
conditions.append(f"c.click_time >= {ph}")
|
|
|
|
|
|
params.append(f"{start_date} 00:00:00")
|
|
|
|
|
|
|
|
|
|
|
|
if end_date:
|
|
|
|
|
|
conditions.append(f"c.click_time <= {ph}")
|
|
|
|
|
|
params.append(f"{end_date} 23:59:59")
|
|
|
|
|
|
|
|
|
|
|
|
where_clause = ' AND '.join(conditions) if conditions else '1=1'
|
|
|
|
|
|
|
|
|
|
|
|
allowed_sort_fields = ['click_time', 'site_id', 'device_type']
|
|
|
|
|
|
if sort_by not in allowed_sort_fields:
|
|
|
|
|
|
sort_by = 'click_time'
|
|
|
|
|
|
|
|
|
|
|
|
sort_order = 'DESC' if sort_order.upper() == 'DESC' else 'ASC'
|
|
|
|
|
|
|
|
|
|
|
|
# 查询总数
|
|
|
|
|
|
count_sql = f"SELECT COUNT(*) as total FROM ai_mip_click c WHERE {where_clause}"
|
|
|
|
|
|
cursor = self._execute_query(conn, count_sql, tuple(params) if params else None)
|
|
|
|
|
|
total = cursor.fetchone()['total']
|
|
|
|
|
|
|
|
|
|
|
|
# 查询数据
|
|
|
|
|
|
offset = (page - 1) * page_size
|
|
|
|
|
|
data_sql = f"""
|
|
|
|
|
|
SELECT c.*, s.site_name
|
|
|
|
|
|
FROM ai_mip_click c
|
|
|
|
|
|
LEFT JOIN ai_mip_site s ON c.site_id = s.id
|
|
|
|
|
|
WHERE {where_clause}
|
|
|
|
|
|
ORDER BY c.{sort_by} {sort_order}
|
|
|
|
|
|
LIMIT {ph} OFFSET {ph}
|
|
|
|
|
|
"""
|
|
|
|
|
|
params.extend([page_size, offset])
|
|
|
|
|
|
|
|
|
|
|
|
cursor = self._execute_query(conn, data_sql, tuple(params))
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows], total
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"分页查询点击记录失败: {str(e)}")
|
|
|
|
|
|
return [], 0
|
|
|
|
|
|
|
|
|
|
|
|
def export_clicks(
|
|
|
|
|
|
self,
|
|
|
|
|
|
site_id: int = None,
|
|
|
|
|
|
start_date: str = None,
|
|
|
|
|
|
end_date: str = None
|
|
|
|
|
|
) -> List[Dict]:
|
|
|
|
|
|
"""导出点击记录"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
conditions = []
|
|
|
|
|
|
params = []
|
|
|
|
|
|
|
|
|
|
|
|
if site_id:
|
|
|
|
|
|
conditions.append(f"c.site_id = {ph}")
|
|
|
|
|
|
params.append(site_id)
|
|
|
|
|
|
|
|
|
|
|
|
if start_date:
|
|
|
|
|
|
conditions.append(f"c.click_time >= {ph}")
|
|
|
|
|
|
params.append(f"{start_date} 00:00:00")
|
|
|
|
|
|
|
|
|
|
|
|
if end_date:
|
|
|
|
|
|
conditions.append(f"c.click_time <= {ph}")
|
|
|
|
|
|
params.append(f"{end_date} 23:59:59")
|
|
|
|
|
|
|
|
|
|
|
|
where_clause = ' AND '.join(conditions) if conditions else '1=1'
|
|
|
|
|
|
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
SELECT c.id, c.site_id, s.site_name, c.site_url, c.click_time,
|
|
|
|
|
|
c.user_ip, c.device_type, c.task_id
|
|
|
|
|
|
FROM ai_mip_click c
|
|
|
|
|
|
LEFT JOIN ai_mip_site s ON c.site_id = s.id
|
|
|
|
|
|
WHERE {where_clause}
|
|
|
|
|
|
ORDER BY c.click_time DESC
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
cursor = self._execute_query(conn, sql, tuple(params) if params else None)
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows]
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"导出点击记录失败: {str(e)}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EnhancedInteractionManager(InteractionManager):
|
|
|
|
|
|
"""增强的互动记录管理器"""
|
|
|
|
|
|
|
|
|
|
|
|
def get_interactions_paginated(
|
|
|
|
|
|
self,
|
|
|
|
|
|
page: int = 1,
|
|
|
|
|
|
page_size: int = 20,
|
|
|
|
|
|
site_id: int = None,
|
|
|
|
|
|
start_date: str = None,
|
|
|
|
|
|
end_date: str = None,
|
|
|
|
|
|
status: str = None,
|
|
|
|
|
|
sort_by: str = 'interaction_time',
|
|
|
|
|
|
sort_order: str = 'desc'
|
|
|
|
|
|
) -> tuple:
|
|
|
|
|
|
"""
|
|
|
|
|
|
分页获取互动记录
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
(互动记录列表, 总数)
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
conditions = []
|
|
|
|
|
|
params = []
|
|
|
|
|
|
|
|
|
|
|
|
if site_id:
|
|
|
|
|
|
conditions.append(f"i.site_id = {ph}")
|
|
|
|
|
|
params.append(site_id)
|
|
|
|
|
|
|
|
|
|
|
|
if start_date:
|
|
|
|
|
|
conditions.append(f"i.interaction_time >= {ph}")
|
|
|
|
|
|
params.append(f"{start_date} 00:00:00")
|
|
|
|
|
|
|
|
|
|
|
|
if end_date:
|
|
|
|
|
|
conditions.append(f"i.interaction_time <= {ph}")
|
|
|
|
|
|
params.append(f"{end_date} 23:59:59")
|
|
|
|
|
|
|
|
|
|
|
|
if status:
|
|
|
|
|
|
conditions.append(f"i.interaction_status = {ph}")
|
|
|
|
|
|
params.append(status)
|
|
|
|
|
|
|
|
|
|
|
|
where_clause = ' AND '.join(conditions) if conditions else '1=1'
|
|
|
|
|
|
|
|
|
|
|
|
allowed_sort_fields = ['interaction_time', 'site_id', 'interaction_status']
|
|
|
|
|
|
if sort_by not in allowed_sort_fields:
|
|
|
|
|
|
sort_by = 'interaction_time'
|
|
|
|
|
|
|
|
|
|
|
|
sort_order = 'DESC' if sort_order.upper() == 'DESC' else 'ASC'
|
|
|
|
|
|
|
|
|
|
|
|
# 查询总数
|
|
|
|
|
|
count_sql = f"SELECT COUNT(*) as total FROM ai_mip_interaction i WHERE {where_clause}"
|
|
|
|
|
|
cursor = self._execute_query(conn, count_sql, tuple(params) if params else None)
|
|
|
|
|
|
total = cursor.fetchone()['total']
|
|
|
|
|
|
|
|
|
|
|
|
# 查询数据
|
|
|
|
|
|
offset = (page - 1) * page_size
|
|
|
|
|
|
data_sql = f"""
|
|
|
|
|
|
SELECT i.*, s.site_name, s.site_url as site_url_ref
|
|
|
|
|
|
FROM ai_mip_interaction i
|
|
|
|
|
|
LEFT JOIN ai_mip_site s ON i.site_id = s.id
|
|
|
|
|
|
WHERE {where_clause}
|
|
|
|
|
|
ORDER BY i.{sort_by} {sort_order}
|
|
|
|
|
|
LIMIT {ph} OFFSET {ph}
|
|
|
|
|
|
"""
|
|
|
|
|
|
params.extend([page_size, offset])
|
|
|
|
|
|
|
|
|
|
|
|
cursor = self._execute_query(conn, data_sql, tuple(params))
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows], total
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"分页查询互动记录失败: {str(e)}")
|
|
|
|
|
|
return [], 0
|
|
|
|
|
|
|
|
|
|
|
|
def export_interactions(
|
|
|
|
|
|
self,
|
|
|
|
|
|
site_id: int = None,
|
|
|
|
|
|
start_date: str = None,
|
|
|
|
|
|
end_date: str = None
|
|
|
|
|
|
) -> List[Dict]:
|
|
|
|
|
|
"""导出互动记录"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
conditions = []
|
|
|
|
|
|
params = []
|
|
|
|
|
|
|
|
|
|
|
|
if site_id:
|
|
|
|
|
|
conditions.append(f"i.site_id = {ph}")
|
|
|
|
|
|
params.append(site_id)
|
|
|
|
|
|
|
|
|
|
|
|
if start_date:
|
|
|
|
|
|
conditions.append(f"i.interaction_time >= {ph}")
|
|
|
|
|
|
params.append(f"{start_date} 00:00:00")
|
|
|
|
|
|
|
|
|
|
|
|
if end_date:
|
|
|
|
|
|
conditions.append(f"i.interaction_time <= {ph}")
|
|
|
|
|
|
params.append(f"{end_date} 23:59:59")
|
|
|
|
|
|
|
|
|
|
|
|
where_clause = ' AND '.join(conditions) if conditions else '1=1'
|
|
|
|
|
|
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
SELECT i.id, i.site_id, s.site_name, s.site_url, i.interaction_time,
|
|
|
|
|
|
i.interaction_type, i.interaction_status, i.reply_content,
|
|
|
|
|
|
i.response_received, i.response_content, i.proxy_ip
|
|
|
|
|
|
FROM ai_mip_interaction i
|
|
|
|
|
|
LEFT JOIN ai_mip_site s ON i.site_id = s.id
|
|
|
|
|
|
WHERE {where_clause}
|
|
|
|
|
|
ORDER BY i.interaction_time DESC
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
cursor = self._execute_query(conn, sql, tuple(params) if params else None)
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows]
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"导出互动记录失败: {str(e)}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EnhancedStatisticsManager(StatisticsManager):
|
|
|
|
|
|
"""增强的统计管理器,支持图表数据"""
|
|
|
|
|
|
|
|
|
|
|
|
def get_click_trend(self, days: int = 7) -> Dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
获取点击趋势数据
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
days: 天数
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
{'dates': [...], 'clicks': [...], 'successes': [...]}
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
# 点击趋势
|
|
|
|
|
|
click_sql = f"""
|
|
|
|
|
|
SELECT DATE(click_time) as date, COUNT(*) as count
|
|
|
|
|
|
FROM ai_mip_click
|
|
|
|
|
|
WHERE click_time >= DATE_SUB(CURDATE(), INTERVAL {ph} DAY)
|
|
|
|
|
|
GROUP BY DATE(click_time)
|
|
|
|
|
|
ORDER BY date
|
|
|
|
|
|
"""
|
|
|
|
|
|
cursor = self._execute_query(conn, click_sql, (days,))
|
|
|
|
|
|
click_rows = cursor.fetchall()
|
|
|
|
|
|
|
|
|
|
|
|
# 成功次数趋势(is_successful=1)
|
|
|
|
|
|
success_sql = f"""
|
|
|
|
|
|
SELECT DATE(interaction_time) as date, COUNT(*) as count
|
|
|
|
|
|
FROM ai_mip_interaction
|
|
|
|
|
|
WHERE interaction_time >= DATE_SUB(CURDATE(), INTERVAL {ph} DAY)
|
|
|
|
|
|
AND is_successful = 1
|
|
|
|
|
|
GROUP BY DATE(interaction_time)
|
|
|
|
|
|
ORDER BY date
|
|
|
|
|
|
"""
|
|
|
|
|
|
cursor = self._execute_query(conn, success_sql, (days,))
|
|
|
|
|
|
success_rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
# 构建结果
|
|
|
|
|
|
from datetime import timedelta
|
|
|
|
|
|
|
|
|
|
|
|
dates = []
|
|
|
|
|
|
clicks = []
|
|
|
|
|
|
successes = []
|
|
|
|
|
|
|
|
|
|
|
|
click_map = {str(row['date']): row['count'] for row in click_rows}
|
|
|
|
|
|
success_map = {str(row['date']): row['count'] for row in success_rows}
|
|
|
|
|
|
|
|
|
|
|
|
for i in range(days - 1, -1, -1):
|
|
|
|
|
|
date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
|
|
|
|
|
|
dates.append(date)
|
|
|
|
|
|
clicks.append(click_map.get(date, 0))
|
|
|
|
|
|
successes.append(success_map.get(date, 0))
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
'dates': dates,
|
|
|
|
|
|
'clicks': clicks,
|
|
|
|
|
|
'successes': successes
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取点击趋势失败: {str(e)}")
|
|
|
|
|
|
return {'dates': [], 'clicks': [], 'successes': []}
|
|
|
|
|
|
|
|
|
|
|
|
def get_hourly_distribution(self) -> Dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
获取按小时分布的点击数据
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
{'hours': [0-23], 'clicks': [...]}
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
|
|
|
|
|
|
sql = """
|
|
|
|
|
|
SELECT HOUR(click_time) as hour, COUNT(*) as count
|
|
|
|
|
|
FROM ai_mip_click
|
|
|
|
|
|
WHERE click_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
|
|
|
|
|
|
GROUP BY HOUR(click_time)
|
|
|
|
|
|
ORDER BY hour
|
|
|
|
|
|
"""
|
|
|
|
|
|
cursor = self._execute_query(conn, sql)
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
hour_map = {row['hour']: row['count'] for row in rows}
|
|
|
|
|
|
|
|
|
|
|
|
hours = list(range(24))
|
|
|
|
|
|
clicks = [hour_map.get(h, 0) for h in hours]
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
'hours': hours,
|
|
|
|
|
|
'clicks': clicks
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取时段分布失败: {str(e)}")
|
|
|
|
|
|
return {'hours': list(range(24)), 'clicks': [0] * 24}
|
|
|
|
|
|
|
|
|
|
|
|
def get_top_sites(self, limit: int = 10) -> List[Dict]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
获取Top活跃站点
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
limit: 数量
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
站点列表 [{'site_name', 'click_count', 'reply_count'}, ...]
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
sql = f"""
|
|
|
|
|
|
SELECT id, site_name, site_url, click_count, reply_count
|
|
|
|
|
|
FROM ai_mip_site
|
|
|
|
|
|
WHERE status = 'active'
|
|
|
|
|
|
ORDER BY click_count DESC
|
|
|
|
|
|
LIMIT {ph}
|
|
|
|
|
|
"""
|
|
|
|
|
|
cursor = self._execute_query(conn, sql, (limit,))
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows]
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取Top站点失败: {str(e)}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def get_reply_rate_distribution(self) -> Dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
获取回复率分布数据(用于饼图)
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
{'labels': [...], 'values': [...]}
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
|
|
|
|
|
|
# 获取总点击和回复
|
|
|
|
|
|
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_click")
|
|
|
|
|
|
total_clicks = cursor.fetchone()['total']
|
|
|
|
|
|
|
|
|
|
|
|
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM ai_mip_interaction WHERE response_received = 1")
|
|
|
|
|
|
total_replies = cursor.fetchone()['total']
|
|
|
|
|
|
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
no_reply = total_clicks - total_replies if total_clicks > total_replies else 0
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
'labels': ['有回复', '无回复'],
|
|
|
|
|
|
'values': [total_replies, no_reply]
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"获取回复率分布失败: {str(e)}")
|
|
|
|
|
|
return {'labels': ['有回复', '无回复'], 'values': [0, 0]}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class QueryImportLogManager(DatabaseManager):
|
|
|
|
|
|
"""Query导入日志管理器"""
|
|
|
|
|
|
|
|
|
|
|
|
def ensure_table(self):
|
|
|
|
|
|
"""确保 query_import_log 表存在"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute("""
|
|
|
|
|
|
CREATE TABLE IF NOT EXISTS `query_import_log` (
|
|
|
|
|
|
`id` INT AUTO_INCREMENT PRIMARY KEY,
|
|
|
|
|
|
`filename` VARCHAR(255) NOT NULL COMMENT '上传的文件名',
|
|
|
|
|
|
`filepath` VARCHAR(500) NOT NULL COMMENT '文件完整路径',
|
|
|
|
|
|
`upload_time` DATETIME NOT NULL COMMENT '上传时间',
|
|
|
|
|
|
`import_time` DATETIME NULL COMMENT '实际导入时间',
|
|
|
|
|
|
`status` VARCHAR(20) DEFAULT 'pending' COMMENT '导入状态',
|
|
|
|
|
|
`total_count` INT DEFAULT 0 COMMENT '总行数',
|
|
|
|
|
|
`success_count` INT DEFAULT 0 COMMENT '成功插入数',
|
|
|
|
|
|
`skip_count` INT DEFAULT 0 COMMENT '跳过数(已存在)',
|
|
|
|
|
|
`fail_count` INT DEFAULT 0 COMMENT '失败数',
|
|
|
|
|
|
`error_message` TEXT NULL COMMENT '错误信息',
|
|
|
|
|
|
`created_at` DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
|
|
|
|
`updated_at` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
|
|
|
|
INDEX `idx_status` (`status`),
|
|
|
|
|
|
INDEX `idx_upload_time` (`upload_time`)
|
|
|
|
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='关键词导入日志表'
|
|
|
|
|
|
""")
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"创建 query_import_log 表失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def create_log(self, filename: str, filepath: str) -> Optional[int]:
|
|
|
|
|
|
"""创建导入日志记录"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.ensure_table()
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute(
|
|
|
|
|
|
f"INSERT INTO query_import_log (filename, filepath, upload_time, status) VALUES ({ph}, {ph}, NOW(), 'pending')",
|
|
|
|
|
|
(filename, filepath)
|
|
|
|
|
|
)
|
|
|
|
|
|
log_id = cursor.lastrowid
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
logger.info(f"创建导入日志: {filename} (ID: {log_id})")
|
|
|
|
|
|
return log_id
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"创建导入日志失败: {e}")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def update_status(self, log_id: int, status: str,
|
|
|
|
|
|
total_count: int = 0, success_count: int = 0,
|
|
|
|
|
|
skip_count: int = 0, fail_count: int = 0,
|
|
|
|
|
|
error_message: str = None):
|
|
|
|
|
|
"""更新导入状态和统计数据"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
|
|
|
|
|
|
import_time_sql = ", import_time = NOW()" if status in ('running', 'completed', 'failed') else ""
|
|
|
|
|
|
|
|
|
|
|
|
cursor.execute(
|
|
|
|
|
|
f"""UPDATE query_import_log
|
|
|
|
|
|
SET status = {ph}, total_count = {ph}, success_count = {ph},
|
|
|
|
|
|
skip_count = {ph}, fail_count = {ph}, error_message = {ph}
|
|
|
|
|
|
{import_time_sql}
|
|
|
|
|
|
WHERE id = {ph}""",
|
|
|
|
|
|
(status, total_count, success_count, skip_count, fail_count, error_message, log_id)
|
|
|
|
|
|
)
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"更新导入日志失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def get_pending_logs(self) -> List[Dict]:
|
|
|
|
|
|
"""获取待处理的导入日志"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.ensure_table()
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
cursor = self._execute_query(
|
|
|
|
|
|
conn, "SELECT * FROM query_import_log WHERE status = 'pending' ORDER BY created_at ASC"
|
|
|
|
|
|
)
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return [self._dict_from_row(row) for row in rows]
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"查询待处理日志失败: {e}")
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
def get_logs_paginated(self, page: int = 1, page_size: int = 20) -> Dict:
|
|
|
|
|
|
"""分页获取导入日志"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.ensure_table()
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
|
|
|
|
|
|
# 总数
|
|
|
|
|
|
cursor = self._execute_query(conn, "SELECT COUNT(*) as total FROM query_import_log")
|
|
|
|
|
|
total = cursor.fetchone()['total']
|
|
|
|
|
|
|
|
|
|
|
|
# 分页数据
|
|
|
|
|
|
offset = (page - 1) * page_size
|
|
|
|
|
|
cursor = self._execute_query(
|
|
|
|
|
|
conn,
|
|
|
|
|
|
f"SELECT * FROM query_import_log ORDER BY created_at DESC LIMIT {ph} OFFSET {ph}",
|
|
|
|
|
|
(page_size, offset)
|
|
|
|
|
|
)
|
|
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
'items': [self._dict_from_row(row) for row in rows],
|
|
|
|
|
|
'total': total,
|
|
|
|
|
|
'page': page,
|
|
|
|
|
|
'page_size': page_size
|
|
|
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"分页查询导入日志失败: {e}")
|
|
|
|
|
|
return {'items': [], 'total': 0, 'page': page, 'page_size': page_size}
|
|
|
|
|
|
|
|
|
|
|
|
def is_file_logged(self, filepath: str) -> bool:
|
|
|
|
|
|
"""检查文件是否已有导入记录"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = self._execute_query(
|
|
|
|
|
|
conn,
|
|
|
|
|
|
f"SELECT COUNT(*) as cnt FROM query_import_log WHERE filepath = {ph}",
|
|
|
|
|
|
(filepath,)
|
|
|
|
|
|
)
|
|
|
|
|
|
cnt = cursor.fetchone()['cnt']
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return cnt > 0
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"检查文件日志失败: {e}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class QueryKeywordManager(DatabaseManager):
|
|
|
|
|
|
"""Query关键词管理器 - 操作 baidu_keyword 表"""
|
|
|
|
|
|
|
|
|
|
|
|
def insert_keyword(self, keyword: str, seed_id: int = 9999, seed_name: str = '手动提交',
|
|
|
|
|
|
crawled: int = 1, department: str = '', department_id: int = 0,
|
|
|
|
|
|
author_id: int = 0, author_name: str = '') -> int:
|
|
|
|
|
|
"""
|
|
|
|
|
|
插入单条关键词到 baidu_keyword 表(INSERT IGNORE)
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
affected rows: 1=新插入, 0=已存在被跳过, -1=失败
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
ph = self._get_placeholder()
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
cursor.execute(
|
|
|
|
|
|
f"""INSERT IGNORE INTO baidu_keyword
|
|
|
|
|
|
(keyword, seed_id, seed_name, crawled, parents_id, created_at,
|
|
|
|
|
|
department, department_id, query_status, author_id, author_name)
|
|
|
|
|
|
VALUES ({ph}, {ph}, {ph}, {ph}, 0, NOW(), {ph}, {ph}, 'manual_review', {ph}, {ph})""",
|
|
|
|
|
|
(keyword, seed_id, seed_name, crawled, department, department_id, author_id, author_name)
|
|
|
|
|
|
)
|
|
|
|
|
|
affected = cursor.rowcount
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
return affected
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"插入关键词失败: {keyword} - {e}")
|
|
|
|
|
|
return -1
|
|
|
|
|
|
|
|
|
|
|
|
def batch_insert_keywords(self, keyword_list: list, seed_id: int = 9999,
|
|
|
|
|
|
seed_name: str = '手动提交', crawled: int = 1,
|
|
|
|
|
|
query_status: str = 'manual_review') -> dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
批量插入关键词到 baidu_keyword 表(INSERT IGNORE)
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
keyword_list: [{'keyword': str, 'department': str, 'seed_name': str(可选)}, ...]
|
|
|
|
|
|
query_status: 写入的query_status值,如 'draft' 或 'manual_review'
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
{'success': int, 'skip': int, 'fail': int}
|
|
|
|
|
|
"""
|
|
|
|
|
|
stats = {'success': 0, 'skip': 0, 'fail': 0}
|
|
|
|
|
|
if not keyword_list:
|
|
|
|
|
|
return stats
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
conn = self.get_connection()
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
|
|
|
|
|
|
values = []
|
|
|
|
|
|
for item in keyword_list:
|
|
|
|
|
|
values.append((
|
|
|
|
|
|
item['keyword'], seed_id, seed_name, crawled,
|
|
|
|
|
|
item.get('department', ''), query_status
|
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
|
|
cursor.executemany(
|
|
|
|
|
|
"""INSERT IGNORE INTO baidu_keyword
|
|
|
|
|
|
(keyword, seed_id, seed_name, crawled, parents_id, created_at,
|
|
|
|
|
|
department, department_id, query_status, author_id, author_name)
|
|
|
|
|
|
VALUES (%s, %s, %s, %s, 0, NOW(), %s, 0, %s, 0, '')""",
|
|
|
|
|
|
values
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# executemany 的 rowcount 返回实际插入的行数
|
|
|
|
|
|
inserted = cursor.rowcount
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
|
conn.close()
|
|
|
|
|
|
|
|
|
|
|
|
stats['success'] = inserted
|
|
|
|
|
|
stats['skip'] = len(keyword_list) - inserted
|
|
|
|
|
|
return stats
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger.error(f"批量插入关键词失败: {e}")
|
|
|
|
|
|
stats['fail'] = len(keyword_list)
|
|
|
|
|
|
return stats
|