Files
WOL-LY/app/routes.py
2024-12-13 18:08:44 +02:00

133 lines
5.4 KiB
Python

from flask import request, jsonify
from redis_config import setup_redis
import subprocess
from auth import auth # Importing authentication setup
from wol_utils import send_magic_packet # Importing the utility function
import os
def setup_routes(limiter, app, redis_connection):
# Security headers
@app.after_request
def add_security_headers(response):
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
#response.headers['Content-Security-Policy'] = "default-src 'self'; script-src 'self'; style-src 'self'; img-src 'self';"
response.headers['Referrer-Policy'] = 'no-referrer'
response.headers['Permissions-Policy'] = 'geolocation=(), microphone=()'
return response
def ping_device(device_ip):
try:
output = subprocess.run(['ping', '-c', '1', '-w', '1', device_ip], check=True, capture_output=True, text=True)
app.logger.info(output.stdout)
return True
except subprocess.CalledProcessError as e:
app.logger.info(f"Device was sleeping: {str(e)}")
return False
def wake_on_lan(device_name, mac, iface):
key = request.args.get('key')
if not key or key != app.config['SECRET_KEY']:
app.logger.warning(f"Unauthorized attempt to access {request.path}")
return "Unauthorized\n", 401
try:
device_ip_var = f"{device_name.upper()}_IP"
device_ip = os.getenv(device_ip_var)
if device_ip:
if ping_device(device_ip):
app.logger.info(f"Device {device_name} is already up. No WoL packet sent.")
return f"Device {device_name} is already up. No WoL packet sent.\n"
send_magic_packet(mac_address=mac, iface=iface)
app.logger.info(f"WoL packet sent to {device_name}!")
return f"WoL packet sent to {device_name}!\n"
except Exception as e:
app.logger.error(f"Failed to send WoL packet: {str(e)}")
return "Failed to send WoL packet\n", 500
def get_device_mapping():
device_mapping = {}
for var in os.environ:
if var.endswith('_MAC'):
device_name = var[:-4].lower() # Get device name from environment variable name
iface_var = f'{device_name.upper()}_IFACE'
iface = os.getenv(iface_var, 'eth0') # Default to eth0 if interface is not set
device_mapping[device_name] = {"mac_var": var, "iface": iface}
return device_mapping
# Dynamic route for WoL
@app.route('/wol/<device>')
@auth.login_required
@limiter.limit("10 per minute")
def wol(device):
device_mapping = get_device_mapping()
name = device.lower()
if name in device_mapping:
mac = os.getenv(device_mapping[name]["mac_var"])
iface = device_mapping[name]["iface"]
if mac:
#print(device)
#print(mac)
#print(iface)
return wake_on_lan(name, mac, iface)
else:
app.logger.warning(f"MAC address for {device} is not set in the environment variables")
return "MAC address not found\n", 400
else:
app.logger.warning(f"Invalid device: {device}")
return "Invalid device\n", 400
@app.route('/get_key')
@auth.login_required
#@limiter.limit("5 per minute")
@limiter.exempt
def get_key():
return jsonify({'key': app.config["SECRET_KEY"]})
@app.route('/ping/<device>')
def ping(device):
device_ip_var = f"{device.upper()}_IP"
device_ip = os.getenv(device_ip_var)
if device_ip:
if ping_device(device_ip):
return f"Device {device.lower()} is up.\n", 200
else:
return f"Device {device.lower()} is down or unreachable.\n", 404
else:
return "Device IP not found in environment variables\n", 400
@app.route('/redis_status')
@auth.login_required
def redis_status():
try:
redis_connection.ping()
app.logger.info("Redis status endpoint: Connected to Redis")
return "Connected to Redis\n", 200
except Exception as e:
app.logger.error(f"Failed to connect to Redis: {str(e)}")
return f"Failed to connect to Redis: {str(e)}\n", 500
@app.route('/reset_limiter')
@auth.login_required
@limiter.exempt
def reset_limiter():
key = request.args.get('key')
if not key or key != app.config['SECRET_KEY']:
app.logger.warning("Unauthorized attempt to reset limiter")
return "Unauthorized\n", 401
# Delete all rate limiter keys
try:
keys = redis_connection.keys("LIMITER*")
if not keys:
app.logger.info("No rate limiter keys to delete.")
return "No rate limiter keys to delete.\n"
for key in keys:
redis_connection.delete(key)
app.logger.info(f"Deleted {len(keys)} rate limiter keys.")
return f"Deleted {len(keys)} rate limiter keys.\n"
except Exception as e:
app.logger.error(f"Failed to reset limiter: {str(e)}")
return f"Failed to reset limiter: {str(e)}\n", 500