my-private-iptv-m3u/scripts/file_manager.py

186 lines
7.1 KiB
Python
Raw Normal View History

2025-06-27 23:30:14 +02:00
"""
2025-06-28 00:03:36 +02:00
File Manager - FIXED to work with correct paths from scripts folder
2025-06-27 23:30:14 +02:00
"""
import os
import re
import shutil
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
class FileManager:
2025-06-28 00:03:36 +02:00
"""Manage file operations with backup and rotation - FIXED paths."""
2025-06-27 23:30:14 +02:00
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(__name__)
2025-06-28 00:03:36 +02:00
# FIXED: Get root directory and create backups there
script_dir = Path(__file__).parent
root_dir = script_dir.parent
self.backup_dir = root_dir / "backups"
2025-06-27 23:30:14 +02:00
self.backup_dir.mkdir(exist_ok=True)
2025-06-28 00:03:36 +02:00
self.logger.info(f"FileManager initialized - root: {root_dir}")
2025-06-27 23:30:14 +02:00
def create_backup(self, file_path: str) -> Optional[Path]:
"""Create timestamped backup with rotation."""
if not self.config.settings.get('create_backup', True):
return None
2025-06-28 00:03:36 +02:00
# Use the full path from config
full_path = Path(file_path)
if not full_path.exists():
self.logger.info(f"No file to backup: {file_path}")
2025-06-27 23:30:14 +02:00
return None
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
2025-06-28 00:03:36 +02:00
backup_path = self.backup_dir / f"{full_path.stem}_{timestamp}{full_path.suffix}"
2025-06-27 23:30:14 +02:00
try:
2025-06-28 00:03:36 +02:00
shutil.copy2(full_path, backup_path)
2025-06-27 23:30:14 +02:00
self.logger.info(f"Created backup: {backup_path}")
2025-06-28 00:03:36 +02:00
self._cleanup_old_backups(full_path.stem)
2025-06-27 23:30:14 +02:00
return backup_path
except Exception as e:
self.logger.error(f"Failed to create backup: {e}")
return None
def _cleanup_old_backups(self, base_name: str):
"""Remove old backups, keeping only the most recent ones."""
max_backups = self.config.settings.get('max_backups', 5)
backup_files = sorted(
[f for f in self.backup_dir.glob(f"{base_name}_*") if f.is_file()],
key=lambda x: x.stat().st_mtime,
reverse=True
)
for old_backup in backup_files[max_backups:]:
try:
old_backup.unlink()
self.logger.debug(f"Removed old backup: {old_backup}")
except Exception as e:
self.logger.warning(f"Could not remove old backup {old_backup}: {e}")
def load_all_channels(self) -> List[Dict]:
"""Load all channels from the channels file."""
2025-06-28 00:03:36 +02:00
channels_file = self.config.channels_file
self.logger.info(f"Attempting to load channels from: {channels_file}")
if not os.path.exists(channels_file):
self.logger.info(f"No channels file found at: {channels_file}")
2025-06-27 23:30:14 +02:00
return []
try:
2025-06-28 00:03:36 +02:00
with open(channels_file, 'r', encoding='utf-8') as f:
2025-06-27 23:30:14 +02:00
content = f.read()
2025-06-28 00:03:36 +02:00
if not content.strip():
self.logger.info(f"Channels file is empty: {channels_file}")
return []
2025-06-27 23:30:14 +02:00
channel_blocks = re.split(r'\n\s*\n+', content.strip())
channels = []
for block in channel_blocks:
if block.strip():
channel = self._parse_channel_block(block)
if channel:
channels.append(channel)
self.logger.info(f"Loaded {len(channels)} channels from file")
return channels
except Exception as e:
self.logger.error(f"Error loading channels: {e}")
return []
def _parse_channel_block(self, block: str) -> Optional[Dict]:
"""Parse a channel block from channels.txt."""
channel_data = {}
lines = block.strip().split('\n')
for line in lines:
if '=' in line:
key, value = line.split('=', 1)
channel_data[key.strip()] = value.strip()
return channel_data if channel_data else None
def save_channels(self, channels: List[Dict]) -> bool:
"""Save channels to the channels.txt file."""
2025-06-28 00:03:36 +02:00
channels_file = self.config.channels_file
self.logger.info(f"Attempting to save {len(channels)} channels to: {channels_file}")
2025-06-27 23:30:14 +02:00
try:
# Create backup first
2025-06-28 00:03:36 +02:00
self.create_backup(channels_file)
2025-06-27 23:30:14 +02:00
2025-06-28 00:03:36 +02:00
# Write all channels
with open(channels_file, 'w', encoding='utf-8') as f:
2025-06-27 23:30:14 +02:00
for i, channel in enumerate(channels):
if i > 0:
f.write("\n\n")
f.write(self._convert_to_channels_txt_block(channel))
2025-06-28 00:03:36 +02:00
self.logger.info(f"Successfully saved {len(channels)} channels to {channels_file}")
# Verify the file was written
if os.path.exists(channels_file):
size = os.path.getsize(channels_file)
self.logger.info(f"Verified: {channels_file} now has {size} bytes")
2025-06-27 23:30:14 +02:00
return True
except Exception as e:
self.logger.error(f"Error saving channels: {e}")
return False
2025-06-28 00:03:36 +02:00
def append_channels(self, new_channels: List[Dict]) -> bool:
"""Append new channels to existing channels file."""
if not new_channels:
self.logger.info("No new channels to append")
return True
channels_file = self.config.channels_file
self.logger.info(f"Appending {len(new_channels)} channels to: {channels_file}")
try:
# Check if file exists and has content
file_exists = os.path.exists(channels_file) and os.path.getsize(channels_file) > 0
with open(channels_file, 'a', encoding='utf-8') as f:
for i, channel in enumerate(new_channels):
# Add separator if file has content or this isn't the first new channel
if i > 0 or file_exists:
f.write("\n\n")
f.write(self._convert_to_channels_txt_block(channel))
self.logger.info(f"Successfully appended {len(new_channels)} channels")
# Verify the file was updated
if os.path.exists(channels_file):
size = os.path.getsize(channels_file)
self.logger.info(f"Verified: {channels_file} now has {size} bytes")
return True
except Exception as e:
self.logger.error(f"Error appending channels: {e}")
return False
2025-06-27 23:30:14 +02:00
def _convert_to_channels_txt_block(self, channel_data: Dict) -> str:
"""Convert to channels.txt format."""
block = []
block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}")
block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}")
block.append(f"Logo = {channel_data.get('Logo', '')}")
block.append(f"EPG id = {channel_data.get('EPG id', '')}")
block.append(f"Stream URL = {channel_data.get('Stream URL', '')}")
return "\n".join(block)