diff --git a/scripts/health_checker.py b/scripts/health_checker.py index 8e9713b..ed5a1b8 100644 --- a/scripts/health_checker.py +++ b/scripts/health_checker.py @@ -1,597 +1,248 @@ #!/usr/bin/env python3 """ -Repository Health Monitor - Keeps the repository clean and organized +Health Checker - Simple URL health checking for IPTV channels """ -import os -import shutil import logging -import json -from datetime import datetime, timedelta -from pathlib import Path -from typing import Dict, List, Tuple -import subprocess -import gzip +import requests +import concurrent.futures +from typing import Dict, List, Optional +import time -class RepoHealthMonitor: - """Monitor and maintain repository cleanliness and organization.""" +class HealthChecker: + """Simple health checker for IPTV channel URLs.""" - def __init__(self, config=None): + def __init__(self, config): self.config = config self.logger = logging.getLogger(__name__) - self.root_path = Path.cwd() + self.timeout = config.settings.get('health_check_timeout', 5) + self.max_workers = config.settings.get('max_workers', 4) - # Define cleanup rules - self.cleanup_rules = { - 'temp_files': { - 'patterns': ['*_temp*', '*.tmp', '*~', '*.backup.*'], - 'max_age_days': 1, - 'action': 'delete' - }, - 'old_logs': { - 'patterns': ['*.log'], - 'max_age_days': 7, - 'action': 'archive', - 'keep_recent': 5 - }, - 'old_backups': { - 'patterns': ['backups/*.txt'], - 'max_age_days': 30, - 'action': 'compress' - }, - 'large_files': { - 'max_size_mb': 50, - 'action': 'warn' - }, - 'python_cache': { - 'patterns': ['__pycache__', '*.pyc', '*.pyo'], - 'action': 'delete' - } - } - - def run_health_check(self) -> Dict: - """Run comprehensive repository health check.""" - self.logger.info("๐Ÿ” Starting repository health check...") - - health_report = { - 'timestamp': datetime.now().isoformat(), - 'repository_size': self._calculate_repo_size(), - 'file_counts': self._count_files_by_type(), - 'issues_found': [], - 'cleanup_suggestions': [], - 'space_analysis': self._analyze_disk_usage(), - 'organization_score': 0 - } - - # Check various aspects - health_report.update({ - 'temp_files': self._check_temp_files(), - 'log_files': self._check_log_files(), - 'backup_files': self._check_backup_files(), - 'large_files': self._check_large_files(), - 'python_artifacts': self._check_python_artifacts(), - 'git_status': self._check_git_status() - }) - - # Calculate organization score - health_report['organization_score'] = self._calculate_organization_score(health_report) - - # Generate suggestions - health_report['cleanup_suggestions'] = self._generate_cleanup_suggestions(health_report) - - self.logger.info(f"๐Ÿ“Š Health check complete. Organization score: {health_report['organization_score']}/100") - return health_report - - def auto_cleanup(self, dry_run: bool = False) -> Dict: - """Automatically clean up repository based on rules.""" - self.logger.info(f"๐Ÿงน Starting auto-cleanup (dry_run={dry_run})...") - - cleanup_results = { - 'files_deleted': [], - 'files_archived': [], - 'files_compressed': [], - 'space_freed_mb': 0, - 'errors': [] - } + def check_single_url(self, url: str) -> Dict: + """Check a single URL for accessibility.""" + start_time = time.time() try: - # Clean temp files - cleanup_results.update(self._cleanup_temp_files(dry_run)) + response = requests.head( + url, + timeout=self.timeout, + allow_redirects=True, + headers={'User-Agent': 'IPTV-Health-Checker/1.0'} + ) - # Archive old logs - cleanup_results.update(self._archive_old_logs(dry_run)) - - # Compress old backups - cleanup_results.update(self._compress_old_backups(dry_run)) - - # Remove Python cache - cleanup_results.update(self._cleanup_python_cache(dry_run)) - - # Organize files - cleanup_results.update(self._organize_files(dry_run)) - - except Exception as e: - self.logger.error(f"Error during auto-cleanup: {e}") - cleanup_results['errors'].append(str(e)) - - self.logger.info(f"โœ… Auto-cleanup complete. Space freed: {cleanup_results['space_freed_mb']:.2f} MB") - return cleanup_results - - def _calculate_repo_size(self) -> Dict: - """Calculate repository size breakdown.""" - sizes = { - 'total_mb': 0, - 'by_directory': {}, - 'by_extension': {} - } - - for root, dirs, files in os.walk(self.root_path): - # Skip .git directory - if '.git' in root: - continue - - dir_size = 0 - for file in files: - file_path = Path(root) / file - try: - file_size = file_path.stat().st_size - dir_size += file_size - - # Track by extension - ext = file_path.suffix.lower() - if ext: - sizes['by_extension'][ext] = sizes['by_extension'].get(ext, 0) + file_size - - except (OSError, FileNotFoundError): - continue - - if dir_size > 0: - rel_dir = str(Path(root).relative_to(self.root_path)) - sizes['by_directory'][rel_dir] = dir_size / (1024 * 1024) # Convert to MB - sizes['total_mb'] += dir_size / (1024 * 1024) - - return sizes - - def _count_files_by_type(self) -> Dict: - """Count files by type and directory.""" - counts = { - 'total_files': 0, - 'by_extension': {}, - 'by_directory': {} - } - - for root, dirs, files in os.walk(self.root_path): - if '.git' in root: - continue - - rel_dir = str(Path(root).relative_to(self.root_path)) - counts['by_directory'][rel_dir] = len(files) - counts['total_files'] += len(files) - - for file in files: - ext = Path(file).suffix.lower() - if ext: - counts['by_extension'][ext] = counts['by_extension'].get(ext, 0) + 1 - - return counts - - def _check_temp_files(self) -> Dict: - """Check for temporary files that should be cleaned.""" - temp_files = [] - - for pattern in self.cleanup_rules['temp_files']['patterns']: - for file_path in self.root_path.rglob(pattern): - if file_path.is_file() and '.git' not in str(file_path): - age_days = (datetime.now() - datetime.fromtimestamp(file_path.stat().st_mtime)).days - temp_files.append({ - 'path': str(file_path.relative_to(self.root_path)), - 'size_mb': file_path.stat().st_size / (1024 * 1024), - 'age_days': age_days - }) - - return { - 'count': len(temp_files), - 'files': temp_files, - 'total_size_mb': sum(f['size_mb'] for f in temp_files) - } - - def _check_log_files(self) -> Dict: - """Check log file status and organization.""" - log_files = [] - reports_dir = self.root_path / 'reports' - - # Check root log files - for log_file in self.root_path.glob('*.log'): - age_days = (datetime.now() - datetime.fromtimestamp(log_file.stat().st_mtime)).days - log_files.append({ - 'path': str(log_file.relative_to(self.root_path)), - 'size_mb': log_file.stat().st_size / (1024 * 1024), - 'age_days': age_days, - 'location': 'root', - 'should_move': True - }) - - # Check reports directory - if reports_dir.exists(): - for log_file in reports_dir.rglob('*.log'): - age_days = (datetime.now() - datetime.fromtimestamp(log_file.stat().st_mtime)).days - log_files.append({ - 'path': str(log_file.relative_to(self.root_path)), - 'size_mb': log_file.stat().st_size / (1024 * 1024), - 'age_days': age_days, - 'location': 'reports', - 'should_move': False - }) - - return { - 'count': len(log_files), - 'files': log_files, - 'misplaced_count': sum(1 for f in log_files if f['should_move']), - 'total_size_mb': sum(f['size_mb'] for f in log_files) - } - - def _check_backup_files(self) -> Dict: - """Check backup file organization and compression opportunities.""" - backups = [] - backup_dir = self.root_path / 'backups' - - if backup_dir.exists(): - for backup_file in backup_dir.rglob('*'): - if backup_file.is_file(): - age_days = (datetime.now() - datetime.fromtimestamp(backup_file.stat().st_mtime)).days - is_compressed = backup_file.suffix in ['.gz', '.zip', '.tar.gz'] - - backups.append({ - 'path': str(backup_file.relative_to(self.root_path)), - 'size_mb': backup_file.stat().st_size / (1024 * 1024), - 'age_days': age_days, - 'is_compressed': is_compressed, - 'should_compress': age_days > 7 and not is_compressed - }) - - return { - 'count': len(backups), - 'files': backups, - 'compression_candidates': sum(1 for b in backups if b['should_compress']), - 'total_size_mb': sum(b['size_mb'] for b in backups) - } - - def _check_large_files(self) -> Dict: - """Check for unusually large files.""" - large_files = [] - max_size_bytes = self.cleanup_rules['large_files']['max_size_mb'] * 1024 * 1024 - - for root, dirs, files in os.walk(self.root_path): - if '.git' in root: - continue - - for file in files: - file_path = Path(root) / file - try: - if file_path.stat().st_size > max_size_bytes: - large_files.append({ - 'path': str(file_path.relative_to(self.root_path)), - 'size_mb': file_path.stat().st_size / (1024 * 1024), - 'type': file_path.suffix.lower() - }) - except (OSError, FileNotFoundError): - continue - - return { - 'count': len(large_files), - 'files': large_files, - 'total_size_mb': sum(f['size_mb'] for f in large_files) - } - - def _check_python_artifacts(self) -> Dict: - """Check for Python cache and compiled files.""" - artifacts = [] - - # Find __pycache__ directories - for pycache_dir in self.root_path.rglob('__pycache__'): - if pycache_dir.is_dir(): - size = sum(f.stat().st_size for f in pycache_dir.rglob('*') if f.is_file()) - artifacts.append({ - 'path': str(pycache_dir.relative_to(self.root_path)), - 'type': 'directory', - 'size_mb': size / (1024 * 1024) - }) - - # Find .pyc and .pyo files - for pyc_file in self.root_path.rglob('*.py[co]'): - artifacts.append({ - 'path': str(pyc_file.relative_to(self.root_path)), - 'type': 'file', - 'size_mb': pyc_file.stat().st_size / (1024 * 1024) - }) - - return { - 'count': len(artifacts), - 'files': artifacts, - 'total_size_mb': sum(a['size_mb'] for a in artifacts) - } - - def _check_git_status(self) -> Dict: - """Check git repository status.""" - try: - # Check for untracked files - result = subprocess.run(['git', 'status', '--porcelain'], - capture_output=True, text=True, cwd=self.root_path) - - untracked = [] - modified = [] - - for line in result.stdout.strip().split('\n'): - if line: - status, filename = line[:2], line[3:] - if status.strip() == '??': - untracked.append(filename) - elif status.strip(): - modified.append(filename) + response_time = time.time() - start_time return { - 'untracked_files': untracked, - 'modified_files': modified, - 'is_clean': len(untracked) == 0 and len(modified) == 0 + 'url': url, + 'status': 'healthy' if response.status_code < 400 else 'unhealthy', + 'status_code': response.status_code, + 'response_time': round(response_time, 2), + 'error': None } - except subprocess.CalledProcessError: - return {'error': 'Not a git repository or git not available'} - - def _calculate_organization_score(self, health_report: Dict) -> int: - """Calculate a repository organization score (0-100).""" - score = 100 - - # Deduct points for issues - if health_report['temp_files']['count'] > 0: - score -= min(20, health_report['temp_files']['count'] * 2) - - if health_report['log_files']['misplaced_count'] > 0: - score -= min(15, health_report['log_files']['misplaced_count'] * 5) - - if health_report['backup_files']['compression_candidates'] > 0: - score -= min(10, health_report['backup_files']['compression_candidates'] * 3) - - if health_report['python_artifacts']['count'] > 0: - score -= min(10, health_report['python_artifacts']['count']) - - if health_report['large_files']['count'] > 0: - score -= min(15, health_report['large_files']['count'] * 5) - - # Check git status - git_status = health_report.get('git_status', {}) - if not git_status.get('is_clean', True): - score -= 10 - - return max(0, score) - - def _generate_cleanup_suggestions(self, health_report: Dict) -> List[str]: - """Generate specific cleanup suggestions based on health report.""" - suggestions = [] - - if health_report['temp_files']['count'] > 0: - suggestions.append(f"๐Ÿ—‘๏ธ Remove {health_report['temp_files']['count']} temporary files ({health_report['temp_files']['total_size_mb']:.1f} MB)") - - if health_report['log_files']['misplaced_count'] > 0: - suggestions.append(f"๐Ÿ“ Move {health_report['log_files']['misplaced_count']} log files to reports/ directory") - - if health_report['backup_files']['compression_candidates'] > 0: - suggestions.append(f"๐Ÿ—œ๏ธ Compress {health_report['backup_files']['compression_candidates']} old backup files") - - if health_report['python_artifacts']['count'] > 0: - suggestions.append(f"๐Ÿ Remove Python cache artifacts ({health_report['python_artifacts']['total_size_mb']:.1f} MB)") - - if health_report['large_files']['count'] > 0: - suggestions.append(f"๐Ÿ“ Review {health_report['large_files']['count']} large files for archival") - - git_status = health_report.get('git_status', {}) - if git_status.get('untracked_files'): - suggestions.append(f"๐Ÿ“ Add {len(git_status['untracked_files'])} untracked files to .gitignore or commit them") - - return suggestions - - def _analyze_disk_usage(self) -> Dict: - """Analyze disk usage patterns.""" - try: - total, used, free = shutil.disk_usage(self.root_path) - return { - 'total_gb': total / (1024**3), - 'used_gb': used / (1024**3), - 'free_gb': free / (1024**3), - 'usage_percent': (used / total) * 100 - } - except Exception as e: - return {'error': str(e)} - - def _cleanup_temp_files(self, dry_run: bool) -> Dict: - """Clean up temporary files.""" - results = {'temp_files_deleted': []} - - for pattern in self.cleanup_rules['temp_files']['patterns']: - for file_path in self.root_path.rglob(pattern): - if file_path.is_file() and '.git' not in str(file_path): - if not dry_run: - try: - file_path.unlink() - results['temp_files_deleted'].append(str(file_path.relative_to(self.root_path))) - except Exception as e: - self.logger.warning(f"Could not delete {file_path}: {e}") - else: - results['temp_files_deleted'].append(str(file_path.relative_to(self.root_path))) - - return results - - def _archive_old_logs(self, dry_run: bool) -> Dict: - """Archive old log files.""" - results = {'logs_archived': []} - - # Create reports/logs directory if it doesn't exist - logs_dir = self.root_path / 'reports' / 'logs' - if not dry_run: - logs_dir.mkdir(parents=True, exist_ok=True) - - # Move log files from root to reports/logs - for log_file in self.root_path.glob('*.log'): - new_path = logs_dir / log_file.name - if not dry_run: - try: - shutil.move(str(log_file), str(new_path)) - results['logs_archived'].append(str(log_file.relative_to(self.root_path))) - except Exception as e: - self.logger.warning(f"Could not move {log_file}: {e}") - else: - results['logs_archived'].append(str(log_file.relative_to(self.root_path))) - - return results - - def _compress_old_backups(self, dry_run: bool) -> Dict: - """Compress old backup files.""" - results = {'backups_compressed': []} - backup_dir = self.root_path / 'backups' - - if backup_dir.exists(): - cutoff_date = datetime.now() - timedelta(days=7) - for backup_file in backup_dir.glob('*.txt'): - file_date = datetime.fromtimestamp(backup_file.stat().st_mtime) - if file_date < cutoff_date: - if not dry_run: - try: - # Compress with gzip - with open(backup_file, 'rb') as f_in: - with gzip.open(f"{backup_file}.gz", 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - backup_file.unlink() - results['backups_compressed'].append(str(backup_file.relative_to(self.root_path))) - except Exception as e: - self.logger.warning(f"Could not compress {backup_file}: {e}") - else: - results['backups_compressed'].append(str(backup_file.relative_to(self.root_path))) - - return results + except requests.exceptions.Timeout: + return { + 'url': url, + 'status': 'timeout', + 'status_code': None, + 'response_time': self.timeout, + 'error': 'Request timeout' + } + + except requests.exceptions.ConnectionError: + return { + 'url': url, + 'status': 'unreachable', + 'status_code': None, + 'response_time': time.time() - start_time, + 'error': 'Connection error' + } + + except Exception as e: + return { + 'url': url, + 'status': 'error', + 'status_code': None, + 'response_time': time.time() - start_time, + 'error': str(e) + } - def _cleanup_python_cache(self, dry_run: bool) -> Dict: - """Remove Python cache files and directories.""" - results = {'python_cache_removed': []} + def check_channel_health(self, channel: Dict) -> Dict: + """Check health of a single channel.""" + url = channel.get('Stream URL', '') - # Remove __pycache__ directories - for pycache_dir in self.root_path.rglob('__pycache__'): - if pycache_dir.is_dir(): - if not dry_run: - try: - shutil.rmtree(pycache_dir) - results['python_cache_removed'].append(str(pycache_dir.relative_to(self.root_path))) - except Exception as e: - self.logger.warning(f"Could not remove {pycache_dir}: {e}") - else: - results['python_cache_removed'].append(str(pycache_dir.relative_to(self.root_path))) + if not url: + return { + 'channel_name': channel.get('Stream name', 'Unknown'), + 'url': '', + 'status': 'no_url', + 'status_code': None, + 'response_time': 0, + 'error': 'No URL provided' + } - # Remove .pyc and .pyo files - for pyc_file in self.root_path.rglob('*.py[co]'): - if not dry_run: + result = self.check_single_url(url) + result['channel_name'] = channel.get('Stream name', 'Unknown') + + return result + + def batch_health_check(self, channels: List[Dict]) -> Dict: + """Perform batch health check on multiple channels.""" + if not self.config.settings.get('enable_health_check', False): + self.logger.info("Health checking is disabled") + return {'enabled': False, 'results': []} + + self.logger.info(f"Starting health check for {len(channels)} channels...") + + start_time = time.time() + results = [] + + # Use ThreadPoolExecutor for concurrent checks + with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # Submit all health check tasks + future_to_channel = { + executor.submit(self.check_channel_health, channel): channel + for channel in channels + } + + # Collect results as they complete + for future in concurrent.futures.as_completed(future_to_channel): try: - pyc_file.unlink() - results['python_cache_removed'].append(str(pyc_file.relative_to(self.root_path))) + result = future.result() + results.append(result) except Exception as e: - self.logger.warning(f"Could not remove {pyc_file}: {e}") - else: - results['python_cache_removed'].append(str(pyc_file.relative_to(self.root_path))) + channel = future_to_channel[future] + self.logger.error(f"Health check failed for {channel.get('Stream name', 'Unknown')}: {e}") + results.append({ + 'channel_name': channel.get('Stream name', 'Unknown'), + 'url': channel.get('Stream URL', ''), + 'status': 'error', + 'status_code': None, + 'response_time': 0, + 'error': str(e) + }) - return results + total_time = time.time() - start_time + + # Generate summary statistics + summary = self._generate_health_summary(results, total_time) + + self.logger.info(f"Health check completed in {total_time:.1f}s: " + f"{summary['healthy']}/{summary['total']} channels healthy") + + return { + 'enabled': True, + 'results': results, + 'summary': summary, + 'total_time': total_time + } - def _organize_files(self, dry_run: bool) -> Dict: - """Organize files into proper directories.""" - results = {'files_organized': []} + def _generate_health_summary(self, results: List[Dict], total_time: float) -> Dict: + """Generate summary statistics from health check results.""" + total = len(results) + healthy = sum(1 for r in results if r['status'] == 'healthy') + unhealthy = sum(1 for r in results if r['status'] == 'unhealthy') + timeout = sum(1 for r in results if r['status'] == 'timeout') + unreachable = sum(1 for r in results if r['status'] == 'unreachable') + errors = sum(1 for r in results if r['status'] == 'error') + no_url = sum(1 for r in results if r['status'] == 'no_url') - # Create proper directory structure - directories = [ - 'reports/logs', - 'reports/archive', - 'backups/compressed', - 'templates' - ] + # Calculate average response time for successful checks + successful_times = [r['response_time'] for r in results if r['status'] == 'healthy'] + avg_response_time = sum(successful_times) / len(successful_times) if successful_times else 0 - if not dry_run: - for directory in directories: - (self.root_path / directory).mkdir(parents=True, exist_ok=True) - - return results + return { + 'total': total, + 'healthy': healthy, + 'unhealthy': unhealthy, + 'timeout': timeout, + 'unreachable': unreachable, + 'errors': errors, + 'no_url': no_url, + 'health_percentage': round((healthy / total * 100) if total > 0 else 0, 1), + 'avg_response_time': round(avg_response_time, 2), + 'total_check_time': round(total_time, 1) + } - def save_health_report(self, health_report: Dict, filename: str = None) -> Path: - """Save health report to file.""" + def get_unhealthy_channels(self, health_results: Dict) -> List[Dict]: + """Get list of unhealthy channels for reporting.""" + if not health_results.get('enabled', False): + return [] + + unhealthy = [] + for result in health_results.get('results', []): + if result['status'] != 'healthy': + unhealthy.append({ + 'name': result['channel_name'], + 'url': result['url'], + 'status': result['status'], + 'error': result.get('error', 'Unknown error') + }) + + return unhealthy + + def save_health_report(self, health_results: Dict, filename: str = None) -> Optional[str]: + """Save health check results to a file.""" + if not health_results.get('enabled', False): + return None + + import json + from datetime import datetime + from pathlib import Path + if filename is None: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - filename = f'repo_health_{timestamp}.json' + filename = f'health_check_{timestamp}.json' - reports_dir = self.root_path / 'reports' + reports_dir = Path('reports') reports_dir.mkdir(exist_ok=True) report_path = reports_dir / filename try: + # Prepare report data + report_data = { + 'timestamp': datetime.now().isoformat(), + 'summary': health_results['summary'], + 'unhealthy_channels': self.get_unhealthy_channels(health_results), + 'total_time': health_results['total_time'] + } + with open(report_path, 'w', encoding='utf-8') as f: - json.dump(health_report, f, indent=2, default=str) + json.dump(report_data, f, indent=2) self.logger.info(f"Health report saved to: {report_path}") - return report_path + return str(report_path) + except Exception as e: self.logger.error(f"Could not save health report: {e}") return None -def main(): - """Command line interface for repository health monitoring.""" - import argparse +# Simple fallback for when requests is not available +class SimpleHealthChecker: + """Fallback health checker that doesn't require external dependencies.""" - parser = argparse.ArgumentParser(description='IPTV Repository Health Monitor') - parser.add_argument('--check', action='store_true', help='Run health check') - parser.add_argument('--cleanup', action='store_true', help='Run auto cleanup') - parser.add_argument('--dry-run', action='store_true', help='Dry run (no actual changes)') - parser.add_argument('--save-report', action='store_true', help='Save health report to file') + def __init__(self, config): + self.config = config + self.logger = logging.getLogger(__name__) - args = parser.parse_args() - - # Setup logging - logging.basicConfig( - level=logging.INFO, - format='[%(asctime)s] %(levelname)s: %(message)s' - ) - - monitor = RepoHealthMonitor() - - if args.check or args.save_report: - health_report = monitor.run_health_check() - - print(f"\n๐Ÿ“Š Repository Health Report") - print(f"Organization Score: {health_report['organization_score']}/100") - print(f"Total Size: {health_report['repository_size']['total_mb']:.1f} MB") - print(f"Total Files: {health_report['file_counts']['total_files']}") - - if health_report['cleanup_suggestions']: - print("\n๐Ÿ”ง Cleanup Suggestions:") - for suggestion in health_report['cleanup_suggestions']: - print(f" {suggestion}") - else: - print("\nโœ… Repository is well organized!") - - if args.save_report: - monitor.save_health_report(health_report) - - if args.cleanup: - cleanup_results = monitor.auto_cleanup(dry_run=args.dry_run) - - if args.dry_run: - print("\n๐Ÿงช Dry Run Results:") - else: - print("\n๐Ÿงน Cleanup Results:") - - for key, items in cleanup_results.items(): - if isinstance(items, list) and items: - print(f" {key}: {len(items)} items") - for item in items[:5]: # Show first 5 - print(f" - {item}") - if len(items) > 5: - print(f" ... and {len(items) - 5} more") + def batch_health_check(self, channels: List[Dict]) -> Dict: + """Fallback that skips health checking.""" + self.logger.info("Health checking disabled (requests library not available)") + return { + 'enabled': False, + 'results': [], + 'summary': {'total': len(channels), 'healthy': 0, 'health_percentage': 0}, + 'total_time': 0 + } -if __name__ == "__main__": - main() \ No newline at end of file +# Try to use the full health checker, fall back to simple one if requests isn't available +try: + import requests + # If requests is available, use the full HealthChecker +except ImportError: + # If requests is not available, use the fallback + class HealthChecker(SimpleHealthChecker): + pass \ No newline at end of file