442 lines
15 KiB
Python
442 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
数据验证与短信告警集成脚本
|
||
|
||
功能:
|
||
1. 执行数据验证(JSON/CSV/数据库)
|
||
2. 如果验证失败,发送阿里云短信告警
|
||
3. 支持定时任务调度(每天9点执行)
|
||
|
||
使用方法:
|
||
# 手动执行一次验证
|
||
python data_validation_with_sms.py
|
||
|
||
# 指定日期验证
|
||
python data_validation_with_sms.py --date 2025-12-29
|
||
|
||
# 配置定时任务(Windows任务计划程序)
|
||
python data_validation_with_sms.py --setup-schedule
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import json
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, List, Optional
|
||
|
||
# 添加项目根目录到路径
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
# 导入数据验证模块
|
||
from data_validation import DataValidator
|
||
|
||
# 阿里云短信SDK导入
|
||
try:
|
||
from alibabacloud_dysmsapi20170525.client import Client as Dysmsapi20170525Client
|
||
from alibabacloud_credentials.client import Client as CredentialClient
|
||
from alibabacloud_credentials.models import Config as CredentialConfig
|
||
from alibabacloud_tea_openapi import models as open_api_models
|
||
from alibabacloud_dysmsapi20170525 import models as dysmsapi_20170525_models
|
||
from alibabacloud_tea_util import models as util_models
|
||
SMS_AVAILABLE = True
|
||
except ImportError:
|
||
print("[!] 阿里云短信SDK未安装,短信功能将不可用")
|
||
print(" 安装命令: pip install alibabacloud_dysmsapi20170525")
|
||
SMS_AVAILABLE = False
|
||
|
||
|
||
class SMSAlertConfig:
|
||
"""短信告警配置"""
|
||
|
||
def __init__(self):
|
||
"""从配置文件或环境变量加载配置"""
|
||
# 尝试从配置文件加载
|
||
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'sms_config.json')
|
||
config_data = {}
|
||
|
||
if os.path.exists(config_file):
|
||
try:
|
||
with open(config_file, 'r', encoding='utf-8') as f:
|
||
config_data = json.load(f)
|
||
except Exception as e:
|
||
print(f"[!] 读取配置文件失败: {e}")
|
||
|
||
# 阿里云访问凭据(优先使用环境变量)
|
||
self.ACCESS_KEY_ID = os.environ.get(
|
||
'ALIBABA_CLOUD_ACCESS_KEY_ID',
|
||
config_data.get('access_key_id', 'LTAI5tSMvnCJdqkZtCVWgh8R')
|
||
)
|
||
self.ACCESS_KEY_SECRET = os.environ.get(
|
||
'ALIBABA_CLOUD_ACCESS_KEY_SECRET',
|
||
config_data.get('access_key_secret', 'nyFzXyIi47peVLK4wR2qqbPezmU79W')
|
||
)
|
||
|
||
# 短信签名和模板
|
||
self.SIGN_NAME = config_data.get('sign_name', '北京乐航时代科技')
|
||
self.TEMPLATE_CODE = config_data.get('template_code', 'SMS_486210104')
|
||
|
||
# 接收短信的手机号(多个号码用逗号分隔)
|
||
self.PHONE_NUMBERS = config_data.get('phone_numbers', '13621242430')
|
||
|
||
# 短信endpoint
|
||
self.ENDPOINT = config_data.get('endpoint', 'dysmsapi.aliyuncs.com')
|
||
|
||
@staticmethod
|
||
def get_instance():
|
||
"""获取配置实例(单例模式)"""
|
||
if not hasattr(SMSAlertConfig, '_instance'):
|
||
SMSAlertConfig._instance = SMSAlertConfig()
|
||
return SMSAlertConfig._instance
|
||
|
||
|
||
class DataValidationWithSMS:
|
||
"""数据验证与短信告警集成器"""
|
||
|
||
def __init__(self, date_str: Optional[str] = None):
|
||
"""初始化
|
||
|
||
Args:
|
||
date_str: 目标日期 (YYYY-MM-DD),默认为昨天
|
||
"""
|
||
self.validator = DataValidator(date_str)
|
||
self.sms_client = None
|
||
self.sms_config = SMSAlertConfig.get_instance()
|
||
|
||
if SMS_AVAILABLE:
|
||
self.sms_client = self._create_sms_client()
|
||
|
||
def _create_sms_client(self) -> Optional[Dysmsapi20170525Client]:
|
||
"""创建阿里云短信客户端
|
||
|
||
Returns:
|
||
短信客户端实例
|
||
"""
|
||
try:
|
||
credential_config = CredentialConfig(
|
||
type='access_key',
|
||
access_key_id=self.sms_config.ACCESS_KEY_ID,
|
||
access_key_secret=self.sms_config.ACCESS_KEY_SECRET
|
||
)
|
||
credential = CredentialClient(credential_config)
|
||
config = open_api_models.Config(
|
||
credential=credential,
|
||
endpoint=self.sms_config.ENDPOINT
|
||
)
|
||
return Dysmsapi20170525Client(config)
|
||
except Exception as e:
|
||
print(f"[X] 创建短信客户端失败: {e}")
|
||
return None
|
||
|
||
def send_sms_alert(self, error_code: str, error_details: str) -> bool:
|
||
"""发送短信告警
|
||
|
||
Args:
|
||
error_code: 错误代码(如 "2222")
|
||
error_details: 错误详情
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
if not self.sms_client:
|
||
print(f"[X] 短信客户端未初始化,无法发送告警")
|
||
return False
|
||
|
||
try:
|
||
# 构建短信请求
|
||
send_sms_request = dysmsapi_20170525_models.SendSmsRequest(
|
||
phone_numbers=self.sms_config.PHONE_NUMBERS,
|
||
sign_name=self.sms_config.SIGN_NAME,
|
||
template_code=self.sms_config.TEMPLATE_CODE,
|
||
template_param=json.dumps({"code": error_code})
|
||
)
|
||
|
||
runtime = util_models.RuntimeOptions()
|
||
|
||
print(f"\n[短信] 正在发送告警短信...")
|
||
print(f" 接收号码: {self.sms_config.PHONE_NUMBERS}")
|
||
print(f" 错误代码: {error_code}")
|
||
print(f" 错误详情: {error_details[:100]}...")
|
||
|
||
# 发送短信
|
||
resp = self.sms_client.send_sms_with_options(send_sms_request, runtime)
|
||
|
||
# 检查响应
|
||
result = resp.to_map()
|
||
if result.get('body', {}).get('Code') == 'OK':
|
||
print(f"[✓] 短信发送成功")
|
||
print(f" 请求ID: {result.get('body', {}).get('RequestId')}")
|
||
print(f" 消息ID: {result.get('body', {}).get('BizId')}")
|
||
return True
|
||
else:
|
||
print(f"[X] 短信发送失败")
|
||
print(f" 错误码: {result.get('body', {}).get('Code')}")
|
||
print(f" 错误信息: {result.get('body', {}).get('Message')}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f"[X] 发送短信异常: {e}")
|
||
if hasattr(e, 'data') and e.data:
|
||
print(f" 诊断地址: {e.data.get('Recommend')}")
|
||
return False
|
||
|
||
def run_validation(self, sources: List[str] = None, table: str = 'ai_statistics') -> bool:
|
||
"""执行数据验证
|
||
|
||
Args:
|
||
sources: 数据源列表,默认 ['json', 'csv', 'database']
|
||
table: 要验证的表名
|
||
|
||
Returns:
|
||
验证是否通过
|
||
"""
|
||
if sources is None:
|
||
sources = ['json', 'csv', 'database']
|
||
|
||
print(f"\n{'='*70}")
|
||
print(f"数据验证与短信告警")
|
||
print(f"{'='*70}")
|
||
print(f"验证日期: {self.validator.date_str}")
|
||
print(f"验证表: {table}")
|
||
print(f"数据源: {', '.join(sources)}")
|
||
print(f"{'='*70}")
|
||
|
||
try:
|
||
# 执行验证
|
||
if table == 'ai_statistics':
|
||
passed = self.validator.validate_ai_statistics(sources)
|
||
elif table == 'ai_statistics_day':
|
||
passed = self.validator.validate_ai_statistics_day(sources)
|
||
elif table == 'ai_statistics_days':
|
||
# TODO: 实现 ai_statistics_days 验证
|
||
print(f"[!] 表 {table} 的验证功能暂未实现")
|
||
passed = False
|
||
else:
|
||
print(f"[X] 未知的表名: {table}")
|
||
passed = False
|
||
|
||
return passed
|
||
|
||
except Exception as e:
|
||
print(f"\n[X] 验证过程出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def generate_error_summary(self) -> str:
|
||
"""生成错误摘要信息
|
||
|
||
Returns:
|
||
错误摘要字符串
|
||
"""
|
||
results = self.validator.validation_results
|
||
|
||
summary_lines = []
|
||
summary_lines.append(f"日期: {self.validator.date_str}")
|
||
|
||
# 顺序验证错误
|
||
order_errors = [r for r in results['顺序验证'] if not r['order_match']]
|
||
if order_errors:
|
||
summary_lines.append(f"顺序不一致: {len(order_errors)}个")
|
||
|
||
# 交叉验证错误
|
||
cross_errors = []
|
||
for r in results['交叉验证']:
|
||
if r['only_in_source1'] or r['only_in_source2'] or r['field_mismatches']:
|
||
cross_errors.append(r)
|
||
|
||
if cross_errors:
|
||
summary_lines.append(f"数据不一致: {len(cross_errors)}个")
|
||
|
||
# 统计详情
|
||
total_missing = sum(len(r['only_in_source1']) for r in cross_errors)
|
||
total_extra = sum(len(r['only_in_source2']) for r in cross_errors)
|
||
total_diff = sum(len(r['field_mismatches']) for r in cross_errors)
|
||
|
||
if total_missing:
|
||
summary_lines.append(f" 缺失记录: {total_missing}条")
|
||
if total_extra:
|
||
summary_lines.append(f" 多余记录: {total_extra}条")
|
||
if total_diff:
|
||
summary_lines.append(f" 字段差异: {total_diff}条")
|
||
|
||
return '; '.join(summary_lines)
|
||
|
||
def run_with_alert(self, sources: List[str] = None, table: str = 'ai_statistics') -> int:
|
||
"""执行验证并在失败时发送告警
|
||
|
||
Args:
|
||
sources: 数据源列表
|
||
table: 要验证的表名
|
||
|
||
Returns:
|
||
退出码(0=成功,1=失败)
|
||
"""
|
||
# 执行验证
|
||
passed = self.run_validation(sources, table)
|
||
|
||
# 创建验证报告目录
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
validation_reports_dir = os.path.join(script_dir, 'validation_reports')
|
||
if not os.path.exists(validation_reports_dir):
|
||
os.makedirs(validation_reports_dir)
|
||
|
||
# 生成报告
|
||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
report_file = os.path.join(
|
||
validation_reports_dir,
|
||
f'validation_report_{timestamp}.txt'
|
||
)
|
||
self.validator.generate_report(report_file)
|
||
|
||
# 判断是否需要发送告警
|
||
if not passed:
|
||
print(f"\n{'='*70}")
|
||
print(f"[!] 验证失败,准备发送短信告警")
|
||
print(f"{'='*70}")
|
||
|
||
# 生成错误摘要
|
||
error_summary = self.generate_error_summary()
|
||
|
||
# 发送短信(错误代码固定为 "2222")
|
||
sms_sent = self.send_sms_alert("2222", error_summary)
|
||
|
||
if sms_sent:
|
||
print(f"\n[✓] 告警短信已发送")
|
||
else:
|
||
print(f"\n[X] 告警短信发送失败")
|
||
|
||
print(f"\n详细报告: {report_file}")
|
||
return 1
|
||
else:
|
||
print(f"\n{'='*70}")
|
||
print(f"[✓] 验证通过,无需发送告警")
|
||
print(f"{'='*70}")
|
||
return 0
|
||
|
||
|
||
def setup_windows_task_scheduler():
|
||
"""配置Windows任务计划程序(每天9点执行)"""
|
||
print(f"\n{'='*70}")
|
||
print(f"配置Windows任务计划程序")
|
||
print(f"{'='*70}")
|
||
|
||
script_path = os.path.abspath(__file__)
|
||
python_path = sys.executable
|
||
|
||
# 生成任务计划XML配置
|
||
task_name = "DataValidationWithSMS"
|
||
|
||
print(f"\n请手动创建Windows任务计划,或使用以下PowerShell命令:\n")
|
||
|
||
# PowerShell命令
|
||
ps_command = f"""
|
||
# 创建任务计划
|
||
$action = New-ScheduledTaskAction -Execute '{python_path}' -Argument '{script_path}'
|
||
$trigger = New-ScheduledTaskTrigger -Daily -At 9:00AM
|
||
$settings = New-ScheduledTaskSettingsSet -AllowStartIfOnBatteries -DontStopIfGoingOnBatteries
|
||
$principal = New-ScheduledTaskPrincipal -UserId "$env:USERNAME" -RunLevel Highest
|
||
|
||
Register-ScheduledTask -TaskName "{task_name}" -Action $action -Trigger $trigger -Settings $settings -Principal $principal -Description "每天9点执行数据验证并发送短信告警"
|
||
|
||
Write-Host "任务计划已创建: {task_name}"
|
||
"""
|
||
|
||
print(ps_command)
|
||
|
||
print(f"\n或者手动配置:")
|
||
print(f"1. 打开 '任务计划程序' (taskschd.msc)")
|
||
print(f"2. 创建基本任务")
|
||
print(f"3. 名称: {task_name}")
|
||
print(f"4. 触发器: 每天 上午9:00")
|
||
print(f"5. 操作: 启动程序")
|
||
print(f"6. 程序: {python_path}")
|
||
print(f"7. 参数: {script_path}")
|
||
print(f"8. 完成")
|
||
|
||
print(f"\n{'='*70}")
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description='数据验证与短信告警集成脚本',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter
|
||
)
|
||
|
||
parser.add_argument(
|
||
'--date',
|
||
type=str,
|
||
help='目标日期 (YYYY-MM-DD),默认为昨天'
|
||
)
|
||
|
||
parser.add_argument(
|
||
'--source',
|
||
nargs='+',
|
||
choices=['json', 'csv', 'database'],
|
||
default=['json', 'csv', 'database'],
|
||
help='数据源列表'
|
||
)
|
||
|
||
parser.add_argument(
|
||
'--table',
|
||
type=str,
|
||
choices=['ai_statistics', 'ai_statistics_day', 'ai_statistics_days'],
|
||
default='ai_statistics',
|
||
help='要验证的表名'
|
||
)
|
||
|
||
parser.add_argument(
|
||
'--setup-schedule',
|
||
action='store_true',
|
||
help='配置定时任务(每天9点执行)'
|
||
)
|
||
|
||
parser.add_argument(
|
||
'--test-sms',
|
||
action='store_true',
|
||
help='测试短信发送功能'
|
||
)
|
||
|
||
parser.add_argument(
|
||
'--no-sms',
|
||
action='store_true',
|
||
help='禁用短信发送(仅验证数据)'
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 配置定时任务
|
||
if args.setup_schedule:
|
||
setup_windows_task_scheduler()
|
||
return 0
|
||
|
||
# 测试短信
|
||
if args.test_sms:
|
||
print(f"\n{'='*70}")
|
||
print(f"测试短信发送功能")
|
||
print(f"{'='*70}")
|
||
|
||
validator = DataValidationWithSMS()
|
||
success = validator.send_sms_alert(
|
||
"2222",
|
||
"这是一条测试短信,数据验证系统运行正常"
|
||
)
|
||
return 0 if success else 1
|
||
|
||
# 执行验证
|
||
try:
|
||
validator = DataValidationWithSMS(date_str=args.date)
|
||
return validator.run_with_alert(args.source, args.table)
|
||
except Exception as e:
|
||
print(f"\n[X] 程序执行失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 1
|
||
|
||
|
||
if __name__ == '__main__':
|
||
sys.exit(main())
|