# ---------------------------------------- # UrNetwork Stats Dashboard - Refactored # ---------------------------------------- # Original Author: https://github.com/techroy23/UrNetwork-Stats-Dashboard # Refactored with Public/Private views, UI, Graphs, Installer, Auth, Dark Theme, and Provider Map. # # --- main_app.py --- # ---------------------------------------- import os import time import datetime import requests import logging import json from functools import wraps from flask import ( Flask, request, render_template_string, redirect, url_for, flash, session, g, jsonify ) from flask_sqlalchemy import SQLAlchemy from flask_apscheduler import APScheduler from dateutil.parser import isoparse # --- Application Setup & Configuration --- # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Environment file path ENV_FILE = ".env" def load_env(): """Load environment variables from .env file.""" if os.path.exists(ENV_FILE): with open(ENV_FILE, 'r') as f: for line in f: if '=' in line: key, value = line.strip().split('=', 1) os.environ[key] = value load_env() class Config: """Flask configuration.""" SCHEDULER_API_ENABLED = True SQLALCHEMY_DATABASE_URI = os.getenv("DATABASE_URL", "sqlite:///transfer_stats.db") SQLALCHEMY_TRACK_MODIFICATIONS = False SECRET_KEY = os.getenv("SECRET_KEY", "default-secret-key-for-initial-setup") UR_API_BASE = "https://api.bringyour.com" # --- Flask App Initialization --- app = Flask(__name__) app.config.from_object(Config) db = SQLAlchemy(app) scheduler = APScheduler() scheduler.init_app(app) # --- Database Models --- class Stats(db.Model): """ Represents a snapshot of paid vs unpaid bytes at a given timestamp. """ __tablename__ = 'stats' id = db.Column(db.Integer, primary_key=True) timestamp = db.Column(db.DateTime, server_default=db.func.now()) paid_bytes = db.Column(db.BigInteger, nullable=False) paid_gb = db.Column(db.Float, nullable=False) unpaid_bytes = db.Column(db.BigInteger, nullable=False) unpaid_gb = db.Column(db.Float, nullable=False) class Webhook(db.Model): """Represents a webhook URL.""" __tablename__ = 'webhook' id = db.Column(db.Integer, primary_key=True) url = db.Column(db.String, unique=True, nullable=False) # --- Helper Functions --- def is_installed(): """Check if the application has been configured.""" return all(os.getenv(var) for var in ["UR_USER", "UR_PASS", "SECRET_KEY"]) def save_env_file(config_data): """Save configuration data to the .env file.""" try: with open(ENV_FILE, "w") as f: for key, value in config_data.items(): f.write(f"{key.upper()}={value}\n") load_env() # Reload env after writing return True except IOError as e: logging.error(f"Failed to write to .env file: {e}") return False def request_with_retry(method, url, retries=3, backoff=5, timeout=30, **kwargs): """Issue an HTTP request with retries.""" last_exc = None for attempt in range(1, retries + 1): try: resp = requests.request(method, url, timeout=timeout, **kwargs) resp.raise_for_status() return resp except requests.exceptions.RequestException as e: last_exc = e logging.warning( f"[{method.upper()} {url}] attempt {attempt}/{retries} failed: {e}" ) if attempt < retries: time.sleep(backoff) raise RuntimeError(f"All {retries} attempts to {method.upper()} {url} failed: {last_exc}") def get_jwt_from_credentials(user, password): """Fetch a new JWT token using username and password.""" try: resp = request_with_retry( "post", f"{app.config['UR_API_BASE']}/auth/login-with-password", headers={"Content-Type": "application/json"}, json={"user_auth": user, "password": password}, ) data = resp.json() token = data.get("network", {}).get("by_jwt") if not token: err = data.get("message") or data.get("error") or str(data) raise RuntimeError(f"Login failed: {err}") return token except Exception as e: logging.error(f"Could not get JWT from credentials: {e}") return None def get_valid_jwt(): """Gets a valid JWT for API calls.""" user = os.getenv("UR_USER") password = os.getenv("UR_PASS") if not user or not password: logging.error("Cannot fetch JWT; user credentials not found in environment.") return None return get_jwt_from_credentials(user, password) def fetch_transfer_stats(jwt_token): """Retrieve transfer statistics using the provided JWT.""" headers = {"Authorization": f"Bearer {jwt_token}", "Accept": "*/*"} resp = request_with_retry("get", f"{app.config['UR_API_BASE']}/transfer/stats", headers=headers) data = resp.json() paid = data.get("paid_bytes_provided", 0) unpaid = data.get("unpaid_bytes_provided", 0) return { "paid_bytes": paid, "paid_gb": paid / 1e9, "unpaid_bytes": unpaid, "unpaid_gb": unpaid / 1e9 } def fetch_payment_stats(jwt_token): """Retrieve account payment statistics using the provided JWT.""" if not jwt_token: return None try: headers = {"Authorization": f"Bearer {jwt_token}", "Accept": "*/*"} resp = request_with_retry("get", f"{app.config['UR_API_BASE']}/account/payments", headers=headers) return resp.json().get("account_payments", []) except Exception as e: logging.error(f"Failed to fetch payment stats: {e}") return [] def calculate_earnings(payments): """Calculate total and monthly earnings from a list of payments.""" total_earnings = 0 monthly_earnings = 0 now = datetime.datetime.now(datetime.timezone.utc) one_month_ago = now - datetime.timedelta(days=30) if not payments: return 0, 0 for payment in payments: if payment.get("completed"): amount = payment.get("token_amount", 0) total_earnings += amount payment_time_str = payment.get("payment_time") if payment_time_str: try: payment_time = isoparse(payment_time_str) if payment_time > one_month_ago: monthly_earnings += amount except ValueError: logging.warning(f"Could not parse payment_time: {payment_time_str}") return total_earnings, monthly_earnings def fetch_provider_locations(): """Retrieve provider locations from the public API.""" try: resp = request_with_retry("get", f"{app.config['UR_API_BASE']}/network/provider-locations") return resp.json() except Exception as e: logging.error(f"Failed to fetch provider locations: {e}") return None def send_webhook_notification(stats_data): """Sends a notification to all configured webhooks.""" webhooks = Webhook.query.all() if not webhooks: return # A more generic payload that might work for Discord, Slack, etc. payload = { "embeds": [{ "title": "UrNetwork Stats Update", "description": "New data has been synced from the UrNetwork API.", "color": 5814783, # A nice blue color "fields": [ {"name": "Total Paid Data", "value": f"{stats_data['paid_gb']:.3f} GB", "inline": True}, {"name": "Total Unpaid Data", "value": f"{stats_data['unpaid_gb']:.3f} GB", "inline": True}, {"name": "Total Data Provided", "value": f"{(stats_data['paid_gb'] + stats_data['unpaid_gb']):.3f} GB", "inline": True}, ], "footer": {"text": f"Update Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"} }] } for webhook in webhooks: try: requests.post(webhook.url, json=payload, timeout=10) logging.info(f"Sent webhook notification to {webhook.url}") except requests.exceptions.RequestException as e: logging.error(f"Failed to send webhook to {webhook.url}: {e}") # --- Authentication Decorator --- def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not session.get('logged_in'): flash("You must be logged in to view this page.", "error") return redirect(url_for('login', next=request.url)) return f(*args, **kwargs) return decorated_function # --- Scheduled Job --- @scheduler.task(id="log_stats_job", trigger="cron", minute="0,15,30,45") def log_stats_job(): """Scheduled job to fetch and store stats every 15 minutes.""" with app.app_context(): if not is_installed(): logging.warning("log_stats_job skipped: Application is not installed.") return logging.info("Running scheduled stats fetch...") try: jwt = get_valid_jwt() if not jwt: raise RuntimeError("Scheduled job could not authenticate with API.") stats_data = fetch_transfer_stats(jwt) entry = Stats( paid_bytes=stats_data["paid_bytes"], paid_gb=stats_data["paid_gb"], unpaid_bytes=stats_data["unpaid_bytes"], unpaid_gb=stats_data["unpaid_gb"] ) db.session.add(entry) db.session.commit() logging.info(f"Logged new stats entry at {entry.timestamp}") send_webhook_notification(stats_data) except Exception as e: logging.error(f"Scheduled job 'log_stats_job' failed: {e}") # --- HTML Templates --- LAYOUT_TEMPLATE = """ {{ title }} - UrNetwork Stats

