diff --git a/ai_mip.zip b/ai_mip.zip deleted file mode 100644 index 0564d73..0000000 Binary files a/ai_mip.zip and /dev/null differ diff --git a/import_excel_to_db.py b/import_excel_to_db.py new file mode 100644 index 0000000..1c187bc --- /dev/null +++ b/import_excel_to_db.py @@ -0,0 +1,348 @@ +""" +从Excel文件导入URL数据到数据库 +支持开发环境(dev)和生产环境(prod) +""" + +import os +import sys +import argparse +from pathlib import Path +from datetime import datetime +from loguru import logger +import pandas as pd +from config import Config +from db_manager import SiteManager + +# 配置日志 +logger.remove() +logger.add( + sys.stdout, + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {message}", + level="INFO" +) + + +class ExcelImporter: + """Excel数据导入器""" + + def __init__(self, env: str = None): + """ + 初始化导入器 + + Args: + env: 环境标识,dev或prod,默认使用当前配置 + """ + # 如果指定了环境,临时设置环境变量 + if env: + original_env = os.getenv('ENV') + if env == 'dev': + os.environ['ENV'] = 'development' + elif env == 'prod': + os.environ['ENV'] = 'production' + else: + raise ValueError(f"无效的环境标识: {env},必须是 dev 或 prod") + + # 重新加载配置 + import importlib + import config as config_module + importlib.reload(config_module) + from config import Config as ReloadedConfig + self.config = ReloadedConfig + + # 恢复原始环境变量 + if original_env: + os.environ['ENV'] = original_env + else: + os.environ.pop('ENV', None) + else: + self.config = Config + + self.site_manager = SiteManager() + + logger.info("=" * 70) + logger.info(f"Excel数据导入器已初始化") + logger.info(f"当前环境: {self.config.ENV}") + logger.info(f"数据库配置:") + logger.info(f" - Host: {self.config.MYSQL_HOST}:{self.config.MYSQL_PORT}") + logger.info(f" - Database: {self.config.MYSQL_DATABASE}") + logger.info(f" - User: {self.config.MYSQL_USER}") + logger.info("=" * 70) + + def read_excel(self, file_path: str) -> pd.DataFrame: + """ + 读取Excel文件 + + Args: + file_path: Excel文件路径 + + Returns: + DataFrame对象 + """ + try: + if not Path(file_path).exists(): + raise FileNotFoundError(f"文件不存在: {file_path}") + + logger.info(f"正在读取Excel文件: {file_path}") + df = pd.read_excel(file_path) + + logger.success(f"成功读取Excel文件,共 {len(df)} 行数据") + logger.info(f"列名: {df.columns.tolist()}") + + return df + + except Exception as e: + logger.error(f"读取Excel文件失败: {str(e)}") + raise + + def validate_dataframe(self, df: pd.DataFrame) -> bool: + """ + 验证DataFrame数据格式 + + Args: + df: DataFrame对象 + + Returns: + 是否验证通过 + """ + # 检查必要的列 + required_columns = ['链接'] + optional_columns = ['序号', '医生', '查询词', '维度'] + + for col in required_columns: + if col not in df.columns: + logger.error(f"缺少必要列: {col}") + return False + + # 检查是否有空链接 + null_count = df['链接'].isna().sum() + if null_count > 0: + logger.warning(f"发现 {null_count} 个空链接,将被跳过") + + logger.success("数据格式验证通过") + return True + + def import_data(self, df: pd.DataFrame, + query_word: str = None, + site_dimension: str = None, + frequency: int = 1, + time_start: str = '09:00:00', + time_end: str = '21:00:00', + interval_minutes: int = 30, + dry_run: bool = False) -> dict: + """ + 导入数据到数据库 + + Args: + df: DataFrame对象 + query_word: 查询词(如果Excel中没有该列,使用此默认值) + site_dimension: 站点维度(如果Excel中没有该列,使用此默认值) + frequency: 频次 + time_start: 开始时间 + time_end: 结束时间 + interval_minutes: 执行间隔(分钟) + dry_run: 是否为试运行模式(不实际插入数据) + + Returns: + 导入结果统计 + """ + logger.info("=" * 70) + logger.info("开始导入数据") + if dry_run: + logger.warning("【试运行模式】不会实际插入数据") + logger.info("=" * 70) + + stats = { + 'total': len(df), + 'success': 0, + 'failed': 0, + 'skipped': 0, + 'duplicate': 0 + } + + for idx, row in df.iterrows(): + try: + # 获取链接 + site_url = row.get('链接', None) + + # 跳过空链接 + if pd.isna(site_url) or not site_url or str(site_url).strip() == '': + logger.warning(f"[{idx + 1}/{stats['total']}] 跳过空链接") + stats['skipped'] += 1 + continue + + site_url = str(site_url).strip() + + # 获取站点名称(医生名或链接本身) + site_name = row.get('医生', site_url) + if pd.isna(site_name): + site_name = site_url + else: + site_name = str(site_name).strip() + + # 获取查询词 + row_query_word = row.get('查询词', query_word) + if pd.isna(row_query_word): + row_query_word = query_word + else: + row_query_word = str(row_query_word).strip() + + # 获取维度 + row_dimension = row.get('维度', site_dimension) + if pd.isna(row_dimension): + row_dimension = site_dimension + else: + row_dimension = str(row_dimension).strip() + + logger.info(f"[{idx + 1}/{stats['total']}] 处理: {site_url}") + logger.info(f" - 名称: {site_name}") + logger.info(f" - 查询词: {row_query_word}") + logger.info(f" - 维度: {row_dimension}") + + # 试运行模式不实际插入 + if dry_run: + logger.info(f" ✓ [试运行] 跳过插入") + stats['success'] += 1 + continue + + # 检查是否已存在 + existing = self.site_manager.get_site_by_url(site_url) + if existing: + logger.warning(f" × 链接已存在,跳过(ID: {existing['id']})") + stats['duplicate'] += 1 + continue + + # 插入数据库 + site_id = self.site_manager.add_site( + site_url=site_url, + site_name=site_name, + site_dimension=row_dimension, + query_word=row_query_word, + frequency=frequency, + time_start=time_start, + time_end=time_end, + interval_minutes=interval_minutes + ) + + if site_id: + logger.success(f" ✓ 导入成功(ID: {site_id})") + stats['success'] += 1 + else: + logger.error(f" × 导入失败") + stats['failed'] += 1 + + except Exception as e: + logger.error(f"[{idx + 1}/{stats['total']}] 处理失败: {str(e)}") + stats['failed'] += 1 + + return stats + + def print_summary(self, stats: dict): + """打印导入结果摘要""" + logger.info("\n" + "=" * 70) + logger.info("导入完成") + logger.info("=" * 70) + logger.info(f"总计: {stats['total']} 条") + logger.info(f"成功: {stats['success']} 条") + logger.info(f"失败: {stats['failed']} 条") + logger.info(f"跳过: {stats['skipped']} 条(空链接)") + logger.info(f"重复: {stats['duplicate']} 条(已存在)") + logger.info("=" * 70) + + # 计算成功率 + processed = stats['success'] + stats['failed'] + stats['duplicate'] + if processed > 0: + success_rate = (stats['success'] / processed) * 100 + logger.info(f"成功率: {success_rate:.2f}%") + + +def main(): + """主函数""" + parser = argparse.ArgumentParser( + description='从Excel文件导入URL数据到数据库', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +使用示例: + # 导入到开发环境 + python import_excel_to_db.py -f "广告链接 1.26(962条).xlsx" -e dev + + # 导入到生产环境 + python import_excel_to_db.py -f "广告链接 1.26(962条).xlsx" -e prod + + # 指定查询词和维度 + python import_excel_to_db.py -f "广告链接.xlsx" -e dev -q "关键词" -d "医疗" + + # 试运行模式(不实际插入) + python import_excel_to_db.py -f "广告链接.xlsx" -e dev --dry-run + """ + ) + + parser.add_argument('-f', '--file', required=True, help='Excel文件路径') + parser.add_argument('-e', '--env', choices=['dev', 'prod'], required=True, + help='目标环境: dev=开发环境, prod=生产环境') + parser.add_argument('-q', '--query-word', help='查询词(默认:None)') + parser.add_argument('-d', '--dimension', help='站点维度(默认:None)') + parser.add_argument('--frequency', type=int, default=1, help='频次(默认:1)') + parser.add_argument('--time-start', default='09:00:00', help='开始时间(默认:09:00:00)') + parser.add_argument('--time-end', default='21:00:00', help='结束时间(默认:21:00:00)') + parser.add_argument('--interval', type=int, default=30, help='执行间隔分钟数(默认:30)') + parser.add_argument('--dry-run', action='store_true', help='试运行模式,不实际插入数据') + + args = parser.parse_args() + + try: + # 创建导入器 + importer = ExcelImporter(env=args.env) + + # 读取Excel + df = importer.read_excel(args.file) + + # 验证数据 + if not importer.validate_dataframe(df): + logger.error("数据验证失败,终止导入") + sys.exit(1) + + # 显示前几行数据预览 + logger.info("\n数据预览(前5行):") + logger.info("\n" + df.head().to_string()) + + # 确认导入 + if not args.dry_run: + logger.warning(f"\n即将导入 {len(df)} 条数据到【{args.env.upper()}】环境") + logger.warning(f"数据库: {importer.config.MYSQL_HOST}:{importer.config.MYSQL_PORT}/{importer.config.MYSQL_DATABASE}") + + response = input("\n确认继续?[y/N]: ") + if response.lower() != 'y': + logger.info("用户取消导入") + sys.exit(0) + + # 导入数据 + stats = importer.import_data( + df=df, + query_word=args.query_word, + site_dimension=args.dimension, + frequency=args.frequency, + time_start=args.time_start, + time_end=args.time_end, + interval_minutes=args.interval, + dry_run=args.dry_run + ) + + # 打印摘要 + importer.print_summary(stats) + + # 根据结果返回退出码 + if stats['failed'] > 0: + sys.exit(1) + else: + sys.exit(0) + + except KeyboardInterrupt: + logger.warning("\n用户中断导入") + sys.exit(130) + except Exception as e: + logger.error(f"导入失败: {str(e)}") + sys.exit(1) + + +if __name__ == '__main__': + main()