Files
ai_mip/export_clicks_to_csv.py

349 lines
13 KiB
Python
Raw Permalink Normal View History

2026-02-24 12:46:35 +08:00
"""
导出点击记录到CSV文件
支持开发环境dev和生产环境prod
"""
import os
import sys
import csv
import argparse
from pathlib import Path
from datetime import datetime
from loguru import logger
from config import Config
from db_manager import ClickManager
# 配置日志
logger.remove()
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>",
level="INFO"
)
class ClickExporter:
"""点击记录导出器"""
def __init__(self):
"""初始化导出器"""
self.config = Config
self.click_manager = ClickManager()
logger.info("=" * 70)
logger.info(f"点击记录导出器已初始化")
logger.info(f"当前环境: {self.config.ENV}")
logger.info(f"数据库配置:")
logger.info(f" - Host: {self.config.MYSQL_HOST}:{self.config.MYSQL_PORT}")
logger.info(f" - Database: {self.config.MYSQL_DATABASE}")
logger.info(f" - User: {self.config.MYSQL_USER}")
logger.info("=" * 70)
logger.info("提示: 通过设置环境变量 ENV=production 切换到生产环境")
logger.info("=" * 70)
def get_all_clicks(self, start_date: str = None, end_date: str = None,
site_id: int = None, limit: int = None, join_mode: str = 'simple') -> list:
"""
查询点击记录
Args:
start_date: 开始日期 YYYY-MM-DD
end_date: 结束日期 YYYY-MM-DD
site_id: 站点ID筛选
limit: 限制数量
join_mode: 查询模式 simple=仅点击表, full=联合三表
Returns:
点击记录列表
"""
try:
conn = self.click_manager.get_connection()
cursor = conn.cursor()
# 根据模式构建不同的SQL查询
if join_mode == 'full':
# 联合三表查询
sql = """
SELECT
c.id as click_id,
c.site_id,
c.site_url,
c.click_time,
c.user_ip,
c.device_type,
c.task_id as click_task_id,
c.operator as click_operator,
s.site_name,
s.status as site_status,
s.site_dimension,
s.click_count as total_click_count,
s.reply_count as total_reply_count,
i.id as interaction_id,
i.interaction_type,
i.interaction_time,
i.interaction_status,
i.reply_content,
i.response_received,
i.response_content,
i.is_successful as interaction_successful,
i.proxy_ip as interaction_proxy_ip,
i.fingerprint_id,
i.error_message
FROM ai_mip_click c
LEFT JOIN ai_mip_site s ON c.site_id = s.id
LEFT JOIN ai_mip_interaction i ON c.id = i.click_id
WHERE 1=1
"""
else:
# 简单查询,仅点击表
sql = "SELECT * FROM ai_mip_click WHERE 1=1"
params = []
if start_date:
sql += " AND c.click_time >= %s" if join_mode == 'full' else " AND click_time >= %s"
params.append(f"{start_date} 00:00:00")
if end_date:
sql += " AND c.click_time <= %s" if join_mode == 'full' else " AND click_time <= %s"
params.append(f"{end_date} 23:59:59")
if site_id:
sql += " AND c.site_id = %s" if join_mode == 'full' else " AND site_id = %s"
params.append(site_id)
sql += " ORDER BY c.click_time DESC" if join_mode == 'full' else " ORDER BY click_time DESC"
if limit:
sql += f" LIMIT {limit}"
logger.info(f"查询模式: {join_mode}")
logger.info(f"执行查询: {sql[:200]}..." if len(sql) > 200 else f"执行查询: {sql}")
logger.info(f"参数: {params}")
cursor.execute(sql, params if params else None)
# 获取列名
columns = [desc[0] for desc in cursor.description]
# 获取所有记录
rows = cursor.fetchall()
# 转换为字典列表
clicks = []
for row in rows:
click_dict = {}
for idx, col in enumerate(columns):
value = row[idx]
# 处理datetime类型
if isinstance(value, datetime):
value = value.strftime('%Y-%m-%d %H:%M:%S')
click_dict[col] = value
clicks.append(click_dict)
conn.close()
logger.success(f"查询成功,获取到 {len(clicks)} 条记录")
return clicks
except Exception as e:
logger.error(f"查询点击记录失败: {str(e)}")
return []
def export_to_csv(self, clicks: list, output_file: str, encoding: str = 'utf-8-sig'):
"""
导出到CSV文件
Args:
clicks: 点击记录列表
output_file: 输出文件路径
encoding: 文件编码默认utf-8-sigExcel兼容
"""
if not clicks:
logger.warning("没有数据可导出")
return False
try:
# 确保输出目录存在
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
# 定义字段中文映射
field_mapping = {
# 点击表字段
'id': '点击ID',
'click_id': '点击ID',
'site_id': '站点ID',
'site_url': '站点URL',
'click_time': '点击时间',
'user_ip': '用户IP',
'user_agent': '浏览器标识',
'referer_url': '来源页面',
'device_type': '设备类型',
'click_count': '点击次数',
'is_valid': '是否有效',
'task_id': '任务ID',
'click_task_id': '任务ID',
'operator': '操作者',
'click_operator': '操作者',
'created_at': '创建时间',
# 站点表字段
'site_name': '站点名称',
'site_status': '站点状态',
'status': '状态',
'site_dimension': '站点维度',
'total_click_count': '总点击数',
'total_reply_count': '总回复数',
# 互动表字段
'interaction_id': '互动ID',
'interaction_type': '互动类型',
'interaction_time': '互动时间',
'interaction_status': '互动状态',
'reply_content': '回复内容',
'response_received': '是否收到回复',
'response_content': '对方回复内容',
'interaction_successful': '互动是否成功',
'is_successful': '是否成功',
'interaction_proxy_ip': '代理IP',
'proxy_ip': '代理IP',
'fingerprint_id': '浏览器指纹ID',
'error_message': '错误信息',
}
# 获取所有字段名(英文)
fieldnames = list(clicks[0].keys())
# 转换为中文字段名
chinese_fieldnames = [field_mapping.get(field, field) for field in fieldnames]
# 写入CSV
with open(output_file, 'w', newline='', encoding=encoding) as csvfile:
writer = csv.writer(csvfile)
# 写入中文表头
writer.writerow(chinese_fieldnames)
# 写入数据行
for click in clicks:
row = [click.get(field, '') for field in fieldnames]
writer.writerow(row)
logger.success(f"成功导出 {len(clicks)} 条记录到: {output_file}")
logger.info(f"文件大小: {output_path.stat().st_size / 1024:.2f} KB")
return True
except Exception as e:
logger.error(f"导出CSV失败: {str(e)}")
return False
def print_summary(self, clicks: list):
"""打印数据摘要"""
if not clicks:
return
logger.info("\n" + "=" * 70)
logger.info("数据摘要")
logger.info("=" * 70)
logger.info(f"总记录数: {len(clicks)}")
# 统计设备类型
device_stats = {}
for click in clicks:
device = click.get('device_type', 'unknown')
device_stats[device] = device_stats.get(device, 0) + 1
logger.info(f"设备类型分布:")
for device, count in device_stats.items():
logger.info(f" - {device}: {count}")
# 时间范围
if clicks:
times = [click.get('click_time') for click in clicks if click.get('click_time')]
if times:
logger.info(f"时间范围: {min(times)} ~ {max(times)}")
logger.info("=" * 70)
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description='导出点击记录到CSV文件环境通过ENV环境变量控制',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
# 导出开发环境所有点击记录(默认,仅点击表)
python export_clicks_to_csv.py -o clicks_dev.csv
# 导出联合三表的完整数据(包含站点信息和互动记录)
python export_clicks_to_csv.py -o clicks_full.csv --join-mode full
# 导出生产环境点击记录设置ENV环境变量
$env:ENV="production"; python export_clicks_to_csv.py -o clicks_prod.csv --join-mode full
# 按日期范围导出
python export_clicks_to_csv.py -o clicks.csv --start-date 2026-01-01 --end-date 2026-01-31 --join-mode full
# 导出指定站点的记录
python export_clicks_to_csv.py -o clicks.csv --site-id 123 --join-mode full
# 限制导出数量
python export_clicks_to_csv.py -o clicks.csv --limit 1000 --join-mode full
"""
)
parser.add_argument('-o', '--output', required=True,
help='输出CSV文件路径')
parser.add_argument('--start-date', help='开始日期 YYYY-MM-DD')
parser.add_argument('--end-date', help='结束日期 YYYY-MM-DD')
parser.add_argument('--site-id', type=int, help='站点ID筛选')
parser.add_argument('--limit', type=int, help='限制导出数量')
parser.add_argument('--join-mode', choices=['simple', 'full'], default='simple',
help='查询模式: simple=仅点击表, full=联合三表(点击+站点+互动)')
parser.add_argument('--encoding', default='utf-8-sig',
help='文件编码默认utf-8-sigExcel兼容')
args = parser.parse_args()
try:
# 创建导出器
exporter = ClickExporter()
# 查询点击记录
logger.info("\n开始查询点击记录...")
clicks = exporter.get_all_clicks(
start_date=args.start_date,
end_date=args.end_date,
site_id=args.site_id,
limit=args.limit,
join_mode=args.join_mode
)
if not clicks:
logger.warning("没有找到符合条件的记录")
sys.exit(1)
# 打印摘要
exporter.print_summary(clicks)
# 导出到CSV
logger.info(f"\n开始导出到: {args.output}")
success = exporter.export_to_csv(clicks, args.output, args.encoding)
if success:
logger.success("\n导出完成!")
sys.exit(0)
else:
logger.error("\n导出失败")
sys.exit(1)
except KeyboardInterrupt:
logger.warning("\n用户中断导出")
sys.exit(130)
except Exception as e:
logger.error(f"导出失败: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
main()