{{ title }}

{% with messages = get_flashed_messages(with_categories=true) %} {% if messages %} {% for category, message in messages %} {% endfor %} {% endif %} {% endwith %} {{ content | safe }}
""" INSTALL_TEMPLATE = """

Initial Application Setup

Please provide your UrNetwork credentials to configure the dashboard.

""" LOGIN_TEMPLATE = """

Sign in to your dashboard

""" PUBLIC_DASHBOARD_TEMPLATE = """

Monitoring for updates...

Total Paid Data
{{ "%.3f"|format(latest.paid_gb if latest else 0) }} GB
Total Unpaid Data
{{ "%.3f"|format(latest.unpaid_gb if latest else 0) }} GB
Earnings (Last 30 Days)
${{ "%.2f"|format(monthly_earnings) }}
Last Update
{{ latest.timestamp.strftime('%H:%M') if latest else 'N/A' }}

Total Data Provided Over Time (GB)

{% if chart_data.labels %} {% else %}

Not enough data to display a chart. Check back later.

{% endif %}

Provider stats refresh in: 02:00

Provider Locations

""" PRIVATE_DASHBOARD_TEMPLATE = """

Monitoring for updates...

Total Paid Data
{{ "%.3f"|format(latest.paid_gb if latest else 0) }} GB
Total Unpaid Data
{{ "%.3f"|format(latest.unpaid_gb if latest else 0) }} GB
Total Earnings
${{ "%.2f"|format(total_earnings) }}
Last Update
{{ latest.timestamp.strftime('%H:%M') if latest else 'N/A' }}

