my-private-iptv-m3u/scripts/file_manager.py
stoney420 65a31644be
Some checks failed
📺 Generate M3U Playlist / build (push) Has been cancelled
Add scripts/file_manager.py
2025-06-27 23:30:14 +02:00

125 lines
No EOL
4.6 KiB
Python

"""
File Manager - Handles file operations, backups, and channel loading
"""
import os
import re
import shutil
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
class FileManager:
"""Manage file operations with backup and rotation."""
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(__name__)
self.backup_dir = Path("backups")
self.backup_dir.mkdir(exist_ok=True)
def create_backup(self, file_path: str) -> Optional[Path]:
"""Create timestamped backup with rotation."""
if not self.config.settings.get('create_backup', True):
return None
file_path = Path(file_path)
if not file_path.exists():
return None
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = self.backup_dir / f"{file_path.stem}_{timestamp}{file_path.suffix}"
try:
shutil.copy2(file_path, backup_path)
self.logger.info(f"Created backup: {backup_path}")
self._cleanup_old_backups(file_path.stem)
return backup_path
except Exception as e:
self.logger.error(f"Failed to create backup: {e}")
return None
def _cleanup_old_backups(self, base_name: str):
"""Remove old backups, keeping only the most recent ones."""
max_backups = self.config.settings.get('max_backups', 5)
backup_files = sorted(
[f for f in self.backup_dir.glob(f"{base_name}_*") if f.is_file()],
key=lambda x: x.stat().st_mtime,
reverse=True
)
for old_backup in backup_files[max_backups:]:
try:
old_backup.unlink()
self.logger.debug(f"Removed old backup: {old_backup}")
except Exception as e:
self.logger.warning(f"Could not remove old backup {old_backup}: {e}")
def load_all_channels(self) -> List[Dict]:
"""Load all channels from the channels file."""
if not os.path.exists(self.config.channels_file):
self.logger.info("No channels.txt file found")
return []
try:
with open(self.config.channels_file, 'r', encoding='utf-8') as f:
content = f.read()
channel_blocks = re.split(r'\n\s*\n+', content.strip())
channels = []
for block in channel_blocks:
if block.strip():
channel = self._parse_channel_block(block)
if channel:
channels.append(channel)
self.logger.info(f"Loaded {len(channels)} channels from file")
return channels
except Exception as e:
self.logger.error(f"Error loading channels: {e}")
return []
def _parse_channel_block(self, block: str) -> Optional[Dict]:
"""Parse a channel block from channels.txt."""
channel_data = {}
lines = block.strip().split('\n')
for line in lines:
if '=' in line:
key, value = line.split('=', 1)
channel_data[key.strip()] = value.strip()
return channel_data if channel_data else None
def save_channels(self, channels: List[Dict]) -> bool:
"""Save channels to the channels.txt file."""
try:
# Create backup first
self.create_backup(self.config.channels_file)
with open(self.config.channels_file, 'w', encoding='utf-8') as f:
for i, channel in enumerate(channels):
if i > 0:
f.write("\n\n")
f.write(self._convert_to_channels_txt_block(channel))
self.logger.info(f"Saved {len(channels)} channels to file")
return True
except Exception as e:
self.logger.error(f"Error saving channels: {e}")
return False
def _convert_to_channels_txt_block(self, channel_data: Dict) -> str:
"""Convert to channels.txt format."""
block = []
block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}")
block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}")
block.append(f"Logo = {channel_data.get('Logo', '')}")
block.append(f"EPG id = {channel_data.get('EPG id', '')}")
block.append(f"Stream URL = {channel_data.get('Stream URL', '')}")
return "\n".join(block)