31 lines
1.1 KiB
Python
31 lines
1.1 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""信号冷却过滤模块
|
|||
|
|
- 避免同一标的在冷却期内重复产生同方向信号
|
|||
|
|
- 过滤后附加 generated_at_utc 字段(UTC ISO)
|
|||
|
|
"""
|
|||
|
|
from datetime import datetime, timezone, timedelta
|
|||
|
|
from typing import List, Dict, Any
|
|||
|
|
|
|||
|
|
class SignalCooldownFilter:
|
|||
|
|
def __init__(self, cooldown_minutes: int = 30):
|
|||
|
|
self.cooldown = timedelta(minutes=cooldown_minutes)
|
|||
|
|
# key: (symbol, direction) -> last datetime
|
|||
|
|
self.last_time: Dict[tuple, datetime] = {}
|
|||
|
|
|
|||
|
|
def filter(self, signals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|||
|
|
now = datetime.now(timezone.utc)
|
|||
|
|
accepted = []
|
|||
|
|
for s in signals:
|
|||
|
|
symbol = s.get('symbol')
|
|||
|
|
direction = s.get('type') or s.get('direction')
|
|||
|
|
key = (symbol, direction)
|
|||
|
|
lt = self.last_time.get(key)
|
|||
|
|
if lt is not None and now - lt < self.cooldown:
|
|||
|
|
continue
|
|||
|
|
# Accept
|
|||
|
|
self.last_time[key] = now
|
|||
|
|
s['direction'] = direction # normalize
|
|||
|
|
s['generated_at_utc'] = now.strftime('%Y-%m-%dT%H:%M:%SZ')
|
|||
|
|
accepted.append(s)
|
|||
|
|
return accepted
|