Files
baijiahao_data_crawl/data_validation_with_sms.py

442 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据验证与短信告警集成脚本
功能:
1. 执行数据验证JSON/CSV/数据库)
2. 如果验证失败,发送阿里云短信告警
3. 支持定时任务调度每天9点执行
使用方法:
# 手动执行一次验证
python data_validation_with_sms.py
# 指定日期验证
python data_validation_with_sms.py --date 2025-12-29
# 配置定时任务Windows任务计划程序
python data_validation_with_sms.py --setup-schedule
"""
import sys
import os
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
# 添加项目根目录到路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 导入数据验证模块
from data_validation import DataValidator
# 阿里云短信SDK导入
try:
from alibabacloud_dysmsapi20170525.client import Client as Dysmsapi20170525Client
from alibabacloud_credentials.client import Client as CredentialClient
from alibabacloud_credentials.models import Config as CredentialConfig
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dysmsapi20170525 import models as dysmsapi_20170525_models
from alibabacloud_tea_util import models as util_models
SMS_AVAILABLE = True
except ImportError:
print("[!] 阿里云短信SDK未安装短信功能将不可用")
print(" 安装命令: pip install alibabacloud_dysmsapi20170525")
SMS_AVAILABLE = False
class SMSAlertConfig:
"""短信告警配置"""
def __init__(self):
"""从配置文件或环境变量加载配置"""
# 尝试从配置文件加载
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'sms_config.json')
config_data = {}
if os.path.exists(config_file):
try:
with open(config_file, 'r', encoding='utf-8') as f:
config_data = json.load(f)
except Exception as e:
print(f"[!] 读取配置文件失败: {e}")
# 阿里云访问凭据(优先使用环境变量)
self.ACCESS_KEY_ID = os.environ.get(
'ALIBABA_CLOUD_ACCESS_KEY_ID',
config_data.get('access_key_id', 'LTAI5tSMvnCJdqkZtCVWgh8R')
)
self.ACCESS_KEY_SECRET = os.environ.get(
'ALIBABA_CLOUD_ACCESS_KEY_SECRET',
config_data.get('access_key_secret', 'nyFzXyIi47peVLK4wR2qqbPezmU79W')
)
# 短信签名和模板
self.SIGN_NAME = config_data.get('sign_name', '北京乐航时代科技')
self.TEMPLATE_CODE = config_data.get('template_code', 'SMS_486210104')
# 接收短信的手机号(多个号码用逗号分隔)
self.PHONE_NUMBERS = config_data.get('phone_numbers', '13621242430')
# 短信endpoint
self.ENDPOINT = config_data.get('endpoint', 'dysmsapi.aliyuncs.com')
@staticmethod
def get_instance():
"""获取配置实例(单例模式)"""
if not hasattr(SMSAlertConfig, '_instance'):
SMSAlertConfig._instance = SMSAlertConfig()
return SMSAlertConfig._instance
class DataValidationWithSMS:
"""数据验证与短信告警集成器"""
def __init__(self, date_str: Optional[str] = None):
"""初始化
Args:
date_str: 目标日期 (YYYY-MM-DD),默认为昨天
"""
self.validator = DataValidator(date_str)
self.sms_client = None
self.sms_config = SMSAlertConfig.get_instance()
if SMS_AVAILABLE:
self.sms_client = self._create_sms_client()
def _create_sms_client(self) -> Optional[Dysmsapi20170525Client]:
"""创建阿里云短信客户端
Returns:
短信客户端实例
"""
try:
credential_config = CredentialConfig(
type='access_key',
access_key_id=self.sms_config.ACCESS_KEY_ID,
access_key_secret=self.sms_config.ACCESS_KEY_SECRET
)
credential = CredentialClient(credential_config)
config = open_api_models.Config(
credential=credential,
endpoint=self.sms_config.ENDPOINT
)
return Dysmsapi20170525Client(config)
except Exception as e:
print(f"[X] 创建短信客户端失败: {e}")
return None
def send_sms_alert(self, error_code: str, error_details: str) -> bool:
"""发送短信告警
Args:
error_code: 错误代码(如 "2222"
error_details: 错误详情
Returns:
是否发送成功
"""
if not self.sms_client:
print(f"[X] 短信客户端未初始化,无法发送告警")
return False
try:
# 构建短信请求
send_sms_request = dysmsapi_20170525_models.SendSmsRequest(
phone_numbers=self.sms_config.PHONE_NUMBERS,
sign_name=self.sms_config.SIGN_NAME,
template_code=self.sms_config.TEMPLATE_CODE,
template_param=json.dumps({"code": error_code})
)
runtime = util_models.RuntimeOptions()
print(f"\n[短信] 正在发送告警短信...")
print(f" 接收号码: {self.sms_config.PHONE_NUMBERS}")
print(f" 错误代码: {error_code}")
print(f" 错误详情: {error_details[:100]}...")
# 发送短信
resp = self.sms_client.send_sms_with_options(send_sms_request, runtime)
# 检查响应
result = resp.to_map()
if result.get('body', {}).get('Code') == 'OK':
print(f"[✓] 短信发送成功")
print(f" 请求ID: {result.get('body', {}).get('RequestId')}")
print(f" 消息ID: {result.get('body', {}).get('BizId')}")
return True
else:
print(f"[X] 短信发送失败")
print(f" 错误码: {result.get('body', {}).get('Code')}")
print(f" 错误信息: {result.get('body', {}).get('Message')}")
return False
except Exception as e:
print(f"[X] 发送短信异常: {e}")
if hasattr(e, 'data') and e.data:
print(f" 诊断地址: {e.data.get('Recommend')}")
return False
def run_validation(self, sources: List[str] = None, table: str = 'ai_statistics') -> bool:
"""执行数据验证
Args:
sources: 数据源列表,默认 ['json', 'csv', 'database']
table: 要验证的表名
Returns:
验证是否通过
"""
if sources is None:
sources = ['json', 'csv', 'database']
print(f"\n{'='*70}")
print(f"数据验证与短信告警")
print(f"{'='*70}")
print(f"验证日期: {self.validator.date_str}")
print(f"验证表: {table}")
print(f"数据源: {', '.join(sources)}")
print(f"{'='*70}")
try:
# 执行验证
if table == 'ai_statistics':
passed = self.validator.validate_ai_statistics(sources)
elif table == 'ai_statistics_day':
passed = self.validator.validate_ai_statistics_day(sources)
elif table == 'ai_statistics_days':
# TODO: 实现 ai_statistics_days 验证
print(f"[!] 表 {table} 的验证功能暂未实现")
passed = False
else:
print(f"[X] 未知的表名: {table}")
passed = False
return passed
except Exception as e:
print(f"\n[X] 验证过程出错: {e}")
import traceback
traceback.print_exc()
return False
def generate_error_summary(self) -> str:
"""生成错误摘要信息
Returns:
错误摘要字符串
"""
results = self.validator.validation_results
summary_lines = []
summary_lines.append(f"日期: {self.validator.date_str}")
# 顺序验证错误
order_errors = [r for r in results['顺序验证'] if not r['order_match']]
if order_errors:
summary_lines.append(f"顺序不一致: {len(order_errors)}")
# 交叉验证错误
cross_errors = []
for r in results['交叉验证']:
if r['only_in_source1'] or r['only_in_source2'] or r['field_mismatches']:
cross_errors.append(r)
if cross_errors:
summary_lines.append(f"数据不一致: {len(cross_errors)}")
# 统计详情
total_missing = sum(len(r['only_in_source1']) for r in cross_errors)
total_extra = sum(len(r['only_in_source2']) for r in cross_errors)
total_diff = sum(len(r['field_mismatches']) for r in cross_errors)
if total_missing:
summary_lines.append(f" 缺失记录: {total_missing}")
if total_extra:
summary_lines.append(f" 多余记录: {total_extra}")
if total_diff:
summary_lines.append(f" 字段差异: {total_diff}")
return '; '.join(summary_lines)
def run_with_alert(self, sources: List[str] = None, table: str = 'ai_statistics') -> int:
"""执行验证并在失败时发送告警
Args:
sources: 数据源列表
table: 要验证的表名
Returns:
退出码0=成功1=失败)
"""
# 执行验证
passed = self.run_validation(sources, table)
# 创建验证报告目录
script_dir = os.path.dirname(os.path.abspath(__file__))
validation_reports_dir = os.path.join(script_dir, 'validation_reports')
if not os.path.exists(validation_reports_dir):
os.makedirs(validation_reports_dir)
# 生成报告
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
report_file = os.path.join(
validation_reports_dir,
f'validation_report_{timestamp}.txt'
)
self.validator.generate_report(report_file)
# 判断是否需要发送告警
if not passed:
print(f"\n{'='*70}")
print(f"[!] 验证失败,准备发送短信告警")
print(f"{'='*70}")
# 生成错误摘要
error_summary = self.generate_error_summary()
# 发送短信(错误代码固定为 "2222"
sms_sent = self.send_sms_alert("2222", error_summary)
if sms_sent:
print(f"\n[✓] 告警短信已发送")
else:
print(f"\n[X] 告警短信发送失败")
print(f"\n详细报告: {report_file}")
return 1
else:
print(f"\n{'='*70}")
print(f"[✓] 验证通过,无需发送告警")
print(f"{'='*70}")
return 0
def setup_windows_task_scheduler():
"""配置Windows任务计划程序每天9点执行"""
print(f"\n{'='*70}")
print(f"配置Windows任务计划程序")
print(f"{'='*70}")
script_path = os.path.abspath(__file__)
python_path = sys.executable
# 生成任务计划XML配置
task_name = "DataValidationWithSMS"
print(f"\n请手动创建Windows任务计划或使用以下PowerShell命令\n")
# PowerShell命令
ps_command = f"""
# 创建任务计划
$action = New-ScheduledTaskAction -Execute '{python_path}' -Argument '{script_path}'
$trigger = New-ScheduledTaskTrigger -Daily -At 9:00AM
$settings = New-ScheduledTaskSettingsSet -AllowStartIfOnBatteries -DontStopIfGoingOnBatteries
$principal = New-ScheduledTaskPrincipal -UserId "$env:USERNAME" -RunLevel Highest
Register-ScheduledTask -TaskName "{task_name}" -Action $action -Trigger $trigger -Settings $settings -Principal $principal -Description "每天9点执行数据验证并发送短信告警"
Write-Host "任务计划已创建: {task_name}"
"""
print(ps_command)
print(f"\n或者手动配置:")
print(f"1. 打开 '任务计划程序' (taskschd.msc)")
print(f"2. 创建基本任务")
print(f"3. 名称: {task_name}")
print(f"4. 触发器: 每天 上午9:00")
print(f"5. 操作: 启动程序")
print(f"6. 程序: {python_path}")
print(f"7. 参数: {script_path}")
print(f"8. 完成")
print(f"\n{'='*70}")
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(
description='数据验证与短信告警集成脚本',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--date',
type=str,
help='目标日期 (YYYY-MM-DD),默认为昨天'
)
parser.add_argument(
'--source',
nargs='+',
choices=['json', 'csv', 'database'],
default=['json', 'csv', 'database'],
help='数据源列表'
)
parser.add_argument(
'--table',
type=str,
choices=['ai_statistics', 'ai_statistics_day', 'ai_statistics_days'],
default='ai_statistics',
help='要验证的表名'
)
parser.add_argument(
'--setup-schedule',
action='store_true',
help='配置定时任务每天9点执行'
)
parser.add_argument(
'--test-sms',
action='store_true',
help='测试短信发送功能'
)
parser.add_argument(
'--no-sms',
action='store_true',
help='禁用短信发送(仅验证数据)'
)
args = parser.parse_args()
# 配置定时任务
if args.setup_schedule:
setup_windows_task_scheduler()
return 0
# 测试短信
if args.test_sms:
print(f"\n{'='*70}")
print(f"测试短信发送功能")
print(f"{'='*70}")
validator = DataValidationWithSMS()
success = validator.send_sms_alert(
"2222",
"这是一条测试短信,数据验证系统运行正常"
)
return 0 if success else 1
# 执行验证
try:
validator = DataValidationWithSMS(date_str=args.date)
return validator.run_with_alert(args.source, args.table)
except Exception as e:
print(f"\n[X] 程序执行失败: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == '__main__':
sys.exit(main())