Paid vs. Unpaid Data (GB)

Data Change per Interval (GB)

{% if chart_data.deltas and chart_data.deltas|length > 0 %} {% else %}

Not enough data for this chart.

{% endif %}

Webhook Management

Detailed History

{% for row in rows %} {% else %} {% endfor %}
Timestamp Paid (GB) Unpaid (GB) Change (GB)
{{ row.ts_str }} {{ "%.3f"|format(row.e.paid_gb) }} {{ "%.3f"|format(row.e.unpaid_gb) }} {% if row.delta_gb is not none %}{{ '%+.3f'|format(row.delta_gb) }}{% else %}N/A{% endif %}
No data available.
""" # --- Template Rendering Helper --- # Load world map data once at startup WORLD_MAP_DATA = None try: map_url = "https://d2ad6b4ur7yvpq.cloudfront.net/naturalearth-3.3.0/ne_110m_admin_0_countries.geojson" response = requests.get(map_url, timeout=15) response.raise_for_status() WORLD_MAP_DATA = response.json() logging.info("Successfully loaded world map GeoJSON data for embedding.") except Exception as e: logging.error(f"Could not download and cache world map data. Map will be disabled. Error: {e}") def render_page(template_content, **context): """Renders a page by injecting its content into the main layout.""" if context.get('is_public_dashboard'): context['world_map_data'] = WORLD_MAP_DATA content = render_template_string(template_content, **context) return render_template_string(LAYOUT_TEMPLATE, content=content, **context) # --- Middleware and Before Request Handlers --- @app.before_request def check_installation(): """Before each request, check if the app is installed.""" if request.endpoint not in ['install', 'static', 'get_locations', 'get_transfer_data'] and not is_installed(): return redirect(url_for('install')) g.now = datetime.datetime.utcnow() @app.context_processor def inject_layout_vars(): """Inject variables into all templates for access.""" return {"now": g.now} # --- Routes --- @app.route('/install', methods=['GET', 'POST']) def install(): if is_installed(): flash("Application is already installed.", "info") return redirect(url_for('public_dashboard')) if request.method == 'POST': user = request.form.get('ur_user') password = request.form.get('ur_pass') if not user or not password: flash("Username and Password are required.", "error") return render_page(INSTALL_TEMPLATE, title="Setup") jwt = get_jwt_from_credentials(user, password) if not jwt: flash("Could not verify credentials with UrNetwork API. Please check them and try again.", "error") return render_page(INSTALL_TEMPLATE, title="Setup") config = { "UR_USER": user, "UR_PASS": password, "SECRET_KEY": os.urandom(24).hex() } if save_env_file(config): with app.app_context(): db.create_all() session['logged_in'] = True flash("Installation successful! Your dashboard is now configured.", "info") return redirect(url_for('private_dashboard')) else: flash("A server error occurred while saving the configuration file.", "error") return render_page(INSTALL_TEMPLATE, title="Setup") @app.route('/login', methods=['GET', 'POST']) def login(): if session.get('logged_in'): return redirect(url_for('private_dashboard')) if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') if username == os.getenv('UR_USER') and password == os.getenv('UR_PASS'): session['logged_in'] = True flash('You have been logged in successfully.', 'info') next_url = request.args.get('next') return redirect(next_url or url_for('private_dashboard')) else: flash('Invalid username or password.', 'error') return render_page(LOGIN_TEMPLATE, title="Login") @app.route('/logout') def logout(): session.pop('logged_in', None) flash('You have been logged out.', 'info') return redirect(url_for('public_dashboard')) @app.route('/') def public_dashboard(): """Display the public-facing dashboard with summary stats, map, and a graph.""" entries = Stats.query.order_by(Stats.timestamp.asc()).all() latest = entries[-1] if entries else None jwt = get_valid_jwt() payments = fetch_payment_stats(jwt) _, monthly_earnings = calculate_earnings(payments) chart_data = { "labels": [e.timestamp.strftime('%m-%d %H:%M') for e in entries], "data": [e.paid_gb + e.unpaid_gb for e in entries] } latest_timestamp = latest.timestamp.isoformat() if latest else None return render_page(PUBLIC_DASHBOARD_TEMPLATE, title="Public Dashboard", latest=latest, monthly_earnings=monthly_earnings, chart_data=chart_data, is_public_dashboard=True, latest_timestamp=latest_timestamp) @app.route('/api/locations') def get_locations(): """API endpoint to proxy provider locations, simplifying client-side fetching.""" data = fetch_provider_locations() if data: return jsonify(data) return jsonify({"error": "Failed to fetch location data"}), 500 @app.route('/api/transfer_data') def get_transfer_data(): """API endpoint to get all transfer stats for dynamic updates.""" entries = Stats.query.order_by(Stats.timestamp.asc()).all() if not entries: return jsonify({"status": "no_data"}) latest_entry = entries[-1] latest_timestamp = latest_entry.timestamp.isoformat() jwt = get_valid_jwt() payments = fetch_payment_stats(jwt) total_earnings, monthly_earnings = calculate_earnings(payments) # Data for Public View public_stats = { "paid_gb": "%.3f" % latest_entry.paid_gb, "unpaid_gb": "%.3f" % latest_entry.unpaid_gb, "last_update": latest_entry.timestamp.strftime('%H:%M') } public_chart_data = { "labels": [e.timestamp.strftime('%m-%d %H:%M') for e in entries], "data": [(e.paid_gb + e.unpaid_gb) for e in entries] } # Data for Private View desc_entries = sorted(entries, key=lambda x: x.timestamp, reverse=True) private_rows = [] for i, e in enumerate(desc_entries): delta_gb = None if i < len(desc_entries) - 1: prev_entry = desc_entries[i+1] delta_gb = (e.paid_gb + e.unpaid_gb) - (prev_entry.paid_gb + prev_entry.unpaid_gb) private_rows.append({ "ts_str": e.timestamp.strftime("%Y-%m-%d %H:%M:%S"), "paid_gb": e.paid_gb, "unpaid_gb": e.unpaid_gb, "delta_gb": delta_gb }) rev_entries = list(reversed(desc_entries)) deltas = [r['delta_gb'] for r in reversed(private_rows) if r['delta_gb'] is not None] private_chart_data = { "labels": [e.timestamp.strftime('%m-%d %H:%M') for e in rev_entries], "paid_gb": [e.paid_gb for e in rev_entries], "unpaid_gb": [e.unpaid_gb for e in rev_entries], "deltas": deltas } return jsonify({ "status": "ok", "latest_timestamp": latest_timestamp, "public_stats": public_stats, "public_chart_data": public_chart_data, "monthly_earnings": monthly_earnings, "total_earnings": total_earnings, "private_rows": private_rows, "private_chart_data": private_chart_data }) @app.route('/dashboard') @login_required def private_dashboard(): """Display the detailed private dashboard for the owner.""" entries = Stats.query.order_by(Stats.timestamp.desc()).all() latest = entries[0] if entries else None jwt = get_valid_jwt() payments = fetch_payment_stats(jwt) total_earnings, _ = calculate_earnings(payments) webhooks = Webhook.query.all() rows = [] for i, e in enumerate(entries): delta_gb = None if i < len(entries) - 1: prev_entry = entries[i+1] delta_gb = (e.paid_gb + e.unpaid_gb) - (prev_entry.paid_gb + prev_entry.unpaid_gb) rows.append({ "e": e, "ts_str": e.timestamp.strftime("%Y-%m-%d %H:%M:%S"), "delta_gb": delta_gb }) rev_entries = list(reversed(entries)) deltas = [r['delta_gb'] for r in reversed(rows) if r['delta_gb'] is not None] chart_data = { "labels": [e.timestamp.strftime('%m-%d %H:%M') for e in rev_entries], "paid_gb": [e.paid_gb for e in rev_entries], "unpaid_gb": [e.unpaid_gb for e in rev_entries], "deltas": deltas if deltas else [] } latest_timestamp = entries[0].timestamp.isoformat() if entries else None return render_page( PRIVATE_DASHBOARD_TEMPLATE, title="Owner Dashboard", rows=rows, latest=latest, total_earnings=total_earnings, chart_data=chart_data, latest_timestamp=latest_timestamp, webhooks=webhooks ) @app.route("/trigger", methods=["POST"]) @login_required def trigger_fetch(): """Manually trigger a stats fetch.""" try: jwt = get_valid_jwt() if not jwt: raise RuntimeError("Failed to get a valid API token.") stats_data = fetch_transfer_stats(jwt) entry = Stats( paid_bytes=stats_data["paid_bytes"], paid_gb=stats_data["paid_gb"], unpaid_bytes=stats_data["unpaid_bytes"], unpaid_gb=stats_data["unpaid_gb"] ) db.session.add(entry) db.session.commit() flash("Successfully fetched and saved latest stats.", "info") except Exception as e: logging.error(f"Manual fetch failed: {e}") flash(f"Failed to fetch stats: {e}", "error") return redirect(url_for('private_dashboard')) @app.route("/clear", methods=["POST"]) @login_required def clear_db(): """Delete all stats records from the database.""" try: num_rows_deleted = db.session.query(Stats).delete() db.session.commit() flash(f"Successfully cleared {num_rows_deleted} records from the database.", "info") except Exception as e: logging.error(f"Failed to clear database: {e}") flash("An error occurred while clearing the database.", "error") db.session.rollback() return redirect(url_for('private_dashboard')) @app.route("/webhooks/add", methods=["POST"]) @login_required def add_webhook(): """Add a new webhook.""" url = request.form.get("webhook_url") if not url: flash("Webhook URL cannot be empty.", "error") return redirect(url_for('private_dashboard')) # Basic URL validation if not url.startswith("http://") and not url.startswith("https://"): flash("Invalid webhook URL.", "error") return redirect(url_for('private_dashboard')) existing = Webhook.query.filter_by(url=url).first() if existing: flash("This webhook URL is already registered.", "error") else: new_webhook = Webhook(url=url) db.session.add(new_webhook) db.session.commit() flash("Webhook added successfully.", "info") return redirect(url_for('private_dashboard')) @app.route("/webhooks/delete/", methods=["POST"]) @login_required def delete_webhook(webhook_id): """Delete a webhook.""" webhook = Webhook.query.get(webhook_id) if webhook: db.session.delete(webhook) db.session.commit() flash("Webhook deleted successfully.", "info") else: flash("Webhook not found.", "error") return redirect(url_for('private_dashboard')) # --- Main Entry Point --- if __name__ == "__main__": with app.app_context(): if is_installed(): db.create_all() scheduler.start() app.run(host="0.0.0.0", port=90, debug=False)