refactor: 移除事务、简化日志、轮询改2秒、删除重建脚本

This commit is contained in:
2026-02-05 23:52:26 +08:00
parent 1013c47524
commit 1a74b42f82
6 changed files with 11 additions and 662 deletions

View File

@@ -4,7 +4,7 @@
## 功能概述
- **守护进程模式**:持续监控数据库,自动处理新数据(默认10秒轮询)
- **守护进程模式**:持续监控数据库,自动处理新数据(默认2秒轮询)
- **批量处理模式**10张图片一个请求多请求并发执行
- **内容审核处理**:自动识别审核失败图片,标记状态并记录原因
- **RESTful API 服务**:提供标签衍生的 HTTP 接口
@@ -90,8 +90,8 @@ export DB_PASSWORD=your-password
# 持续监控数据库,自动处理新数据
python image_tag_derive.py --daemon
# 指定轮询间隔(默认10秒)
python image_tag_derive.py --daemon --interval 10
# 指定轮询间隔(默认2秒)
python image_tag_derive.py --daemon --interval 2
# 并发配置
python image_tag_derive.py --daemon --batch-size 50 --concurrency 3
@@ -123,7 +123,7 @@ python image_tag_derive.py --id 16495 16496 16497
| 参数 | 说明 |
|------|------|
| `--daemon` | 守护模式:持续监控数据库 |
| `--interval` | 轮询间隔(秒),默认10秒 |
| `--interval` | 轮询间隔(秒),默认2秒 |
| `--limit` | 限制处理数量(测试用) |
| `--start-id` | 起始ID断点续传 |
| `--end-id` | 结束ID |

View File

@@ -64,10 +64,6 @@ class DatabaseManager:
cursor = conn.cursor(dictionary=dictionary)
try:
yield cursor
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()

View File

