220 lines
7.3 KiB
Python
220 lines
7.3 KiB
Python
|
|
import time
|
|||
|
|
import random
|
|||
|
|
from typing import Optional, Tuple
|
|||
|
|
from playwright.sync_api import Page, ElementHandle
|
|||
|
|
from loguru import logger
|
|||
|
|
from config import Config
|
|||
|
|
|
|||
|
|
|
|||
|
|
class MIPAdAutomation:
|
|||
|
|
"""MIP页面广告自动化操作"""
|
|||
|
|
|
|||
|
|
def __init__(self, page: Page):
|
|||
|
|
self.page = page
|
|||
|
|
|
|||
|
|
def check_and_click_ad(self, url: str) -> Tuple[bool, bool]:
|
|||
|
|
"""
|
|||
|
|
检查并点击广告
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
url: MIP页面链接
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
(是否点击成功, 是否获得回复)
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 访问链接
|
|||
|
|
logger.info(f"访问链接: {url}")
|
|||
|
|
self.page.goto(url, wait_until='domcontentloaded')
|
|||
|
|
|
|||
|
|
# 等待页面加载
|
|||
|
|
time.sleep(3)
|
|||
|
|
|
|||
|
|
# 检查是否存在商业广告
|
|||
|
|
has_ad, ad_element = self._detect_commercial_ad()
|
|||
|
|
|
|||
|
|
if not has_ad:
|
|||
|
|
logger.info("未检测到商业广告,跳过该链接")
|
|||
|
|
return False, False
|
|||
|
|
|
|||
|
|
# 点击广告
|
|||
|
|
logger.info("检测到商业广告,准备点击")
|
|||
|
|
if not self._click_advertisement(ad_element):
|
|||
|
|
logger.warning("点击广告失败")
|
|||
|
|
return False, False
|
|||
|
|
|
|||
|
|
# 等待并检查回复
|
|||
|
|
has_reply = self._wait_for_reply()
|
|||
|
|
|
|||
|
|
return True, has_reply
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"处理链接异常: {str(e)}")
|
|||
|
|
return False, False
|
|||
|
|
finally:
|
|||
|
|
# 尝试关闭当前标签页,返回主窗口
|
|||
|
|
self._close_current_tab()
|
|||
|
|
|
|||
|
|
def _detect_commercial_ad(self) -> Tuple[bool, Optional[ElementHandle]]:
|
|||
|
|
"""
|
|||
|
|
检测页面是否存在商业广告
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
(是否存在商业广告, 广告元素)
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 等待评论区加载
|
|||
|
|
time.sleep(2)
|
|||
|
|
|
|||
|
|
# 方法1: 查找包含"广告"标识的元素
|
|||
|
|
# 根据实际页面结构调整选择器
|
|||
|
|
ad_selectors = [
|
|||
|
|
"//div[contains(@class, 'ad') or contains(@class, 'advertisement')]",
|
|||
|
|
"//div[contains(text(), '广告')]",
|
|||
|
|
"//*[contains(text(), '广告')]//ancestor::div[contains(@class, 'card')]",
|
|||
|
|
"//a[contains(@class, 'ad-link')]",
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for selector in ad_selectors:
|
|||
|
|
try:
|
|||
|
|
elements = self.page.locator(f"xpath={selector}").all()
|
|||
|
|
if elements:
|
|||
|
|
# 检查元素是否可见
|
|||
|
|
for elem in elements:
|
|||
|
|
if elem.is_visible():
|
|||
|
|
# 进一步验证是否是商业广告(非AI健康管家)
|
|||
|
|
elem_text = elem.inner_text().lower()
|
|||
|
|
if '广告' in elem_text and 'ai健康' not in elem_text:
|
|||
|
|
logger.info("检测到商业广告")
|
|||
|
|
return True, elem
|
|||
|
|
except Exception:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
logger.info("未检测到商业广告")
|
|||
|
|
return False, None
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"检测广告异常: {str(e)}")
|
|||
|
|
return False, None
|
|||
|
|
|
|||
|
|
def _click_advertisement(self, ad_element: ElementHandle) -> bool:
|
|||
|
|
"""
|
|||
|
|
点击广告元素
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
ad_element: 广告元素
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
是否点击成功
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 获取当前页面
|
|||
|
|
context = self.page.context
|
|||
|
|
|
|||
|
|
# 滚动到广告元素可见
|
|||
|
|
ad_element.scroll_into_view_if_needed()
|
|||
|
|
time.sleep(1)
|
|||
|
|
|
|||
|
|
# 监听新页面打开
|
|||
|
|
with context.expect_page() as new_page_info:
|
|||
|
|
# 点击广告
|
|||
|
|
ad_element.click()
|
|||
|
|
logger.info("已点击广告")
|
|||
|
|
|
|||
|
|
# 等待新页面
|
|||
|
|
new_page = new_page_info.value
|
|||
|
|
new_page.wait_for_load_state('domcontentloaded')
|
|||
|
|
|
|||
|
|
# 切换到新页面
|
|||
|
|
self.page = new_page
|
|||
|
|
logger.info("已切换到广告页面")
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"点击广告异常: {str(e)}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def _wait_for_reply(self) -> bool:
|
|||
|
|
"""
|
|||
|
|
等待广告主回复
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
是否收到回复
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
logger.info(f"等待广告主回复(最多{Config.REPLY_WAIT_TIMEOUT}秒)")
|
|||
|
|
|
|||
|
|
# 检查是否已经自动发送消息
|
|||
|
|
time.sleep(2)
|
|||
|
|
|
|||
|
|
# 等待并检查回复
|
|||
|
|
start_time = time.time()
|
|||
|
|
timeout = Config.REPLY_WAIT_TIMEOUT
|
|||
|
|
|
|||
|
|
# 根据实际页面结构调整回复检测逻辑
|
|||
|
|
# 这里使用轮询方式检查是否有新消息
|
|||
|
|
initial_msg_count = self._count_messages()
|
|||
|
|
|
|||
|
|
while time.time() - start_time < timeout:
|
|||
|
|
time.sleep(2)
|
|||
|
|
current_msg_count = self._count_messages()
|
|||
|
|
|
|||
|
|
# 如果消息数量增加,说明收到了回复
|
|||
|
|
if current_msg_count > initial_msg_count:
|
|||
|
|
logger.info("收到广告主回复")
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
logger.info("未收到广告主回复(超时)")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"等待回复异常: {str(e)}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def _count_messages(self) -> int:
|
|||
|
|
"""
|
|||
|
|
统计当前页面的消息数量
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
消息数量
|
|||
|
|
"""
|
|||
|
|
try:
|
|||
|
|
# 根据实际页面结构调整选择器
|
|||
|
|
# 这里是示例选择器,需要根据实际情况修改
|
|||
|
|
message_selectors = [
|
|||
|
|
"//div[contains(@class, 'message')]",
|
|||
|
|
"//div[contains(@class, 'chat-message')]",
|
|||
|
|
"//div[contains(@class, 'msg-item')]",
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for selector in message_selectors:
|
|||
|
|
try:
|
|||
|
|
messages = self.page.locator(f"xpath={selector}").all()
|
|||
|
|
if messages:
|
|||
|
|
return len(messages)
|
|||
|
|
except:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"统计消息数量异常: {str(e)}")
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
def _close_current_tab(self):
|
|||
|
|
"""关闭当前标签页并返回主窗口"""
|
|||
|
|
try:
|
|||
|
|
pages = self.page.context.pages
|
|||
|
|
if len(pages) > 1:
|
|||
|
|
self.page.close()
|
|||
|
|
self.page = pages[0]
|
|||
|
|
logger.info("已关闭广告页面")
|
|||
|
|
except Exception as e:
|
|||
|
|
logger.error(f"关闭标签页异常: {str(e)}")
|
|||
|
|
|
|||
|
|
def random_delay(self, min_seconds: int = 2, max_seconds: int = 5):
|
|||
|
|
"""随机延迟,模拟人工操作"""
|
|||
|
|
delay = random.uniform(min_seconds, max_seconds)
|
|||
|
|
time.sleep(delay)
|