Files
ai_mip/ad_automation.py
2026-01-13 18:59:26 +08:00

220 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import random
from typing import Optional, Tuple
from playwright.sync_api import Page, ElementHandle
from loguru import logger
from config import Config
class MIPAdAutomation:
"""MIP页面广告自动化操作"""
def __init__(self, page: Page):
self.page = page
def check_and_click_ad(self, url: str) -> Tuple[bool, bool]:
"""
检查并点击广告
Args:
url: MIP页面链接
Returns:
(是否点击成功, 是否获得回复)
"""
try:
# 访问链接
logger.info(f"访问链接: {url}")
self.page.goto(url, wait_until='domcontentloaded')
# 等待页面加载
time.sleep(3)
# 检查是否存在商业广告
has_ad, ad_element = self._detect_commercial_ad()
if not has_ad:
logger.info("未检测到商业广告,跳过该链接")
return False, False
# 点击广告
logger.info("检测到商业广告,准备点击")
if not self._click_advertisement(ad_element):
logger.warning("点击广告失败")
return False, False
# 等待并检查回复
has_reply = self._wait_for_reply()
return True, has_reply
except Exception as e:
logger.error(f"处理链接异常: {str(e)}")
return False, False
finally:
# 尝试关闭当前标签页,返回主窗口
self._close_current_tab()
def _detect_commercial_ad(self) -> Tuple[bool, Optional[ElementHandle]]:
"""
检测页面是否存在商业广告
Returns:
(是否存在商业广告, 广告元素)
"""
try:
# 等待评论区加载
time.sleep(2)
# 方法1: 查找包含"广告"标识的元素
# 根据实际页面结构调整选择器
ad_selectors = [
"//div[contains(@class, 'ad') or contains(@class, 'advertisement')]",
"//div[contains(text(), '广告')]",
"//*[contains(text(), '广告')]//ancestor::div[contains(@class, 'card')]",
"//a[contains(@class, 'ad-link')]",
]
for selector in ad_selectors:
try:
elements = self.page.locator(f"xpath={selector}").all()
if elements:
# 检查元素是否可见
for elem in elements:
if elem.is_visible():
# 进一步验证是否是商业广告非AI健康管家
elem_text = elem.inner_text().lower()
if '广告' in elem_text and 'ai健康' not in elem_text:
logger.info("检测到商业广告")
return True, elem
except Exception:
continue
logger.info("未检测到商业广告")
return False, None
except Exception as e:
logger.error(f"检测广告异常: {str(e)}")
return False, None
def _click_advertisement(self, ad_element: ElementHandle) -> bool:
"""
点击广告元素
Args:
ad_element: 广告元素
Returns:
是否点击成功
"""
try:
# 获取当前页面
context = self.page.context
# 滚动到广告元素可见
ad_element.scroll_into_view_if_needed()
time.sleep(1)
# 监听新页面打开
with context.expect_page() as new_page_info:
# 点击广告
ad_element.click()
logger.info("已点击广告")
# 等待新页面
new_page = new_page_info.value
new_page.wait_for_load_state('domcontentloaded')
# 切换到新页面
self.page = new_page
logger.info("已切换到广告页面")
return True
except Exception as e:
logger.error(f"点击广告异常: {str(e)}")
return False
def _wait_for_reply(self) -> bool:
"""
等待广告主回复
Returns:
是否收到回复
"""
try:
logger.info(f"等待广告主回复(最多{Config.REPLY_WAIT_TIMEOUT}秒)")
# 检查是否已经自动发送消息
time.sleep(2)
# 等待并检查回复
start_time = time.time()
timeout = Config.REPLY_WAIT_TIMEOUT
# 根据实际页面结构调整回复检测逻辑
# 这里使用轮询方式检查是否有新消息
initial_msg_count = self._count_messages()
while time.time() - start_time < timeout:
time.sleep(2)
current_msg_count = self._count_messages()
# 如果消息数量增加,说明收到了回复
if current_msg_count > initial_msg_count:
logger.info("收到广告主回复")
return True
logger.info("未收到广告主回复(超时)")
return False
except Exception as e:
logger.error(f"等待回复异常: {str(e)}")
return False
def _count_messages(self) -> int:
"""
统计当前页面的消息数量
Returns:
消息数量
"""
try:
# 根据实际页面结构调整选择器
# 这里是示例选择器,需要根据实际情况修改
message_selectors = [
"//div[contains(@class, 'message')]",
"//div[contains(@class, 'chat-message')]",
"//div[contains(@class, 'msg-item')]",
]
for selector in message_selectors:
try:
messages = self.page.locator(f"xpath={selector}").all()
if messages:
return len(messages)
except:
continue
return 0
except Exception as e:
logger.error(f"统计消息数量异常: {str(e)}")
return 0
def _close_current_tab(self):
"""关闭当前标签页并返回主窗口"""
try:
pages = self.page.context.pages
if len(pages) > 1:
self.page.close()
self.page = pages[0]
logger.info("已关闭广告页面")
except Exception as e:
logger.error(f"关闭标签页异常: {str(e)}")
def random_delay(self, min_seconds: int = 2, max_seconds: int = 5):
"""随机延迟,模拟人工操作"""
delay = random.uniform(min_seconds, max_seconds)
time.sleep(delay)