133 lines
5.4 KiB
Python
133 lines
5.4 KiB
Python
from flask import request, jsonify
|
|
from redis_config import setup_redis
|
|
import subprocess
|
|
from auth import auth # Importing authentication setup
|
|
from wol_utils import send_magic_packet # Importing the utility function
|
|
import os
|
|
|
|
def setup_routes(limiter, app, redis_connection):
|
|
|
|
# Security headers
|
|
@app.after_request
|
|
def add_security_headers(response):
|
|
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
|
|
response.headers['X-Content-Type-Options'] = 'nosniff'
|
|
response.headers['X-Frame-Options'] = 'DENY'
|
|
#response.headers['Content-Security-Policy'] = "default-src 'self'; script-src 'self'; style-src 'self'; img-src 'self';"
|
|
response.headers['Referrer-Policy'] = 'no-referrer'
|
|
response.headers['Permissions-Policy'] = 'geolocation=(), microphone=()'
|
|
return response
|
|
|
|
def ping_device(device_ip):
|
|
try:
|
|
output = subprocess.run(['ping', '-c', '1', '-w', '1', device_ip], check=True, capture_output=True, text=True)
|
|
app.logger.info(output.stdout)
|
|
return True
|
|
except subprocess.CalledProcessError as e:
|
|
app.logger.info(f"Device was sleeping: {str(e)}")
|
|
return False
|
|
|
|
def wake_on_lan(device_name, mac, iface):
|
|
key = request.args.get('key')
|
|
if not key or key != app.config['SECRET_KEY']:
|
|
app.logger.warning(f"Unauthorized attempt to access {request.path}")
|
|
return "Unauthorized\n", 401
|
|
try:
|
|
device_ip_var = f"{device_name.upper()}_IP"
|
|
device_ip = os.getenv(device_ip_var)
|
|
if device_ip:
|
|
if ping_device(device_ip):
|
|
app.logger.info(f"Device {device_name} is already up. No WoL packet sent.")
|
|
return f"Device {device_name} is already up. No WoL packet sent.\n"
|
|
|
|
send_magic_packet(mac_address=mac, iface=iface)
|
|
app.logger.info(f"WoL packet sent to {device_name}!")
|
|
return f"WoL packet sent to {device_name}!\n"
|
|
except Exception as e:
|
|
app.logger.error(f"Failed to send WoL packet: {str(e)}")
|
|
return "Failed to send WoL packet\n", 500
|
|
|
|
def get_device_mapping():
|
|
device_mapping = {}
|
|
for var in os.environ:
|
|
if var.endswith('_MAC'):
|
|
device_name = var[:-4].lower() # Get device name from environment variable name
|
|
iface_var = f'{device_name.upper()}_IFACE'
|
|
iface = os.getenv(iface_var, 'eth0') # Default to eth0 if interface is not set
|
|
device_mapping[device_name] = {"mac_var": var, "iface": iface}
|
|
return device_mapping
|
|
|
|
# Dynamic route for WoL
|
|
@app.route('/wol/<device>')
|
|
@auth.login_required
|
|
@limiter.limit("10 per minute")
|
|
def wol(device):
|
|
device_mapping = get_device_mapping()
|
|
name = device.lower()
|
|
if name in device_mapping:
|
|
mac = os.getenv(device_mapping[name]["mac_var"])
|
|
iface = device_mapping[name]["iface"]
|
|
if mac:
|
|
#print(device)
|
|
#print(mac)
|
|
#print(iface)
|
|
return wake_on_lan(name, mac, iface)
|
|
else:
|
|
app.logger.warning(f"MAC address for {device} is not set in the environment variables")
|
|
return "MAC address not found\n", 400
|
|
else:
|
|
app.logger.warning(f"Invalid device: {device}")
|
|
return "Invalid device\n", 400
|
|
|
|
@app.route('/get_key')
|
|
@auth.login_required
|
|
#@limiter.limit("5 per minute")
|
|
@limiter.exempt
|
|
def get_key():
|
|
return jsonify({'key': app.config["SECRET_KEY"]})
|
|
|
|
@app.route('/ping/<device>')
|
|
def ping(device):
|
|
device_ip_var = f"{device.upper()}_IP"
|
|
device_ip = os.getenv(device_ip_var)
|
|
if device_ip:
|
|
if ping_device(device_ip):
|
|
return f"Device {device.lower()} is up.\n", 200
|
|
else:
|
|
return f"Device {device.lower()} is down or unreachable.\n", 404
|
|
else:
|
|
return "Device IP not found in environment variables\n", 400
|
|
|
|
@app.route('/redis_status')
|
|
@auth.login_required
|
|
def redis_status():
|
|
try:
|
|
redis_connection.ping()
|
|
app.logger.info("Redis status endpoint: Connected to Redis")
|
|
return "Connected to Redis\n", 200
|
|
except Exception as e:
|
|
app.logger.error(f"Failed to connect to Redis: {str(e)}")
|
|
return f"Failed to connect to Redis: {str(e)}\n", 500
|
|
|
|
@app.route('/reset_limiter')
|
|
@auth.login_required
|
|
@limiter.exempt
|
|
def reset_limiter():
|
|
key = request.args.get('key')
|
|
if not key or key != app.config['SECRET_KEY']:
|
|
app.logger.warning("Unauthorized attempt to reset limiter")
|
|
return "Unauthorized\n", 401
|
|
# Delete all rate limiter keys
|
|
try:
|
|
keys = redis_connection.keys("LIMITER*")
|
|
if not keys:
|
|
app.logger.info("No rate limiter keys to delete.")
|
|
return "No rate limiter keys to delete.\n"
|
|
for key in keys:
|
|
redis_connection.delete(key)
|
|
app.logger.info(f"Deleted {len(keys)} rate limiter keys.")
|
|
return f"Deleted {len(keys)} rate limiter keys.\n"
|
|
except Exception as e:
|
|
app.logger.error(f"Failed to reset limiter: {str(e)}")
|
|
return f"Failed to reset limiter: {str(e)}\n", 500
|