Files
matrix-rss-bot/bot/rss_bot.py

246 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Основная логика RSS бота"""
import asyncio
import feedparser
import re
from datetime import datetime
from email.utils import parsedate_to_datetime
from typing import List, Dict, Any, Optional
import logging
from nio import AsyncClient
from .config import Config
from .history_manager import HistoryManager
from .image_handler import ImageHandler
logger = logging.getLogger(__name__)
class RSSNewsBot:
"""Основной класс бота"""
def __init__(self, config: Config):
self.config = config
self.client = AsyncClient(config.homeserver, config.bot_user_id)
self.client.access_token = config.access_token
self.history = HistoryManager(
config.history_file,
config.history_days,
config.max_history_size
)
self.image_handler = ImageHandler(
config.images_dir,
config.compress_images,
config.max_image_size_mb,
config.max_image_width,
config.max_image_height
)
self.cycle_counter = 0
async def fetch_rss(self, url: str, source_name: str, room_id: str) -> List[Dict[str, Any]]:
"""Загружает и парсит RSS-ленту"""
try:
feed = feedparser.parse(url)
if feed.bozo:
logger.warning(f"Ошибка парсинга {source_name}: {feed.bozo_exception}")
return []
entries_with_metadata = []
for entry in feed.entries:
image_url = self.image_handler.extract_from_entry(entry)
entry_with_meta = {
'title': entry.get('title', 'Без заголовка'),
'link': entry.get('link', ''),
'summary': entry.get('summary', ''),
'published': entry.get('published', ''),
'source': source_name,
'room_id': room_id,
'image_url': image_url
}
entries_with_metadata.append(entry_with_meta)
return entries_with_metadata
except Exception as e:
logger.error(f"Ошибка при загрузке {source_name}: {e}")
return []
def format_news_message(self, entry: Dict[str, Any]) -> tuple[str, str]:
"""Форматирует новость с красивым HTML"""
title = entry.get('title', 'Без заголовка')
link = entry.get('link', '')
summary = entry.get('summary', '')
published = entry.get('published', '')
# Очищаем HTML теги
summary = re.sub(r'<[^>]+>', '', summary)
if len(summary) > 300:
summary = summary[:300] + ""
# Форматируем дату
try:
pub_date = parsedate_to_datetime(published)
formatted_date = pub_date.strftime("%d %B %Y, %H:%M")
except Exception:
formatted_date = published
# HTML версия
html_message = f"""<b>📰 {title}</b><br/>
<br/>
{summary}<br/>
<br/>
🕒 <i>{formatted_date}</i><br/>
🔗 <a href="{link}">Читать полностью</a><br/>"""
# Plain text версия
plain_message = f"📰 {title}\n\n{summary}\n\n🕒 {formatted_date}\n🔗 {link}"
return plain_message, html_message
async def send_news(self, room_id: str, entry: Dict[str, Any]) -> bool:
"""Отправляет новость с изображением (если есть)"""
title = entry.get('title', 'Без заголовка')
image_url = entry.get('image_url')
link = entry.get('link', '')
# Отправляем изображение если есть
if image_url:
logger.debug(f"Найдено изображение: {image_url[:80]}...")
image_path = await self.image_handler.download(image_url, link)
if image_path:
success = await self.image_handler.upload_and_send(
self.client, room_id, image_path
)
if success:
logger.debug("Изображение отправлено")
else:
logger.warning("Не удалось отправить изображение")
# Отправляем текст новости
plain_message, html_message = self.format_news_message(entry)
retries = 3
while retries > 0:
try:
response = await self.client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": plain_message,
"format": "org.matrix.custom.html",
"formatted_body": html_message
}
)
if isinstance(response, tuple):
response = response[0]
if hasattr(response, 'event_id'):
logger.debug(f"Текст отправлен: {title[:50]}")
return True
except Exception as e:
error_msg = str(e).lower()
if "429" in error_msg or "ratelimit" in error_msg:
await asyncio.sleep(15)
retries -= 1
else:
logger.error(f"Ошибка отправки: {e}")
return False
return False
async def check_and_send(self) -> None:
"""Основная логика: проверяем все ленты и отправляем новое"""
self.cycle_counter += 1
logger.info(f"Цикл #{self.cycle_counter}")
news_by_room: Dict[str, List[Dict[str, Any]]] = {}
for source in self.config.sources:
logger.debug(f"Проверяю: {source['name']}")
entries = await self.fetch_rss(
source["url"],
source["name"],
source["room_id"]
)
new_entries = []
for entry in entries:
link = entry.get('link', '')
if link and not self.history.is_already_sent(link):
room_id = entry.get('room_id')
if room_id:
try:
published = entry.get('published', '')
pub_date = parsedate_to_datetime(published) if published else datetime.now()
entry['timestamp'] = pub_date
except Exception:
entry['timestamp'] = datetime.now()
new_entries.append(entry)
if new_entries:
logger.info(f"Найдено {len(new_entries)} новых в {source['name']}")
for entry in new_entries:
room_id = entry.get('room_id')
if room_id not in news_by_room:
news_by_room[room_id] = []
news_by_room[room_id].append(entry)
else:
logger.debug(f"Новых нет в {source['name']}")
# Сортируем и отправляем
if news_by_room:
for room_id, news_list in news_by_room.items():
news_list.sort(key=lambda x: x['timestamp'])
logger.info(f"Отправка {len(news_list)} новостей в комнату")
for i, entry in enumerate(news_list, 1):
title = entry.get('title', '')[:50]
logger.info(f"[{i}/{len(news_list)}]: {title}")
success = await self.send_news(room_id, entry)
if success:
self.history.add(entry.get('link', ''), title)
if i < len(news_list):
await asyncio.sleep(self.config.delay_between_posts)
# Сохраняем историю
self.history.save()
# Периодическая очистка
if self.cycle_counter % self.config.cleanup_images_every == 0:
await self.image_handler.clean()
async def run(self) -> None:
"""Запускает бота"""
logger.info("Запускаем RSS-бота...")
try:
await self.client.sync(timeout=3000)
logger.info("Соединение с Matrix установлено")
except Exception as e:
logger.warning(f"Ошибка при подключении: {e}")
logger.info("Бот запущен!")
while True:
start_time = asyncio.get_event_loop().time()
try:
await self.check_and_send()
except Exception as e:
logger.exception(f"Ошибка в основном цикле: {e}")
elapsed = asyncio.get_event_loop().time() - start_time
wait_time = max(0, self.config.check_interval - elapsed)
if wait_time > 0:
await asyncio.sleep(wait_time)