""" AI服务封装模块 支持多种AI提供商:DeepSeek, OpenAI, Claude, 通义千问 """ from typing import Optional, Dict, Any, List import httpx import json import re def format_characters_prompt(characters: List[Dict]) -> str: """格式化角色信息为prompt文本""" if not characters: return "" text = "\n【故事角色设定】\n" for char in characters: text += f"- {char.get('name', '未知')}({char.get('role_type', '配角')}):" if char.get('gender'): text += f" {char.get('gender')}," if char.get('age_range'): text += f" {char.get('age_range')}," if char.get('appearance'): text += f" 外貌:{char.get('appearance')[:100]}," if char.get('personality'): text += f" 性格:{char.get('personality')[:100]}" text += "\n" return text class AIService: def __init__(self): from app.config import get_settings settings = get_settings() self.enabled = settings.ai_service_enabled self.provider = settings.ai_provider print(f"[AI服务初始化] enabled={self.enabled}, provider={self.provider}") # 根据提供商初始化配置 if self.provider == "deepseek": self.api_key = settings.deepseek_api_key self.base_url = settings.deepseek_base_url self.model = settings.deepseek_model print(f"[AI服务初始化] DeepSeek配置: api_key={self.api_key[:20] + '...' if self.api_key else 'None'}, base_url={self.base_url}, model={self.model}") elif self.provider == "openai": self.api_key = settings.openai_api_key self.base_url = settings.openai_base_url self.model = "gpt-3.5-turbo" elif self.provider == "claude": # Claude 需要从环境变量读取 import os self.api_key = os.getenv("CLAUDE_API_KEY", "") self.base_url = "https://api.anthropic.com" self.model = "claude-3-haiku-20240307" elif self.provider == "qwen": import os self.api_key = os.getenv("DASHSCOPE_API_KEY", "") self.base_url = "https://dashscope.aliyuncs.com/api/v1" self.model = "qwen-plus" else: self.api_key = None self.base_url = None self.model = None async def rewrite_ending( self, story_title: str, story_category: str, ending_name: str, ending_content: str, user_prompt: str, characters: List[Dict] = None ) -> Optional[Dict[str, Any]]: """ AI改写结局 :return: {"content": str, "tokens_used": int} 或 None """ if not self.enabled or not self.api_key: return None # 格式化角色信息 characters_text = format_characters_prompt(characters) if characters else "" # 构建Prompt system_prompt = f"""你是一个专业的互动故事创作专家。根据用户的改写指令,重新创作故事结局。 {characters_text} 要求: 1. 保持原故事的世界观和人物性格 2. 结局要有张力和情感冲击 3. 结局内容字数控制在200-400字 4. 为新结局取一个4-8字的新名字,体现改写后的剧情走向 5. 如果有角色设定,必须保持角色性格和外貌描述的一致性 6. 输出格式必须是JSON:{{"ending_name": "新结局名称", "content": "结局内容"}}""" user_prompt_text = f"""故事标题:{story_title} 故事分类:{story_category} 原结局名称:{ending_name} 原结局内容:{ending_content[:500]} --- 用户改写指令:{user_prompt} --- 请创作新的结局(输出JSON格式):""" try: if self.provider == "openai": return await self._call_openai(system_prompt, user_prompt_text) elif self.provider == "claude": return await self._call_claude(user_prompt_text) elif self.provider == "qwen": return await self._call_qwen(system_prompt, user_prompt_text) elif self.provider == "deepseek": return await self._call_deepseek(system_prompt, user_prompt_text) except Exception as e: print(f"AI调用失败:{e}") return None return None async def rewrite_branch( self, story_title: str, story_category: str, path_history: List[Dict[str, str]], current_content: str, user_prompt: str, characters: List[Dict] = None ) -> Optional[Dict[str, Any]]: """ AI改写中间章节,生成新的剧情分支 """ print(f"\n[rewrite_branch] ========== 开始调用 ==========") print(f"[rewrite_branch] story_title={story_title}, category={story_category}") print(f"[rewrite_branch] user_prompt={user_prompt}") print(f"[rewrite_branch] path_history长度={len(path_history)}") print(f"[rewrite_branch] current_content长度={len(current_content)}") print(f"[rewrite_branch] characters数量={len(characters) if characters else 0}") print(f"[rewrite_branch] enabled={self.enabled}, api_key存在={bool(self.api_key)}") if not self.enabled or not self.api_key: print(f"[rewrite_branch] 服务未启用或API Key为空,返回None") return None # 格式化角色信息 characters_text = format_characters_prompt(characters) if characters else "" # 构建路径历史文本 path_text = "" for i, item in enumerate(path_history, 1): path_text += f"第{i}段:{item.get('content', '')}\n" if item.get('choice'): path_text += f" → 用户选择:{item['choice']}\n" # 构建系统提示词 system_prompt_header = f"""你是一个专业的互动故事续写专家。用户正在玩一个互动故事,想要在当前位置改变剧情走向。 {characters_text} 【任务】 请从当前节点开始,创作新的剧情分支。新剧情的第一句话必须紧接当前节点最后的情节,仿佛是同一段故事的自然延续,不能有任何跳跃感。 【写作要求】 1. 第一个节点必须紧密衔接当前剧情,像是同一段话的下一句 2. 生成 4-6 个新节点,形成有层次的剧情发展(起承转合) 3. 每个节点内容 150-300 字,要分 2-3 个自然段(用\\n\\n分隔),包含:场景描写、人物对话、心理活动 4. 每个非结局节点有 2 个选项,选项要有明显的剧情差异和后果 5. 严格符合用户的改写意图,围绕用户指令展开剧情 6. 保持原故事的人物性格、语言风格和世界观 7. 如果有角色设定,必须保持角色性格和外貌的一致性 8. 对话要自然生动,描写要有画面感 【关于结局 - 极其重要!】 ★★★ 每一条分支路径的尽头必须是结局节点 ★★★ - 结局节点必须设置 "is_ending": true - 结局内容要 200-400 字,分 2-3 段,有情感冲击力 - 结局名称 4-8 字,体现剧情走向 - 如果有2个选项分支,最终必须有2个不同的结局 - 不允许出现没有结局的"死胡同"节点 - 每个结局必须有 "ending_score" 评分(0-100): - good 好结局:80-100分 - bad 坏结局:20-50分 - neutral 中立结局:50-70分 - special 特殊结局:70-90分 【内容分段示例】 "content": "他的声音在耳边响起,像是一阵温柔的风。\\n\\n\\"我喜欢你。\\"他说,目光坚定地看着你。\\n\\n你的心跳漏了一拍,一时间不知该如何回应。" """ system_prompt_json = """【输出格式】(严格JSON,不要有任何额外文字) { "nodes": { "branch_1": { "content": "新剧情第一段(150-300字)...", "speaker": "旁白", "choices": [ {"text": "选项A(5-15字)", "nextNodeKey": "branch_2a"}, {"text": "选项B(5-15字)", "nextNodeKey": "branch_2b"} ] }, "branch_2a": { "content": "...", "speaker": "旁白", "choices": [ {"text": "选项C", "nextNodeKey": "branch_ending_good"}, {"text": "选项D", "nextNodeKey": "branch_ending_bad"} ] }, "branch_2b": { "content": "...", "speaker": "旁白", "choices": [ {"text": "选项E", "nextNodeKey": "branch_ending_neutral"}, {"text": "选项F", "nextNodeKey": "branch_ending_special"} ] }, "branch_ending_good": { "content": "好结局内容(200-400字)...\\n\\n【达成结局:xxx】", "speaker": "旁白", "is_ending": true, "ending_name": "结局名称", "ending_type": "good", "ending_score": 90 }, "branch_ending_bad": { "content": "坏结局内容...\\n\\n【达成结局:xxx】", "speaker": "旁白", "is_ending": true, "ending_name": "结局名称", "ending_type": "bad", "ending_score": 40 }, "branch_ending_neutral": { "content": "中立结局...\\n\\n【达成结局:xxx】", "speaker": "旁白", "is_ending": true, "ending_name": "结局名称", "ending_type": "neutral", "ending_score": 60 }, "branch_ending_special": { "content": "特殊结局...\\n\\n【达成结局:xxx】", "speaker": "旁白", "is_ending": true, "ending_name": "结局名称", "ending_type": "special", "ending_score": 80 } }, "entryNodeKey": "branch_1" }""" system_prompt = system_prompt_header + system_prompt_json # 构建用户提示词 user_prompt_text = f"""【原故事信息】 故事标题:{story_title} 故事分类:{story_category} 【用户已走过的剧情】 {path_text} 【当前节点】 {current_content} 【用户改写指令】 {user_prompt} 请创作新的剧情分支(输出JSON格式):""" print(f"[rewrite_branch] 提示词构建完成,开始调用AI...") print(f"[rewrite_branch] provider={self.provider}") try: result = None if self.provider == "openai": print(f"[rewrite_branch] 调用 OpenAI...") result = await self._call_openai_long(system_prompt, user_prompt_text) elif self.provider == "claude": print(f"[rewrite_branch] 调用 Claude...") result = await self._call_claude(f"{system_prompt}\n\n{user_prompt_text}") elif self.provider == "qwen": print(f"[rewrite_branch] 调用 Qwen...") result = await self._call_qwen_long(system_prompt, user_prompt_text) elif self.provider == "deepseek": print(f"[rewrite_branch] 调用 DeepSeek...") result = await self._call_deepseek_long(system_prompt, user_prompt_text) print(f"[rewrite_branch] AI调用完成,result存在={result is not None}") if result and result.get("content"): print(f"[rewrite_branch] AI返回内容长度={len(result.get('content', ''))}") print(f"[rewrite_branch] AI返回内容前500字: {result.get('content', '')[:500]}") # 解析JSON响应 parsed = self._parse_branch_json(result["content"]) print(f"[rewrite_branch] JSON解析结果: parsed存在={parsed is not None}") if parsed: parsed["tokens_used"] = result.get("tokens_used", 0) print(f"[rewrite_branch] 成功! nodes数量={len(parsed.get('nodes', {}))}, tokens={parsed.get('tokens_used')}") return parsed else: print(f"[rewrite_branch] JSON解析失败!") else: print(f"[rewrite_branch] AI返回为空或无content") return None except Exception as e: print(f"[rewrite_branch] 异常: {type(e).__name__}: {e}") import traceback traceback.print_exc() return None async def continue_ending( self, story_title: str, story_category: str, ending_name: str, ending_content: str, user_prompt: str, characters: List[Dict] = None ) -> Optional[Dict[str, Any]]: """ AI续写结局,从结局开始续写新的剧情分支 """ print(f"\n[continue_ending] ========== 开始调用 ==========") print(f"[continue_ending] story_title={story_title}, category={story_category}") print(f"[continue_ending] ending_name={ending_name}") print(f"[continue_ending] user_prompt={user_prompt}") print(f"[continue_ending] characters数量={len(characters) if characters else 0}") print(f"[continue_ending] enabled={self.enabled}, api_key存在={bool(self.api_key)}") if not self.enabled or not self.api_key: print(f"[continue_ending] 服务未启用或API Key为空,返回None") return None # 格式化角色信息 characters_text = format_characters_prompt(characters) if characters else "" # 构建系统提示词 system_prompt_header = f"""你是一个专业的互动故事续写专家。用户已经到达故事的某个结局,现在想要从这个结局继续故事发展。 {characters_text} 【任务】 请从这个结局开始,创作故事的延续。新剧情必须紧接结局内容,仿佛故事并没有在此结束,而是有了新的发展。 【写作要求】 1. 第一个节点必须紧密衔接原结局,像是结局之后自然发生的事 2. 生成 4-6 个新节点,形成有层次的剧情发展(起承转合) 3. 每个节点内容 150-300 字,要分 2-3 个自然段(用\\n\\n分隔),包含:场景描写、人物对话、心理活动 4. 每个非结局节点有 2 个选项,选项要有明显的剧情差异和后果 5. 严格符合用户的续写意图,围绕用户指令展开剧情 6. 保持原故事的人物性格、语言风格和世界观 7. 如果有角色设定,必须保持角色性格和外貌的一致性 8. 对话要自然生动,描写要有画面感 【关于新结局 - 极其重要!】 ★★★ 每一条分支路径的尽头必须是新结局节点 ★★★ - 结局节点必须设置 "is_ending": true - 结局内容要 200-400 字,分 2-3 段,有情感冲击力 - 结局名称 4-8 字,体现剧情走向 - 如果有2个选项分支,最终必须有2个不同的结局 - 每个结局必须有 "ending_score" 评分(0-100) """ system_prompt_json = """【输出格式】(严格JSON,不要有任何额外文字) { "nodes": { "continue_1": { "content": "续写剧情第一段(150-300字)...", "speaker": "旁白", "choices": [ {"text": "选项A(5-15字)", "nextNodeKey": "continue_2a"}, {"text": "选项B(5-15字)", "nextNodeKey": "continue_2b"} ] }, "continue_2a": { "content": "...", "speaker": "旁白", "choices": [ {"text": "选项C", "nextNodeKey": "continue_ending_good"}, {"text": "选项D", "nextNodeKey": "continue_ending_bad"} ] }, "continue_ending_good": { "content": "新好结局内容(200-400字)...\\n\\n【达成结局:xxx】", "speaker": "旁白", "is_ending": true, "ending_name": "新结局名称", "ending_type": "good", "ending_score": 90 }, "continue_ending_bad": { "content": "新坏结局内容...\\n\\n【达成结局:xxx】", "speaker": "旁白", "is_ending": true, "ending_name": "新结局名称", "ending_type": "bad", "ending_score": 40 } }, "entryNodeKey": "continue_1" }""" system_prompt = system_prompt_header + system_prompt_json # 构建用户提示词 user_prompt_text = f"""【原故事信息】 故事标题:{story_title} 故事分类:{story_category} 【已达成的结局】 结局名称:{ending_name} 结局内容:{ending_content[:800]} 【用户续写指令】 {user_prompt} 请从这个结局开始续写新的剧情分支(输出JSON格式):""" print(f"[continue_ending] 提示词构建完成,开始调用AI...") try: result = None if self.provider == "openai": result = await self._call_openai_long(system_prompt, user_prompt_text) elif self.provider == "claude": result = await self._call_claude(f"{system_prompt}\n\n{user_prompt_text}") elif self.provider == "qwen": result = await self._call_qwen_long(system_prompt, user_prompt_text) elif self.provider == "deepseek": result = await self._call_deepseek_long(system_prompt, user_prompt_text) print(f"[continue_ending] AI调用完成,result存在={result is not None}") if result and result.get("content"): print(f"[continue_ending] AI返回内容长度={len(result.get('content', ''))}") # 解析JSON响应(复用 rewrite_branch 的解析方法) parsed = self._parse_branch_json(result["content"]) print(f"[continue_ending] JSON解析结果: parsed存在={parsed is not None}") if parsed: parsed["tokens_used"] = result.get("tokens_used", 0) print(f"[continue_ending] 成功! nodes数量={len(parsed.get('nodes', {}))}") return parsed else: print(f"[continue_ending] JSON解析失败!") return None except Exception as e: print(f"[continue_ending] 异常: {type(e).__name__}: {e}") import traceback traceback.print_exc() return None async def create_story( self, genre: str, keywords: str, protagonist: str = None, conflict: str = None, user_id: int = None ) -> Optional[Dict[str, Any]]: """ AI创作全新故事 :return: 包含完整故事结构的字典,或 None """ print(f"\n[create_story] ========== 开始创作 ==========") print(f"[create_story] genre={genre}, keywords={keywords}") print(f"[create_story] protagonist={protagonist}, conflict={conflict}") print(f"[create_story] enabled={self.enabled}, api_key存在={bool(self.api_key)}") if not self.enabled or not self.api_key: print(f"[create_story] 服务未启用或API Key为空,返回None") return None # 构建系统提示词 system_prompt = """你是一个专业的互动故事创作专家。请根据用户提供的题材和关键词,创作一个完整的互动故事。 【故事结构要求】 1. 故事要有吸引人的标题(10字以内)和简介(50-100字) 2. 创建2-3个主要角色,每个角色需要详细设定 3. 故事包含6-8个节点,形成多分支结构 4. 必须有2-4个不同类型的结局(good/bad/neutral/special) 5. 每个非结局节点有2个选项,选项要有明显的剧情差异 【角色设定要求】 每个角色需要: - name: 角色名(2-4字) - role_type: 角色类型(protagonist/antagonist/supporting) - gender: 性别(male/female) - age_range: 年龄段(youth/adult/middle_aged/elderly) - appearance: 外貌描述(50-100字,包含发型、眼睛、身材、穿着等) - personality: 性格特点(30-50字) 【节点内容要求】 - 每个节点150-300字,分2-3段(用\\n\\n分隔) - 包含场景描写、人物对话、心理活动 - 对话要自然生动,描写要有画面感 【结局要求】 - 结局内容200-400字,有情感冲击力 - 结局名称4-8字,体现剧情走向 - 结局需要评分(ending_score):good 80-100, bad 20-50, neutral 50-70, special 70-90 【输出格式】严格JSON,不要有任何额外文字: { "title": "故事标题", "description": "故事简介(50-100字)", "category": "题材分类", "characters": [ { "name": "角色名", "role_type": "protagonist", "gender": "male", "age_range": "youth", "appearance": "外貌描述...", "personality": "性格特点..." } ], "nodes": { "start": { "content": "开篇内容...", "speaker": "旁白", "choices": [ {"text": "选项A", "nextNodeKey": "node_1a"}, {"text": "选项B", "nextNodeKey": "node_1b"} ] }, "node_1a": { "content": "...", "speaker": "旁白", "choices": [...] }, "ending_good": { "content": "好结局内容...\\n\\n【达成结局:xxx】", "speaker": "旁白", "is_ending": true, "ending_name": "结局名称", "ending_type": "good", "ending_score": 90 } }, "startNodeKey": "start" }""" # 构建用户提示词 protagonist_text = f"\n主角设定:{protagonist}" if protagonist else "" conflict_text = f"\n核心冲突:{conflict}" if conflict else "" user_prompt_text = f"""请创作一个互动故事: 【题材】{genre} 【关键词】{keywords}{protagonist_text}{conflict_text} 请创作完整的故事(输出JSON格式):""" print(f"[create_story] 提示词构建完成,开始调用AI...") try: result = None if self.provider == "openai": result = await self._call_openai_long(system_prompt, user_prompt_text) elif self.provider == "claude": result = await self._call_claude(f"{system_prompt}\n\n{user_prompt_text}") elif self.provider == "qwen": result = await self._call_qwen_long(system_prompt, user_prompt_text) elif self.provider == "deepseek": result = await self._call_deepseek_long(system_prompt, user_prompt_text) print(f"[create_story] AI调用完成,result存在={result is not None}") if result and result.get("content"): print(f"[create_story] AI返回内容长度={len(result.get('content', ''))}") # 解析JSON响应 parsed = self._parse_story_json(result["content"]) print(f"[create_story] JSON解析结果: parsed存在={parsed is not None}") if parsed: parsed["tokens_used"] = result.get("tokens_used", 0) print(f"[create_story] 成功! title={parsed.get('title')}, nodes数量={len(parsed.get('nodes', {}))}") return parsed else: print(f"[create_story] JSON解析失败!") return None except Exception as e: print(f"[create_story] 异常: {type(e).__name__}: {e}") import traceback traceback.print_exc() return None def _parse_story_json(self, content: str) -> Optional[Dict]: """解析AI返回的故事JSON""" print(f"[_parse_story_json] 开始解析,内容长度={len(content)}") # 移除 markdown 代码块标记 clean_content = content.strip() if clean_content.startswith('```'): clean_content = re.sub(r'^```(?:json)?\s*', '', clean_content) clean_content = re.sub(r'\s*```$', '', clean_content) result = None # 方法1: 直接解析 try: result = json.loads(clean_content) if all(k in result for k in ['title', 'nodes', 'startNodeKey']): print(f"[_parse_story_json] 直接解析成功!") except json.JSONDecodeError as e: print(f"[_parse_story_json] 直接解析失败: {e}") result = None # 方法2: 提取JSON块 if not result: try: brace_match = re.search(r'\{[\s\S]*\}', clean_content) if brace_match: json_str = brace_match.group(0) result = json.loads(json_str) if all(k in result for k in ['title', 'nodes', 'startNodeKey']): print(f"[_parse_story_json] 花括号块解析成功!") else: result = None except json.JSONDecodeError as e: print(f"[_parse_story_json] 花括号块解析失败: {e}") # 尝试修复截断的JSON try: result = self._try_fix_story_json(json_str) if result: print(f"[_parse_story_json] JSON修复成功!") except: pass except Exception as e: print(f"[_parse_story_json] 提取解析失败: {e}") if not result: print(f"[_parse_story_json] 所有解析方法都失败了") return None # 验证并修复故事结构 result = self._validate_and_fix_story(result) return result def _validate_and_fix_story(self, story: Dict) -> Dict: """验证并修复故事结构,确保每个分支都有结局""" nodes = story.get('nodes', {}) if not nodes: return story print(f"[_validate_and_fix_story] 开始验证,节点数={len(nodes)}") # 1. 找出所有结局节点 ending_nodes = [k for k, v in nodes.items() if v.get('is_ending')] print(f"[_validate_and_fix_story] 已有结局节点: {ending_nodes}") # 2. 找出所有被引用的节点(作为 nextNodeKey) referenced_keys = set() for node_key, node_data in nodes.items(): choices = node_data.get('choices', []) if isinstance(choices, list): for choice in choices: if isinstance(choice, dict) and 'nextNodeKey' in choice: referenced_keys.add(choice['nextNodeKey']) # 3. 找出"叶子节点":没有 choices 或 choices 为空,且不是结局 leaf_nodes = [] broken_refs = [] # 引用了不存在节点的选项 for node_key, node_data in nodes.items(): choices = node_data.get('choices', []) is_ending = node_data.get('is_ending', False) # 检查 choices 中引用的节点是否存在 if isinstance(choices, list): for choice in choices: if isinstance(choice, dict): next_key = choice.get('nextNodeKey') if next_key and next_key not in nodes: broken_refs.append((node_key, next_key)) # 没有有效选项且不是结局的节点 if not is_ending and (not choices or len(choices) == 0): leaf_nodes.append(node_key) print(f"[_validate_and_fix_story] 叶子节点(无选项非结局): {leaf_nodes}") print(f"[_validate_and_fix_story] 断裂引用: {broken_refs}") # 4. 修复:将叶子节点标记为结局 for node_key in leaf_nodes: node = nodes[node_key] print(f"[_validate_and_fix_story] 修复节点 {node_key} -> 标记为结局") node['is_ending'] = True if not node.get('ending_name'): node['ending_name'] = '命运的转折' if not node.get('ending_type'): node['ending_type'] = 'neutral' if not node.get('ending_score'): node['ending_score'] = 60 # 5. 修复:处理断裂引用(选项指向不存在的节点) for node_key, missing_key in broken_refs: node = nodes[node_key] choices = node.get('choices', []) # 移除指向不存在节点的选项 valid_choices = [c for c in choices if c.get('nextNodeKey') in nodes] if len(valid_choices) == 0: # 没有有效选项了,标记为结局 print(f"[_validate_and_fix_story] 节点 {node_key} 所有选项失效 -> 标记为结局") node['is_ending'] = True node['choices'] = [] if not node.get('ending_name'): node['ending_name'] = '未知结局' if not node.get('ending_type'): node['ending_type'] = 'neutral' if not node.get('ending_score'): node['ending_score'] = 50 else: node['choices'] = valid_choices # 6. 最终检查:确保至少有一个结局 ending_count = sum(1 for v in nodes.values() if v.get('is_ending')) print(f"[_validate_and_fix_story] 修复后结局数: {ending_count}") if ending_count == 0: # 如果还是没有结局,找最后一个节点标记为结局 last_key = list(nodes.keys())[-1] print(f"[_validate_and_fix_story] 强制将最后节点 {last_key} 标记为结局") nodes[last_key]['is_ending'] = True nodes[last_key]['ending_name'] = '故事的终点' nodes[last_key]['ending_type'] = 'neutral' nodes[last_key]['ending_score'] = 60 nodes[last_key]['choices'] = [] return story def _try_fix_story_json(self, json_str: str) -> Optional[Dict]: """尝试修复不完整的故事JSON""" try: # 尝试找到最后一个完整的节点 # 查找 "nodes": { 的位置 nodes_match = re.search(r'"nodes"\s*:\s*\{', json_str) if not nodes_match: return None # 找所有看起来完整的节点(有 "content" 字段的) node_pattern = r'"(\w+)"\s*:\s*\{[^{}]*"content"[^{}]*(?:\{[^{}]*\}[^{}]*)*\}' nodes = list(re.finditer(node_pattern, json_str[nodes_match.end():])) if len(nodes) < 2: return None # 取到最后一个完整节点的位置 last_node_end = nodes_match.end() + nodes[-1].end() # 尝试提取基本信息 title_match = re.search(r'"title"\s*:\s*"([^"]+)"', json_str) desc_match = re.search(r'"description"\s*:\s*"([^"]+)"', json_str) category_match = re.search(r'"category"\s*:\s*"([^"]+)"', json_str) start_match = re.search(r'"startNodeKey"\s*:\s*"([^"]+)"', json_str) title = title_match.group(1) if title_match else "AI创作故事" description = desc_match.group(1) if desc_match else "" category = category_match.group(1) if category_match else "都市言情" startNodeKey = start_match.group(1) if start_match else "start" # 提取角色 characters = [] char_match = re.search(r'"characters"\s*:\s*\[([\s\S]*?)\]', json_str) if char_match: try: characters = json.loads('[' + char_match.group(1) + ']') except: pass # 提取节点 nodes_content = json_str[nodes_match.start():last_node_end] + '}' try: nodes_obj = json.loads('{' + nodes_content + '}') nodes_dict = nodes_obj.get('nodes', {}) except: return None if len(nodes_dict) < 2: return None result = { "title": title, "description": description, "category": category, "characters": characters, "nodes": nodes_dict, "startNodeKey": startNodeKey } print(f"[_try_fix_story_json] 修复成功! 节点数={len(nodes_dict)}") return result except Exception as e: print(f"[_try_fix_story_json] 修复失败: {e}") return None def _parse_branch_json(self, content: str) -> Optional[Dict]: """解析AI返回的分支JSON""" print(f"[_parse_branch_json] 开始解析,内容长度={len(content)}") # 移除 markdown 代码块标记 clean_content = content.strip() if clean_content.startswith('```'): # 移除开头的 ```json 或 ``` clean_content = re.sub(r'^```(?:json)?\s*', '', clean_content) # 移除结尾的 ``` clean_content = re.sub(r'\s*```$', '', clean_content) try: # 尝试直接解析 result = json.loads(clean_content) print(f"[_parse_branch_json] 直接解析成功!") return result except json.JSONDecodeError as e: print(f"[_parse_branch_json] 直接解析失败: {e}") # 尝试提取JSON块 try: # 匹配 { ... } 结构 brace_match = re.search(r'\{[\s\S]*\}', clean_content) if brace_match: json_str = brace_match.group(0) print(f"[_parse_branch_json] 找到花括号块,尝试解析...") try: result = json.loads(json_str) print(f"[_parse_branch_json] 花括号块解析成功!") return result except json.JSONDecodeError as e: print(f"[_parse_branch_json] 花括号块解析失败: {e}") # 打印错误位置附近的内容 error_pos = e.pos if hasattr(e, 'pos') else 0 start = max(0, error_pos - 100) end = min(len(json_str), error_pos + 100) print(f"[_parse_branch_json] 错误位置附近内容: ...{json_str[start:end]}...") # 尝试修复不完整的 JSON print(f"[_parse_branch_json] 尝试修复不完整的JSON...") fixed_json = self._try_fix_incomplete_json(json_str) if fixed_json: print(f"[_parse_branch_json] JSON修复成功!") return fixed_json except Exception as e: print(f"[_parse_branch_json] 提取解析异常: {e}") print(f"[_parse_branch_json] 所有解析方法都失败了") return None def _try_fix_incomplete_json(self, json_str: str) -> Optional[Dict]: """尝试修复不完整的JSON(被截断的情况)""" try: # 找到已完成的节点,截断不完整的部分 # 查找最后一个完整的节点(以 } 结尾,后面跟着逗号或闭括号) # 先找到 "nodes": { 的位置 nodes_match = re.search(r'"nodes"\s*:\s*\{', json_str) if not nodes_match: return None nodes_start = nodes_match.end() # 找所有完整的 branch 节点 branch_pattern = r'"branch_\w+"\s*:\s*\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}' branches = list(re.finditer(branch_pattern, json_str[nodes_start:])) if not branches: return None # 取最后一个完整的节点的结束位置 last_complete_end = nodes_start + branches[-1].end() # 构建修复后的 JSON # 截取到最后一个完整节点,然后补全结构 truncated = json_str[:last_complete_end] # 补全 JSON 结构 fixed = truncated + '\n },\n "entryNodeKey": "branch_1"\n}' print(f"[_try_fix_incomplete_json] 修复后的JSON长度: {len(fixed)}") result = json.loads(fixed) # 验证结果结构 if "nodes" in result and len(result["nodes"]) > 0: print(f"[_try_fix_incomplete_json] 修复后节点数: {len(result['nodes'])}") return result except Exception as e: print(f"[_try_fix_incomplete_json] 修复失败: {e}") return None async def _call_deepseek_long(self, system_prompt: str, user_prompt: str) -> Optional[Dict]: """调用 DeepSeek API (长文本版本)""" print(f"[_call_deepseek_long] 开始调用...") print(f"[_call_deepseek_long] base_url={self.base_url}") print(f"[_call_deepseek_long] model={self.model}") url = f"{self.base_url}/chat/completions" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": self.model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "temperature": 0.85, "max_tokens": 8192 # DeepSeek 最大输出限制 } print(f"[_call_deepseek_long] system_prompt长度={len(system_prompt)}") print(f"[_call_deepseek_long] user_prompt长度={len(user_prompt)}") async with httpx.AsyncClient(timeout=300.0) as client: try: print(f"[_call_deepseek_long] 发送请求到 {url}...") response = await client.post(url, headers=headers, json=data) print(f"[_call_deepseek_long] 响应状态码: {response.status_code}") response.raise_for_status() result = response.json() print(f"[_call_deepseek_long] 响应JSON keys: {result.keys()}") if "choices" in result and len(result["choices"]) > 0: content = result["choices"][0]["message"]["content"] tokens = result.get("usage", {}).get("total_tokens", 0) print(f"[_call_deepseek_long] 成功! content长度={len(content)}, tokens={tokens}") return {"content": content.strip(), "tokens_used": tokens} else: print(f"[_call_deepseek_long] 响应异常,无choices: {result}") return None except httpx.HTTPStatusError as e: print(f"[_call_deepseek_long] HTTP错误: {e.response.status_code} - {e.response.text}") return None except httpx.TimeoutException as e: print(f"[_call_deepseek_long] 请求超时: {e}") return None except Exception as e: print(f"[_call_deepseek_long] 其他错误: {type(e).__name__}: {e}") import traceback traceback.print_exc() return None async def _call_openai_long(self, system_prompt: str, user_prompt: str) -> Optional[Dict]: """调用OpenAI API (长文本版本)""" url = f"{self.base_url}/chat/completions" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": self.model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "temperature": 0.8, "max_tokens": 2000 } async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(url, headers=headers, json=data) response.raise_for_status() result = response.json() content = result["choices"][0]["message"]["content"] tokens = result["usage"]["total_tokens"] return {"content": content.strip(), "tokens_used": tokens} async def _call_qwen_long(self, system_prompt: str, user_prompt: str) -> Optional[Dict]: """调用通义千问API (长文本版本)""" url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": self.model, "input": { "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] }, "parameters": { "result_format": "message", "temperature": 0.8, "max_tokens": 2000 } } async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(url, headers=headers, json=data) response.raise_for_status() result = response.json() content = result["output"]["choices"][0]["message"]["content"] tokens = result.get("usage", {}).get("total_tokens", 0) return {"content": content.strip(), "tokens_used": tokens} async def _call_openai(self, system_prompt: str, user_prompt: str) -> Optional[Dict]: """调用OpenAI API""" url = f"{self.base_url}/chat/completions" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": self.model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "temperature": 0.8, "max_tokens": 500 } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(url, headers=headers, json=data) response.raise_for_status() result = response.json() content = result["choices"][0]["message"]["content"] tokens = result["usage"]["total_tokens"] return {"content": content.strip(), "tokens_used": tokens} async def _call_claude(self, prompt: str) -> Optional[Dict]: """调用Claude API""" url = "https://api.anthropic.com/v1/messages" headers = { "x-api-key": self.api_key, "anthropic-version": "2023-06-01", "content-type": "application/json" } data = { "model": self.model, "max_tokens": 1024, "messages": [{"role": "user", "content": prompt}] } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(url, headers=headers, json=data) response.raise_for_status() result = response.json() content = result["content"][0]["text"] tokens = result.get("usage", {}).get("output_tokens", 0) return {"content": content.strip(), "tokens_used": tokens} async def _call_qwen(self, system_prompt: str, user_prompt: str) -> Optional[Dict]: """调用通义千问API""" url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": self.model, "input": { "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] }, "parameters": { "result_format": "message", "temperature": 0.8 } } async with httpx.AsyncClient(timeout=30.0) as client: response = await client.post(url, headers=headers, json=data) response.raise_for_status() result = response.json() content = result["output"]["choices"][0]["message"]["content"] tokens = result.get("usage", {}).get("total_tokens", 0) return {"content": content.strip(), "tokens_used": tokens} async def _call_deepseek(self, system_prompt: str, user_prompt: str) -> Optional[Dict]: """调用 DeepSeek API""" url = f"{self.base_url}/chat/completions" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } data = { "model": self.model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "temperature": 0.8, "max_tokens": 500 } async with httpx.AsyncClient(timeout=30.0) as client: try: response = await client.post(url, headers=headers, json=data) response.raise_for_status() result = response.json() if "choices" in result and len(result["choices"]) > 0: content = result["choices"][0]["message"]["content"] tokens = result.get("usage", {}).get("total_tokens", 0) return {"content": content.strip(), "tokens_used": tokens} else: print(f"DeepSeek API 返回异常:{result}") return None except httpx.HTTPStatusError as e: print(f"DeepSeek HTTP 错误:{e.response.status_code} - {e.response.text}") return None except Exception as e: print(f"DeepSeek 调用失败:{e}") return None # 单例模式 ai_service = AIService()