""" 导出点击记录到CSV文件 支持开发环境(dev)和生产环境(prod) """ import os import sys import csv import argparse from pathlib import Path from datetime import datetime from loguru import logger from config import Config from db_manager import ClickManager # 配置日志 logger.remove() logger.add( sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}", level="INFO" ) class ClickExporter: """点击记录导出器""" def __init__(self): """初始化导出器""" self.config = Config self.click_manager = ClickManager() logger.info("=" * 70) logger.info(f"点击记录导出器已初始化") logger.info(f"当前环境: {self.config.ENV}") logger.info(f"数据库配置:") logger.info(f" - Host: {self.config.MYSQL_HOST}:{self.config.MYSQL_PORT}") logger.info(f" - Database: {self.config.MYSQL_DATABASE}") logger.info(f" - User: {self.config.MYSQL_USER}") logger.info("=" * 70) logger.info("提示: 通过设置环境变量 ENV=production 切换到生产环境") logger.info("=" * 70) def get_all_clicks(self, start_date: str = None, end_date: str = None, site_id: int = None, limit: int = None, join_mode: str = 'simple') -> list: """ 查询点击记录 Args: start_date: 开始日期 YYYY-MM-DD end_date: 结束日期 YYYY-MM-DD site_id: 站点ID筛选 limit: 限制数量 join_mode: 查询模式 simple=仅点击表, full=联合三表 Returns: 点击记录列表 """ try: conn = self.click_manager.get_connection() cursor = conn.cursor() # 根据模式构建不同的SQL查询 if join_mode == 'full': # 联合三表查询 sql = """ SELECT c.id as click_id, c.site_id, c.site_url, c.click_time, c.user_ip, c.device_type, c.task_id as click_task_id, c.operator as click_operator, s.site_name, s.status as site_status, s.site_dimension, s.click_count as total_click_count, s.reply_count as total_reply_count, i.id as interaction_id, i.interaction_type, i.interaction_time, i.interaction_status, i.reply_content, i.response_received, i.response_content, i.is_successful as interaction_successful, i.proxy_ip as interaction_proxy_ip, i.fingerprint_id, i.error_message FROM ai_mip_click c LEFT JOIN ai_mip_site s ON c.site_id = s.id LEFT JOIN ai_mip_interaction i ON c.id = i.click_id WHERE 1=1 """ else: # 简单查询,仅点击表 sql = "SELECT * FROM ai_mip_click WHERE 1=1" params = [] if start_date: sql += " AND c.click_time >= %s" if join_mode == 'full' else " AND click_time >= %s" params.append(f"{start_date} 00:00:00") if end_date: sql += " AND c.click_time <= %s" if join_mode == 'full' else " AND click_time <= %s" params.append(f"{end_date} 23:59:59") if site_id: sql += " AND c.site_id = %s" if join_mode == 'full' else " AND site_id = %s" params.append(site_id) sql += " ORDER BY c.click_time DESC" if join_mode == 'full' else " ORDER BY click_time DESC" if limit: sql += f" LIMIT {limit}" logger.info(f"查询模式: {join_mode}") logger.info(f"执行查询: {sql[:200]}..." if len(sql) > 200 else f"执行查询: {sql}") logger.info(f"参数: {params}") cursor.execute(sql, params if params else None) # 获取列名 columns = [desc[0] for desc in cursor.description] # 获取所有记录 rows = cursor.fetchall() # 转换为字典列表 clicks = [] for row in rows: click_dict = {} for idx, col in enumerate(columns): value = row[idx] # 处理datetime类型 if isinstance(value, datetime): value = value.strftime('%Y-%m-%d %H:%M:%S') click_dict[col] = value clicks.append(click_dict) conn.close() logger.success(f"查询成功,获取到 {len(clicks)} 条记录") return clicks except Exception as e: logger.error(f"查询点击记录失败: {str(e)}") return [] def export_to_csv(self, clicks: list, output_file: str, encoding: str = 'utf-8-sig'): """ 导出到CSV文件 Args: clicks: 点击记录列表 output_file: 输出文件路径 encoding: 文件编码,默认utf-8-sig(Excel兼容) """ if not clicks: logger.warning("没有数据可导出") return False try: # 确保输出目录存在 output_path = Path(output_file) output_path.parent.mkdir(parents=True, exist_ok=True) # 定义字段中文映射 field_mapping = { # 点击表字段 'id': '点击ID', 'click_id': '点击ID', 'site_id': '站点ID', 'site_url': '站点URL', 'click_time': '点击时间', 'user_ip': '用户IP', 'user_agent': '浏览器标识', 'referer_url': '来源页面', 'device_type': '设备类型', 'click_count': '点击次数', 'is_valid': '是否有效', 'task_id': '任务ID', 'click_task_id': '任务ID', 'operator': '操作者', 'click_operator': '操作者', 'created_at': '创建时间', # 站点表字段 'site_name': '站点名称', 'site_status': '站点状态', 'status': '状态', 'site_dimension': '站点维度', 'total_click_count': '总点击数', 'total_reply_count': '总回复数', # 互动表字段 'interaction_id': '互动ID', 'interaction_type': '互动类型', 'interaction_time': '互动时间', 'interaction_status': '互动状态', 'reply_content': '回复内容', 'response_received': '是否收到回复', 'response_content': '对方回复内容', 'interaction_successful': '互动是否成功', 'is_successful': '是否成功', 'interaction_proxy_ip': '代理IP', 'proxy_ip': '代理IP', 'fingerprint_id': '浏览器指纹ID', 'error_message': '错误信息', } # 获取所有字段名(英文) fieldnames = list(clicks[0].keys()) # 转换为中文字段名 chinese_fieldnames = [field_mapping.get(field, field) for field in fieldnames] # 写入CSV with open(output_file, 'w', newline='', encoding=encoding) as csvfile: writer = csv.writer(csvfile) # 写入中文表头 writer.writerow(chinese_fieldnames) # 写入数据行 for click in clicks: row = [click.get(field, '') for field in fieldnames] writer.writerow(row) logger.success(f"成功导出 {len(clicks)} 条记录到: {output_file}") logger.info(f"文件大小: {output_path.stat().st_size / 1024:.2f} KB") return True except Exception as e: logger.error(f"导出CSV失败: {str(e)}") return False def print_summary(self, clicks: list): """打印数据摘要""" if not clicks: return logger.info("\n" + "=" * 70) logger.info("数据摘要") logger.info("=" * 70) logger.info(f"总记录数: {len(clicks)}") # 统计设备类型 device_stats = {} for click in clicks: device = click.get('device_type', 'unknown') device_stats[device] = device_stats.get(device, 0) + 1 logger.info(f"设备类型分布:") for device, count in device_stats.items(): logger.info(f" - {device}: {count} 条") # 时间范围 if clicks: times = [click.get('click_time') for click in clicks if click.get('click_time')] if times: logger.info(f"时间范围: {min(times)} ~ {max(times)}") logger.info("=" * 70) def main(): """主函数""" parser = argparse.ArgumentParser( description='导出点击记录到CSV文件(环境通过ENV环境变量控制)', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: # 导出开发环境所有点击记录(默认,仅点击表) python export_clicks_to_csv.py -o clicks_dev.csv # 导出联合三表的完整数据(包含站点信息和互动记录) python export_clicks_to_csv.py -o clicks_full.csv --join-mode full # 导出生产环境点击记录(设置ENV环境变量) $env:ENV="production"; python export_clicks_to_csv.py -o clicks_prod.csv --join-mode full # 按日期范围导出 python export_clicks_to_csv.py -o clicks.csv --start-date 2026-01-01 --end-date 2026-01-31 --join-mode full # 导出指定站点的记录 python export_clicks_to_csv.py -o clicks.csv --site-id 123 --join-mode full # 限制导出数量 python export_clicks_to_csv.py -o clicks.csv --limit 1000 --join-mode full """ ) parser.add_argument('-o', '--output', required=True, help='输出CSV文件路径') parser.add_argument('--start-date', help='开始日期 YYYY-MM-DD') parser.add_argument('--end-date', help='结束日期 YYYY-MM-DD') parser.add_argument('--site-id', type=int, help='站点ID筛选') parser.add_argument('--limit', type=int, help='限制导出数量') parser.add_argument('--join-mode', choices=['simple', 'full'], default='simple', help='查询模式: simple=仅点击表, full=联合三表(点击+站点+互动)') parser.add_argument('--encoding', default='utf-8-sig', help='文件编码,默认utf-8-sig(Excel兼容)') args = parser.parse_args() try: # 创建导出器 exporter = ClickExporter() # 查询点击记录 logger.info("\n开始查询点击记录...") clicks = exporter.get_all_clicks( start_date=args.start_date, end_date=args.end_date, site_id=args.site_id, limit=args.limit, join_mode=args.join_mode ) if not clicks: logger.warning("没有找到符合条件的记录") sys.exit(1) # 打印摘要 exporter.print_summary(clicks) # 导出到CSV logger.info(f"\n开始导出到: {args.output}") success = exporter.export_to_csv(clicks, args.output, args.encoding) if success: logger.success("\n导出完成!") sys.exit(0) else: logger.error("\n导出失败") sys.exit(1) except KeyboardInterrupt: logger.warning("\n用户中断导出") sys.exit(130) except Exception as e: logger.error(f"导出失败: {str(e)}") sys.exit(1) if __name__ == '__main__': main()