from flask import Flask, request, Response, jsonify import jinja2 import re import time from functools import wraps from collections import defaultdict, deque from werkzeug.middleware.proxy_fix import ProxyFix app = Flask(__name__) # Never run in debug in production app.config['DEBUG'] = False app.config['TESTING'] = False # Trust one proxy (e.g. Traefik); ensure proxy overwrites X-Forwarded-For with real client IP app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1) # Reject request bodies larger than this (bytes); protects when Content-Length is missing/wrong app.config['MAX_CONTENT_LENGTH'] = 4096 # Only allow safe HTTP methods (block TRACE, CONNECT, etc.) ALLOWED_HTTP_METHODS = {'GET', 'POST', 'HEAD', 'OPTIONS'} env = jinja2.Environment( loader=jinja2.FileSystemLoader('templates'), # Escape all template output (XML); .j2 extension may not match select_autoescape(['xml']) autoescape=True, ) # Security configuration ALLOWED_HOSTS = { 'autoconfig.mifi.holdings', 'autodiscover.mifi.holdings', 'autoconfig.mifi.com.br', 'autodiscover.mifi.com.br', 'autoconfig.mifi.dev', 'autodiscover.mifi.dev', 'autoconfig.mifi.ventures', 'autodiscover.mifi.ventures', 'autoconfig.mifi.vix.br', 'autodiscover.mifi.vix.br', 'autoconfig.mifi.me', 'autodiscover.mifi.me', 'autoconfig.blackice.vix.br', 'autodiscover.blackice.vix.br', 'autoconfig.fitz.guru', 'autodiscover.fitz.guru', 'autoconfig.umlautpress.com', 'autodiscover.umlautpress.com', 'autoconfig.camilla-rena.com', 'autodiscover.camilla-rena.com', 'autoconfig.officelift.net', 'autodiscover.officelift.net', 'autoconfig.mylocalpro.biz', 'autodiscover.mylocalpro.biz', 'autoconfig.mylocalpro.online', 'autodiscover.mylocalpro.online', 'autoconfig.happybeardedcarpenter.com', 'autodiscover.happybeardedcarpenter.com', 'autoconfig.thenewenglandpalletguy.com', 'autodiscover.thenewenglandpalletguy.com', 'autoconfig.dining-it.com', 'autodiscover.dining-it.com' } # Rate limiting storage (in production, use Redis or similar) # Simple in-memory rate limiting - use Redis in production rate_limit_storage = defaultdict(deque) RATE_LIMIT_REQUESTS = 100 # requests per window RATE_LIMIT_WINDOW = 3600 # 1 hour window def _prune_stale_rate_limits(): """Remove IPs with no requests in the current window to bound memory (M4).""" if len(rate_limit_storage) <= 1000: return current_time = time.time() stale = [ ip for ip, timestamps in rate_limit_storage.items() if not timestamps or timestamps[-1] < current_time - RATE_LIMIT_WINDOW ] for ip in stale: del rate_limit_storage[ip] def rate_limit(max_requests=RATE_LIMIT_REQUESTS, window=RATE_LIMIT_WINDOW): """Simple rate limiting decorator. Uses request.remote_addr (set by ProxyFix from trusted proxy).""" def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): client_ip = request.remote_addr or 'unknown' current_time = time.time() _prune_stale_rate_limits() # Clean old requests outside the window while (rate_limit_storage[client_ip] and rate_limit_storage[client_ip][0] < current_time - window): rate_limit_storage[client_ip].popleft() # Check if limit exceeded if len(rate_limit_storage[client_ip]) >= max_requests: return jsonify({'error': 'Rate limit exceeded'}), 429 rate_limit_storage[client_ip].append(current_time) return f(*args, **kwargs) return decorated_function return decorator def validate_host(f): """Decorator to validate Host header""" @wraps(f) def decorated_function(*args, **kwargs): host = request.headers.get('Host', '').lower() # Remove port if present if ':' in host: host = host.split(':')[0] if host not in ALLOWED_HOSTS: return jsonify({'error': 'Forbidden host'}), 403 return f(*args, **kwargs) return decorated_function def validate_request_size(max_size=1024): """Decorator to validate request size. POST without Content-Length is rejected (H2).""" def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): content_length = request.content_length if request.method == 'POST' and content_length is None: return jsonify({'error': 'Content-Length required for POST'}), 411 if content_length is not None and content_length > max_size: return jsonify({'error': 'Request too large'}), 413 return f(*args, **kwargs) return decorated_function return decorator def add_security_headers(response): """Add security headers to all responses""" response.headers['X-Content-Type-Options'] = 'nosniff' response.headers['X-Frame-Options'] = 'DENY' response.headers['X-XSS-Protection'] = '1; mode=block' response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' response.headers['Content-Security-Policy'] = "default-src 'self'; style-src 'self' 'unsafe-inline'; script-src 'self'" response.headers['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()' # HSTS: enforce HTTPS (Traefik terminates TLS) response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains; preload' return response def sanitize_domain(domain): """Sanitize domain to prevent injection attacks""" # Only allow alphanumeric, dots, and hyphens sanitized = re.sub(r'[^a-zA-Z0-9.-]', '', domain) # Prevent empty or invalid domains if not sanitized or len(sanitized) > 253: return None # Basic domain validation if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$', sanitized): return None return sanitized # Reject unsafe HTTP methods before any route logic @app.before_request def reject_unsafe_methods(): if request.method not in ALLOWED_HTTP_METHODS: return jsonify({'error': 'Method not allowed'}), 405 # Safe error handlers: no stack traces or internal details @app.errorhandler(404) def not_found(e): return jsonify({'error': 'Not found'}), 404 @app.errorhandler(500) def internal_error(e): return jsonify({'error': 'Internal server error'}), 500 @app.errorhandler(413) def request_entity_too_large(e): return jsonify({'error': 'Request too large'}), 413 # Register security headers middleware @app.after_request def after_request(response): return add_security_headers(response) @app.route('/') @rate_limit(max_requests=50, window=3600) # 50 requests per hour for main page @validate_host @validate_request_size(max_size=512) def index(): host = request.headers.get('Host', '').lower() if ':' in host: host = host.split(':')[0] subdomain = host.split('.', 1)[0] if '.' in host else '' domain = host.split('.', 1)[1] if '.' in host else host # Sanitize domain to prevent injection sanitized_domain = sanitize_domain(domain) if not sanitized_domain: return jsonify({'error': 'Invalid domain'}), 400 base_html = """
autoconfig.domain.com or autodiscover.domain.com