#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 数据验证与短信告警集成脚本 功能: 1. 执行数据验证(JSON/CSV/数据库) 2. 如果验证失败,发送阿里云短信告警 3. 支持定时任务调度(每天9点执行) 使用方法: # 手动执行一次验证 python data_validation_with_sms.py # 指定日期验证 python data_validation_with_sms.py --date 2025-12-29 # 配置定时任务(Windows任务计划程序) python data_validation_with_sms.py --setup-schedule """ import sys import os import json from datetime import datetime, timedelta from typing import Dict, List, Optional # 添加项目根目录到路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # 导入数据验证模块 from data_validation import DataValidator # 阿里云短信SDK导入 try: from alibabacloud_dysmsapi20170525.client import Client as Dysmsapi20170525Client from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_credentials.models import Config as CredentialConfig from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dysmsapi20170525 import models as dysmsapi_20170525_models from alibabacloud_tea_util import models as util_models SMS_AVAILABLE = True except ImportError: print("[!] 阿里云短信SDK未安装,短信功能将不可用") print(" 安装命令: pip install alibabacloud_dysmsapi20170525") SMS_AVAILABLE = False class SMSAlertConfig: """短信告警配置""" def __init__(self): """从配置文件或环境变量加载配置""" # 尝试从配置文件加载 config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'sms_config.json') config_data = {} if os.path.exists(config_file): try: with open(config_file, 'r', encoding='utf-8') as f: config_data = json.load(f) except Exception as e: print(f"[!] 读取配置文件失败: {e}") # 阿里云访问凭据(优先使用环境变量) self.ACCESS_KEY_ID = os.environ.get( 'ALIBABA_CLOUD_ACCESS_KEY_ID', config_data.get('access_key_id', 'LTAI5tSMvnCJdqkZtCVWgh8R') ) self.ACCESS_KEY_SECRET = os.environ.get( 'ALIBABA_CLOUD_ACCESS_KEY_SECRET', config_data.get('access_key_secret', 'nyFzXyIi47peVLK4wR2qqbPezmU79W') ) # 短信签名和模板 self.SIGN_NAME = config_data.get('sign_name', '北京乐航时代科技') self.TEMPLATE_CODE = config_data.get('template_code', 'SMS_486210104') # 接收短信的手机号(多个号码用逗号分隔) self.PHONE_NUMBERS = config_data.get('phone_numbers', '13621242430') # 短信endpoint self.ENDPOINT = config_data.get('endpoint', 'dysmsapi.aliyuncs.com') @staticmethod def get_instance(): """获取配置实例(单例模式)""" if not hasattr(SMSAlertConfig, '_instance'): SMSAlertConfig._instance = SMSAlertConfig() return SMSAlertConfig._instance class DataValidationWithSMS: """数据验证与短信告警集成器""" def __init__(self, date_str: Optional[str] = None): """初始化 Args: date_str: 目标日期 (YYYY-MM-DD),默认为昨天 """ self.validator = DataValidator(date_str) self.sms_client = None self.sms_config = SMSAlertConfig.get_instance() if SMS_AVAILABLE: self.sms_client = self._create_sms_client() def _create_sms_client(self) -> Optional[Dysmsapi20170525Client]: """创建阿里云短信客户端 Returns: 短信客户端实例 """ try: credential_config = CredentialConfig( type='access_key', access_key_id=self.sms_config.ACCESS_KEY_ID, access_key_secret=self.sms_config.ACCESS_KEY_SECRET ) credential = CredentialClient(credential_config) config = open_api_models.Config( credential=credential, endpoint=self.sms_config.ENDPOINT ) return Dysmsapi20170525Client(config) except Exception as e: print(f"[X] 创建短信客户端失败: {e}") return None def send_sms_alert(self, error_code: str, error_details: str) -> bool: """发送短信告警 Args: error_code: 错误代码(如 "2222") error_details: 错误详情 Returns: 是否发送成功 """ if not self.sms_client: print(f"[X] 短信客户端未初始化,无法发送告警") return False try: # 构建短信请求 send_sms_request = dysmsapi_20170525_models.SendSmsRequest( phone_numbers=self.sms_config.PHONE_NUMBERS, sign_name=self.sms_config.SIGN_NAME, template_code=self.sms_config.TEMPLATE_CODE, template_param=json.dumps({"code": error_code}) ) runtime = util_models.RuntimeOptions() print(f"\n[短信] 正在发送告警短信...") print(f" 接收号码: {self.sms_config.PHONE_NUMBERS}") print(f" 错误代码: {error_code}") print(f" 错误详情: {error_details[:100]}...") # 发送短信 resp = self.sms_client.send_sms_with_options(send_sms_request, runtime) # 检查响应 result = resp.to_map() if result.get('body', {}).get('Code') == 'OK': print(f"[✓] 短信发送成功") print(f" 请求ID: {result.get('body', {}).get('RequestId')}") print(f" 消息ID: {result.get('body', {}).get('BizId')}") return True else: print(f"[X] 短信发送失败") print(f" 错误码: {result.get('body', {}).get('Code')}") print(f" 错误信息: {result.get('body', {}).get('Message')}") return False except Exception as e: print(f"[X] 发送短信异常: {e}") if hasattr(e, 'data') and e.data: print(f" 诊断地址: {e.data.get('Recommend')}") return False def run_validation(self, sources: List[str] = None, table: str = 'ai_statistics') -> bool: """执行数据验证 Args: sources: 数据源列表,默认 ['json', 'csv', 'database'] table: 要验证的表名 Returns: 验证是否通过 """ if sources is None: sources = ['json', 'csv', 'database'] print(f"\n{'='*70}") print(f"数据验证与短信告警") print(f"{'='*70}") print(f"验证日期: {self.validator.date_str}") print(f"验证表: {table}") print(f"数据源: {', '.join(sources)}") print(f"{'='*70}") try: # 执行验证 if table == 'ai_statistics': passed = self.validator.validate_ai_statistics(sources) elif table == 'ai_statistics_day': passed = self.validator.validate_ai_statistics_day(sources) elif table == 'ai_statistics_days': # TODO: 实现 ai_statistics_days 验证 print(f"[!] 表 {table} 的验证功能暂未实现") passed = False else: print(f"[X] 未知的表名: {table}") passed = False return passed except Exception as e: print(f"\n[X] 验证过程出错: {e}") import traceback traceback.print_exc() return False def generate_error_summary(self) -> str: """生成错误摘要信息 Returns: 错误摘要字符串 """ results = self.validator.validation_results summary_lines = [] summary_lines.append(f"日期: {self.validator.date_str}") # 顺序验证错误 order_errors = [r for r in results['顺序验证'] if not r['order_match']] if order_errors: summary_lines.append(f"顺序不一致: {len(order_errors)}个") # 交叉验证错误 cross_errors = [] for r in results['交叉验证']: if r['only_in_source1'] or r['only_in_source2'] or r['field_mismatches']: cross_errors.append(r) if cross_errors: summary_lines.append(f"数据不一致: {len(cross_errors)}个") # 统计详情 total_missing = sum(len(r['only_in_source1']) for r in cross_errors) total_extra = sum(len(r['only_in_source2']) for r in cross_errors) total_diff = sum(len(r['field_mismatches']) for r in cross_errors) if total_missing: summary_lines.append(f" 缺失记录: {total_missing}条") if total_extra: summary_lines.append(f" 多余记录: {total_extra}条") if total_diff: summary_lines.append(f" 字段差异: {total_diff}条") return '; '.join(summary_lines) def run_with_alert(self, sources: List[str] = None, table: str = 'ai_statistics') -> int: """执行验证并在失败时发送告警 Args: sources: 数据源列表 table: 要验证的表名 Returns: 退出码(0=成功,1=失败) """ # 执行验证 passed = self.run_validation(sources, table) # 创建验证报告目录 script_dir = os.path.dirname(os.path.abspath(__file__)) validation_reports_dir = os.path.join(script_dir, 'validation_reports') if not os.path.exists(validation_reports_dir): os.makedirs(validation_reports_dir) # 生成报告 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') report_file = os.path.join( validation_reports_dir, f'validation_report_{timestamp}.txt' ) self.validator.generate_report(report_file) # 判断是否需要发送告警 if not passed: print(f"\n{'='*70}") print(f"[!] 验证失败,准备发送短信告警") print(f"{'='*70}") # 生成错误摘要 error_summary = self.generate_error_summary() # 发送短信(错误代码固定为 "2222") sms_sent = self.send_sms_alert("2222", error_summary) if sms_sent: print(f"\n[✓] 告警短信已发送") else: print(f"\n[X] 告警短信发送失败") print(f"\n详细报告: {report_file}") return 1 else: print(f"\n{'='*70}") print(f"[✓] 验证通过,无需发送告警") print(f"{'='*70}") return 0 def setup_windows_task_scheduler(): """配置Windows任务计划程序(每天9点执行)""" print(f"\n{'='*70}") print(f"配置Windows任务计划程序") print(f"{'='*70}") script_path = os.path.abspath(__file__) python_path = sys.executable # 生成任务计划XML配置 task_name = "DataValidationWithSMS" print(f"\n请手动创建Windows任务计划,或使用以下PowerShell命令:\n") # PowerShell命令 ps_command = f""" # 创建任务计划 $action = New-ScheduledTaskAction -Execute '{python_path}' -Argument '{script_path}' $trigger = New-ScheduledTaskTrigger -Daily -At 9:00AM $settings = New-ScheduledTaskSettingsSet -AllowStartIfOnBatteries -DontStopIfGoingOnBatteries $principal = New-ScheduledTaskPrincipal -UserId "$env:USERNAME" -RunLevel Highest Register-ScheduledTask -TaskName "{task_name}" -Action $action -Trigger $trigger -Settings $settings -Principal $principal -Description "每天9点执行数据验证并发送短信告警" Write-Host "任务计划已创建: {task_name}" """ print(ps_command) print(f"\n或者手动配置:") print(f"1. 打开 '任务计划程序' (taskschd.msc)") print(f"2. 创建基本任务") print(f"3. 名称: {task_name}") print(f"4. 触发器: 每天 上午9:00") print(f"5. 操作: 启动程序") print(f"6. 程序: {python_path}") print(f"7. 参数: {script_path}") print(f"8. 完成") print(f"\n{'='*70}") def main(): """主函数""" import argparse parser = argparse.ArgumentParser( description='数据验证与短信告警集成脚本', formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( '--date', type=str, help='目标日期 (YYYY-MM-DD),默认为昨天' ) parser.add_argument( '--source', nargs='+', choices=['json', 'csv', 'database'], default=['json', 'csv', 'database'], help='数据源列表' ) parser.add_argument( '--table', type=str, choices=['ai_statistics', 'ai_statistics_day', 'ai_statistics_days'], default='ai_statistics', help='要验证的表名' ) parser.add_argument( '--setup-schedule', action='store_true', help='配置定时任务(每天9点执行)' ) parser.add_argument( '--test-sms', action='store_true', help='测试短信发送功能' ) parser.add_argument( '--no-sms', action='store_true', help='禁用短信发送(仅验证数据)' ) args = parser.parse_args() # 配置定时任务 if args.setup_schedule: setup_windows_task_scheduler() return 0 # 测试短信 if args.test_sms: print(f"\n{'='*70}") print(f"测试短信发送功能") print(f"{'='*70}") validator = DataValidationWithSMS() success = validator.send_sms_alert( "2222", "这是一条测试短信,数据验证系统运行正常" ) return 0 if success else 1 # 执行验证 try: validator = DataValidationWithSMS(date_str=args.date) return validator.run_with_alert(args.source, args.table) except Exception as e: print(f"\n[X] 程序执行失败: {e}") import traceback traceback.print_exc() return 1 if __name__ == '__main__': sys.exit(main())