Files
matrix-rss-bot/bot/image_handler.py

273 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Обработка изображений: скачивание, сжатие, отправка"""
import os
import hashlib
import mimetypes
from pathlib import Path
from typing import Optional, Tuple
import logging
import aiohttp
from PIL import Image
from nio import AsyncClient, UploadResponse, RoomSendResponse
logger = logging.getLogger(__name__)
class ImageHandler:
"""Обработчик изображений для Matrix"""
def __init__(
self,
images_dir: str,
compress: bool = True,
max_size_mb: float = 0.5,
max_width: int = 1200,
max_height: int = 1200
):
self.images_dir = Path(images_dir)
self.compress = compress
self.max_size_mb = max_size_mb
self.max_width = max_width
self.max_height = max_height
self.images_dir.mkdir(parents=True, exist_ok=True)
def extract_from_entry(self, entry) -> Optional[str]:
"""Извлекает URL изображения из записи RSS"""
# 1. Проверяем стандартные поля RSS
if hasattr(entry, 'media_content') and entry.media_content:
for media in entry.media_content:
if 'url' in media and 'image' in media.get('type', ''):
return media['url']
if hasattr(entry, 'enclosures') and entry.enclosures:
for enc in entry.enclosures:
if 'image' in enc.get('type', ''):
return enc.get('href', '')
# 2. Проверяем специфичные теги (РБК, etc)
for field in ['rbc_news_image', 'image']:
if hasattr(entry, field):
img = getattr(entry, field)
if hasattr(img, 'url'):
return img.url
# 3. Проверяем thumbnail
if hasattr(entry, 'rbc_news_thumbnail'):
if hasattr(entry.rbc_news_thumbnail, 'url'):
return entry.rbc_news_thumbnail.url
# 4. Ищем в description
summary = entry.get('summary', '') or entry.get('description', '')
import re
img_patterns = [
r'<img[^>]+src=["\']([^"\']+)["\']',
r'src=["\'](https?://[^"\']+\.(jpg|jpeg|png|gif|webp))["\']',
]
for pattern in img_patterns:
match = re.search(pattern, summary, re.IGNORECASE)
if match:
return match.group(1)
return None
async def download(self, image_url: str, news_link: str) -> Optional[str]:
"""Скачивает изображение и сохраняет во временную папку"""
try:
# Создаем имя файла на основе URL новости
url_hash = hashlib.md5(news_link.encode()).hexdigest()[:12]
async with aiohttp.ClientSession() as session:
async with session.get(image_url, timeout=10) as response:
if response.status != 200:
logger.warning(f"Не удалось скачать {image_url}: статус {response.status}")
return None
content_type = response.headers.get('Content-Type', '')
file_ext = self._get_extension(content_type)
file_name = f"{url_hash}.{file_ext}"
file_path = self.images_dir / file_name
with open(file_path, 'wb') as f:
f.write(await response.read())
return str(file_path)
except Exception as e:
logger.warning(f"Ошибка скачивания изображения {image_url}: {e}")
return None
def _get_extension(self, content_type: str) -> str:
"""Определяет расширение файла по MIME типу"""
ext_map = {
'png': 'png',
'gif': 'gif',
'webp': 'webp',
'jpeg': 'jpg',
'jpg': 'jpg'
}
for key, ext in ext_map.items():
if key in content_type:
return ext
return 'jpg'
async def compress_image(self, image_path: str) -> str:
"""Сжимает изображение и возвращает путь к сжатой версии"""
if not self.compress:
return image_path
try:
compressed_path = image_path.replace('.', '_compressed.')
with Image.open(image_path) as img:
original_size = os.path.getsize(image_path) / (1024 * 1024)
original_format = img.format
has_alpha = img.mode in ('RGBA', 'LA', 'P') and 'transparency' in img.info
# Изменяем размер если нужно
if img.width > self.max_width or img.height > self.max_height:
ratio = min(self.max_width / img.width, self.max_height / img.height)
new_width = int(img.width * ratio)
new_height = int(img.height * ratio)
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
logger.debug(f"Изменен размер: {img.width}x{img.height}")
# Выбираем формат и качество
if original_format == 'PNG' and has_alpha:
img.save(compressed_path, 'PNG', optimize=True)
elif original_format in ('JPEG', 'JPG'):
img.save(compressed_path, 'JPEG', quality=60, optimize=True)
else:
# Конвертируем в JPEG
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode == 'RGBA' else None)
img = background
img.save(compressed_path, 'JPEG', quality=60, optimize=True)
compressed_size = os.path.getsize(compressed_path) / (1024 * 1024)
logger.debug(f"Сжатие: {original_size:.2f}MB → {compressed_size:.2f}MB")
return compressed_path
except Exception as e:
logger.warning(f"Не удалось сжать: {e}, отправляю оригинал")
return image_path
async def upload_and_send(
self,
client: AsyncClient,
room_id: str,
image_path: str
) -> bool:
"""Загружает изображение на сервер Matrix и отправляет"""
try:
if not os.path.exists(image_path):
logger.error(f"Файл не найден: {image_path}")
return False
# Сжимаем если нужно
compressed_path = await self.compress_image(image_path)
file_name = os.path.basename(compressed_path)
file_size = os.path.getsize(compressed_path)
mime_type = mimetypes.guess_type(compressed_path)[0] or 'image/jpeg'
# Получаем размеры
width, height = self._get_image_size(compressed_path)
# Загружаем на сервер
with open(compressed_path, 'rb') as f:
upload_response = await client.upload(
f,
content_type=mime_type,
filename=file_name
)
if isinstance(upload_response, tuple):
upload_response = upload_response[0]
if not hasattr(upload_response, 'content_uri'):
logger.error("Не удалось загрузить изображение")
return False
content_uri = upload_response.content_uri
# Формируем сообщение
content = {
"msgtype": "m.image",
"body": file_name,
"url": content_uri,
"info": {
"mimetype": mime_type,
"size": file_size
}
}
if width and height:
content["info"]["w"] = width
content["info"]["h"] = height
# Добавляем thumbnail
content["info"]["thumbnail_url"] = content_uri
content["info"]["thumbnail_info"] = {
"mimetype": mime_type,
"size": file_size,
"w": width or 800,
"h": height or 600
}
send_response = await client.room_send(
room_id,
"m.room.message",
content
)
if isinstance(send_response, tuple):
send_response = send_response[0]
# Удаляем сжатый файл если он временный
if compressed_path != image_path and os.path.exists(compressed_path):
os.remove(compressed_path)
if hasattr(send_response, 'event_id'):
logger.debug(f"Изображение отправлено: {file_name}")
return True
else:
logger.error(f"Ошибка отправки: {send_response}")
return False
except Exception as e:
logger.error(f"Ошибка при отправке изображения: {e}")
return False
def _get_image_size(self, image_path: str) -> Tuple[Optional[int], Optional[int]]:
"""Получает размеры изображения"""
try:
with Image.open(image_path) as img:
return img.size
except Exception:
return None, None
async def clean(self) -> None:
"""Очищает папку с изображениями"""
try:
if not self.images_dir.exists():
return
files = list(self.images_dir.glob('*'))
if files:
for file in files:
try:
if file.is_file():
file.unlink()
except Exception:
pass
logger.info(f"Очищено {len(files)} изображений")
except Exception as e:
logger.error(f"Ошибка очистки папки с изображениями: {e}")