#!/usr/bin/env python # -*- coding: utf-8 -*- """ 企业管理接口 """ from flask import Blueprint, request, jsonify import logging from datetime import datetime from auth_utils import require_auth, require_role, AuthUtils from database_config import get_db_manager, format_datetime_fields from log_utils import log_create, log_update, log_delete, log_error import uuid logger = logging.getLogger('article_server') # 创建蓝图 enterprise_bp = Blueprint('enterprise', __name__, url_prefix='/api/enterprises') @enterprise_bp.route('/list', methods=['GET']) @require_auth @require_role('admin', 'enterprise') def get_enterprises_list(): """获取企业列表(平台管理员)""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[获取企业列表] 开始处理请求, IP: {client_ip}") try: # 获取查询参数 page = int(request.args.get('page', 1)) page_size = int(request.args.get('pageSize', 20)) keyword = request.args.get('keyword', '').strip() status = request.args.get('status', '').strip() logger.info(f"[获取企业列表] 查询参数: page={page}, pageSize={page_size}, keyword={keyword}, status={status}, IP: {client_ip}") # 构建查询条件 where_conditions = ["1=1"] params = [] if keyword: where_conditions.append("(name LIKE %s OR short_name LIKE %s OR phone LIKE %s)") keyword_pattern = f"%{keyword}%" params.extend([keyword_pattern, keyword_pattern, keyword_pattern]) if status: where_conditions.append("status = %s") params.append(status) where_clause = " AND ".join(where_conditions) # 计算偏移量 offset = (page - 1) * page_size db_manager = get_db_manager() # 查询总数 count_sql = f"SELECT COUNT(*) as total FROM ai_enterprises WHERE {where_clause}" count_result = db_manager.execute_query(count_sql, params) total = count_result[0]['total'] # 查询企业列表 sql = f""" SELECT id, enterprise_ID, name, short_name, email, status, users_total, products_total, articles_total, published_total, created_at, updated_at FROM ai_enterprises WHERE {where_clause} ORDER BY created_at DESC LIMIT %s OFFSET %s """ params.extend([page_size, offset]) enterprises = db_manager.execute_query(sql, params) # 为每个企业查询管理员手机号(从ai_users表) for enterprise in enterprises: phone_sql = """ SELECT phone FROM ai_users WHERE enterprise_id = %s AND role = 'enterprise' AND is_enterprise = 1 AND status = 'active' LIMIT 1 """ phone_result = db_manager.execute_query(phone_sql, (enterprise['id'],)) enterprise['phone'] = phone_result[0]['phone'] if phone_result and phone_result[0].get('phone') else '' # 格式化日期时间字段 enterprises = format_datetime_fields(enterprises) logger.info(f"[获取企业列表] 查询成功, 总数: {total}, 当前页: {page}, 每页: {page_size}, 返回数量: {len(enterprises)}, IP: {client_ip}") return jsonify({ 'code': 200, 'message': 'success', 'data': { 'total': total, 'list': enterprises }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取企业列表] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @enterprise_bp.route('/create', methods=['POST']) @require_auth @require_role('admin', 'enterprise') def create_enterprise(): """创建企业(平台管理员)""" client_ip = request.environ.get('HTTP_X_FORWARDED_FOR', request.environ.get('REMOTE_ADDR', '未知')) logger.info(f"[创建企业] 开始处理创建企业请求, IP: {client_ip}") try: data = request.get_json() if not data: logger.warning(f"[创建企业] 请求参数为空, IP: {client_ip}") return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 logger.info(f"[创建企业] 收到创建请求, 企业名称: {data.get('name')}, 简称: {data.get('short_name')}, 手机号: {data.get('phone')}, IP: {client_ip}") # 验证必需字段 required_fields = ['name', 'short_name', 'phone', 'password'] for field in required_fields: if not data.get(field): logger.warning(f"[创建企业] 缺少必需字段: {field}, IP: {client_ip}") return jsonify({ 'code': 400, 'message': f'缺少必需字段: {field}', 'data': None }), 400 db_manager = get_db_manager() # ✅ 检查手机号是否已存在(ai_users表中role='enterprise'的记录) logger.info(f"[创建企业] 检查ai_users表中企业角色手机号是否已存在: {data['phone']}") check_user_sql = "SELECT id FROM ai_users WHERE phone = %s AND role = 'enterprise' AND is_enterprise = 1" existing_user = db_manager.execute_query(check_user_sql, (data['phone'],)) if existing_user: logger.warning(f"[创建企业] ai_users表中企业角色手机号已存在: {data['phone']}, IP: {client_ip}") return jsonify({ 'code': 409, 'message': '手机号已被使用', 'data': None }), 409 # 生成企业唯一标识 enterprise_id_str = f"ENT-{datetime.now().strftime('%Y%m%d')}-{str(uuid.uuid4())[:8].upper()}" logger.info(f"[创建企业] 生成企业ID: {enterprise_id_str}, 企业名称: {data['name']}") # 加密密码 from auth_utils import AuthUtils hashed_password = AuthUtils.hash_password(data['password']) # ✅ (1)创建企业(ai_enterprises表) logger.info(f"[创建企业] 开始插入ai_enterprises表, 企业ID: {enterprise_id_str}, 名称: {data['name']}") enterprise_sql = """ INSERT INTO ai_enterprises (enterprise_ID, name, short_name, icon, email, website, address, status, users_total, products_total, published_total, articles_total, released_month_total, linked_to_xhs_num) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ enterprise_db_id = db_manager.execute_insert(enterprise_sql, ( enterprise_id_str, data['name'], data['short_name'], data.get('icon', ''), data.get('email', ''), data.get('website', ''), data.get('address', ''), 'active', 0, 0, 0, 0, 0, 0 )) logger.info(f"[创建企业] ai_enterprises表插入成功, 数据库ID: {enterprise_db_id}, 企业ID: {enterprise_id_str}") # ✅ (2)创建企业管理员用户(ai_users表) logger.info(f"[创建企业] 开始创建企业管理员用户, 手机号: {data['phone']}, 角色: enterprise") # 用户名使用手机号 username = data['phone'] # 检查用户名是否已存在 check_username_sql = "SELECT id FROM ai_users WHERE username = %s" existing_username = db_manager.execute_query(check_username_sql, (username,)) if existing_username: logger.warning(f"[创建企业] 用户名已存在: {username}, IP: {client_ip}") return jsonify({ 'code': 409, 'message': '用户名已被使用', 'data': None }), 409 user_sql = """ INSERT INTO ai_users (enterprise_id, enterprise_name, username, password, real_name, email, phone, department, role, status, is_enterprise) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ user_db_id = db_manager.execute_insert(user_sql, ( enterprise_db_id, # enterprise_id data['name'], # enterprise_name username, # username (使用手机号) hashed_password, # password (使用传入的密码) data.get('real_name', data['short_name']), # real_name data.get('email', ''), # email data['phone'], # phone data.get('department', ''), # department 'enterprise', # role (✅ 角色为enterprise) 'active', # status 1 )) logger.info(f"[创建企业] ai_users表插入成功, 用户ID: {user_db_id}, 用户名: {username}, 角色: enterprise") logger.info(f"[创建企业] 企业创建成功, 企业DB_ID: {enterprise_db_id}, 企业ID: {enterprise_id_str}, 名称: {data['name']}, 管理员用户ID: {user_db_id}, IP: {client_ip}") # 记录业务日志 log_create( target_type='enterprise', target_id=enterprise_db_id, description=f"创建企业: {data['name']}, 同时创建企业管理员用户", request_data={**data, 'password': '***'}, # 隐藏密码 response_data={'enterprise_id': enterprise_db_id, 'enterprise_ID': enterprise_id_str, 'user_id': user_db_id} ) return jsonify({ 'code': 200, 'message': '创建成功', 'data': { 'id': enterprise_db_id, 'enterprise_ID': enterprise_id_str, 'name': data['name'], 'user_id': user_db_id, 'username': username }, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[创建企业] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @enterprise_bp.route('/', methods=['PUT']) @require_auth @require_role('admin', 'enterprise') def update_enterprise(enterprise_id): """更新企业信息(平台管理员)""" try: data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 # 检查企业是否存在 db_manager = get_db_manager() check_sql = "SELECT id FROM ai_enterprises WHERE id = %s" existing = db_manager.execute_query(check_sql, (enterprise_id,)) if not existing: return jsonify({ 'code': 404, 'message': '企业不存在', 'data': None }), 404 # 构建更新字段 enterprise_update_fields = [] enterprise_params = [] user_password = None if 'name' in data: enterprise_update_fields.append("name = %s") enterprise_params.append(data['name']) if 'short_name' in data: enterprise_update_fields.append("short_name = %s") enterprise_params.append(data['short_name']) if 'email' in data: enterprise_update_fields.append("email = %s") enterprise_params.append(data['email']) if 'password' in data: from auth_utils import AuthUtils user_password = AuthUtils.hash_password(data['password']) if not enterprise_update_fields and not user_password: return jsonify({ 'code': 400, 'message': '没有要更新的字段', 'data': None }), 400 # 执行更新企业表 if enterprise_update_fields: enterprise_params.append(enterprise_id) sql = f"UPDATE ai_enterprises SET {', '.join(enterprise_update_fields)}, updated_at = NOW() WHERE id = %s" db_manager.execute_update(sql, enterprise_params) # 如果有密码更新,更新ai_users表中的企业管理员密码 if user_password: user_update_sql = "UPDATE ai_users SET password = %s, updated_at = NOW() WHERE enterprise_id = %s AND role = 'enterprise'" db_manager.execute_update(user_update_sql, (user_password, enterprise_id)) logger.info(f"更新企业成功: ID {enterprise_id}") # 记录业务日志 log_update( target_type='enterprise', target_id=enterprise_id, description=f"更新企业信息", request_data=data ) return jsonify({ 'code': 200, 'message': '更新成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[更新企业] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @enterprise_bp.route('//status', methods=['PUT']) @require_auth @require_role('admin', 'enterprise') def toggle_enterprise_status(enterprise_id): """切换企业状态(平台管理员)""" try: data = request.get_json() if not data or 'status' not in data: return jsonify({ 'code': 400, 'message': '缺少status参数', 'data': None }), 400 status = data['status'] if status not in ['active', 'disabled']: return jsonify({ 'code': 400, 'message': '状态值无效', 'data': None }), 400 db_manager = get_db_manager() sql = "UPDATE ai_enterprises SET status = %s, updated_at = NOW() WHERE id = %s" affected = db_manager.execute_update(sql, (status, enterprise_id)) if affected == 0: return jsonify({ 'code': 404, 'message': '企业不存在', 'data': None }), 404 logger.info(f"切换企业状态成功: ID {enterprise_id}, 状态: {status}") # 记录业务日志 log_update( target_type='enterprise', target_id=enterprise_id, description=f"切换企业状态: {status}", request_data=data ) return jsonify({ 'code': 200, 'message': '操作成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[切换企业状态] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @enterprise_bp.route('/', methods=['DELETE']) @require_auth @require_role('admin', 'enterprise') def delete_enterprise(enterprise_id): """删除企业(平台管理员)""" try: db_manager = get_db_manager() # 检查企业是否存在 check_sql = "SELECT id, name FROM ai_enterprises WHERE id = %s" existing = db_manager.execute_query(check_sql, (enterprise_id,)) if not existing: return jsonify({ 'code': 404, 'message': '企业不存在', 'data': None }), 404 # 物理删除企业(API文档要求) sql = "DELETE FROM ai_enterprises WHERE id = %s" db_manager.execute_update(sql, (enterprise_id,)) logger.info(f"删除企业成功: {existing[0]['name']}") # 记录业务日志 log_delete( target_type='enterprise', target_id=enterprise_id, description=f"删除企业: {existing[0]['name']}" ) return jsonify({ 'code': 200, 'message': '删除成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[删除企业] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @enterprise_bp.route('/stats', methods=['GET']) @require_auth @require_role('admin', 'enterprise') def get_enterprises_stats(): """获取企业统计(平台管理员)""" try: db_manager = get_db_manager() sql = """ SELECT COUNT(*) as total_count, SUM(CASE WHEN status = 'active' THEN 1 ELSE 0 END) as active_count, SUM(CASE WHEN status = 'disabled' THEN 1 ELSE 0 END) as disabled_count, SUM(users_total) as total_users, SUM(products_total) as total_products, SUM(articles_total) as total_articles, SUM(published_total) as total_published FROM ai_enterprises WHERE status != 'deleted' """ result = db_manager.execute_query(sql) stats = result[0] if result else {} logger.info("获取企业统计成功") return jsonify({ 'code': 200, 'message': 'success', 'data': stats, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取企业统计] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 # 企业管理员接口 @enterprise_bp.route('/info', methods=['GET']) @require_auth @require_role('admin','editor','reviewer','publisher','each_title_reviewer','enterprise') def get_enterprise_info(): """获取企业信息(企业管理员,包含密码字段)""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') user_id = current_user.get('user_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 db_manager = get_db_manager() # 查询企业信息 enterprise_sql = """ SELECT id, enterprise_ID, name, short_name, email, status, users_total, products_total, articles_total, published_total, created_at, updated_at FROM ai_enterprises WHERE id = %s """ enterprise_result = db_manager.execute_query(enterprise_sql, (enterprise_id,)) if not enterprise_result: return jsonify({ 'code': 404, 'message': '企业不存在', 'data': None }), 404 # 格式化日期时间字段 enterprise_info = format_datetime_fields(enterprise_result[0]) # 查询企业管理员的手机号(从ai_users表) user_sql = """ SELECT phone FROM ai_users WHERE enterprise_id = %s AND role = 'enterprise' AND is_enterprise = 1 AND status = 'active' LIMIT 1 """ user_result = db_manager.execute_query(user_sql, (enterprise_id,)) # 添加phone字段到企业信息中 if user_result and user_result[0].get('phone'): enterprise_info['phone'] = user_result[0]['phone'] else: enterprise_info['phone'] = '' logger.info(f"获取企业信息成功(含密码): ID {enterprise_id}") return jsonify({ 'code': 200, 'message': 'success', 'data': enterprise_info, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[获取企业信息] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @enterprise_bp.route('/info', methods=['PUT']) @require_auth @require_role('enterprise', 'admin') def update_enterprise_info(): """更新企业信息(企业管理员)""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 # 构建更新字段(企业管理员只能更新部分字段) update_fields = [] params = [] if 'short_name' in data: update_fields.append("short_name = %s") params.append(data['short_name']) if 'email' in data: update_fields.append("email = %s") params.append(data['email']) if not update_fields: return jsonify({ 'code': 400, 'message': '没有要更新的字段', 'data': None }), 400 db_manager = get_db_manager() params.append(enterprise_id) sql = f"UPDATE ai_enterprises SET {', '.join(update_fields)}, updated_at = NOW() WHERE id = %s" db_manager.execute_update(sql, params) logger.info(f"企业管理员更新企业信息成功: ID {enterprise_id}") # 记录业务日志 log_update( target_type='enterprise', target_id=enterprise_id, description=f"更新企业信息(企业管理员)", request_data=data ) return jsonify({ 'code': 200, 'message': '更新成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[更新企业信息] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500 @enterprise_bp.route('/change-password', methods=['PUT']) @require_auth @require_role('enterprise') def change_password(): """修改企业密码(企业管理员)""" try: current_user = AuthUtils.get_current_user() enterprise_id = current_user.get('enterprise_id') if not enterprise_id: return jsonify({ 'code': 400, 'message': '无法获取企业ID', 'data': None }), 400 data = request.get_json() if not data: return jsonify({ 'code': 400, 'message': '请求参数错误', 'data': None }), 400 old_password = data.get('old_password') new_password = data.get('new_password') if not old_password or not new_password: return jsonify({ 'code': 400, 'message': '旧密码和新密码不能为空', 'data': None }), 400 # 获取当前密码(从ai_users表查询企业管理员账号) db_manager = get_db_manager() user_id = current_user.get('user_id') sql = "SELECT password FROM ai_users WHERE id = %s AND role = 'enterprise'" result = db_manager.execute_query(sql, (user_id,)) if not result: return jsonify({ 'code': 404, 'message': '用户不存在', 'data': None }), 404 current_password = result[0]['password'] # 验证旧密码 if not AuthUtils.verify_password(old_password, current_password): logger.warning(f"企业修改密码失败 - 旧密码错误: 企业ID {enterprise_id}") return jsonify({ 'code': 401, 'message': '旧密码错误', 'data': None }), 401 # 更新密码(更新ai_users表中的企业管理员密码) hashed_new_password = AuthUtils.hash_password(new_password) update_sql = "UPDATE ai_users SET password = %s, updated_at = NOW() WHERE id = %s AND role = 'enterprise'" db_manager.execute_update(update_sql, (hashed_new_password, user_id)) logger.info(f"企业修改密码成功: ID {enterprise_id}") # 记录业务日志 log_update( target_type='enterprise', target_id=enterprise_id, description=f"修改企业密码", request_data={'old_password': '***', 'new_password': '***'} # 隐藏密码 ) return jsonify({ 'code': 200, 'message': '密码修改成功', 'data': None, 'timestamp': int(datetime.now().timestamp() * 1000) }) except Exception as e: logger.error(f"[修改密码] 处理请求时发生错误: {str(e)}", exc_info=True) return jsonify({ 'code': 500, 'message': '服务器内部错误', 'data': None }), 500