1746 lines
79 KiB
Python
1746 lines
79 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
数据整合导出工具
|
|||
|
|
从 bjh_integrated_data.json 中读取数据并导出为三个数据库表对应的CSV文件:
|
|||
|
|
1. ai_statistics.csv - 账号汇总统计表
|
|||
|
|
2. ai_statistics_day.csv - 每日明细统计表
|
|||
|
|
3. ai_statistics_days.csv - 核心指标统计表(含发文量、收益、环比)
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import sys
|
|||
|
|
import os
|
|||
|
|
import json
|
|||
|
|
import csv
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import Dict, List, Optional
|
|||
|
|
from decimal import Decimal
|
|||
|
|
|
|||
|
|
# 导入数据库配置
|
|||
|
|
try:
|
|||
|
|
from database_config import DatabaseManager, DB_CONFIG
|
|||
|
|
except ImportError:
|
|||
|
|
DatabaseManager = None
|
|||
|
|
DB_CONFIG = None
|
|||
|
|
|
|||
|
|
# 设置UTF-8编码
|
|||
|
|
if sys.platform == 'win32':
|
|||
|
|
import io
|
|||
|
|
if not isinstance(sys.stdout, io.TextIOWrapper) or sys.stdout.encoding != 'utf-8':
|
|||
|
|
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
|||
|
|
if not isinstance(sys.stderr, io.TextIOWrapper) or sys.stderr.encoding != 'utf-8':
|
|||
|
|
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
|
|||
|
|
|
|||
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
from log_config import setup_export_csv_logger
|
|||
|
|
except ImportError:
|
|||
|
|
# 如果没有日志模块,使用print替代
|
|||
|
|
def setup_export_csv_logger():
|
|||
|
|
class DummyLogger:
|
|||
|
|
def info(self, msg): print(msg)
|
|||
|
|
def error(self, msg): print(msg)
|
|||
|
|
def debug(self, msg): pass
|
|||
|
|
return DummyLogger()
|
|||
|
|
|
|||
|
|
|
|||
|
|
class DataExporter:
|
|||
|
|
"""数据导出器"""
|
|||
|
|
|
|||
|
|
def __init__(self, use_database: bool = False, db_config: Optional[Dict] = None):
|
|||
|
|
"""初始化导出器
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
use_database: 是否使用数据库模式(True=直接插入数据库,False=导出CSV)
|
|||
|
|
db_config: 数据库配置,默认使用database_config.DB_CONFIG
|
|||
|
|
"""
|
|||
|
|
# 初始化日志
|
|||
|
|
self.logger = setup_export_csv_logger()
|
|||
|
|
|
|||
|
|
self.script_dir = os.path.dirname(os.path.abspath(__file__))
|
|||
|
|
# 使用新的整合数据文件
|
|||
|
|
self.integrated_file = os.path.join(self.script_dir, "bjh_integrated_data.json")
|
|||
|
|
|
|||
|
|
# 输出文件路径(对应三个数据库表)
|
|||
|
|
self.output_ai_statistics = os.path.join(self.script_dir, "ai_statistics.csv")
|
|||
|
|
self.output_ai_statistics_day = os.path.join(self.script_dir, "ai_statistics_day.csv")
|
|||
|
|
self.output_ai_statistics_days = os.path.join(self.script_dir, "ai_statistics_days.csv")
|
|||
|
|
|
|||
|
|
# 数据库模式
|
|||
|
|
self.use_database = use_database
|
|||
|
|
self.db_manager = None
|
|||
|
|
if use_database:
|
|||
|
|
if DatabaseManager is None:
|
|||
|
|
print("[X] 数据库模块未安装,请检查 database_config.py")
|
|||
|
|
self.use_database = False
|
|||
|
|
else:
|
|||
|
|
self.db_manager = DatabaseManager(db_config)
|
|||
|
|
print("[配置] 已启用数据库模式")
|
|||
|
|
else:
|
|||
|
|
# CSV模式下也需要连接数据库以查询author_id
|
|||
|
|
if DatabaseManager is not None:
|
|||
|
|
try:
|
|||
|
|
self.db_manager = DatabaseManager(db_config)
|
|||
|
|
print("[配置] CSV模式 - 已连接数据库以查询author_id")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"[!] 数据库连接失败,author_id将设为0: {e}")
|
|||
|
|
self.db_manager = None
|
|||
|
|
|
|||
|
|
# 缓存author_id映射(author_name -> author_id)
|
|||
|
|
self.author_id_cache = {}
|
|||
|
|
|
|||
|
|
def get_author_id(self, author_name: str) -> int:
|
|||
|
|
"""获取作者ID
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
author_name: 作者名称
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
作者ID,未找到返回0
|
|||
|
|
"""
|
|||
|
|
# 先从缓存中查找
|
|||
|
|
if author_name in self.author_id_cache:
|
|||
|
|
return self.author_id_cache[author_name]
|
|||
|
|
|
|||
|
|
# 如果没有数据库连接,返回0
|
|||
|
|
if not self.db_manager:
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
# 从数据库查询
|
|||
|
|
try:
|
|||
|
|
sql = "SELECT id FROM ai_authors WHERE author_name = %s AND channel = 1 LIMIT 1"
|
|||
|
|
result = self.db_manager.execute_query(sql, (author_name,), fetch_one=True)
|
|||
|
|
|
|||
|
|
if result:
|
|||
|
|
author_id = result.get('id', 0)
|
|||
|
|
# 缓存结果
|
|||
|
|
self.author_id_cache[author_name] = author_id
|
|||
|
|
return author_id
|
|||
|
|
else:
|
|||
|
|
# 未找到,缓存为0
|
|||
|
|
self.author_id_cache[author_name] = 0
|
|||
|
|
return 0
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" [!] 查询author_id失败: {e}")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
def query_article_count_from_db(self, author_id: int, stat_date: str) -> Dict:
|
|||
|
|
"""从ai_articles表查询文章相关数据
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
author_id: 作者ID
|
|||
|
|
stat_date: 统计日期 (YYYY-MM-DD)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
包含单日发文量、累计发文量(当月)和投稿量的字典
|
|||
|
|
"""
|
|||
|
|
result = {
|
|||
|
|
'daily_published_count': 0,
|
|||
|
|
'cumulative_published_count': 0,
|
|||
|
|
'total_submission_count': 0
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if not self.db_manager or author_id == 0:
|
|||
|
|
print(f" [数据库] 未连接或author_id无效,无法查询文章数据")
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
from datetime import datetime
|
|||
|
|
|
|||
|
|
# 解析统计日期
|
|||
|
|
target_date = datetime.strptime(stat_date, '%Y-%m-%d')
|
|||
|
|
stat_date_str = target_date.strftime('%Y-%m-%d')
|
|||
|
|
|
|||
|
|
# 计算当月第一天
|
|||
|
|
month_first_day = target_date.replace(day=1).strftime('%Y-%m-%d')
|
|||
|
|
|
|||
|
|
# 查询单日发文量(当天状态为published的文章数)
|
|||
|
|
daily_sql = """
|
|||
|
|
SELECT COUNT(*) as count
|
|||
|
|
FROM ai_articles
|
|||
|
|
WHERE author_id = %s
|
|||
|
|
AND DATE(publish_time) = %s
|
|||
|
|
AND status = 'published'
|
|||
|
|
AND channel = 1
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
daily_result = self.db_manager.execute_query(
|
|||
|
|
daily_sql,
|
|||
|
|
(author_id, stat_date_str),
|
|||
|
|
fetch_one=True,
|
|||
|
|
dict_cursor=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if daily_result:
|
|||
|
|
result['daily_published_count'] = int(daily_result.get('count', 0) or 0)
|
|||
|
|
print(f" [数据库] 单日发文量 ({stat_date_str}): {result['daily_published_count']}")
|
|||
|
|
|
|||
|
|
# 查询累计发文量(当月第一天到stat_date的发文量)
|
|||
|
|
cumulative_sql = """
|
|||
|
|
SELECT COUNT(*) as count
|
|||
|
|
FROM ai_articles
|
|||
|
|
WHERE author_id = %s
|
|||
|
|
AND DATE(publish_time) >= %s
|
|||
|
|
AND DATE(publish_time) <= %s
|
|||
|
|
AND status = 'published'
|
|||
|
|
AND channel = 1
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
cumulative_result = self.db_manager.execute_query(
|
|||
|
|
cumulative_sql,
|
|||
|
|
(author_id, month_first_day, stat_date_str),
|
|||
|
|
fetch_one=True,
|
|||
|
|
dict_cursor=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if cumulative_result:
|
|||
|
|
result['cumulative_published_count'] = int(cumulative_result.get('count', 0) or 0)
|
|||
|
|
print(f" [数据库] 累计发文量 ({month_first_day}至{stat_date_str}): {result['cumulative_published_count']}")
|
|||
|
|
|
|||
|
|
# 查询投稿量(当天的所有文章数,包括各种状态)
|
|||
|
|
# total_submission_count 用于 ai_statistics_day 表
|
|||
|
|
submission_sql = """
|
|||
|
|
SELECT COUNT(*) as count
|
|||
|
|
FROM ai_articles
|
|||
|
|
WHERE author_id = %s
|
|||
|
|
AND DATE(publish_time) = %s
|
|||
|
|
AND status = 'published'
|
|||
|
|
AND channel = 1
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
submission_result = self.db_manager.execute_query(
|
|||
|
|
submission_sql,
|
|||
|
|
(author_id, stat_date_str),
|
|||
|
|
fetch_one=True,
|
|||
|
|
dict_cursor=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if submission_result:
|
|||
|
|
result['total_submission_count'] = int(submission_result.get('count', 0) or 0)
|
|||
|
|
print(f" [数据库] 投稿量 ({stat_date_str}): {result['total_submission_count']}")
|
|||
|
|
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" [!] 从数据库查询文章数据失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
def calculate_weekly_article_count_from_db(self, author_id: int, stat_date: str) -> int:
|
|||
|
|
"""从ai_articles表计算当周发文量(周一至周日)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
author_id: 作者ID
|
|||
|
|
stat_date: 统计日期 (YYYY-MM-DD)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
当周发文量
|
|||
|
|
"""
|
|||
|
|
if not self.db_manager or author_id == 0:
|
|||
|
|
print(f" [数据库] 未连接或author_id无效,无法计算当周发文量")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
from datetime import datetime, timedelta
|
|||
|
|
|
|||
|
|
# 解析日期
|
|||
|
|
target_date = datetime.strptime(stat_date, '%Y-%m-%d')
|
|||
|
|
|
|||
|
|
# 计算本周一的日期(weekday: 0=周一, 6=周日)
|
|||
|
|
weekday = target_date.weekday()
|
|||
|
|
monday = target_date - timedelta(days=weekday)
|
|||
|
|
sunday = monday + timedelta(days=6)
|
|||
|
|
|
|||
|
|
monday_str = monday.strftime('%Y-%m-%d')
|
|||
|
|
sunday_str = sunday.strftime('%Y-%m-%d')
|
|||
|
|
|
|||
|
|
# 查询当周发文量
|
|||
|
|
sql = """
|
|||
|
|
SELECT COUNT(*) as count
|
|||
|
|
FROM ai_articles
|
|||
|
|
WHERE author_id = %s
|
|||
|
|
AND DATE(publish_time) >= %s
|
|||
|
|
AND DATE(publish_time) <= %s
|
|||
|
|
AND status = 'published'
|
|||
|
|
AND channel = 1
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
result = self.db_manager.execute_query(
|
|||
|
|
sql,
|
|||
|
|
(author_id, monday_str, sunday_str),
|
|||
|
|
fetch_one=True,
|
|||
|
|
dict_cursor=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if result:
|
|||
|
|
weekly_count = int(result.get('count', 0) or 0)
|
|||
|
|
print(f" [数据库] 当周发文量 ({monday_str} 至 {sunday_str}): {weekly_count}")
|
|||
|
|
return weekly_count
|
|||
|
|
else:
|
|||
|
|
print(f" [数据库] 未找到当周数据")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" [!] 从数据库计算当周发文量失败: {e}")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
def calculate_weekly_revenue_from_db(self, author_id: int, stat_date: str) -> float:
|
|||
|
|
"""从ai_statistics_days表汇总计算当周收益(周一至周日)
|
|||
|
|
|
|||
|
|
基于day_revenue字段进行汇总计算
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
author_id: 作者ID
|
|||
|
|
stat_date: 统计日期 (YYYY-MM-DD)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
当周收益总额
|
|||
|
|
"""
|
|||
|
|
if not self.db_manager or author_id == 0:
|
|||
|
|
print(f" [数据库] 未连接或author_id无效,无法计算当周收益")
|
|||
|
|
return 0.0
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
from datetime import datetime, timedelta
|
|||
|
|
|
|||
|
|
# 解析日期
|
|||
|
|
target_date = datetime.strptime(stat_date, '%Y-%m-%d')
|
|||
|
|
|
|||
|
|
# 计算本周一的日期(weekday: 0=周一, 6=周日)
|
|||
|
|
weekday = target_date.weekday()
|
|||
|
|
monday = target_date - timedelta(days=weekday)
|
|||
|
|
sunday = monday + timedelta(days=6)
|
|||
|
|
|
|||
|
|
monday_str = monday.strftime('%Y-%m-%d')
|
|||
|
|
sunday_str = sunday.strftime('%Y-%m-%d')
|
|||
|
|
|
|||
|
|
print(f" [调试] 目标日期: {stat_date}, 周一: {monday_str}, 周日: {sunday_str}")
|
|||
|
|
|
|||
|
|
# 查询数据库中本周的day_revenue总和
|
|||
|
|
sql = """
|
|||
|
|
SELECT SUM(day_revenue) as weekly_total, COUNT(*) as day_count
|
|||
|
|
FROM ai_statistics_days
|
|||
|
|
WHERE author_id = %s
|
|||
|
|
AND stat_date >= %s
|
|||
|
|
AND stat_date <= %s
|
|||
|
|
AND channel = 1
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
result = self.db_manager.execute_query(
|
|||
|
|
sql,
|
|||
|
|
(author_id, monday_str, sunday_str),
|
|||
|
|
fetch_one=True,
|
|||
|
|
dict_cursor=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
print(f" [调试] 查询结果: {result}")
|
|||
|
|
|
|||
|
|
if result and result.get('weekly_total') is not None:
|
|||
|
|
weekly_total = float(result['weekly_total'] or 0)
|
|||
|
|
day_count = int(result.get('day_count', 0) or 0)
|
|||
|
|
print(f" [数据库] 当周收益 ({monday_str} 至 {sunday_str}): ¥{weekly_total:.2f} (基于{day_count}天的数据)")
|
|||
|
|
return weekly_total
|
|||
|
|
else:
|
|||
|
|
print(f" [数据库] 未找到当周数据 ({monday_str} 至 {sunday_str}),返回0")
|
|||
|
|
return 0.0
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" [!] 从数据库计算当周收益失败: {e}")
|
|||
|
|
return 0.0
|
|||
|
|
|
|||
|
|
def calculate_last_week_revenue_from_db(self, author_id: int, stat_date: str) -> float:
|
|||
|
|
"""从ai_statistics_days表汇总计算上周收益(上周一至上周日)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
author_id: 作者ID
|
|||
|
|
stat_date: 统计日期 (YYYY-MM-DD)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
上周收益总额
|
|||
|
|
"""
|
|||
|
|
if not self.db_manager or author_id == 0:
|
|||
|
|
return 0.0
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
from datetime import datetime, timedelta
|
|||
|
|
|
|||
|
|
target_date = datetime.strptime(stat_date, '%Y-%m-%d')
|
|||
|
|
weekday = target_date.weekday()
|
|||
|
|
|
|||
|
|
# 本周一
|
|||
|
|
this_monday = target_date - timedelta(days=weekday)
|
|||
|
|
# 上周一 = 本周一 - 7天
|
|||
|
|
last_monday = this_monday - timedelta(days=7)
|
|||
|
|
# 上周日 = 上周一 + 6天
|
|||
|
|
last_sunday = last_monday + timedelta(days=6)
|
|||
|
|
|
|||
|
|
last_monday_str = last_monday.strftime('%Y-%m-%d')
|
|||
|
|
last_sunday_str = last_sunday.strftime('%Y-%m-%d')
|
|||
|
|
|
|||
|
|
sql = """
|
|||
|
|
SELECT SUM(day_revenue) as weekly_total
|
|||
|
|
FROM ai_statistics_days
|
|||
|
|
WHERE author_id = %s
|
|||
|
|
AND stat_date >= %s
|
|||
|
|
AND stat_date <= %s
|
|||
|
|
AND channel = 1
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
result = self.db_manager.execute_query(
|
|||
|
|
sql,
|
|||
|
|
(author_id, last_monday_str, last_sunday_str),
|
|||
|
|
fetch_one=True,
|
|||
|
|
dict_cursor=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if result and result.get('weekly_total') is not None:
|
|||
|
|
last_week_total = float(result['weekly_total'] or 0)
|
|||
|
|
print(f" [数据库] 上周收益 ({last_monday_str} 至 {last_sunday_str}): ¥{last_week_total:.2f}")
|
|||
|
|
return last_week_total
|
|||
|
|
else:
|
|||
|
|
print(f" [数据库] 未找到上周数据 ({last_monday_str} 至 {last_sunday_str})")
|
|||
|
|
return 0.0
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" [!] 从数据库计算上周收益失败: {e}")
|
|||
|
|
return 0.0
|
|||
|
|
|
|||
|
|
def calculate_last_month_revenue_from_db(self, author_id: int, stat_date: str) -> float:
|
|||
|
|
"""从ai_statistics_days表汇总计算上月收益
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
author_id: 作者ID
|
|||
|
|
stat_date: 统计日期 (YYYY-MM-DD)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
上月收益总额
|
|||
|
|
"""
|
|||
|
|
if not self.db_manager or author_id == 0:
|
|||
|
|
return 0.0
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
from datetime import datetime
|
|||
|
|
from dateutil.relativedelta import relativedelta
|
|||
|
|
|
|||
|
|
target_date = datetime.strptime(stat_date, '%Y-%m-%d')
|
|||
|
|
|
|||
|
|
# 上个月的第一天
|
|||
|
|
last_month_first = (target_date.replace(day=1) - relativedelta(months=1))
|
|||
|
|
# 上个月的最后一天
|
|||
|
|
last_month_last = target_date.replace(day=1) - relativedelta(days=1)
|
|||
|
|
|
|||
|
|
last_month_first_str = last_month_first.strftime('%Y-%m-%d')
|
|||
|
|
last_month_last_str = last_month_last.strftime('%Y-%m-%d')
|
|||
|
|
|
|||
|
|
sql = """
|
|||
|
|
SELECT SUM(day_revenue) as monthly_total
|
|||
|
|
FROM ai_statistics_days
|
|||
|
|
WHERE author_id = %s
|
|||
|
|
AND stat_date >= %s
|
|||
|
|
AND stat_date <= %s
|
|||
|
|
AND channel = 1
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
result = self.db_manager.execute_query(
|
|||
|
|
sql,
|
|||
|
|
(author_id, last_month_first_str, last_month_last_str),
|
|||
|
|
fetch_one=True,
|
|||
|
|
dict_cursor=True
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if result and result.get('monthly_total') is not None:
|
|||
|
|
last_month_total = float(result['monthly_total'] or 0)
|
|||
|
|
print(f" [数据库] 上月收益 ({last_month_first_str} 至 {last_month_last_str}): ¥{last_month_total:.2f}")
|
|||
|
|
return last_month_total
|
|||
|
|
else:
|
|||
|
|
print(f" [数据库] 未找到上月数据 ({last_month_first_str} 至 {last_month_last_str})")
|
|||
|
|
return 0.0
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" [!] 从数据库计算上月收益失败: {e}")
|
|||
|
|
return 0.0
|
|||
|
|
|
|||
|
|
def load_integrated_data(self) -> List[Dict]:
|
|||
|
|
"""加载整合数据文件 bjh_integrated_data.json"""
|
|||
|
|
try:
|
|||
|
|
if os.path.exists(self.integrated_file):
|
|||
|
|
with open(self.integrated_file, 'r', encoding='utf-8') as f:
|
|||
|
|
data = json.load(f)
|
|||
|
|
print(f"[OK] 加载整合数据文件: {self.integrated_file}")
|
|||
|
|
print(f" 共 {len(data)} 个账号")
|
|||
|
|
return data
|
|||
|
|
else:
|
|||
|
|
print(f"[!] 未找到整合数据文件: {self.integrated_file}")
|
|||
|
|
return []
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"[X] 加载整合数据失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
def extract_total_metrics(self, account_data: Dict) -> Dict:
|
|||
|
|
"""从整合数据中提取汇总指标(对应 ai_statistics 表)"""
|
|||
|
|
metrics = {
|
|||
|
|
'submission_count': 0,
|
|||
|
|
'read_count': 0,
|
|||
|
|
'comment_count': 0,
|
|||
|
|
'comment_rate': 0.0,
|
|||
|
|
'like_count': 0,
|
|||
|
|
'like_rate': 0.0,
|
|||
|
|
'favorite_count': 0,
|
|||
|
|
'favorite_rate': 0.0,
|
|||
|
|
'share_count': 0,
|
|||
|
|
'share_rate': 0.0,
|
|||
|
|
'slide_ratio': 0.0,
|
|||
|
|
'baidu_search_volume': 0,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
analytics = account_data.get('analytics', {})
|
|||
|
|
apis = analytics.get('apis', [])
|
|||
|
|
|
|||
|
|
if apis and len(apis) > 0:
|
|||
|
|
api_data = apis[0].get('data', {})
|
|||
|
|
if api_data.get('errno') == 0:
|
|||
|
|
total_info = api_data.get('data', {}).get('total_info', {})
|
|||
|
|
|
|||
|
|
if total_info:
|
|||
|
|
metrics['submission_count'] = int(total_info.get('publish_count', 0) or 0)
|
|||
|
|
metrics['read_count'] = int(total_info.get('view_count', 0) or 0)
|
|||
|
|
metrics['comment_count'] = int(total_info.get('comment_count', 0) or 0)
|
|||
|
|
metrics['comment_rate'] = float(total_info.get('comment_rate', 0) or 0)
|
|||
|
|
metrics['like_count'] = int(total_info.get('likes_count', 0) or 0)
|
|||
|
|
metrics['like_rate'] = float(total_info.get('likes_rate', 0) or 0)
|
|||
|
|
metrics['favorite_count'] = int(total_info.get('collect_count', 0) or 0)
|
|||
|
|
metrics['favorite_rate'] = float(total_info.get('collect_rate', 0) or 0)
|
|||
|
|
metrics['share_count'] = int(total_info.get('share_count', 0) or 0)
|
|||
|
|
metrics['share_rate'] = float(total_info.get('share_rate', 0) or 0)
|
|||
|
|
metrics['slide_ratio'] = float(total_info.get('pic_slide_rate', 0) or 0)
|
|||
|
|
metrics['baidu_search_volume'] = int(total_info.get('disp_pv', 0) or 0) # 修正:使用disp_pv
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" [!] 提取汇总指标失败: {e}")
|
|||
|
|
|
|||
|
|
return metrics
|
|||
|
|
|
|||
|
|
def extract_income_metrics(self, account_data: Dict, account_id: str = '') -> Dict:
|
|||
|
|
"""从整合数据中提取收入指标
|
|||
|
|
|
|||
|
|
注意:
|
|||
|
|
- weekly_revenue: 不再从API获取,在export_ai_statistics_days中从数据库计算
|
|||
|
|
- monthly_revenue: 使用currentMonth(当前自然月收益)
|
|||
|
|
- day_revenue: 从yesterday提取昨日收益(当日收益)
|
|||
|
|
- revenue_wow_growth_rate: 周环比,从数据库计算(本周 vs 上周)
|
|||
|
|
- revenue_mom_growth_rate: 月环比,从数据库计算(当月 vs 上月)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
account_data: 账号整合数据
|
|||
|
|
account_id: 账号ID(用于获取当周收益)
|
|||
|
|
"""
|
|||
|
|
metrics = {
|
|||
|
|
'weekly_revenue': 0.0, # 将在导出时从数据库计算
|
|||
|
|
'monthly_revenue': 0.0,
|
|||
|
|
'day_revenue': 0.0, # 当日收益
|
|||
|
|
'revenue_wow_growth_rate': 0.0, # 周环比,将在导出时从数据库计算
|
|||
|
|
'revenue_mom_growth_rate': 0.0, # 月环比,将在导出时从数据库计算
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
income = account_data.get('income', {})
|
|||
|
|
if income and income.get('errno') == 0:
|
|||
|
|
income_data = income.get('data', {}).get('income', {})
|
|||
|
|
|
|||
|
|
# 从yesterday获取当日收益(day_revenue)
|
|||
|
|
yesterday = income_data.get('yesterday', {})
|
|||
|
|
if yesterday:
|
|||
|
|
metrics['day_revenue'] = float(yesterday.get('income', 0) or 0)
|
|||
|
|
print(f" 当日收益: ¥{metrics['day_revenue']:.2f}")
|
|||
|
|
|
|||
|
|
# weekly_revenue 不再从API获取,在导出时从数据库的day_revenue汇总计算
|
|||
|
|
print(f" 当周收益: 将从数据库计算")
|
|||
|
|
|
|||
|
|
# 周环比和月环比不再从API获取,将在导出时从数据库计算
|
|||
|
|
# 这里保持为0,由export_ai_statistics_days方法计算
|
|||
|
|
print(f" 环比增长率: 将从数据库计算")
|
|||
|
|
|
|||
|
|
# 当前自然月收入(currentMonth)
|
|||
|
|
current_month = income_data.get('currentMonth', {})
|
|||
|
|
if current_month:
|
|||
|
|
metrics['monthly_revenue'] = float(current_month.get('income', 0) or 0)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" [!] 提取收入指标失败: {e}")
|
|||
|
|
|
|||
|
|
return metrics
|
|||
|
|
|
|||
|
|
def export_ai_statistics(self, integrated_data: List[Dict]) -> bool:
|
|||
|
|
"""导出 ai_statistics 表数据(账号汇总统计)"""
|
|||
|
|
try:
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("开始导出 ai_statistics 表数据")
|
|||
|
|
print("="*70)
|
|||
|
|
|
|||
|
|
csv_rows = []
|
|||
|
|
|
|||
|
|
for account_data in integrated_data:
|
|||
|
|
account_id = account_data.get('account_id', '')
|
|||
|
|
if not account_id:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
print(f"\n处理账号: {account_id}")
|
|||
|
|
|
|||
|
|
# 获取作者ID
|
|||
|
|
author_id = self.get_author_id(account_id)
|
|||
|
|
|
|||
|
|
# 提取统计指标
|
|||
|
|
metrics = self.extract_total_metrics(account_data)
|
|||
|
|
|
|||
|
|
# 获取最新日期
|
|||
|
|
latest_date = datetime.now().strftime('%Y-%m-%d')
|
|||
|
|
analytics = account_data.get('analytics', {})
|
|||
|
|
apis = analytics.get('apis', [])
|
|||
|
|
if apis and len(apis) > 0:
|
|||
|
|
api_data = apis[0].get('data', {})
|
|||
|
|
if api_data.get('errno') == 0:
|
|||
|
|
latest_event_day = api_data.get('data', {}).get('latest_event_day', '')
|
|||
|
|
if latest_event_day:
|
|||
|
|
# 格式化:20251216 -> 2025-12-16
|
|||
|
|
date_str = str(latest_event_day)
|
|||
|
|
latest_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}"
|
|||
|
|
|
|||
|
|
row = {
|
|||
|
|
'author_id': author_id, # 使用从数据库查询的author_id
|
|||
|
|
'author_name': account_id,
|
|||
|
|
'channel': 1, # 1=baidu
|
|||
|
|
'date': latest_date,
|
|||
|
|
'submission_count': metrics['submission_count'],
|
|||
|
|
'read_count': metrics['read_count'],
|
|||
|
|
'comment_count': metrics['comment_count'],
|
|||
|
|
'comment_rate': f"{metrics['comment_rate']:.4f}",
|
|||
|
|
'like_count': metrics['like_count'],
|
|||
|
|
'like_rate': f"{metrics['like_rate']:.4f}",
|
|||
|
|
'favorite_count': metrics['favorite_count'],
|
|||
|
|
'favorite_rate': f"{metrics['favorite_rate']:.4f}",
|
|||
|
|
'share_count': metrics['share_count'],
|
|||
|
|
'share_rate': f"{metrics['share_rate']:.4f}",
|
|||
|
|
'slide_ratio': f"{metrics['slide_ratio']:.4f}",
|
|||
|
|
'baidu_search_volume': metrics['baidu_search_volume'],
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
csv_rows.append(row)
|
|||
|
|
print(f" 投稿量: {row['submission_count']}")
|
|||
|
|
print(f" 阅读量: {row['read_count']}")
|
|||
|
|
|
|||
|
|
# 写入CSV
|
|||
|
|
if csv_rows:
|
|||
|
|
fieldnames = [
|
|||
|
|
'author_id', 'author_name', 'channel', 'date',
|
|||
|
|
'submission_count', 'read_count', 'comment_count', 'comment_rate',
|
|||
|
|
'like_count', 'like_rate', 'favorite_count', 'favorite_rate',
|
|||
|
|
'share_count', 'share_rate', 'slide_ratio', 'baidu_search_volume'
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
with open(self.output_ai_statistics, 'w', encoding='utf-8-sig', newline='') as f:
|
|||
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|||
|
|
writer.writeheader()
|
|||
|
|
writer.writerows(csv_rows)
|
|||
|
|
|
|||
|
|
print(f"\n{'='*70}")
|
|||
|
|
print(f"[OK] ai_statistics 表数据已导出到: {self.output_ai_statistics}")
|
|||
|
|
print(f" 共 {len(csv_rows)} 条记录")
|
|||
|
|
print(f"{'='*70}")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
print("\n[!] 没有数据可导出")
|
|||
|
|
return False
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n[X] 导出 ai_statistics 失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def export_ai_statistics_day(self, integrated_data: List[Dict], use_db_article_count: bool = True) -> bool:
|
|||
|
|
"""导出 ai_statistics_day 表数据(每日明细统计)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
integrated_data: 整合数据列表
|
|||
|
|
use_db_article_count: 是否使用数据库查询文章数据(默认True)
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("开始导出 ai_statistics_day 表数据")
|
|||
|
|
print("="*70)
|
|||
|
|
|
|||
|
|
csv_rows = []
|
|||
|
|
|
|||
|
|
for account_data in integrated_data:
|
|||
|
|
account_id = account_data.get('account_id', '')
|
|||
|
|
if not account_id:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
print(f"\n处理账号: {account_id}")
|
|||
|
|
|
|||
|
|
# 获取作者ID
|
|||
|
|
author_id = self.get_author_id(account_id)
|
|||
|
|
|
|||
|
|
# 获取每日数据列表
|
|||
|
|
analytics = account_data.get('analytics', {})
|
|||
|
|
apis = analytics.get('apis', [])
|
|||
|
|
|
|||
|
|
daily_list = []
|
|||
|
|
if apis and len(apis) > 0:
|
|||
|
|
api_data = apis[0].get('data', {})
|
|||
|
|
if api_data.get('errno') == 0:
|
|||
|
|
daily_list = api_data.get('data', {}).get('list', [])
|
|||
|
|
|
|||
|
|
if daily_list:
|
|||
|
|
print(f" 找到 {len(daily_list)} 天的数据")
|
|||
|
|
|
|||
|
|
# 只取最新一天的数据
|
|||
|
|
# 按日期排序,取最后一条(最新)
|
|||
|
|
daily_list_sorted = sorted(daily_list, key=lambda x: x.get('event_day', ''))
|
|||
|
|
latest_day_data = daily_list_sorted[-1] if daily_list_sorted else None
|
|||
|
|
|
|||
|
|
if latest_day_data:
|
|||
|
|
event_day = latest_day_data.get('event_day', '')
|
|||
|
|
if event_day:
|
|||
|
|
# 格式化日期
|
|||
|
|
date_str = str(event_day)
|
|||
|
|
formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}"
|
|||
|
|
|
|||
|
|
# 优先从数据库查询投稿量
|
|||
|
|
total_submission_count = int(latest_day_data.get('publish_count', 0) or 0)
|
|||
|
|
if use_db_article_count and author_id > 0:
|
|||
|
|
article_data = self.query_article_count_from_db(author_id, formatted_date)
|
|||
|
|
total_submission_count = article_data['total_submission_count']
|
|||
|
|
print(f" [使用数据库] 投稿量: {total_submission_count}")
|
|||
|
|
else:
|
|||
|
|
print(f" [使用API] 投稿量: {total_submission_count}")
|
|||
|
|
|
|||
|
|
row = {
|
|||
|
|
'author_id': author_id,
|
|||
|
|
'author_name': account_id,
|
|||
|
|
'channel': 1,
|
|||
|
|
'stat_date': formatted_date,
|
|||
|
|
'total_submission_count': total_submission_count,
|
|||
|
|
'total_read_count': int(latest_day_data.get('view_count', 0) or 0),
|
|||
|
|
'total_comment_count': int(latest_day_data.get('comment_count', 0) or 0),
|
|||
|
|
'total_like_count': int(latest_day_data.get('likes_count', 0) or 0),
|
|||
|
|
'total_favorite_count': int(latest_day_data.get('collect_count', 0) or 0),
|
|||
|
|
'total_share_count': int(latest_day_data.get('share_count', 0) or 0),
|
|||
|
|
'avg_comment_rate': f"{float(latest_day_data.get('comment_rate', 0) or 0):.4f}",
|
|||
|
|
'avg_like_rate': f"{float(latest_day_data.get('likes_rate', 0) or 0):.4f}",
|
|||
|
|
'avg_favorite_rate': f"{float(latest_day_data.get('collect_rate', 0) or 0):.4f}",
|
|||
|
|
'avg_share_rate': f"{float(latest_day_data.get('share_rate', 0) or 0):.4f}",
|
|||
|
|
'avg_slide_ratio': f"{float(latest_day_data.get('pic_slide_rate', 0) or 0):.4f}",
|
|||
|
|
'total_baidu_search_volume': int(latest_day_data.get('disp_pv', 0) or 0),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
csv_rows.append(row)
|
|||
|
|
print(f" 最新日期: {formatted_date}")
|
|||
|
|
else:
|
|||
|
|
print(f" 未找到有效的最新数据")
|
|||
|
|
else:
|
|||
|
|
print(f" 未找到每日数据")
|
|||
|
|
|
|||
|
|
# 写入CSV
|
|||
|
|
if csv_rows:
|
|||
|
|
fieldnames = [
|
|||
|
|
'author_id', 'author_name', 'channel', 'stat_date',
|
|||
|
|
'total_submission_count', 'total_read_count', 'total_comment_count',
|
|||
|
|
'total_like_count', 'total_favorite_count', 'total_share_count',
|
|||
|
|
'avg_comment_rate', 'avg_like_rate', 'avg_favorite_rate',
|
|||
|
|
'avg_share_rate', 'avg_slide_ratio', 'total_baidu_search_volume'
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
with open(self.output_ai_statistics_day, 'w', encoding='utf-8-sig', newline='') as f:
|
|||
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|||
|
|
writer.writeheader()
|
|||
|
|
writer.writerows(csv_rows)
|
|||
|
|
|
|||
|
|
print(f"\n{'='*70}")
|
|||
|
|
print(f"[OK] ai_statistics_day 表数据已导出到: {self.output_ai_statistics_day}")
|
|||
|
|
print(f" 共 {len(csv_rows)} 条记录")
|
|||
|
|
print(f"{'='*70}")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
print("\n[!] 没有数据可导出")
|
|||
|
|
return False
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n[X] 导出 ai_statistics_day 失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def export_ai_statistics_days(self, integrated_data: List[Dict], use_db_article_count: bool = True, use_db_weekly_revenue: bool = True) -> bool:
|
|||
|
|
"""导出ai_statistics_days表数据(核心指标:发文量、收益、环比)
|
|||
|
|
|
|||
|
|
注意:
|
|||
|
|
- daily_published_count: 优先从ai_articles表查询,否则使用API数据
|
|||
|
|
- cumulative_published_count: 优先从ai_articles表查询(从起始日到stat_date的累计发文量)
|
|||
|
|
- monthly_revenue: stat_date所在自然月的总收益(使用近30天收益作为近似值)
|
|||
|
|
- weekly_revenue: 优先从ai_statistics_days表汇总计算,否则使用API数据
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
integrated_data: 整合数据列表
|
|||
|
|
use_db_article_count: 是否使用数据库查询文章数据(默认True)
|
|||
|
|
use_db_weekly_revenue: 是否使用数据库计算当周收益(默认True)
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("开始导出 ai_statistics_days 表数据")
|
|||
|
|
print("="*70)
|
|||
|
|
|
|||
|
|
csv_rows = []
|
|||
|
|
|
|||
|
|
for account_data in integrated_data:
|
|||
|
|
account_id = account_data.get('account_id', '')
|
|||
|
|
if not account_id:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
print(f"\n处理账号: {account_id}")
|
|||
|
|
|
|||
|
|
# 获取作者ID
|
|||
|
|
author_id = self.get_author_id(account_id)
|
|||
|
|
|
|||
|
|
# 获取每日数据列表
|
|||
|
|
analytics = account_data.get('analytics', {})
|
|||
|
|
apis = analytics.get('apis', [])
|
|||
|
|
|
|||
|
|
daily_list = []
|
|||
|
|
if apis and len(apis) > 0:
|
|||
|
|
api_data = apis[0].get('data', {})
|
|||
|
|
if api_data.get('errno') == 0:
|
|||
|
|
daily_list = api_data.get('data', {}).get('list', [])
|
|||
|
|
|
|||
|
|
# 提取收入指标(传入账号ID以支持当周收益)
|
|||
|
|
income_metrics = self.extract_income_metrics(account_data, account_id)
|
|||
|
|
|
|||
|
|
# 处理每日数据
|
|||
|
|
if daily_list:
|
|||
|
|
print(f" 找到 {len(daily_list)} 天的数据")
|
|||
|
|
|
|||
|
|
# 按日期排序(从早到晚)
|
|||
|
|
daily_list_sorted = sorted(daily_list, key=lambda x: x.get('event_day', ''))
|
|||
|
|
|
|||
|
|
# 逐日累计发文量(API数据,仅当数据库无法查询时使用)
|
|||
|
|
cumulative_count = 0
|
|||
|
|
for day_data in daily_list_sorted:
|
|||
|
|
daily_published = int(day_data.get('publish_count', 0) or 0)
|
|||
|
|
cumulative_count += daily_published
|
|||
|
|
|
|||
|
|
# 只取最新一天的数据
|
|||
|
|
latest_day_data = daily_list_sorted[-1] if daily_list_sorted else None
|
|||
|
|
|
|||
|
|
if latest_day_data:
|
|||
|
|
event_day = latest_day_data.get('event_day', '')
|
|||
|
|
if event_day:
|
|||
|
|
# 格式化日期
|
|||
|
|
date_str = str(event_day)
|
|||
|
|
formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}"
|
|||
|
|
|
|||
|
|
# 优先从数据库查询文章数据
|
|||
|
|
if use_db_article_count and author_id > 0:
|
|||
|
|
article_data = self.query_article_count_from_db(author_id, formatted_date)
|
|||
|
|
daily_published = article_data['daily_published_count']
|
|||
|
|
cumulative_count = article_data['cumulative_published_count']
|
|||
|
|
print(f" [使用数据库] 文章数据: 单日={daily_published}, 累计={cumulative_count}")
|
|||
|
|
else:
|
|||
|
|
# 使用API数据
|
|||
|
|
daily_published = int(latest_day_data.get('publish_count', 0) or 0)
|
|||
|
|
print(f" [使用API] 文章数据: 单日={daily_published}, 累计={cumulative_count}")
|
|||
|
|
|
|||
|
|
# 计算当周收益:数据库中本周已有的收益 + 当日新抓取的收益
|
|||
|
|
if use_db_weekly_revenue and author_id > 0:
|
|||
|
|
# 从数据库查询本周已有的收益(不包括今天,因为今天的数据还没导入)
|
|||
|
|
weekly_revenue_db = self.calculate_weekly_revenue_from_db(author_id, formatted_date)
|
|||
|
|
# 当周收益 = 数据库中的历史收益 + 当日新抓取的收益
|
|||
|
|
day_revenue = income_metrics['day_revenue']
|
|||
|
|
weekly_revenue_total = weekly_revenue_db + day_revenue
|
|||
|
|
income_metrics['weekly_revenue'] = weekly_revenue_total
|
|||
|
|
print(f" [数据库] 本周已有收益: ¥{weekly_revenue_db:.2f}")
|
|||
|
|
print(f" [API] 当日新增收益: ¥{day_revenue:.2f}")
|
|||
|
|
print(f" [计算] 当周总收益: ¥{weekly_revenue_total:.2f}")
|
|||
|
|
|
|||
|
|
# 计算周环比:本周 vs 上周
|
|||
|
|
last_week_revenue = self.calculate_last_week_revenue_from_db(author_id, formatted_date)
|
|||
|
|
if last_week_revenue > 0:
|
|||
|
|
income_metrics['revenue_wow_growth_rate'] = (weekly_revenue_total - last_week_revenue) / last_week_revenue
|
|||
|
|
print(f" [计算] 周环比: {income_metrics['revenue_wow_growth_rate']:.2%} (本周¥{weekly_revenue_total:.2f} vs 上周¥{last_week_revenue:.2f})")
|
|||
|
|
else:
|
|||
|
|
print(f" [计算] 周环比: 无法计算(上周没有数据)")
|
|||
|
|
|
|||
|
|
# 计算月环比:当月 vs 上月
|
|||
|
|
last_month_revenue = self.calculate_last_month_revenue_from_db(author_id, formatted_date)
|
|||
|
|
monthly_revenue = income_metrics['monthly_revenue']
|
|||
|
|
if last_month_revenue > 0:
|
|||
|
|
income_metrics['revenue_mom_growth_rate'] = (monthly_revenue - last_month_revenue) / last_month_revenue
|
|||
|
|
print(f" [计算] 月环比: {income_metrics['revenue_mom_growth_rate']:.2%} (当月¥{monthly_revenue:.2f} vs 上月¥{last_month_revenue:.2f})")
|
|||
|
|
else:
|
|||
|
|
print(f" [计算] 月环比: 无法计算(上月没有数据)")
|
|||
|
|
else:
|
|||
|
|
# 如果不使用数据库,weekly_revenue = 当日收益
|
|||
|
|
income_metrics['weekly_revenue'] = income_metrics['day_revenue']
|
|||
|
|
print(f" [跳过数据库] 当周收益 = 当日收益: ¥{income_metrics['day_revenue']:.2f}")
|
|||
|
|
|
|||
|
|
row = {
|
|||
|
|
'author_id': author_id,
|
|||
|
|
'author_name': account_id,
|
|||
|
|
'channel': 1,
|
|||
|
|
'stat_date': formatted_date,
|
|||
|
|
'daily_published_count': daily_published,
|
|||
|
|
'cumulative_published_count': cumulative_count,
|
|||
|
|
'day_revenue': f"{income_metrics['day_revenue']:.2f}",
|
|||
|
|
'monthly_revenue': f"{income_metrics['monthly_revenue']:.2f}",
|
|||
|
|
'weekly_revenue': f"{income_metrics['weekly_revenue']:.2f}",
|
|||
|
|
'revenue_mom_growth_rate': f"{income_metrics['revenue_mom_growth_rate']:.6f}",
|
|||
|
|
'revenue_wow_growth_rate': f"{income_metrics['revenue_wow_growth_rate']:.6f}",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
csv_rows.append(row)
|
|||
|
|
print(f" 最新日期: {formatted_date}")
|
|||
|
|
print(f" 累计发文量: {cumulative_count}")
|
|||
|
|
else:
|
|||
|
|
print(f" 未找到有效的最新数据")
|
|||
|
|
else:
|
|||
|
|
# 即使没有每日数据,也导出收入信息
|
|||
|
|
row = {
|
|||
|
|
'author_id': author_id,
|
|||
|
|
'author_name': account_id,
|
|||
|
|
'channel': 1,
|
|||
|
|
'stat_date': datetime.now().strftime('%Y-%m-%d'),
|
|||
|
|
'daily_published_count': 0,
|
|||
|
|
'cumulative_published_count': 0,
|
|||
|
|
'day_revenue': f"{income_metrics['day_revenue']:.2f}", # 新增:每日收益
|
|||
|
|
'monthly_revenue': f"{income_metrics['monthly_revenue']:.2f}",
|
|||
|
|
'weekly_revenue': f"{income_metrics['weekly_revenue']:.2f}",
|
|||
|
|
'revenue_mom_growth_rate': f"{income_metrics['revenue_mom_growth_rate']:.6f}",
|
|||
|
|
'revenue_wow_growth_rate': f"{income_metrics['revenue_wow_growth_rate']:.6f}",
|
|||
|
|
}
|
|||
|
|
csv_rows.append(row)
|
|||
|
|
print(f" 未找到每日数据,使用默认值")
|
|||
|
|
|
|||
|
|
# 写入CSV
|
|||
|
|
if csv_rows:
|
|||
|
|
fieldnames = [
|
|||
|
|
'author_id', 'author_name', 'channel', 'stat_date',
|
|||
|
|
'daily_published_count', 'cumulative_published_count',
|
|||
|
|
'day_revenue', # 新增:每日收益
|
|||
|
|
'monthly_revenue', 'weekly_revenue',
|
|||
|
|
'revenue_mom_growth_rate', 'revenue_wow_growth_rate'
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
with open(self.output_ai_statistics_days, 'w', encoding='utf-8-sig', newline='') as f:
|
|||
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|||
|
|
writer.writeheader()
|
|||
|
|
writer.writerows(csv_rows)
|
|||
|
|
|
|||
|
|
print(f"\n{'='*70}")
|
|||
|
|
print(f"[OK] ai_statistics_days 表数据已导出到: {self.output_ai_statistics_days}")
|
|||
|
|
print(f" 共 {len(csv_rows)} 条记录")
|
|||
|
|
print(f"{'='*70}")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
print("\n[!] 没有数据可导出")
|
|||
|
|
return False
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n[X] 导出 ai_statistics_days 失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def merge_and_export(self, analytics_data: Dict, income_data: Dict) -> bool:
|
|||
|
|
"""整合数据并导出为CSV
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
analytics_data: 发文统计数据
|
|||
|
|
income_data: 收入数据
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
bool: 是否成功导出
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 准备CSV数据
|
|||
|
|
csv_rows = []
|
|||
|
|
|
|||
|
|
# 收集所有账号ID
|
|||
|
|
account_ids = set()
|
|||
|
|
|
|||
|
|
# 从发文统计中收集账号
|
|||
|
|
if isinstance(analytics_data, list):
|
|||
|
|
for item in analytics_data:
|
|||
|
|
account_ids.add(item.get('account_id', ''))
|
|||
|
|
elif isinstance(analytics_data, dict):
|
|||
|
|
account_ids.update(analytics_data.keys())
|
|||
|
|
|
|||
|
|
# 从收入数据中收集账号
|
|||
|
|
if isinstance(income_data, dict):
|
|||
|
|
account_ids.update(income_data.keys())
|
|||
|
|
|
|||
|
|
# 移除空字符串
|
|||
|
|
account_ids.discard('')
|
|||
|
|
|
|||
|
|
print(f"\n找到 {len(account_ids)} 个账号")
|
|||
|
|
|
|||
|
|
# 为每个账号整合数据
|
|||
|
|
for account_id in sorted(account_ids):
|
|||
|
|
print(f"\n处理账号: {account_id}")
|
|||
|
|
|
|||
|
|
row = {
|
|||
|
|
'账号ID': account_id,
|
|||
|
|
'更新时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 旧方法已废弃,此部分代码不再使用
|
|||
|
|
analytics_metrics = {}
|
|||
|
|
daily_list = []
|
|||
|
|
|
|||
|
|
# 计算单日发文量(平均值)和累计发文量
|
|||
|
|
if daily_list:
|
|||
|
|
total_publish = 0
|
|||
|
|
valid_days = 0
|
|||
|
|
for day_data in daily_list:
|
|||
|
|
publish_count = day_data.get('publish_count', '')
|
|||
|
|
if publish_count != '' and publish_count != '0':
|
|||
|
|
try:
|
|||
|
|
total_publish += int(publish_count)
|
|||
|
|
valid_days += 1
|
|||
|
|
except (ValueError, TypeError):
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
# 累计发文量:7天内发文总数(从投稿量字段获取)
|
|||
|
|
analytics_metrics['累计发文量'] = analytics_metrics.get('投稿量', '0')
|
|||
|
|
|
|||
|
|
# 单日发文量:平均值
|
|||
|
|
if valid_days > 0:
|
|||
|
|
avg_daily = total_publish / valid_days
|
|||
|
|
analytics_metrics['单日发文量'] = f"{avg_daily:.2f}"
|
|||
|
|
else:
|
|||
|
|
analytics_metrics['单日发文量'] = "0"
|
|||
|
|
else:
|
|||
|
|
analytics_metrics['单日发文量'] = "0"
|
|||
|
|
analytics_metrics['累计发文量'] = analytics_metrics.get('投稿量', '0')
|
|||
|
|
|
|||
|
|
# 提取收入数据
|
|||
|
|
income_metrics = {}
|
|||
|
|
if account_id in income_data:
|
|||
|
|
income_metrics = self.extract_income_metrics(income_data[account_id])
|
|||
|
|
|
|||
|
|
# 合并指标
|
|||
|
|
row.update(analytics_metrics)
|
|||
|
|
row.update(income_metrics)
|
|||
|
|
|
|||
|
|
csv_rows.append(row)
|
|||
|
|
|
|||
|
|
# 显示数据预览
|
|||
|
|
print(f" 投稿量: {row.get('投稿量', '无')}")
|
|||
|
|
print(f" 当周收益: ¥{row.get('当周收益', '0')}")
|
|||
|
|
print(f" 当月收益: ¥{row.get('当月收益', '0')}")
|
|||
|
|
|
|||
|
|
# 写入CSV文件
|
|||
|
|
if csv_rows:
|
|||
|
|
# 定义列顺序
|
|||
|
|
fieldnames = [
|
|||
|
|
'账号ID',
|
|||
|
|
'更新时间',
|
|||
|
|
# 发文统计
|
|||
|
|
'投稿量',
|
|||
|
|
'阅读量',
|
|||
|
|
'评论量',
|
|||
|
|
'评论率',
|
|||
|
|
'点赞量',
|
|||
|
|
'点赞率',
|
|||
|
|
'收藏量',
|
|||
|
|
'收藏率',
|
|||
|
|
'分享量',
|
|||
|
|
'分享率',
|
|||
|
|
'滑图占比',
|
|||
|
|
'百度搜索量',
|
|||
|
|
'单日发文量',
|
|||
|
|
'累计发文量',
|
|||
|
|
'曝光量',
|
|||
|
|
'点击率',
|
|||
|
|
# 收入数据
|
|||
|
|
'当周收益',
|
|||
|
|
'当月收益',
|
|||
|
|
'收益周环比',
|
|||
|
|
'收益月环比',
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 旧方法已废弃
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
print("\n[!] 没有数据可导出")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n[X] 导出失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def export_all_tables(self) -> bool:
|
|||
|
|
"""导出所有三个表的数据
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
bool: 是否成功
|
|||
|
|
"""
|
|||
|
|
if self.use_database:
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("从 bjh_integrated_data.json 导入数据到数据库")
|
|||
|
|
print("="*70)
|
|||
|
|
else:
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("从 bjh_integrated_data.json 导出数据到三个表")
|
|||
|
|
print("="*70)
|
|||
|
|
|
|||
|
|
# 加载整合数据
|
|||
|
|
integrated_data = self.load_integrated_data()
|
|||
|
|
if not integrated_data:
|
|||
|
|
print("\n[X] 没有可用的数据")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# 导出三个表
|
|||
|
|
if self.use_database:
|
|||
|
|
# 数据库模式:直接插入数据库
|
|||
|
|
result1 = self.insert_ai_statistics(integrated_data)
|
|||
|
|
result2 = self.insert_ai_statistics_day(integrated_data)
|
|||
|
|
result3 = self.insert_ai_statistics_days(integrated_data)
|
|||
|
|
else:
|
|||
|
|
# CSV模式:导出CSV文件
|
|||
|
|
result1 = self.export_ai_statistics(integrated_data)
|
|||
|
|
result2 = self.export_ai_statistics_day(integrated_data)
|
|||
|
|
result3 = self.export_ai_statistics_days(integrated_data)
|
|||
|
|
|
|||
|
|
return result1 and result2 and result3
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
def export_daily_data(self, analytics_data: Dict) -> bool:
|
|||
|
|
"""导出每日明细数据(对应ai_statistics_day表)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
analytics_data: 发文统计数据
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
bool: 是否成功导出
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("开始导出每日明细数据")
|
|||
|
|
print("="*70)
|
|||
|
|
|
|||
|
|
# 准备CSV数据
|
|||
|
|
csv_rows = []
|
|||
|
|
|
|||
|
|
# 收集所有账号ID
|
|||
|
|
account_ids = set()
|
|||
|
|
if isinstance(analytics_data, list):
|
|||
|
|
for item in analytics_data:
|
|||
|
|
account_ids.add(item.get('account_id', ''))
|
|||
|
|
elif isinstance(analytics_data, dict):
|
|||
|
|
account_ids.update(analytics_data.keys())
|
|||
|
|
|
|||
|
|
account_ids.discard('')
|
|||
|
|
|
|||
|
|
print(f"\n找到 {len(account_ids)} 个账号")
|
|||
|
|
|
|||
|
|
# 为每个账号提取每日数据
|
|||
|
|
for account_id in sorted(account_ids):
|
|||
|
|
print(f"\n处理账号: {account_id}")
|
|||
|
|
|
|||
|
|
# 获取每日数据列表
|
|||
|
|
daily_list = []
|
|||
|
|
|
|||
|
|
if isinstance(analytics_data, list):
|
|||
|
|
for item in analytics_data:
|
|||
|
|
if item.get('account_id') == account_id:
|
|||
|
|
data = item.get('data', {})
|
|||
|
|
api_data = data.get('api_data', {})
|
|||
|
|
if 'apis' in api_data:
|
|||
|
|
for api_item in api_data['apis']:
|
|||
|
|
api_resp = api_item.get('data', {})
|
|||
|
|
if api_resp.get('errno') == 0:
|
|||
|
|
daily_list = api_resp.get('data', {}).get('list', [])
|
|||
|
|
break
|
|||
|
|
break
|
|||
|
|
elif isinstance(analytics_data, dict) and account_id in analytics_data:
|
|||
|
|
data = analytics_data[account_id].get('data', {})
|
|||
|
|
api_data = data.get('api_data', {})
|
|||
|
|
if 'apis' in api_data:
|
|||
|
|
for api_item in api_data['apis']:
|
|||
|
|
api_resp = api_item.get('data', {})
|
|||
|
|
if api_resp.get('errno') == 0:
|
|||
|
|
daily_list = api_resp.get('data', {}).get('list', [])
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 处理每日数据
|
|||
|
|
if daily_list:
|
|||
|
|
print(f" 找到 {len(daily_list)} 天的数据")
|
|||
|
|
for day_data in daily_list:
|
|||
|
|
event_day = day_data.get('event_day', '')
|
|||
|
|
if not event_day:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 格式化日期:20251215 -> 2025-12-15
|
|||
|
|
try:
|
|||
|
|
date_str = str(event_day)
|
|||
|
|
formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}"
|
|||
|
|
except:
|
|||
|
|
formatted_date = event_day
|
|||
|
|
|
|||
|
|
row = {
|
|||
|
|
'账号ID': account_id,
|
|||
|
|
'统计日期': formatted_date,
|
|||
|
|
'投稿量': day_data.get('publish_count', '0'),
|
|||
|
|
'阅读量': day_data.get('view_count', '0'),
|
|||
|
|
'评论量': day_data.get('comment_count', '0'),
|
|||
|
|
'评论率': day_data.get('comment_rate', '0'),
|
|||
|
|
'点赞量': day_data.get('likes_count', '0'),
|
|||
|
|
'点赞率': day_data.get('likes_rate', '0'),
|
|||
|
|
'收藏量': day_data.get('collect_count', '0'),
|
|||
|
|
'收藏率': day_data.get('collect_rate', '0'),
|
|||
|
|
'分享量': day_data.get('share_count', '0'),
|
|||
|
|
'分享率': day_data.get('share_rate', '0'),
|
|||
|
|
'滑图占比': day_data.get('pic_slide_rate', '0'),
|
|||
|
|
'百度搜索量': day_data.get('search_click_pv', '0'),
|
|||
|
|
'曝光量': day_data.get('disp_pv', '0'),
|
|||
|
|
'点击率': day_data.get('click_rate', '0'),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
csv_rows.append(row)
|
|||
|
|
else:
|
|||
|
|
print(f" 未找到每日数据")
|
|||
|
|
|
|||
|
|
# 写入CSV文件
|
|||
|
|
if csv_rows:
|
|||
|
|
# 定义列顺序
|
|||
|
|
fieldnames = [
|
|||
|
|
'账号ID',
|
|||
|
|
'统计日期',
|
|||
|
|
'投稿量',
|
|||
|
|
'阅读量',
|
|||
|
|
'评论量',
|
|||
|
|
'评论率',
|
|||
|
|
'点赞量',
|
|||
|
|
'点赞率',
|
|||
|
|
'收藏量',
|
|||
|
|
'收藏率',
|
|||
|
|
'分享量',
|
|||
|
|
'分享率',
|
|||
|
|
'滑图占比',
|
|||
|
|
'百度搜索量',
|
|||
|
|
'曝光量',
|
|||
|
|
'点击率',
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 旧方法已废弃
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
print("\n[!] 没有每日数据可导出")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n[X] 导出每日数据失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def export_core_metrics(self, analytics_data: Dict, income_data: Dict) -> bool:
|
|||
|
|
"""导出核心指标数据(对应ai_statistics_days表)
|
|||
|
|
包含:单日发文量、累计发文量、当周收益、当月收益、收益环比
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
analytics_data: 发文统计数据
|
|||
|
|
income_data: 收入数据
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
bool: 是否成功导出
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("开始导出核心指标数据(ai_statistics_days表)")
|
|||
|
|
print("="*70)
|
|||
|
|
|
|||
|
|
# 准备CSV数据
|
|||
|
|
csv_rows = []
|
|||
|
|
|
|||
|
|
# 收集所有账号ID
|
|||
|
|
account_ids = set()
|
|||
|
|
if isinstance(analytics_data, list):
|
|||
|
|
for item in analytics_data:
|
|||
|
|
account_ids.add(item.get('account_id', ''))
|
|||
|
|
elif isinstance(analytics_data, dict):
|
|||
|
|
account_ids.update(analytics_data.keys())
|
|||
|
|
|
|||
|
|
if isinstance(income_data, dict):
|
|||
|
|
account_ids.update(income_data.keys())
|
|||
|
|
|
|||
|
|
account_ids.discard('')
|
|||
|
|
|
|||
|
|
print(f"\n找到 {len(account_ids)} 个账号")
|
|||
|
|
|
|||
|
|
# 为每个账号提取核心指标
|
|||
|
|
for account_id in sorted(account_ids):
|
|||
|
|
print(f"\n处理账号: {account_id}")
|
|||
|
|
|
|||
|
|
# 获取每日数据列表
|
|||
|
|
daily_list = []
|
|||
|
|
|
|||
|
|
if isinstance(analytics_data, list):
|
|||
|
|
for item in analytics_data:
|
|||
|
|
if item.get('account_id') == account_id:
|
|||
|
|
data = item.get('data', {})
|
|||
|
|
api_data = data.get('api_data', {})
|
|||
|
|
if 'apis' in api_data:
|
|||
|
|
for api_item in api_data['apis']:
|
|||
|
|
api_resp = api_item.get('data', {})
|
|||
|
|
if api_resp.get('errno') == 0:
|
|||
|
|
daily_list = api_resp.get('data', {}).get('list', [])
|
|||
|
|
break
|
|||
|
|
break
|
|||
|
|
elif isinstance(analytics_data, dict) and account_id in analytics_data:
|
|||
|
|
data = analytics_data[account_id].get('data', {})
|
|||
|
|
api_data = data.get('api_data', {})
|
|||
|
|
if 'apis' in api_data:
|
|||
|
|
for api_item in api_data['apis']:
|
|||
|
|
api_resp = api_item.get('data', {})
|
|||
|
|
if api_resp.get('errno') == 0:
|
|||
|
|
daily_list = api_resp.get('data', {}).get('list', [])
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
# 获取收入数据
|
|||
|
|
income_metrics = {}
|
|||
|
|
if account_id in income_data:
|
|||
|
|
income_metrics = self.extract_income_metrics(income_data[account_id])
|
|||
|
|
|
|||
|
|
# 计算累计发文量(7天内的总发文量)
|
|||
|
|
cumulative_count = 0
|
|||
|
|
if daily_list:
|
|||
|
|
for day_data in daily_list:
|
|||
|
|
publish_count = day_data.get('publish_count', '')
|
|||
|
|
if publish_count != '' and publish_count != '0':
|
|||
|
|
try:
|
|||
|
|
cumulative_count += int(publish_count)
|
|||
|
|
except (ValueError, TypeError):
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
# 处理每日数据
|
|||
|
|
if daily_list:
|
|||
|
|
print(f" 找到 {len(daily_list)} 天的数据")
|
|||
|
|
for day_data in daily_list:
|
|||
|
|
event_day = day_data.get('event_day', '')
|
|||
|
|
if not event_day:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# 格式化日期:20251215 -> 2025-12-15
|
|||
|
|
try:
|
|||
|
|
date_str = str(event_day)
|
|||
|
|
formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}"
|
|||
|
|
except:
|
|||
|
|
formatted_date = event_day
|
|||
|
|
|
|||
|
|
# 单日发文量
|
|||
|
|
daily_published = day_data.get('publish_count', '0')
|
|||
|
|
|
|||
|
|
row = {
|
|||
|
|
'账号ID': account_id,
|
|||
|
|
'统计日期': formatted_date,
|
|||
|
|
'单日发文量': daily_published,
|
|||
|
|
'累计发文量': str(cumulative_count),
|
|||
|
|
'当周收益': income_metrics.get('当周收益', '0'),
|
|||
|
|
'当月收益': income_metrics.get('当月收益', '0'),
|
|||
|
|
'收益周环比': income_metrics.get('收益周环比', '0.000000'),
|
|||
|
|
'收益月环比': income_metrics.get('收益月环比', '0.000000'),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
csv_rows.append(row)
|
|||
|
|
else:
|
|||
|
|
# 即使没有每日数据,也导出收入信息
|
|||
|
|
row = {
|
|||
|
|
'账号ID': account_id,
|
|||
|
|
'统计日期': datetime.now().strftime('%Y-%m-%d'),
|
|||
|
|
'单日发文量': '0',
|
|||
|
|
'累计发文量': '0',
|
|||
|
|
'当周收益': income_metrics.get('当周收益', '0'),
|
|||
|
|
'当月收益': income_metrics.get('当月收益', '0'),
|
|||
|
|
'收益周环比': income_metrics.get('收益周环比', '0.000000'),
|
|||
|
|
'收益月环比': income_metrics.get('收益月环比', '0.000000'),
|
|||
|
|
}
|
|||
|
|
csv_rows.append(row)
|
|||
|
|
print(f" 未找到每日数据,使用默认值")
|
|||
|
|
|
|||
|
|
# 写入CSV文件
|
|||
|
|
if csv_rows:
|
|||
|
|
# 定义列顺序(对应ai_statistics_days表)
|
|||
|
|
fieldnames = [
|
|||
|
|
'账号ID',
|
|||
|
|
'统计日期',
|
|||
|
|
'单日发文量',
|
|||
|
|
'累计发文量',
|
|||
|
|
'当周收益',
|
|||
|
|
'当月收益',
|
|||
|
|
'收益周环比',
|
|||
|
|
'收益月环比',
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 旧方法已废弃
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
print("\n[!] 没有核心指标数据可导出")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n[X] 导出核心指标数据失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def insert_ai_statistics(self, integrated_data: List[Dict]) -> bool:
|
|||
|
|
"""插入 ai_statistics 表数据(账号汇总统计)到数据库"""
|
|||
|
|
try:
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("开始插入 ai_statistics 表数据")
|
|||
|
|
print("="*70)
|
|||
|
|
|
|||
|
|
if not self.db_manager:
|
|||
|
|
print("[X] 数据库管理器未初始化")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
success_count = 0
|
|||
|
|
|
|||
|
|
for account_data in integrated_data:
|
|||
|
|
account_id = account_data.get('account_id', '')
|
|||
|
|
if not account_id:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
print(f"\n处理账号: {account_id}")
|
|||
|
|
|
|||
|
|
# 获取作者ID
|
|||
|
|
author_id = self.get_author_id(account_id)
|
|||
|
|
|
|||
|
|
# 提取统计指标
|
|||
|
|
metrics = self.extract_total_metrics(account_data)
|
|||
|
|
|
|||
|
|
# 获取最新日期
|
|||
|
|
latest_date = datetime.now().strftime('%Y-%m-%d')
|
|||
|
|
analytics = account_data.get('analytics', {})
|
|||
|
|
if analytics:
|
|||
|
|
apis = analytics.get('apis', [])
|
|||
|
|
if apis and len(apis) > 0:
|
|||
|
|
api_data = apis[0].get('data', {})
|
|||
|
|
if api_data.get('errno') == 0:
|
|||
|
|
data_info = api_data.get('data', {})
|
|||
|
|
latest_event_day = data_info.get('latest_event_day', '')
|
|||
|
|
if latest_event_day:
|
|||
|
|
date_str = str(latest_event_day)
|
|||
|
|
latest_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}"
|
|||
|
|
|
|||
|
|
# 滑图占比需要限制在decimal(5,4)范围内(0-9.9999)
|
|||
|
|
slide_ratio_value = float(metrics['slide_ratio'])
|
|||
|
|
# 如果值大于10,说明是百分比格式,需要除以100
|
|||
|
|
if slide_ratio_value > 10:
|
|||
|
|
slide_ratio_value = slide_ratio_value / 100
|
|||
|
|
# 确保不超过9.9999
|
|||
|
|
slide_ratio_value = min(slide_ratio_value, 9.9999)
|
|||
|
|
|
|||
|
|
# 准备数据库记录
|
|||
|
|
record = {
|
|||
|
|
'author_id': author_id, # 使用从数据库查询的author_id
|
|||
|
|
'author_name': account_id,
|
|||
|
|
'channel': 1,
|
|||
|
|
'date': latest_date, # ai_statistics表使用date字段,不是stat_date
|
|||
|
|
'submission_count': metrics['submission_count'],
|
|||
|
|
'read_count': metrics['read_count'],
|
|||
|
|
'comment_count': metrics['comment_count'],
|
|||
|
|
'comment_rate': metrics['comment_rate'],
|
|||
|
|
'like_count': metrics['like_count'],
|
|||
|
|
'like_rate': metrics['like_rate'],
|
|||
|
|
'favorite_count': metrics['favorite_count'],
|
|||
|
|
'favorite_rate': metrics['favorite_rate'],
|
|||
|
|
'share_count': metrics['share_count'],
|
|||
|
|
'share_rate': metrics['share_rate'],
|
|||
|
|
'slide_ratio': slide_ratio_value, # ai_statistics表使用slide_ratio字段,并限制范围
|
|||
|
|
'baidu_search_volume': metrics['baidu_search_volume'],
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 插入或更新
|
|||
|
|
try:
|
|||
|
|
self.db_manager.insert_or_update(
|
|||
|
|
table='ai_statistics',
|
|||
|
|
data=record,
|
|||
|
|
unique_keys=['author_name', 'channel'] # 使用author_name作为唯一键
|
|||
|
|
)
|
|||
|
|
success_count += 1
|
|||
|
|
print(f" [OK] 插入成功")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" [X] 插入失败: {e}")
|
|||
|
|
|
|||
|
|
print(f"\n{'='*70}")
|
|||
|
|
print(f"[OK] ai_statistics 表数据插入完成")
|
|||
|
|
print(f" 成功 {success_count}/{len(integrated_data)} 条记录")
|
|||
|
|
print(f"{'='*70}")
|
|||
|
|
return success_count > 0
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n[X] 插入 ai_statistics 失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def insert_ai_statistics_day(self, integrated_data: List[Dict], use_db_article_count: bool = True) -> bool:
|
|||
|
|
"""插入 ai_statistics_day 表数据(每日明细统计)到数据库
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
integrated_data: 整合数据列表
|
|||
|
|
use_db_article_count: 是否使用数据库查询文章数据(默认True)
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("开始插入 ai_statistics_day 表数据")
|
|||
|
|
print("="*70)
|
|||
|
|
|
|||
|
|
if not self.db_manager:
|
|||
|
|
print("[X] 数据库管理器未初始化")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
success_count = 0
|
|||
|
|
|
|||
|
|
for account_data in integrated_data:
|
|||
|
|
account_id = account_data.get('account_id', '')
|
|||
|
|
if not account_id:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
print(f"\n处理账号: {account_id}")
|
|||
|
|
|
|||
|
|
# 获取作者ID
|
|||
|
|
author_id = self.get_author_id(account_id)
|
|||
|
|
|
|||
|
|
analytics = account_data.get('analytics', {})
|
|||
|
|
apis = analytics.get('apis', [])
|
|||
|
|
|
|||
|
|
if apis and len(apis) > 0:
|
|||
|
|
api_data = apis[0].get('data', {})
|
|||
|
|
if api_data.get('errno') == 0:
|
|||
|
|
daily_list = api_data.get('data', {}).get('list', [])
|
|||
|
|
|
|||
|
|
if daily_list:
|
|||
|
|
print(f" 找到 {len(daily_list)} 天的数据")
|
|||
|
|
|
|||
|
|
# 只取最新一天的数据
|
|||
|
|
daily_list_sorted = sorted(daily_list, key=lambda x: x.get('event_day', ''))
|
|||
|
|
latest_day_data = daily_list_sorted[-1] if daily_list_sorted else None
|
|||
|
|
|
|||
|
|
if latest_day_data:
|
|||
|
|
event_day = latest_day_data.get('event_day', '')
|
|||
|
|
if event_day:
|
|||
|
|
# 格式化日期
|
|||
|
|
date_str = str(event_day)
|
|||
|
|
formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}"
|
|||
|
|
|
|||
|
|
# 优先从数据库查询投稿量
|
|||
|
|
total_submission_count = int(latest_day_data.get('publish_count', 0) or 0)
|
|||
|
|
if use_db_article_count and author_id > 0:
|
|||
|
|
article_data = self.query_article_count_from_db(author_id, formatted_date)
|
|||
|
|
total_submission_count = article_data['total_submission_count']
|
|||
|
|
print(f" [使用数据库] 投稿量: {total_submission_count}")
|
|||
|
|
else:
|
|||
|
|
print(f" [使用API] 投稿量: {total_submission_count}")
|
|||
|
|
|
|||
|
|
# 滑图占比需要限制在decimal(5,4)范围内(0-9.9999)
|
|||
|
|
slide_ratio_value = float(latest_day_data.get('pic_slide_rate', 0) or 0)
|
|||
|
|
# 如果值大于10,说明是百分比格式,需要除以100
|
|||
|
|
if slide_ratio_value > 10:
|
|||
|
|
slide_ratio_value = slide_ratio_value / 100
|
|||
|
|
# 确保不超过9.9999
|
|||
|
|
slide_ratio_value = min(slide_ratio_value, 9.9999)
|
|||
|
|
|
|||
|
|
record = {
|
|||
|
|
'author_id': author_id,
|
|||
|
|
'author_name': account_id,
|
|||
|
|
'channel': 1,
|
|||
|
|
'stat_date': formatted_date,
|
|||
|
|
'total_submission_count': total_submission_count,
|
|||
|
|
'total_read_count': int(latest_day_data.get('view_count', 0) or 0),
|
|||
|
|
'total_comment_count': int(latest_day_data.get('comment_count', 0) or 0),
|
|||
|
|
'total_like_count': int(latest_day_data.get('likes_count', 0) or 0),
|
|||
|
|
'total_favorite_count': int(latest_day_data.get('collect_count', 0) or 0),
|
|||
|
|
'total_share_count': int(latest_day_data.get('share_count', 0) or 0),
|
|||
|
|
'avg_comment_rate': float(latest_day_data.get('comment_rate', 0) or 0),
|
|||
|
|
'avg_like_rate': float(latest_day_data.get('likes_rate', 0) or 0),
|
|||
|
|
'avg_favorite_rate': float(latest_day_data.get('collect_rate', 0) or 0),
|
|||
|
|
'avg_share_rate': float(latest_day_data.get('share_rate', 0) or 0),
|
|||
|
|
'avg_slide_ratio': slide_ratio_value,
|
|||
|
|
'total_baidu_search_volume': int(latest_day_data.get('disp_pv', 0) or 0),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 插入或更新
|
|||
|
|
try:
|
|||
|
|
self.db_manager.insert_or_update(
|
|||
|
|
table='ai_statistics_day',
|
|||
|
|
data=record,
|
|||
|
|
unique_keys=['author_name', 'channel'] # 使用author_name作为唯一键
|
|||
|
|
)
|
|||
|
|
success_count += 1
|
|||
|
|
print(f" [OK] 插入最新一天: {formatted_date}")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" [X] 插入失败: {e}")
|
|||
|
|
else:
|
|||
|
|
print(f" 未找到每日数据")
|
|||
|
|
|
|||
|
|
print(f"\n{'='*70}")
|
|||
|
|
print(f"[OK] ai_statistics_day 表数据插入完成")
|
|||
|
|
print(f" 成功 {success_count} 条记录")
|
|||
|
|
print(f"{'='*70}")
|
|||
|
|
return success_count > 0
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n[X] 插入 ai_statistics_day 失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def insert_ai_statistics_days(self, integrated_data: List[Dict]) -> bool:
|
|||
|
|
"""插入 ai_statistics_days 表数据(核心指标统计)到数据库"""
|
|||
|
|
try:
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("开始插入 ai_statistics_days 表数据")
|
|||
|
|
print("="*70)
|
|||
|
|
|
|||
|
|
if not self.db_manager:
|
|||
|
|
print("[X] 数据库管理器未初始化")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
success_count = 0
|
|||
|
|
|
|||
|
|
for account_data in integrated_data:
|
|||
|
|
account_id = account_data.get('account_id', '')
|
|||
|
|
if not account_id:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
print(f"\n处理账号: {account_id}")
|
|||
|
|
|
|||
|
|
# 获取作者ID
|
|||
|
|
author_id = self.get_author_id(account_id)
|
|||
|
|
|
|||
|
|
# 提取收入指标
|
|||
|
|
income_metrics = self.extract_income_metrics(account_data, account_id)
|
|||
|
|
|
|||
|
|
analytics = account_data.get('analytics', {})
|
|||
|
|
apis = analytics.get('apis', [])
|
|||
|
|
|
|||
|
|
if apis and len(apis) > 0:
|
|||
|
|
api_data = apis[0].get('data', {})
|
|||
|
|
if api_data.get('errno') == 0:
|
|||
|
|
daily_list = api_data.get('data', {}).get('list', [])
|
|||
|
|
|
|||
|
|
if daily_list:
|
|||
|
|
print(f" 找到 {len(daily_list)} 天的数据")
|
|||
|
|
|
|||
|
|
# 按日期排序
|
|||
|
|
daily_list_sorted = sorted(daily_list, key=lambda x: x.get('event_day', ''))
|
|||
|
|
|
|||
|
|
# 计算累计发文量
|
|||
|
|
cumulative_count = 0
|
|||
|
|
for day_data in daily_list_sorted:
|
|||
|
|
daily_published = int(day_data.get('publish_count', 0) or 0)
|
|||
|
|
cumulative_count += daily_published
|
|||
|
|
|
|||
|
|
# 只插入最新一天的数据
|
|||
|
|
latest_day_data = daily_list_sorted[-1]
|
|||
|
|
event_day = latest_day_data.get('event_day', '')
|
|||
|
|
|
|||
|
|
if event_day:
|
|||
|
|
# 格式化日期
|
|||
|
|
date_str = str(event_day)
|
|||
|
|
formatted_date = f"{date_str[0:4]}-{date_str[4:6]}-{date_str[6:8]}"
|
|||
|
|
|
|||
|
|
# 单日发文量
|
|||
|
|
daily_published = int(latest_day_data.get('publish_count', 0) or 0)
|
|||
|
|
|
|||
|
|
record = {
|
|||
|
|
'author_id': author_id,
|
|||
|
|
'author_name': account_id,
|
|||
|
|
'channel': 1,
|
|||
|
|
'stat_date': formatted_date,
|
|||
|
|
'daily_published_count': daily_published,
|
|||
|
|
'cumulative_published_count': cumulative_count,
|
|||
|
|
'day_revenue': Decimal(str(income_metrics['day_revenue'])), # 新增:每日收益
|
|||
|
|
'monthly_revenue': Decimal(str(income_metrics['monthly_revenue'])),
|
|||
|
|
'weekly_revenue': Decimal(str(income_metrics['weekly_revenue'])),
|
|||
|
|
'revenue_mom_growth_rate': Decimal(str(income_metrics['revenue_mom_growth_rate'])),
|
|||
|
|
'revenue_wow_growth_rate': Decimal(str(income_metrics['revenue_wow_growth_rate'])),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 插入或更新
|
|||
|
|
try:
|
|||
|
|
self.db_manager.insert_or_update(
|
|||
|
|
table='ai_statistics_days',
|
|||
|
|
data=record,
|
|||
|
|
unique_keys=['author_name', 'channel']
|
|||
|
|
)
|
|||
|
|
success_count += 1
|
|||
|
|
print(f" [OK] 插入最新一天: {formatted_date}, 累计发文: {cumulative_count}")
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f" [X] 插入失败: {e}")
|
|||
|
|
else:
|
|||
|
|
print(f" 未找到每日数据")
|
|||
|
|
|
|||
|
|
print(f"\n{'='*70}")
|
|||
|
|
print(f"[OK] ai_statistics_days 表数据插入完成")
|
|||
|
|
print(f" 成功 {success_count} 条记录")
|
|||
|
|
print(f"{'='*70}")
|
|||
|
|
return success_count > 0
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n[X] 插入 ai_statistics_days 失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("百家号数据导出工具 - 从 bjh_integrated_data.json 导出")
|
|||
|
|
print("="*70)
|
|||
|
|
|
|||
|
|
# 选择导出模式
|
|||
|
|
print("\n请选择导出模式:")
|
|||
|
|
print(" 1. 导出CSV文件")
|
|||
|
|
print(" 2. 直接插入数据库")
|
|||
|
|
|
|||
|
|
mode = input("\n输入选项 (1/2, 默认1): ").strip() or '1'
|
|||
|
|
|
|||
|
|
if mode == '2':
|
|||
|
|
# 数据库模式
|
|||
|
|
exporter = DataExporter(use_database=True)
|
|||
|
|
|
|||
|
|
print("\n将从 bjh_integrated_data.json 导入数据到数据库的三个表:")
|
|||
|
|
print(" 1. ai_statistics - 账号汇总统计表")
|
|||
|
|
print(" 2. ai_statistics_day - 每日明细统计表")
|
|||
|
|
print(" 3. ai_statistics_days - 核心指标统计表(含发文量、收益、环比)")
|
|||
|
|
print("="*70)
|
|||
|
|
else:
|
|||
|
|
# CSV模式
|
|||
|
|
exporter = DataExporter(use_database=False)
|
|||
|
|
|
|||
|
|
print("\n将从 bjh_integrated_data.json 导出以下三个CSV文件:")
|
|||
|
|
print(" 1. ai_statistics.csv - 账号汇总统计表")
|
|||
|
|
print(" 2. ai_statistics_day.csv - 每日明细统计表")
|
|||
|
|
print(" 3. ai_statistics_days.csv - 核心指标统计表(含发文量、收益、环比)")
|
|||
|
|
print("="*70)
|
|||
|
|
|
|||
|
|
confirm = input("\n是否继续? (y/n): ").strip().lower()
|
|||
|
|
|
|||
|
|
if confirm == 'y':
|
|||
|
|
exporter.export_all_tables()
|
|||
|
|
else:
|
|||
|
|
print("\n已取消")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
print("\n" + "="*70)
|
|||
|
|
print("完成")
|
|||
|
|
print("="*70 + "\n")
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
main()
|