Files
ai_mip/export_clicks_to_csv.py
2026-02-24 12:46:35 +08:00

349 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
导出点击记录到CSV文件
支持开发环境dev和生产环境prod
"""
import os
import sys
import csv
import argparse
from pathlib import Path
from datetime import datetime
from loguru import logger
from config import Config
from db_manager import ClickManager
# 配置日志
logger.remove()
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>",
level="INFO"
)
class ClickExporter:
"""点击记录导出器"""
def __init__(self):
"""初始化导出器"""
self.config = Config
self.click_manager = ClickManager()
logger.info("=" * 70)
logger.info(f"点击记录导出器已初始化")
logger.info(f"当前环境: {self.config.ENV}")
logger.info(f"数据库配置:")
logger.info(f" - Host: {self.config.MYSQL_HOST}:{self.config.MYSQL_PORT}")
logger.info(f" - Database: {self.config.MYSQL_DATABASE}")
logger.info(f" - User: {self.config.MYSQL_USER}")
logger.info("=" * 70)
logger.info("提示: 通过设置环境变量 ENV=production 切换到生产环境")
logger.info("=" * 70)
def get_all_clicks(self, start_date: str = None, end_date: str = None,
site_id: int = None, limit: int = None, join_mode: str = 'simple') -> list:
"""
查询点击记录
Args:
start_date: 开始日期 YYYY-MM-DD
end_date: 结束日期 YYYY-MM-DD
site_id: 站点ID筛选
limit: 限制数量
join_mode: 查询模式 simple=仅点击表, full=联合三表
Returns:
点击记录列表
"""
try:
conn = self.click_manager.get_connection()
cursor = conn.cursor()
# 根据模式构建不同的SQL查询
if join_mode == 'full':
# 联合三表查询
sql = """
SELECT
c.id as click_id,
c.site_id,
c.site_url,
c.click_time,
c.user_ip,
c.device_type,
c.task_id as click_task_id,
c.operator as click_operator,
s.site_name,
s.status as site_status,
s.site_dimension,
s.click_count as total_click_count,
s.reply_count as total_reply_count,
i.id as interaction_id,
i.interaction_type,
i.interaction_time,
i.interaction_status,
i.reply_content,
i.response_received,
i.response_content,
i.is_successful as interaction_successful,
i.proxy_ip as interaction_proxy_ip,
i.fingerprint_id,
i.error_message
FROM ai_mip_click c
LEFT JOIN ai_mip_site s ON c.site_id = s.id
LEFT JOIN ai_mip_interaction i ON c.id = i.click_id
WHERE 1=1
"""
else:
# 简单查询,仅点击表
sql = "SELECT * FROM ai_mip_click WHERE 1=1"
params = []
if start_date:
sql += " AND c.click_time >= %s" if join_mode == 'full' else " AND click_time >= %s"
params.append(f"{start_date} 00:00:00")
if end_date:
sql += " AND c.click_time <= %s" if join_mode == 'full' else " AND click_time <= %s"
params.append(f"{end_date} 23:59:59")
if site_id:
sql += " AND c.site_id = %s" if join_mode == 'full' else " AND site_id = %s"
params.append(site_id)
sql += " ORDER BY c.click_time DESC" if join_mode == 'full' else " ORDER BY click_time DESC"
if limit:
sql += f" LIMIT {limit}"
logger.info(f"查询模式: {join_mode}")
logger.info(f"执行查询: {sql[:200]}..." if len(sql) > 200 else f"执行查询: {sql}")
logger.info(f"参数: {params}")
cursor.execute(sql, params if params else None)
# 获取列名
columns = [desc[0] for desc in cursor.description]
# 获取所有记录
rows = cursor.fetchall()
# 转换为字典列表
clicks = []
for row in rows:
click_dict = {}
for idx, col in enumerate(columns):
value = row[idx]
# 处理datetime类型
if isinstance(value, datetime):
value = value.strftime('%Y-%m-%d %H:%M:%S')
click_dict[col] = value
clicks.append(click_dict)
conn.close()
logger.success(f"查询成功,获取到 {len(clicks)} 条记录")
return clicks
except Exception as e:
logger.error(f"查询点击记录失败: {str(e)}")
return []
def export_to_csv(self, clicks: list, output_file: str, encoding: str = 'utf-8-sig'):
"""
导出到CSV文件
Args:
clicks: 点击记录列表
output_file: 输出文件路径
encoding: 文件编码默认utf-8-sigExcel兼容
"""
if not clicks:
logger.warning("没有数据可导出")
return False
try:
# 确保输出目录存在
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
# 定义字段中文映射
field_mapping = {
# 点击表字段
'id': '点击ID',
'click_id': '点击ID',
'site_id': '站点ID',
'site_url': '站点URL',
'click_time': '点击时间',
'user_ip': '用户IP',
'user_agent': '浏览器标识',
'referer_url': '来源页面',
'device_type': '设备类型',
'click_count': '点击次数',
'is_valid': '是否有效',
'task_id': '任务ID',
'click_task_id': '任务ID',
'operator': '操作者',
'click_operator': '操作者',
'created_at': '创建时间',
# 站点表字段
'site_name': '站点名称',
'site_status': '站点状态',
'status': '状态',
'site_dimension': '站点维度',
'total_click_count': '总点击数',
'total_reply_count': '总回复数',
# 互动表字段
'interaction_id': '互动ID',
'interaction_type': '互动类型',
'interaction_time': '互动时间',
'interaction_status': '互动状态',
'reply_content': '回复内容',
'response_received': '是否收到回复',
'response_content': '对方回复内容',
'interaction_successful': '互动是否成功',
'is_successful': '是否成功',
'interaction_proxy_ip': '代理IP',
'proxy_ip': '代理IP',
'fingerprint_id': '浏览器指纹ID',
'error_message': '错误信息',
}
# 获取所有字段名(英文)
fieldnames = list(clicks[0].keys())
# 转换为中文字段名
chinese_fieldnames = [field_mapping.get(field, field) for field in fieldnames]
# 写入CSV
with open(output_file, 'w', newline='', encoding=encoding) as csvfile:
writer = csv.writer(csvfile)
# 写入中文表头
writer.writerow(chinese_fieldnames)
# 写入数据行
for click in clicks:
row = [click.get(field, '') for field in fieldnames]
writer.writerow(row)
logger.success(f"成功导出 {len(clicks)} 条记录到: {output_file}")
logger.info(f"文件大小: {output_path.stat().st_size / 1024:.2f} KB")
return True
except Exception as e:
logger.error(f"导出CSV失败: {str(e)}")
return False
def print_summary(self, clicks: list):
"""打印数据摘要"""
if not clicks:
return
logger.info("\n" + "=" * 70)
logger.info("数据摘要")
logger.info("=" * 70)
logger.info(f"总记录数: {len(clicks)}")
# 统计设备类型
device_stats = {}
for click in clicks:
device = click.get('device_type', 'unknown')
device_stats[device] = device_stats.get(device, 0) + 1
logger.info(f"设备类型分布:")
for device, count in device_stats.items():
logger.info(f" - {device}: {count}")
# 时间范围
if clicks:
times = [click.get('click_time') for click in clicks if click.get('click_time')]
if times:
logger.info(f"时间范围: {min(times)} ~ {max(times)}")
logger.info("=" * 70)
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='导出点击记录到CSV文件环境通过ENV环境变量控制',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
# 导出开发环境所有点击记录(默认,仅点击表)
python export_clicks_to_csv.py -o clicks_dev.csv
# 导出联合三表的完整数据(包含站点信息和互动记录)
python export_clicks_to_csv.py -o clicks_full.csv --join-mode full
# 导出生产环境点击记录设置ENV环境变量
$env:ENV="production"; python export_clicks_to_csv.py -o clicks_prod.csv --join-mode full
# 按日期范围导出
python export_clicks_to_csv.py -o clicks.csv --start-date 2026-01-01 --end-date 2026-01-31 --join-mode full
# 导出指定站点的记录
python export_clicks_to_csv.py -o clicks.csv --site-id 123 --join-mode full
# 限制导出数量
python export_clicks_to_csv.py -o clicks.csv --limit 1000 --join-mode full
"""
)
parser.add_argument('-o', '--output', required=True,
help='输出CSV文件路径')
parser.add_argument('--start-date', help='开始日期 YYYY-MM-DD')
parser.add_argument('--end-date', help='结束日期 YYYY-MM-DD')
parser.add_argument('--site-id', type=int, help='站点ID筛选')
parser.add_argument('--limit', type=int, help='限制导出数量')
parser.add_argument('--join-mode', choices=['simple', 'full'], default='simple',
help='查询模式: simple=仅点击表, full=联合三表(点击+站点+互动)')
parser.add_argument('--encoding', default='utf-8-sig',
help='文件编码默认utf-8-sigExcel兼容')
args = parser.parse_args()
try:
# 创建导出器
exporter = ClickExporter()
# 查询点击记录
logger.info("\n开始查询点击记录...")
clicks = exporter.get_all_clicks(
start_date=args.start_date,
end_date=args.end_date,
site_id=args.site_id,
limit=args.limit,
join_mode=args.join_mode
)
if not clicks:
logger.warning("没有找到符合条件的记录")
sys.exit(1)
# 打印摘要
exporter.print_summary(clicks)
# 导出到CSV
logger.info(f"\n开始导出到: {args.output}")
success = exporter.export_to_csv(clicks, args.output, args.encoding)
if success:
logger.success("\n导出完成!")
sys.exit(0)
else:
logger.error("\n导出失败")
sys.exit(1)
except KeyboardInterrupt:
logger.warning("\n用户中断导出")
sys.exit(130)
except Exception as e:
logger.error(f"导出失败: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
main()