@@ -497,13 +497,11 @@ def run_daemon(batch_size=None, concurrency=None, interval=10):
logger.info(f"并发数: {concurrency or settings.tag_derive.concurrency}")
logger.info("=" * 60)
round_count = 0
total_success = 0
total_failed = 0
while running:
round_count += 1
logger.info(f"[{round_count}] 检查待处理数据...")
logger.info("检查待处理数据...")
try:
results = batch_derive_tags(
@@ -516,12 +514,12 @@ def run_daemon(batch_size=None, concurrency=None, interval=10):
failed = len(results) - success
total_success += success
total_failed += failed
logger.info(f"[{round_count}] 处理完成: 成功 {success}, 失败 {failed}")
logger.info(f"处理完成: 成功 {success}, 失败 {failed}")
else:
logger.info(f"[{round_count}] 没有待处理的数据")
logger.info("没有待处理的数据")
except Exception as e:
logger.error(f"[{round_count}] 处理异常: {e}")
logger.error(f"处理异常: {e}")
if running:
logger.info(f"等待 {interval} 秒后继续...")
@@ -533,7 +531,7 @@ def run_daemon(batch_size=None, concurrency=None, interval=10):
logger.info("=" * 60)
logger.info("服务已停止")
logger.info(f"统计: 共运行 {round_count} 次, 成功 {total_success} 条, 失败 {total_failed}")
logger.info(f"统计: 成功 {total_success} 条, 失败 {total_failed}")
logger.info("=" * 60)
@@ -548,7 +546,7 @@ def main():
parser.add_argument('--id', type=int, nargs='+', default=None, help='指定ID只处理这些ID可指定多个')
parser.add_argument('--limit', type=int, default=None, help='限制处理的总数量(用于测试,如 --limit 10')
parser.add_argument('--daemon', action='store_true', help='守护模式:持续监控数据库,自动处理新数据')
parser.add_argument('--interval', type=int, default=10, help='守护模式轮询间隔(秒),默认10')
parser.add_argument('--interval', type=int, default=2, help='守护模式轮询间隔(秒),默认2')
args = parser.parse_args()
batch_size = args.batch_size or settings.tag_derive.batch_size

View File

@@ -1,310 +0,0 @@
# -*- coding: utf-8 -*-
"""
数据库重建脚本
根据 ai_article.sql 重建数据库结构,从 ai_image_tags.txt 导入数据
"""
import mysql.connector
import os
# 数据库配置
DB_CONFIG = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "liang20020523",
"charset": "utf8mb4"
}
DATABASE_NAME = "ai_article"
SQL_FILE = "ai_article.sql"
DATA_FILE = "ai_image_tags.txt"
def get_connection(with_database=False):
"""获取数据库连接"""
config = DB_CONFIG.copy()
if with_database:
config["database"] = DATABASE_NAME
return mysql.connector.connect(**config)
def rebuild_database_structure():
"""重建数据库结构"""
print("=" * 60)
print("步骤1: 重建数据库结构")
print("=" * 60)
# 读取SQL文件
sql_path = os.path.join(os.path.dirname(__file__), SQL_FILE)
print(f"读取SQL文件: {sql_path}")
with open(sql_path, "r", encoding="utf-8") as f:
sql_content = f.read()
# 连接MySQL不指定数据库
conn = get_connection(with_database=False)
cursor = conn.cursor()
try:
# 删除并重新创建数据库
print(f"\n删除数据库 {DATABASE_NAME}(如果存在)...")
cursor.execute(f"DROP DATABASE IF EXISTS `{DATABASE_NAME}`")
print(f"创建数据库 {DATABASE_NAME}...")
cursor.execute(f"CREATE DATABASE `{DATABASE_NAME}` CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci")
print(f"切换到数据库 {DATABASE_NAME}...")
cursor.execute(f"USE `{DATABASE_NAME}`")
# 分割并执行SQL语句
print("\n执行SQL脚本...")
# 移除注释并分割SQL语句
statements = []
current_statement = ""
in_comment = False
for line in sql_content.split("\n"):
stripped = line.strip()
# 跳过空行
if not stripped:
continue
# 跳过单行注释
if stripped.startswith("--"):
continue
# 处理多行注释开始
if stripped.startswith("/*"):
in_comment = True
continue
# 处理多行注释结束
if "*/" in stripped:
in_comment = False
continue
# 跳过注释中的内容
if in_comment:
continue
current_statement += line + "\n"
# 检查语句是否结束
if stripped.endswith(";"):
statements.append(current_statement.strip())
current_statement = ""
# 执行每条SQL语句
success_count = 0
error_count = 0
for i, stmt in enumerate(statements):
if not stmt or stmt.strip() == ";":
continue
try:
cursor.execute(stmt)
conn.commit()
success_count += 1
# 打印表创建信息
if "CREATE TABLE" in stmt.upper():
table_name = stmt.split("`")[1] if "`" in stmt else "unknown"
print(f" ✓ 创建表: {table_name}")
except mysql.connector.Error as e:
error_count += 1
# 只打印关键错误
if "already exists" not in str(e).lower():
print(f" ✗ SQL执行错误: {str(e)[:100]}")
print(f"\nSQL执行完成: 成功 {success_count} 条, 失败 {error_count}")
finally:
cursor.close()
conn.close()
def import_image_tags_data():
"""从 ai_image_tags.txt 导入数据"""
print("\n" + "=" * 60)
print("步骤2: 导入 ai_image_tags 数据")
print("=" * 60)
data_path = os.path.join(os.path.dirname(__file__), DATA_FILE)
print(f"读取数据文件: {data_path}")
if not os.path.exists(data_path):
print(f"数据文件不存在: {data_path}")
return
with open(data_path, "r", encoding="utf-8") as f:
lines = f.readlines()
# 解析数据
# 文件格式第9行是列头从第10行开始是数据
data_rows = []
header_line = None
for i, line in enumerate(lines):
stripped = line.strip()
if not stripped:
continue
# 找到列头行(包含 id, image_id 等)
if stripped.startswith("id\t"):
header_line = stripped
print(f"找到列头(第{i+1}行): {stripped[:80]}...")
continue
# 跳过非数据行
if header_line is None:
continue
# 解析数据行(以数字开头)
parts = stripped.split("\t")
if len(parts) >= 10 and parts[0].isdigit():
data_rows.append(parts)
print(f"解析到 {len(data_rows)} 条数据")
if not data_rows:
print("没有数据需要导入")
return
# 连接数据库
conn = get_connection(with_database=True)
cursor = conn.cursor()
try:
# 禁用外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
print("已禁用外键检查")
# 插入数据
insert_sql = """
INSERT INTO ai_image_tags
(id, image_id, image_name, image_url, image_thumb_url, tag_id, tag_name,
default_tag_id, default_tag_name, keywords_id, keywords_name,
department_id, department_name, image_source, created_user_id,
created_at, updated_at, image_attached_article_count, status, blocking_reason)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
success_count = 0
error_count = 0
for row in data_rows:
try:
# 处理数据,确保长度匹配
while len(row) < 20:
row.append("")
# 转换数据类型
values = (
int(row[0]), # id
int(row[1]), # image_id
row[2], # image_name
row[3], # image_url
row[4], # image_thumb_url
int(row[5]), # tag_id
row[6], # tag_name
int(row[7]) if row[7] else 0, # default_tag_id
row[8], # default_tag_name
int(row[9]), # keywords_id
row[10], # keywords_name
int(row[11]), # department_id
row[12], # department_name
int(row[13]) if row[13] else 1, # image_source
int(row[14]) if row[14] else 0, # created_user_id
row[15], # created_at
row[16], # updated_at
int(row[17]) if row[17] else 0, # image_attached_article_count
row[18] if row[18] else "draft", # status
row[19] if len(row) > 19 else "" # blocking_reason
)
cursor.execute(insert_sql, values)
success_count += 1
except Exception as e:
error_count += 1
if error_count <= 3:
print(f" 插入错误 (id={row[0]}): {e}")
conn.commit()
print(f"\n数据导入完成: 成功 {success_count} 条, 失败 {error_count}")
# 恢复外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
print("已恢复外键检查")
finally:
cursor.close()
conn.close()
def verify_database():
"""验证数据库"""
print("\n" + "=" * 60)
print("步骤3: 验证数据库")
print("=" * 60)
conn = get_connection(with_database=True)
cursor = conn.cursor()
try:
# 检查表数量
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
print(f"\n数据库中共有 {len(tables)} 张表:")
for t in tables[:10]:
print(f" - {t[0]}")
if len(tables) > 10:
print(f" ... 还有 {len(tables) - 10} 张表")
# 检查 ai_image_tags 表数据
cursor.execute("SELECT COUNT(*) FROM ai_image_tags")
count = cursor.fetchone()[0]
print(f"\nai_image_tags 表共有 {count} 条记录")
if count > 0:
cursor.execute("SELECT id, tag_name, department_name FROM ai_image_tags LIMIT 3")
rows = cursor.fetchall()
print("示例数据:")
for row in rows:
print(f" ID: {row[0]}, 标签: {row[1][:30]}..., 科室: {row[2]}")
finally:
cursor.close()
conn.close()
def main():
print("\n" + "=" * 60)
print(" 数据库重建脚本")
print("=" * 60)
print(f"数据库: {DATABASE_NAME}")
print(f"SQL文件: {SQL_FILE}")
print(f"数据文件: {DATA_FILE}")
print("=" * 60)
# 确认操作
confirm = input("\n警告: 此操作将删除并重建数据库,所有数据将丢失!\n确认继续? (输入 'yes' 确认): ")
if confirm.lower() != "yes":
print("操作已取消")
return
# 执行重建
rebuild_database_structure()
import_image_tags_data()
verify_database()
print("\n" + "=" * 60)
print("数据库重建完成!")
print("=" * 60)
if __name__ == "__main__":
main()

View File

@@ -1,335 +0,0 @@
# -*- coding: utf-8 -*-
"""
数据库恢复脚本
根据 ai_article.sql 重建数据库结构,从 backup_data.json 恢复数据
"""
import mysql.connector
import json
import os
# 数据库配置
DB_CONFIG = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "liang20020523",
"charset": "utf8mb4"
}
DATABASE_NAME = "ai_article"
SQL_FILE = "ai_article.sql"
BACKUP_FILE = "backup_data.json"
def get_connection(with_database=False):
"""获取数据库连接"""
config = DB_CONFIG.copy()
if with_database:
config["database"] = DATABASE_NAME
return mysql.connector.connect(**config)
def rebuild_database_structure():
"""重建数据库结构"""
print("=" * 60)
print("步骤1: 重建数据库结构")
print("=" * 60)
sql_path = os.path.join(os.path.dirname(__file__), SQL_FILE)
print(f"读取SQL文件: {sql_path}")
with open(sql_path, "r", encoding="utf-8") as f:
sql_content = f.read()
conn = get_connection(with_database=False)
cursor = conn.cursor()
try:
print(f"\n删除数据库 {DATABASE_NAME}(如果存在)...")
cursor.execute(f"DROP DATABASE IF EXISTS `{DATABASE_NAME}`")
print(f"创建数据库 {DATABASE_NAME}...")
cursor.execute(f"CREATE DATABASE `{DATABASE_NAME}` CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci")
print(f"切换到数据库 {DATABASE_NAME}...")
cursor.execute(f"USE `{DATABASE_NAME}`")
print("\n执行SQL脚本...")
statements = []
current_statement = ""
in_comment = False
for line in sql_content.split("\n"):
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("--"):
continue
if stripped.startswith("/*"):
in_comment = True
continue
if "*/" in stripped:
in_comment = False
continue
if in_comment:
continue
current_statement += line + "\n"
if stripped.endswith(";"):
statements.append(current_statement.strip())
current_statement = ""
success_count = 0
table_count = 0
for stmt in statements:
if not stmt or stmt.strip() == ";":
continue
try:
cursor.execute(stmt)
conn.commit()
success_count += 1
if "CREATE TABLE" in stmt.upper():
table_name = stmt.split("`")[1] if "`" in stmt else "unknown"
print(f" ✓ 创建表: {table_name}")
table_count += 1
except mysql.connector.Error as e:
if "already exists" not in str(e).lower():
print(f" ✗ SQL执行错误: {str(e)[:80]}")
print(f"\n表结构创建完成: {table_count} 张表")
finally:
cursor.close()
conn.close()
def restore_data():
"""从备份文件恢复数据"""
print("\n" + "=" * 60)
print("步骤2: 恢复表数据")
print("=" * 60)
backup_path = os.path.join(os.path.dirname(__file__), BACKUP_FILE)
print(f"读取备份文件: {backup_path}")
if not os.path.exists(backup_path):
print(f"备份文件不存在: {backup_path}")
return
with open(backup_path, "r", encoding="utf-8") as f:
data = json.load(f)
conn = get_connection(with_database=True)
cursor = conn.cursor()
try:
# 禁用外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS = 0")
print("已禁用外键检查\n")
# 恢复 ai_tags
tags = data.get("ai_tags", [])
print(f"恢复 ai_tags 表 ({len(tags)} 条)...")
if tags:
for tag in tags:
columns = ", ".join(f"`{k}`" for k in tag.keys())
placeholders = ", ".join(["%s"] * len(tag))
sql = f"INSERT INTO ai_tags ({columns}) VALUES ({placeholders})"
try:
cursor.execute(sql, list(tag.values()))
except mysql.connector.Error as e:
print(f" 插入错误: {e}")
conn.commit()
print(f" ✓ ai_tags 恢复完成")
# 恢复 ai_image_tags
image_tags = data.get("ai_image_tags", [])
print(f"恢复 ai_image_tags 表 ({len(image_tags)} 条)...")
# 先插入一条URL错误的测试数据id=1第一条
error_url_data = {
"id": 1,
"image_id": 99999,
"image_name": "error_test_image.png",
"image_url": "https://invalid-url-test/not-exist/error_image.png",
"image_thumb_url": "https://invalid-url-test/not-exist/error_image_thumb.png",
"tag_id": 12679,
"tag_name": "#测试错误URL#",
"default_tag_id": 0,
"default_tag_name": "",
"keywords_id": 186,
"keywords_name": "测试",
"department_id": 11,
"department_name": "妇科",
"image_source": 1,
"created_user_id": 0,
"created_at": "2025-01-01T00:00:00",
"updated_at": "2025-01-01T00:00:00",
"image_attached_article_count": 0,
"status": "draft",
"blocking_reason": "",
"similarity": "draft",
"similarity_image_tags_id": 0,
"similarity score": 0.0
}
columns = ", ".join(f"`{k}`" for k in error_url_data.keys())
placeholders = ", ".join(["%s"] * len(error_url_data))
sql = f"INSERT INTO ai_image_tags ({columns}) VALUES ({placeholders})"
try:
cursor.execute(sql, list(error_url_data.values()))
conn.commit()
print(f" ✓ 已插入URL错误的测试数据 (id=1)")
except mysql.connector.Error as e:
print(f" 插入错误URL测试数据失败: {e}")
if image_tags:
success = 0
for item in image_tags:
columns = ", ".join(f"`{k}`" for k in item.keys())
placeholders = ", ".join(["%s"] * len(item))
sql = f"INSERT INTO ai_image_tags ({columns}) VALUES ({placeholders})"
try:
cursor.execute(sql, list(item.values()))
success += 1
except mysql.connector.Error as e:
print(f" 插入错误 (id={item.get('id')}): {e}")
conn.commit()
print(f" ✓ ai_image_tags 恢复完成 ({success} 条)")
# 恢复外键检查
cursor.execute("SET FOREIGN_KEY_CHECKS = 1")
print("\n已恢复外键检查")
finally:
cursor.close()
conn.close()
def verify_database():
"""验证数据库"""
print("\n" + "=" * 60)
print("步骤3: 验证数据库")
print("=" * 60)
conn = get_connection(with_database=True)
cursor = conn.cursor()
try:
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
print(f"\n数据库共有 {len(tables)} 张表")
cursor.execute("SELECT COUNT(*) FROM ai_tags")
count = cursor.fetchone()[0]
print(f"ai_tags 表: {count} 条记录")
cursor.execute("SELECT COUNT(*) FROM ai_image_tags")
count = cursor.fetchone()[0]
print(f"ai_image_tags 表: {count} 条记录")
# 显示示例数据
cursor.execute("SELECT id, tag_name, department FROM ai_tags LIMIT 3")
rows = cursor.fetchall()
if rows:
print("\nai_tags 示例:")
for row in rows:
print(f" ID: {row[0]}, 标签: {row[1][:40]}..., 科室: {row[2]}")
finally:
cursor.close()
conn.close()
def backup_current_data():
"""备份当前数据到 backup_data.json"""
print("=" * 60)
print("备份当前数据")
print("=" * 60)
conn = get_connection(with_database=True)
cursor = conn.cursor(dictionary=True)
try:
# 导出 ai_tags
cursor.execute('SELECT * FROM ai_tags')
tags = cursor.fetchall()
print(f"ai_tags: {len(tags)}")
# 导出 ai_image_tags
cursor.execute('SELECT * FROM ai_image_tags')
image_tags = cursor.fetchall()
print(f"ai_image_tags: {len(image_tags)}")
# 转换datetime为字符串
def convert_datetime(obj):
for key, value in obj.items():
if hasattr(value, 'isoformat'):
obj[key] = value.isoformat()
return obj
tags = [convert_datetime(t) for t in tags]
image_tags = [convert_datetime(t) for t in image_tags]
# 保存为JSON
backup_path = os.path.join(os.path.dirname(__file__), BACKUP_FILE)
data = {'ai_tags': tags, 'ai_image_tags': image_tags}
with open(backup_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"\n数据已保存到 {backup_path}")
finally:
cursor.close()
conn.close()
def main():
print("\n" + "=" * 60)
print(" 数据库恢复脚本")
print("=" * 60)
print(f"数据库: {DATABASE_NAME}")
print(f"SQL文件: {SQL_FILE}")
print(f"备份文件: {BACKUP_FILE}")
print("=" * 60)
print("\n请选择操作:")
print(" 1. 完整恢复 (重建结构 + 恢复数据)")
print(" 2. 仅恢复数据 (保留现有结构)")
print(" 3. 备份当前数据")
print(" 0. 取消")
choice = input("\n请输入选项 (0-3): ").strip()
if choice == "1":
confirm = input("\n警告: 此操作将删除并重建数据库!确认? (输入 'yes'): ")
if confirm.lower() == "yes":
rebuild_database_structure()
restore_data()
verify_database()
print("\n" + "=" * 60)
print("数据库恢复完成!")
print("=" * 60)
else:
print("操作已取消")
elif choice == "2":
restore_data()
verify_database()
print("\n数据恢复完成!")
elif choice == "3":
backup_current_data()
print("\n备份完成!")
else:
print("操作已取消")
if __name__ == "__main__":
main()

View File

@@ -35,7 +35,7 @@ get_all_pids() {
}
# 循环间隔(秒)
LOOP_INTERVAL=10
LOOP_INTERVAL=2
# 启动服务(带进程数量控制)
start_single() {