#!/usr/bin/env python # -*- coding: utf-8 -*- """ 工作台接口 """ from flask import Blueprint, request, jsonify import logging from datetime import datetime, timedelta from auth_utils import require_auth, AuthUtils from database_config import get_db_manager, format_datetime_fields logger = logging.getLogger('article_server') # 创建蓝图 dashboard_bp = Blueprint('dashboard', __name__, url_prefix='/api/dashboard') @dashboard_bp.route('/overview', methods=['GET']) @require_auth def get_dashboard_overview(): """获取工作台概览""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 查询企业统计数据 enterprise_sql = """ SELECT users_total, products_total, articles_total, published_total FROM ai_enterprises WHERE id = %s """ enterprise_result = db_manager.execute_query(enterprise_sql, (enterprise_id,)) if not enterprise_result: return jsonify({ 'code': 404, 'message': '企业不存在', 'data': None }), 404 stats = enterprise_result[0] # 查询本月发布数 month_sql = """ SELECT COUNT(*) as month_published FROM ai_article_published_records WHERE enterprise_id = %s AND YEAR(publish_time) = YEAR(CURDATE()) AND MONTH(publish_time) = MONTH(CURDATE()) """ month_result = db_manager.execute_query(month_sql, (enterprise_id,)) month_published = month_result[0]['month_published'] if month_result else 0 # 查询上月发布数(用于计算增长率) last_month_sql = """ SELECT COUNT(*) as last_month_published FROM ai_article_published_records WHERE enterprise_id = %s AND YEAR(publish_time) = YEAR(DATE_SUB(CURDATE(), INTERVAL 1 MONTH)) AND MONTH(publish_time) = MONTH(DATE_SUB(CURDATE(), INTERVAL 1 MONTH)) """ last_month_result = db_manager.execute_query(last_month_sql, (enterprise_id,)) last_month_published = last_month_result[0]['last_month_published'] if last_month_result else 0 # 计算增长率 if last_month_published > 0: published_change = f"+{int((month_published - last_month_published) * 100 / last_month_published)}%" else: published_change = "+100%" if month_published > 0 else "0%" logger.info("获取工作台概览成功") return jsonify({ 'code': 200, 'message': 'success', 'data': { 'stats': { 'users_total': stats['users_total'], 'users_change': '+0', 'products_total': stats['products_total'], 'products_change': '+0', 'articles_total': stats['articles_total'], 'articles_change': '+0', 'published_month': month_published, 'published_change': published_change } }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取工作台概览] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @dashboard_bp.route('/recent-publishes', methods=['GET']) @require_auth def get_recent_publishes(): """获取最近发布""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 # 获取查询参数 limit = int(request.args.get('limit', 5)) db_manager = get_db_manager() # 查询最近发布记录 sql = """ SELECT r.id, r.publish_time, u.real_name as employee_name, p.name as product_name, 'success' as status FROM ai_article_published_records r LEFT JOIN ai_users u ON r.created_user_id = u.id LEFT JOIN ai_products p ON r.product_id = p.id WHERE r.enterprise_id = %s ORDER BY r.publish_time DESC LIMIT %s """ records = db_manager.execute_query(sql, (enterprise_id, limit)) # 计算时间差 for record in records: publish_time = record['publish_time'] if isinstance(publish_time, str): publish_time = datetime.strptime(publish_time, '%Y-%m-%d %H:%M:%S') time_diff = datetime.now() - publish_time if time_diff.days > 0: record['time'] = f"{time_diff.days}天前" elif time_diff.seconds >= 3600: record['time'] = f"{time_diff.seconds // 3600}小时前" elif time_diff.seconds >= 60: record['time'] = f"{time_diff.seconds // 60}分钟前" else: record['time'] = "刚刚" logger.info("获取最近发布成功") return jsonify({ 'code': 200, 'message': 'success', 'data': { 'list': records }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取最近发布] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @dashboard_bp.route('/hot-products', methods=['GET']) @require_auth def get_hot_products(): """获取热门产品""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 # 获取查询参数 limit = int(request.args.get('limit', 4)) db_manager = get_db_manager() # 查询热门产品 sql = """ SELECT r.product_id, p.name as product_name, COUNT(*) as publishes, ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM ai_article_published_records WHERE enterprise_id = %s), 0) as percentage FROM ai_article_published_records r LEFT JOIN ai_products p ON r.product_id = p.id WHERE r.enterprise_id = %s GROUP BY r.product_id, p.name ORDER BY publishes DESC LIMIT %s """ products = db_manager.execute_query(sql, (enterprise_id, enterprise_id, limit)) logger.info("获取热门产品成功") return jsonify({ 'code': 200, 'message': 'success', 'data': { 'list': products }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取热门产品] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500