Harden autoconfig and sanitize input
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2025-09-28 12:42:26 -03:00
parent f643efb220
commit a0f148c3ef
4 changed files with 318 additions and 31 deletions

172
app.py
View File

@@ -1,5 +1,10 @@
from flask import Flask, request, Response
from flask import Flask, request, Response, jsonify
import jinja2
import re
import html
import time
from functools import wraps
from collections import defaultdict, deque
app = Flask(__name__)
env = jinja2.Environment(
@@ -7,10 +12,130 @@ env = jinja2.Environment(
autoescape=jinja2.select_autoescape(['xml'])
)
# Security configuration
ALLOWED_HOSTS = {
'autoconfig.mifi.holdings', 'autodiscover.mifi.holdings',
'autoconfig.mifi.com.br', 'autodiscover.mifi.com.br',
'autoconfig.mifi.dev', 'autodiscover.mifi.dev',
'autoconfig.mifi.ventures', 'autodiscover.mifi.ventures',
'autoconfig.mifi.vix.br', 'autodiscover.mifi.vix.br',
'autoconfig.mifi.me', 'autodiscover.mifi.me',
'autoconfig.blackice.vix.br', 'autodiscover.blackice.vix.br',
'autoconfig.fitz.guru', 'autodiscover.fitz.guru',
'autoconfig.umlautpress.com', 'autodiscover.umlautpress.com',
'autoconfig.camilla-rena.com', 'autodiscover.camilla-rena.com',
'autoconfig.officelift.net', 'autodiscover.officelift.net',
'autoconfig.mylocalpro.biz', 'autodiscover.mylocalpro.biz',
'autoconfig.mylocalpro.online', 'autodiscover.mylocalpro.online',
'autoconfig.happybeardedcarpenter.com', 'autodiscover.happybeardedcarpenter.com',
'autoconfig.thenewenglandpalletguy.com', 'autodiscover.thenewenglandpalletguy.com',
'autoconfig.dining-it.com', 'autodiscover.dining-it.com'
}
# Rate limiting storage (in production, use Redis or similar)
# Simple in-memory rate limiting - use Redis in production
rate_limit_storage = defaultdict(deque)
RATE_LIMIT_REQUESTS = 100 # requests per window
RATE_LIMIT_WINDOW = 3600 # 1 hour window
def rate_limit(max_requests=RATE_LIMIT_REQUESTS, window=RATE_LIMIT_WINDOW):
"""Simple rate limiting decorator"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Get client IP
client_ip = request.headers.get('X-Forwarded-For', request.remote_addr)
if client_ip:
client_ip = client_ip.split(',')[0].strip()
current_time = time.time()
# Clean old requests outside the window
while (rate_limit_storage[client_ip] and
rate_limit_storage[client_ip][0] < current_time - window):
rate_limit_storage[client_ip].popleft()
# Check if limit exceeded
if len(rate_limit_storage[client_ip]) >= max_requests:
return jsonify({'error': 'Rate limit exceeded'}), 429
# Add current request
rate_limit_storage[client_ip].append(current_time)
return f(*args, **kwargs)
return decorated_function
return decorator
def validate_host(f):
"""Decorator to validate Host header"""
@wraps(f)
def decorated_function(*args, **kwargs):
host = request.headers.get('Host', '').lower()
# Remove port if present
if ':' in host:
host = host.split(':')[0]
if host not in ALLOWED_HOSTS:
return jsonify({'error': 'Forbidden host'}), 403
return f(*args, **kwargs)
return decorated_function
def validate_request_size(max_size=1024):
"""Decorator to validate request size"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
content_length = request.content_length
if content_length and content_length > max_size:
return jsonify({'error': 'Request too large'}), 413
return f(*args, **kwargs)
return decorated_function
return decorator
def add_security_headers(response):
"""Add security headers to all responses"""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
response.headers['Content-Security-Policy'] = "default-src 'self'; style-src 'self' 'unsafe-inline'; script-src 'self'"
return response
def sanitize_domain(domain):
"""Sanitize domain to prevent injection attacks"""
# Only allow alphanumeric, dots, and hyphens
sanitized = re.sub(r'[^a-zA-Z0-9.-]', '', domain)
# Prevent empty or invalid domains
if not sanitized or len(sanitized) > 253:
return None
# Basic domain validation
if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$', sanitized):
return None
return sanitized
# Register security headers middleware
@app.after_request
def after_request(response):
return add_security_headers(response)
@app.route('/')
@rate_limit(max_requests=50, window=3600) # 50 requests per hour for main page
@validate_host
@validate_request_size(max_size=512)
def index():
subdomain = request.host.split('.', 1)[0]
domain = request.host.split('.', 1)[1] if '.' in request.host else request.host
host = request.headers.get('Host', '').lower()
if ':' in host:
host = host.split(':')[0]
subdomain = host.split('.', 1)[0] if '.' in host else ''
domain = host.split('.', 1)[1] if '.' in host else host
# Sanitize domain to prevent injection
sanitized_domain = sanitize_domain(domain)
if not sanitized_domain:
return jsonify({'error': 'Invalid domain'}), 400
base_html = """
<!DOCTYPE html>
@@ -120,11 +245,11 @@ def index():
title="Mail Autoconfig Service",
service_type="Mozilla Thunderbird",
icon="🔧",
domain=domain,
domain=sanitized_domain,
content=f'''
<div class="endpoint">
<div class="endpoint-label">Thunderbird Autoconfig Endpoint:</div>
<a href="/mail/config-v1.1.xml">https://{request.host}/mail/config-v1.1.xml</a>
<a href="/mail/config-v1.1.xml">https://{host}/mail/config-v1.1.xml</a>
</div>
'''
)
@@ -133,11 +258,11 @@ def index():
title="Mail Autodiscover Service",
service_type="Microsoft Outlook",
icon="🔍",
domain=domain,
domain=sanitized_domain,
content=f'''
<div class="endpoint">
<div class="endpoint-label">Outlook Autodiscover Endpoint:</div>
<a href="/Autodiscover/Autodiscover.xml">https://{request.host}/Autodiscover/Autodiscover.xml</a>
<a href="/Autodiscover/Autodiscover.xml">https://{host}/Autodiscover/Autodiscover.xml</a>
</div>
'''
)
@@ -156,17 +281,42 @@ def index():
), 400
@app.route('/ping')
@rate_limit(max_requests=10, window=60) # 10 requests per minute for health check
def ping():
return "✅ Mail Autoconfig Service is running."
@app.route('/mail/config-v1.1.xml')
@rate_limit(max_requests=20, window=3600) # 20 requests per hour for config
@validate_host
@validate_request_size(max_size=1024)
def thunderbird_config():
domain = request.host.split('.', 1)[1]
xml = env.get_template('config-v1.1.xml.j2').render(DOMAIN=domain)
host = request.headers.get('Host', '').lower()
if ':' in host:
host = host.split(':')[0]
domain = host.split('.', 1)[1] if '.' in host else host
sanitized_domain = sanitize_domain(domain)
if not sanitized_domain:
return jsonify({'error': 'Invalid domain'}), 400
xml = env.get_template('config-v1.1.xml.j2').render(DOMAIN=sanitized_domain)
return Response(xml, mimetype='application/xml')
@app.route('/Autodiscover/Autodiscover.xml', methods=['POST','GET'])
@rate_limit(max_requests=20, window=3600) # 20 requests per hour for autodiscover
@validate_host
@validate_request_size(max_size=2048)
def outlook_autodiscover():
domain = request.host.split('.', 1)[1]
xml = env.get_template('Autodiscover.xml.j2').render(DOMAIN=domain)
host = request.headers.get('Host', '').lower()
if ':' in host:
host = host.split(':')[0]
domain = host.split('.', 1)[1] if '.' in host else host
sanitized_domain = sanitize_domain(domain)
if not sanitized_domain:
return jsonify({'error': 'Invalid domain'}), 400
xml = env.get_template('Autodiscover.xml.j2').render(DOMAIN=sanitized_domain)
return Response(xml, mimetype='text/xml')