my-private-iptv-m3u/scripts/file_manager.py
stoney420 335bbbcbeb
Some checks failed
Generate M3U Playlist / build (push) Has been cancelled
Update scripts/file_manager.py
2025-06-28 00:03:36 +02:00

186 lines
No EOL
7.1 KiB
Python

"""
File Manager - FIXED to work with correct paths from scripts folder
"""
import os
import re
import shutil
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
class FileManager:
"""Manage file operations with backup and rotation - FIXED paths."""
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(__name__)
# FIXED: Get root directory and create backups there
script_dir = Path(__file__).parent
root_dir = script_dir.parent
self.backup_dir = root_dir / "backups"
self.backup_dir.mkdir(exist_ok=True)
self.logger.info(f"FileManager initialized - root: {root_dir}")
def create_backup(self, file_path: str) -> Optional[Path]:
"""Create timestamped backup with rotation."""
if not self.config.settings.get('create_backup', True):
return None
# Use the full path from config
full_path = Path(file_path)
if not full_path.exists():
self.logger.info(f"No file to backup: {file_path}")
return None
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = self.backup_dir / f"{full_path.stem}_{timestamp}{full_path.suffix}"
try:
shutil.copy2(full_path, backup_path)
self.logger.info(f"Created backup: {backup_path}")
self._cleanup_old_backups(full_path.stem)
return backup_path
except Exception as e:
self.logger.error(f"Failed to create backup: {e}")
return None
def _cleanup_old_backups(self, base_name: str):
"""Remove old backups, keeping only the most recent ones."""
max_backups = self.config.settings.get('max_backups', 5)
backup_files = sorted(
[f for f in self.backup_dir.glob(f"{base_name}_*") if f.is_file()],
key=lambda x: x.stat().st_mtime,
reverse=True
)
for old_backup in backup_files[max_backups:]:
try:
old_backup.unlink()
self.logger.debug(f"Removed old backup: {old_backup}")
except Exception as e:
self.logger.warning(f"Could not remove old backup {old_backup}: {e}")
def load_all_channels(self) -> List[Dict]:
"""Load all channels from the channels file."""
channels_file = self.config.channels_file
self.logger.info(f"Attempting to load channels from: {channels_file}")
if not os.path.exists(channels_file):
self.logger.info(f"No channels file found at: {channels_file}")
return []
try:
with open(channels_file, 'r', encoding='utf-8') as f:
content = f.read()
if not content.strip():
self.logger.info(f"Channels file is empty: {channels_file}")
return []
channel_blocks = re.split(r'\n\s*\n+', content.strip())
channels = []
for block in channel_blocks:
if block.strip():
channel = self._parse_channel_block(block)
if channel:
channels.append(channel)
self.logger.info(f"Loaded {len(channels)} channels from file")
return channels
except Exception as e:
self.logger.error(f"Error loading channels: {e}")
return []
def _parse_channel_block(self, block: str) -> Optional[Dict]:
"""Parse a channel block from channels.txt."""
channel_data = {}
lines = block.strip().split('\n')
for line in lines:
if '=' in line:
key, value = line.split('=', 1)
channel_data[key.strip()] = value.strip()
return channel_data if channel_data else None
def save_channels(self, channels: List[Dict]) -> bool:
"""Save channels to the channels.txt file."""
channels_file = self.config.channels_file
self.logger.info(f"Attempting to save {len(channels)} channels to: {channels_file}")
try:
# Create backup first
self.create_backup(channels_file)
# Write all channels
with open(channels_file, 'w', encoding='utf-8') as f:
for i, channel in enumerate(channels):
if i > 0:
f.write("\n\n")
f.write(self._convert_to_channels_txt_block(channel))
self.logger.info(f"Successfully saved {len(channels)} channels to {channels_file}")
# Verify the file was written
if os.path.exists(channels_file):
size = os.path.getsize(channels_file)
self.logger.info(f"Verified: {channels_file} now has {size} bytes")
return True
except Exception as e:
self.logger.error(f"Error saving channels: {e}")
return False
def append_channels(self, new_channels: List[Dict]) -> bool:
"""Append new channels to existing channels file."""
if not new_channels:
self.logger.info("No new channels to append")
return True
channels_file = self.config.channels_file
self.logger.info(f"Appending {len(new_channels)} channels to: {channels_file}")
try:
# Check if file exists and has content
file_exists = os.path.exists(channels_file) and os.path.getsize(channels_file) > 0
with open(channels_file, 'a', encoding='utf-8') as f:
for i, channel in enumerate(new_channels):
# Add separator if file has content or this isn't the first new channel
if i > 0 or file_exists:
f.write("\n\n")
f.write(self._convert_to_channels_txt_block(channel))
self.logger.info(f"Successfully appended {len(new_channels)} channels")
# Verify the file was updated
if os.path.exists(channels_file):
size = os.path.getsize(channels_file)
self.logger.info(f"Verified: {channels_file} now has {size} bytes")
return True
except Exception as e:
self.logger.error(f"Error appending channels: {e}")
return False
def _convert_to_channels_txt_block(self, channel_data: Dict) -> str:
"""Convert to channels.txt format."""
block = []
block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}")
block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}")
block.append(f"Logo = {channel_data.get('Logo', '')}")
block.append(f"EPG id = {channel_data.get('EPG id', '')}")
block.append(f"Stream URL = {channel_data.get('Stream URL', '')}")
return "\n".join(block)