TDD: 1, Me: 0
This commit is contained in:
132
app/routes.py
Normal file
132
app/routes.py
Normal file
@@ -0,0 +1,132 @@
|
||||
from flask import request, jsonify
|
||||
from redis_config import setup_redis
|
||||
import subprocess
|
||||
from auth import auth # Importing authentication setup
|
||||
from wol_utils import send_magic_packet # Importing the utility function
|
||||
import os
|
||||
|
||||
def setup_routes(limiter, app, redis_connection):
|
||||
|
||||
# Security headers
|
||||
@app.after_request
|
||||
def add_security_headers(response):
|
||||
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
|
||||
response.headers['X-Content-Type-Options'] = 'nosniff'
|
||||
response.headers['X-Frame-Options'] = 'DENY'
|
||||
#response.headers['Content-Security-Policy'] = "default-src 'self'; script-src 'self'; style-src 'self'; img-src 'self';"
|
||||
response.headers['Referrer-Policy'] = 'no-referrer'
|
||||
response.headers['Permissions-Policy'] = 'geolocation=(), microphone=()'
|
||||
return response
|
||||
|
||||
def ping_device(device_ip):
|
||||
try:
|
||||
output = subprocess.run(['ping', '-c', '1', '-w', '1', device_ip], check=True, capture_output=True, text=True)
|
||||
app.logger.info(output.stdout)
|
||||
return True
|
||||
except subprocess.CalledProcessError as e:
|
||||
app.logger.info(f"Device was sleeping: {str(e)}")
|
||||
return False
|
||||
|
||||
def wake_on_lan(device_name, mac, iface):
|
||||
key = request.args.get('key')
|
||||
if not key or key != app.config['SECRET_KEY']:
|
||||
app.logger.warning(f"Unauthorized attempt to access {request.path}")
|
||||
return "Unauthorized\n", 401
|
||||
try:
|
||||
device_ip_var = f"{device_name.upper()}_IP"
|
||||
device_ip = os.getenv(device_ip_var)
|
||||
if device_ip:
|
||||
if ping_device(device_ip):
|
||||
app.logger.info(f"Device {device_name} is already up. No WoL packet sent.")
|
||||
return f"Device {device_name} is already up. No WoL packet sent.\n"
|
||||
|
||||
send_magic_packet(mac_address=mac, iface=iface)
|
||||
app.logger.info(f"WoL packet sent to {device_name}!")
|
||||
return f"WoL packet sent to {device_name}!\n"
|
||||
except Exception as e:
|
||||
app.logger.error(f"Failed to send WoL packet: {str(e)}")
|
||||
return "Failed to send WoL packet\n", 500
|
||||
|
||||
def get_device_mapping():
|
||||
device_mapping = {}
|
||||
for var in os.environ:
|
||||
if var.endswith('_MAC'):
|
||||
device_name = var[:-4].lower() # Get device name from environment variable name
|
||||
iface_var = f'{device_name.upper()}_IFACE'
|
||||
iface = os.getenv(iface_var, 'eth0') # Default to eth0 if interface is not set
|
||||
device_mapping[device_name] = {"mac_var": var, "iface": iface}
|
||||
return device_mapping
|
||||
|
||||
# Dynamic route for WoL
|
||||
@app.route('/wol/<device>')
|
||||
@auth.login_required
|
||||
@limiter.limit("10 per minute")
|
||||
def wol(device):
|
||||
device_mapping = get_device_mapping()
|
||||
name = device.lower()
|
||||
if name in device_mapping:
|
||||
mac = os.getenv(device_mapping[name]["mac_var"])
|
||||
iface = device_mapping[name]["iface"]
|
||||
if mac:
|
||||
#print(device)
|
||||
#print(mac)
|
||||
#print(iface)
|
||||
return wake_on_lan(name, mac, iface)
|
||||
else:
|
||||
app.logger.warning(f"MAC address for {device} is not set in the environment variables")
|
||||
return "MAC address not found\n", 400
|
||||
else:
|
||||
app.logger.warning(f"Invalid device: {device}")
|
||||
return "Invalid device\n", 400
|
||||
|
||||
@app.route('/get_key')
|
||||
@auth.login_required
|
||||
#@limiter.limit("5 per minute")
|
||||
@limiter.exempt
|
||||
def get_key():
|
||||
return jsonify({'key': app.config["SECRET_KEY"]})
|
||||
|
||||
@app.route('/ping/<device>')
|
||||
def ping(device):
|
||||
device_ip_var = f"{device.upper()}_IP"
|
||||
device_ip = os.getenv(device_ip_var)
|
||||
if device_ip:
|
||||
if ping_device(device_ip):
|
||||
return f"Device {device.lower()} is up.\n", 200
|
||||
else:
|
||||
return f"Device {device.lower()} is down or unreachable.\n", 404
|
||||
else:
|
||||
return "Device IP not found in environment variables\n", 400
|
||||
|
||||
@app.route('/redis_status')
|
||||
@auth.login_required
|
||||
def redis_status():
|
||||
try:
|
||||
redis_connection.ping()
|
||||
app.logger.info("Redis status endpoint: Connected to Redis")
|
||||
return "Connected to Redis\n", 200
|
||||
except Exception as e:
|
||||
app.logger.error(f"Failed to connect to Redis: {str(e)}")
|
||||
return f"Failed to connect to Redis: {str(e)}\n", 500
|
||||
|
||||
@app.route('/reset_limiter')
|
||||
@auth.login_required
|
||||
@limiter.exempt
|
||||
def reset_limiter():
|
||||
key = request.args.get('key')
|
||||
if not key or key != app.config['SECRET_KEY']:
|
||||
app.logger.warning("Unauthorized attempt to reset limiter")
|
||||
return "Unauthorized\n", 401
|
||||
# Delete all rate limiter keys
|
||||
try:
|
||||
keys = redis_connection.keys("LIMITER*")
|
||||
if not keys:
|
||||
app.logger.info("No rate limiter keys to delete.")
|
||||
return "No rate limiter keys to delete.\n"
|
||||
for key in keys:
|
||||
redis_connection.delete(key)
|
||||
app.logger.info(f"Deleted {len(keys)} rate limiter keys.")
|
||||
return f"Deleted {len(keys)} rate limiter keys.\n"
|
||||
except Exception as e:
|
||||
app.logger.error(f"Failed to reset limiter: {str(e)}")
|
||||
return f"Failed to reset limiter: {str(e)}\n", 500
|
||||
Reference in New Issue
Block a user