# ----------------------------------------
# UrNetwork Stats Dashboard - Refactored & Enhanced
# ----------------------------------------
# Original Author: https://github.com/techroy23/UrNetwork-Stats-Dashboard
# Refactored with Public/Private views, UI, Graphs, Installer, Auth, Dark Theme, and Provider Map.
# Enhanced with categorized, configurable features from the API and separate dashboard pages.
#
# --- main_app.py ---
# ----------------------------------------
import os
import time
import datetime
import requests
import logging
import json
from functools import wraps
from flask import (
Flask, request, render_template_string,
redirect, url_for, flash, session, g, jsonify
)
from flask_sqlalchemy import SQLAlchemy
from flask_apscheduler import APScheduler
from dateutil.parser import isoparse
import secrets
# --- Application Setup & Configuration ---
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Environment file path
ENV_FILE = ".env"
def load_env():
"""Load environment variables from .env file."""
if os.path.exists(ENV_FILE):
with open(ENV_FILE, 'r') as f:
for line in f:
if '=' in line and not line.strip().startswith('#'):
key, value = line.strip().split('=', 1)
os.environ[key] = value
load_env()
class Config:
"""Flask configuration."""
SCHEDULER_API_ENABLED = True
SQLALCHEMY_DATABASE_URI = os.getenv("DATABASE_URL", "sqlite:///transfer_stats.db")
SQLALCHEMY_TRACK_MODIFICATIONS = False
SECRET_KEY = os.getenv("SECRET_KEY", "default-secret-key-for-initial-setup")
UR_API_BASE = "https://api.bringyour.com"
# --- New Feature Flags (Defaults) ---
# These will be used to initialize the settings in the database on first run.
ENABLE_ACCOUNT_STATS = os.getenv("ENABLE_ACCOUNT_STATS", "True").lower() in ('true', '1', 't')
ENABLE_LEADERBOARD = os.getenv("ENABLE_LEADERBOARD", "True").lower() in ('true', '1', 't')
ENABLE_DEVICE_STATS = os.getenv("ENABLE_DEVICE_STATS", "True").lower() in ('true', '1', 't')
# --- Flask App Initialization ---
app = Flask(__name__)
app.config.from_object(Config)
db = SQLAlchemy(app)
scheduler = APScheduler()
scheduler.init_app(app)
# --- Database Models ---
class Stats(db.Model):
"""Represents a snapshot of paid vs unpaid bytes at a given timestamp."""
__tablename__ = 'stats'
id = db.Column(db.Integer, primary_key=True)
timestamp = db.Column(db.DateTime, server_default=db.func.now())
paid_bytes = db.Column(db.BigInteger, nullable=False)
paid_gb = db.Column(db.Float, nullable=False)
unpaid_bytes = db.Column(db.BigInteger, nullable=False)
unpaid_gb = db.Column(db.Float, nullable=False)
class Webhook(db.Model):
"""Represents a webhook URL."""
__tablename__ = 'webhook'
id = db.Column(db.Integer, primary_key=True)
url = db.Column(db.String, unique=True, nullable=False)
class Setting(db.Model):
"""Represents a key-value setting for the application, like feature flags."""
__tablename__ = 'settings'
key = db.Column(db.String(50), primary_key=True)
value = db.Column(db.String(100), nullable=False)
# --- Helper Functions ---
def get_setting(key, default=None):
"""Gets a setting value from the database."""
setting = Setting.query.get(key)
return setting.value if setting else default
def get_boolean_setting(key):
"""Gets a boolean setting from the database."""
value = get_setting(key, 'False')
return value.lower() in ('true', '1', 't')
def is_installed():
"""Check if the application has been configured."""
return all(os.getenv(var) for var in ["UR_USER", "UR_PASS", "SECRET_KEY"])
def save_env_file(config_data):
"""Save configuration data to the .env file."""
try:
with open(ENV_FILE, "w") as f:
for key, value in config_data.items():
f.write(f"{key.upper()}={value}\n")
# Also write the initial feature flag settings
f.write("\n# Feature Flags (can be managed in the UI after install)\n")
f.write(f"ENABLE_ACCOUNT_STATS=True\n")
f.write(f"ENABLE_LEADERBOARD=True\n")
f.write(f"ENABLE_DEVICE_STATS=True\n")
load_env() # Reload env after writing
return True
except IOError as e:
logging.error(f"Failed to write to .env file: {e}")
return False
def request_with_retry(method, url, retries=3, backoff=5, timeout=30, **kwargs):
"""Issue an HTTP request with retries."""
last_exc = None
for attempt in range(1, retries + 1):
try:
resp = requests.request(method, url, timeout=timeout, **kwargs)
resp.raise_for_status()
return resp
except requests.exceptions.RequestException as e:
last_exc = e
logging.warning(
f"[{method.upper()} {url}] attempt {attempt}/{retries} failed: {e}"
)
if attempt < retries:
time.sleep(backoff)
# Instead of raising a fatal RuntimeError, return None to handle gracefully
logging.error(f"All {retries} attempts to {method.upper()} {url} failed: {last_exc}")
return None
def get_jwt_from_credentials(user, password):
"""Fetch a new JWT token using username and password."""
try:
resp = request_with_retry(
"post",
f"{app.config['UR_API_BASE']}/auth/login-with-password",
headers={"Content-Type": "application/json"},
json={"user_auth": user, "password": password},
)
if not resp:
raise RuntimeError("API request failed after multiple retries.")
data = resp.json()
token = data.get("network", {}).get("by_jwt")
if not token:
err = data.get("message") or data.get("error") or str(data)
raise RuntimeError(f"Login failed: {err}")
return token
except Exception as e:
logging.error(f"Could not get JWT from credentials: {e}")
return None
def get_valid_jwt():
"""Gets a valid JWT for API calls."""
user = os.getenv("UR_USER")
password = os.getenv("UR_PASS")
if not user or not password:
logging.error("Cannot fetch JWT; user credentials not found in environment.")
return None
return get_jwt_from_credentials(user, password)
# --- API Fetch Functions ---
def fetch_transfer_stats(jwt_token):
"""Retrieve transfer statistics using the provided JWT."""
if not jwt_token: return None
resp = request_with_retry("get", f"{app.config['UR_API_BASE']}/transfer/stats", headers={"Authorization": f"Bearer {jwt_token}"})
if not resp: return None
data = resp.json()
paid = data.get("paid_bytes_provided", 0)
unpaid = data.get("unpaid_bytes_provided", 0)
return {
"paid_bytes": paid,
"paid_gb": paid / 1e9,
"unpaid_bytes": unpaid,
"unpaid_gb": unpaid / 1e9
}
def fetch_payment_stats(jwt_token):
"""Retrieve account payment statistics using the provided JWT."""
if not jwt_token: return []
resp = request_with_retry("get", f"{app.config['UR_API_BASE']}/account/payments", headers={"Authorization": f"Bearer {jwt_token}"})
if not resp: return []
return resp.json().get("account_payments", [])
def fetch_account_details(jwt_token):
"""Fetches various account details like points and referrals."""
if not jwt_token: return {}
headers = {"Authorization": f"Bearer {jwt_token}"}
details = {}
points_resp = request_with_retry("get", f"{app.config['UR_API_BASE']}/account/points", headers=headers)
if points_resp:
points_data = points_resp.json().get("network_points", [])
details['points'] = sum(p.get('point_value', 0) for p in points_data)
referral_resp = request_with_retry("get", f"{app.config['UR_API_BASE']}/account/referral-code", headers=headers)
if referral_resp:
details['referrals'] = referral_resp.json()
ranking_resp = request_with_retry("get", f"{app.config['UR_API_BASE']}/network/ranking", headers=headers)
if ranking_resp:
details['ranking'] = ranking_resp.json().get('network_ranking', {})
return details
def fetch_leaderboard(jwt_token):
"""Fetches the global leaderboard."""
if not jwt_token: return []
headers = {"Authorization": f"Bearer {jwt_token}"}
resp = request_with_retry("post", f"{app.config['UR_API_BASE']}/stats/leaderboard", headers=headers, json={})
if not resp: return []
return resp.json().get("earners", [])
def fetch_devices(jwt_token):
"""Fetches the status of all network clients/devices."""
if not jwt_token: return []
headers = {"Authorization": f"Bearer {jwt_token}"}
resp = request_with_retry("get", f"{app.config['UR_API_BASE']}/network/clients", headers=headers)
if not resp: return []
return resp.json().get("clients", [])
def remove_device(jwt_token, client_id):
"""Removes a device from the network."""
if not jwt_token: return False, "Authentication token not available."
headers = {"Authorization": f"Bearer {jwt_token}"}
payload = {"client_id": client_id}
resp = request_with_retry("post", f"{app.config['UR_API_BASE']}/network/remove-client", headers=headers, json=payload)
if resp and resp.status_code == 200:
data = resp.json()
if data.get("error"):
return False, data["error"].get("message", "An unknown error occurred.")
return True, "Device removed successfully."
elif resp:
return False, f"API returned status {resp.status_code}."
else:
return False, "Failed to communicate with API."
def calculate_earnings(payments):
"""Calculate total and monthly earnings from a list of payments."""
total_earnings = 0
monthly_earnings = 0
now = datetime.datetime.now(datetime.timezone.utc)
one_month_ago = now - datetime.timedelta(days=30)
if not payments:
return 0, 0
for payment in payments:
if payment.get("completed"):
amount = payment.get("token_amount", 0)
total_earnings += amount
payment_time_str = payment.get("payment_time")
if payment_time_str:
try:
payment_time = isoparse(payment_time_str)
if payment_time > one_month_ago:
monthly_earnings += amount
except (ValueError, TypeError):
logging.warning(f"Could not parse payment_time: {payment_time_str}")
return total_earnings, monthly_earnings
def fetch_provider_locations():
"""Retrieve provider locations from the public API."""
resp = request_with_retry("get", f"{app.config['UR_API_BASE']}/network/provider-locations")
return resp.json() if resp else None
def send_webhook_notification(stats_data):
"""Sends a notification to all configured webhooks."""
webhooks = Webhook.query.all()
if not webhooks: return
payload = {
"embeds": [{
"title": "UrNetwork Stats Update",
"description": "New data has been synced from the UrNetwork API.",
"color": 5814783,
"fields": [
{"name": "Total Paid Data", "value": f"{stats_data['paid_gb']:.3f} GB", "inline": True},
{"name": "Total Unpaid Data", "value": f"{stats_data['unpaid_gb']:.3f} GB", "inline": True},
{"name": "Total Data Provided", "value": f"{(stats_data['paid_gb'] + stats_data['unpaid_gb']):.3f} GB", "inline": True},
],
"footer": {"text": f"Update Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"}
}]
}
for webhook in webhooks:
try:
requests.post(webhook.url, json=payload, timeout=10)
logging.info(f"Sent webhook notification to {webhook.url}")
except requests.exceptions.RequestException as e:
logging.error(f"Failed to send webhook to {webhook.url}: {e}")
# --- Authentication Decorator ---
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
flash("You must be logged in to view this page.", "error")
return redirect(url_for('login', next=request.url))
return f(*args, **kwargs)
return decorated_function
# --- Scheduled Job ---
@scheduler.task(id="log_stats_job", trigger="cron", minute="0,15,30,45")
def log_stats_job():
"""Scheduled job to fetch and store stats every 15 minutes."""
with app.app_context():
if not is_installed():
logging.warning("log_stats_job skipped: Application is not installed.")
return
logging.info("Running scheduled stats fetch...")
try:
jwt = get_valid_jwt()
if not jwt: raise RuntimeError("Scheduled job could not authenticate with API.")
stats_data = fetch_transfer_stats(jwt)
if not stats_data: raise RuntimeError("Scheduled job could not fetch transfer stats.")
entry = Stats(
paid_bytes=stats_data["paid_bytes"], paid_gb=stats_data["paid_gb"],
unpaid_bytes=stats_data["unpaid_bytes"], unpaid_gb=stats_data["unpaid_gb"]
)
db.session.add(entry)
db.session.commit()
logging.info(f"Logged new stats entry at {entry.timestamp}")
send_webhook_notification(stats_data)
except Exception as e:
logging.error(f"Scheduled job 'log_stats_job' failed: {e}")
# --- HTML Templates ---
LAYOUT_TEMPLATE = """
{{ title }} - UrNetwork Stats
{{ title }}
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
"""
# --- Template Rendering Helper ---
WORLD_MAP_DATA = None
try:
map_url = "https://d2ad6b4ur7yvpq.cloudfront.net/naturalearth-3.3.0/ne_110m_admin_0_countries.geojson"
response = requests.get(map_url, timeout=15)
response.raise_for_status()
WORLD_MAP_DATA = response.json()
logging.info("Successfully loaded world map GeoJSON data for embedding.")
except Exception as e:
logging.error(f"Could not download and cache world map data. Map will be disabled. Error: {e}")
def render_page(template_content, **context):
"""Renders a page by injecting its content into the main layout."""
if context.get('is_public_dashboard'):
context['world_map_data'] = WORLD_MAP_DATA
if session.get('logged_in'):
context['settings'] = {
'ENABLE_ACCOUNT_STATS': get_boolean_setting('ENABLE_ACCOUNT_STATS'),
'ENABLE_DEVICE_STATS': get_boolean_setting('ENABLE_DEVICE_STATS'),
}
content = render_template_string(template_content, **context)
return render_template_string(LAYOUT_TEMPLATE, content=content, **context)
# --- Middleware and Before Request Handlers ---
@app.before_request
def check_installation_and_init():
"""Before each request, check if the app is installed and set up g."""
if request.endpoint not in ['install', 'static'] and not is_installed():
return redirect(url_for('install'))
if is_installed() and request.endpoint not in ['static']:
with app.app_context():
if not Setting.query.first():
logging.info("First run after install: Initializing settings in database.")
settings_to_add = [
Setting(key='ENABLE_ACCOUNT_STATS', value=str(app.config['ENABLE_ACCOUNT_STATS'])),
Setting(key='ENABLE_LEADERBOARD', value=str(app.config['ENABLE_LEADERBOARD'])),
Setting(key='ENABLE_DEVICE_STATS', value=str(app.config['ENABLE_DEVICE_STATS'])),
]
db.session.bulk_save_objects(settings_to_add)
db.session.commit()
g.now = datetime.datetime.utcnow()
@app.context_processor
def inject_layout_vars():
"""Inject variables into all templates for access."""
return {"now": g.now}
# --- Routes ---
@app.route('/install', methods=['GET', 'POST'])
def install():
if is_installed():
flash("Application is already installed.", "info")
return redirect(url_for('public_dashboard'))
if request.method == 'POST':
user = request.form.get('ur_user')
password = request.form.get('ur_pass')
if not user or not password:
flash("Username and Password are required.", "error")
return render_page(INSTALL_TEMPLATE, title="Setup")
jwt = get_jwt_from_credentials(user, password)
if not jwt:
flash("Could not verify credentials with UrNetwork API. Please check them and try again.", "error")
return render_page(INSTALL_TEMPLATE, title="Setup")
config_data = {
"UR_USER": user,
"UR_PASS": password,
"SECRET_KEY": secrets.token_hex(24)
}
if save_env_file(config_data):
with app.app_context():
db.create_all()
session['logged_in'] = True
flash("Installation successful! Your dashboard is now configured.", "info")
return redirect(url_for('private_dashboard'))
else:
flash("A server error occurred while saving the configuration file.", "error")
return render_page(INSTALL_TEMPLATE, title="Setup")
@app.route('/login', methods=['GET', 'POST'])
def login():
if session.get('logged_in'):
return redirect(url_for('private_dashboard'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
if username == os.getenv('UR_USER') and password == os.getenv('UR_PASS'):
session['logged_in'] = True
flash('You have been logged in successfully.', 'info')
next_url = request.args.get('next')
return redirect(next_url or url_for('private_dashboard'))
else:
flash('Invalid username or password.', 'error')
return render_page(LOGIN_TEMPLATE, title="Login")
@app.route('/logout')
def logout():
session.pop('logged_in', None)
flash('You have been logged out.', 'info')
return redirect(url_for('public_dashboard'))
@app.route('/')
def public_dashboard():
"""Display the public-facing dashboard with summary stats, map, and a graph."""
entries = Stats.query.order_by(Stats.timestamp.asc()).all()
latest = entries[-1] if entries else None
jwt = get_valid_jwt()
payments = fetch_payment_stats(jwt) if jwt else []
_, monthly_earnings = calculate_earnings(payments)
chart_data = {
"labels": [e.timestamp.strftime('%m-%d %H:%M') for e in entries],
"data": [e.paid_gb + e.unpaid_gb for e in entries]
}
latest_timestamp = latest.timestamp.isoformat() if latest else None
return render_page(PUBLIC_DASHBOARD_TEMPLATE, title="Public Dashboard", latest=latest, monthly_earnings=monthly_earnings, chart_data=chart_data, is_public_dashboard=True, latest_timestamp=latest_timestamp)
@app.route('/api/locations')
def get_locations():
"""API endpoint to proxy provider locations."""
data = fetch_provider_locations()
if data: return jsonify(data)
return jsonify({"error": "Failed to fetch location data"}), 500
@app.route('/api/transfer_data')
def get_transfer_data():
"""API endpoint for public page polling."""
entries = Stats.query.order_by(Stats.timestamp.asc()).all()
if not entries: return jsonify({"status": "no_data"})
latest_entry = entries[-1]
jwt = get_valid_jwt()
payments = fetch_payment_stats(jwt) if jwt else []
_, monthly_earnings = calculate_earnings(payments)
return jsonify({
"status": "ok",
"latest_timestamp": latest_entry.timestamp.isoformat(),
"public_stats": {
"paid_gb": "%.3f" % latest_entry.paid_gb,
"unpaid_gb": "%.3f" % latest_entry.unpaid_gb,
"last_update": latest_entry.timestamp.strftime('%H:%M')
},
"public_chart_data": {
"labels": [e.timestamp.strftime('%m-%d %H:%M') for e in entries],
"data": [(e.paid_gb + e.unpaid_gb) for e in entries]
},
"monthly_earnings": monthly_earnings
})
# --- Private Dashboard Routes ---
@app.route('/dashboard')
@login_required
def private_dashboard():
"""Display the detailed private dashboard overview."""
entries = Stats.query.order_by(Stats.timestamp.desc()).all()
latest = entries[0] if entries else None
jwt = get_valid_jwt()
payments = fetch_payment_stats(jwt) if jwt else []
total_earnings, _ = calculate_earnings(payments)
rows = []
for i, e in enumerate(entries):
delta_gb = None
if i < len(entries) - 1:
prev_entry = entries[i+1]
delta_gb = (e.paid_gb + e.unpaid_gb) - (prev_entry.paid_gb + prev_entry.unpaid_gb)
rows.append({"e": e, "ts_str": e.timestamp.strftime("%Y-%m-%d %H:%M:%S"), "delta_gb": delta_gb})
rev_entries = list(reversed(entries))
deltas = [r['delta_gb'] for r in reversed(rows) if r['delta_gb'] is not None]
chart_data = {
"labels": [e.timestamp.strftime('%m-%d %H:%M') for e in rev_entries],
"paid_gb": [e.paid_gb for e in rev_entries],
"unpaid_gb": [e.unpaid_gb for e in rev_entries],
"deltas": deltas if deltas else []
}
return render_page(
PRIVATE_DASHBOARD_TEMPLATE, title="Owner Dashboard - Overview",
rows=rows, latest=latest, total_earnings=total_earnings, chart_data=chart_data
)
@app.route('/dashboard/account')
@login_required
def private_account_page():
"""Display the account details and leaderboard page."""
if not get_boolean_setting('ENABLE_ACCOUNT_STATS'):
flash("This feature is currently disabled. You can enable it in the settings.", "warning")
return redirect(url_for('private_dashboard'))
jwt = get_valid_jwt()
account_details = fetch_account_details(jwt) if jwt else {}
leaderboard = fetch_leaderboard(jwt) if jwt else []
return render_page(
ACCOUNT_PAGE_TEMPLATE, title="Account & Leaderboard",
account_details=account_details, leaderboard=leaderboard
)
@app.route('/dashboard/devices')
@login_required
def private_devices_page():
"""Display the device management page."""
if not get_boolean_setting('ENABLE_DEVICE_STATS'):
flash("This feature is currently disabled. You can enable it in the settings.", "warning")
return redirect(url_for('private_dashboard'))
jwt = get_valid_jwt()
devices = fetch_devices(jwt) if jwt else []
provide_mode_map = {-1: "Default", 0: "None", 1: "Network", 2: "Friends & Family", 3: "Public", 4: "Stream"}
for device in devices:
device['provide_mode_str'] = provide_mode_map.get(device.get('provide_mode'), 'Unknown')
return render_page(DEVICES_PAGE_TEMPLATE, title="Device Management", devices=devices)
@app.route('/dashboard/devices/remove/', methods=["POST"])
@login_required
def private_remove_device(client_id):
"""Handle the removal of a device."""
jwt = get_valid_jwt()
success, message = remove_device(jwt, client_id)
if success:
flash(message, "info")
else:
flash(f"Failed to remove device: {message}", "error")
return redirect(url_for('private_devices_page'))
@app.route('/settings', methods=['GET', 'POST'])
@login_required
def settings():
"""Page to manage feature flag settings."""
if request.method == 'POST':
settings_map = {
'ENABLE_ACCOUNT_STATS': 'enable_account_stats',
'ENABLE_DEVICE_STATS': 'enable_device_stats',
}
for key, form_key in settings_map.items():
setting = Setting.query.get(key)
if setting:
setting.value = str(request.form.get(form_key) == 'on')
db.session.commit()
flash("Settings saved successfully.", "info")
return redirect(url_for('settings'))
return render_page(SETTINGS_TEMPLATE, title="Settings")
@app.route("/trigger", methods=["POST"])
@login_required
def trigger_fetch():
"""Manually trigger a stats fetch."""
try:
jwt = get_valid_jwt()
if not jwt: raise RuntimeError("Failed to get a valid API token.")
stats_data = fetch_transfer_stats(jwt)
if not stats_data: raise RuntimeError("Failed to fetch stats from API.")
entry = Stats(
paid_bytes=stats_data["paid_bytes"], paid_gb=stats_data["paid_gb"],
unpaid_bytes=stats_data["unpaid_bytes"], unpaid_gb=stats_data["unpaid_gb"]
)
db.session.add(entry)
db.session.commit()
flash("Successfully fetched and saved latest stats.", "info")
except Exception as e:
logging.error(f"Manual fetch failed: {e}")
flash(f"Failed to fetch stats: {e}", "error")
return redirect(url_for('private_dashboard'))
@app.route("/clear", methods=["POST"])
@login_required
def clear_db():
"""Delete all stats records from the database."""
try:
num_rows_deleted = db.session.query(Stats).delete()
db.session.commit()
flash(f"Successfully cleared {num_rows_deleted} records from the database.", "info")
except Exception as e:
logging.error(f"Failed to clear database: {e}")
flash("An error occurred while clearing the database.", "error")
db.session.rollback()
return redirect(url_for('private_dashboard'))
@app.route("/webhooks/add", methods=["POST"])
@login_required
def add_webhook():
"""Add a new webhook."""
url = request.form.get("webhook_url")
if not url or not (url.startswith("http://") or url.startswith("https://")):
flash("A valid Webhook URL is required.", "error")
elif Webhook.query.filter_by(url=url).first():
flash("This webhook URL is already registered.", "error")
else:
new_webhook = Webhook(url=url)
db.session.add(new_webhook)
db.session.commit()
flash("Webhook added successfully.", "info")
return redirect(url_for('private_dashboard'))
@app.route("/webhooks/delete/", methods=["POST"])
@login_required
def delete_webhook(webhook_id):
"""Delete a webhook."""
webhook = Webhook.query.get(webhook_id)
if webhook:
db.session.delete(webhook)
db.session.commit()
flash("Webhook deleted successfully.", "info")
else:
flash("Webhook not found.", "error")
return redirect(url_for('private_dashboard'))
# --- Main Entry Point ---
if __name__ == "__main__":
with app.app_context():
db.create_all()
scheduler.start()
app.run(host="0.0.0.0", port=90, debug=False)