Files
mail-autoconfig/app.py

372 lines
13 KiB
Python

from flask import Flask, request, Response, jsonify
import jinja2
import re
import time
from functools import wraps
from collections import defaultdict, deque
from werkzeug.middleware.proxy_fix import ProxyFix
app = Flask(__name__)
# Never run in debug in production
app.config['DEBUG'] = False
app.config['TESTING'] = False
# Trust one proxy (e.g. Traefik); ensure proxy overwrites X-Forwarded-For with real client IP
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1)
# Reject request bodies larger than this (bytes); protects when Content-Length is missing/wrong
app.config['MAX_CONTENT_LENGTH'] = 4096
# Only allow safe HTTP methods (block TRACE, CONNECT, etc.)
ALLOWED_HTTP_METHODS = {'GET', 'POST', 'HEAD', 'OPTIONS'}
env = jinja2.Environment(
loader=jinja2.FileSystemLoader('templates'),
# Escape all template output (XML); .j2 extension may not match select_autoescape(['xml'])
autoescape=True,
)
# Security configuration
ALLOWED_HOSTS = {
'autoconfig.mifi.holdings', 'autodiscover.mifi.holdings',
'autoconfig.mifi.com.br', 'autodiscover.mifi.com.br',
'autoconfig.mifi.dev', 'autodiscover.mifi.dev',
'autoconfig.mifi.ventures', 'autodiscover.mifi.ventures',
'autoconfig.mifi.vix.br', 'autodiscover.mifi.vix.br',
'autoconfig.mifi.me', 'autodiscover.mifi.me',
'autoconfig.blackice.vix.br', 'autodiscover.blackice.vix.br',
'autoconfig.fitz.guru', 'autodiscover.fitz.guru',
'autoconfig.umlautpress.com', 'autodiscover.umlautpress.com',
'autoconfig.camilla-rena.com', 'autodiscover.camilla-rena.com',
'autoconfig.officelift.net', 'autodiscover.officelift.net',
'autoconfig.mylocalpro.biz', 'autodiscover.mylocalpro.biz',
'autoconfig.mylocalpro.online', 'autodiscover.mylocalpro.online',
'autoconfig.happybeardedcarpenter.com', 'autodiscover.happybeardedcarpenter.com',
'autoconfig.thenewenglandpalletguy.com', 'autodiscover.thenewenglandpalletguy.com',
'autoconfig.dining-it.com', 'autodiscover.dining-it.com'
}
# Rate limiting storage (in production, use Redis or similar)
# Simple in-memory rate limiting - use Redis in production
rate_limit_storage = defaultdict(deque)
RATE_LIMIT_REQUESTS = 100 # requests per window
RATE_LIMIT_WINDOW = 3600 # 1 hour window
def _prune_stale_rate_limits():
"""Remove IPs with no requests in the current window to bound memory (M4)."""
if len(rate_limit_storage) <= 1000:
return
current_time = time.time()
stale = [
ip for ip, timestamps in rate_limit_storage.items()
if not timestamps or timestamps[-1] < current_time - RATE_LIMIT_WINDOW
]
for ip in stale:
del rate_limit_storage[ip]
def rate_limit(max_requests=RATE_LIMIT_REQUESTS, window=RATE_LIMIT_WINDOW):
"""Simple rate limiting decorator. Uses request.remote_addr (set by ProxyFix from trusted proxy)."""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
client_ip = request.remote_addr or 'unknown'
current_time = time.time()
_prune_stale_rate_limits()
# Clean old requests outside the window
while (rate_limit_storage[client_ip] and
rate_limit_storage[client_ip][0] < current_time - window):
rate_limit_storage[client_ip].popleft()
# Check if limit exceeded
if len(rate_limit_storage[client_ip]) >= max_requests:
return jsonify({'error': 'Rate limit exceeded'}), 429
rate_limit_storage[client_ip].append(current_time)
return f(*args, **kwargs)
return decorated_function
return decorator
def validate_host(f):
"""Decorator to validate Host header"""
@wraps(f)
def decorated_function(*args, **kwargs):
host = request.headers.get('Host', '').lower()
# Remove port if present
if ':' in host:
host = host.split(':')[0]
if host not in ALLOWED_HOSTS:
return jsonify({'error': 'Forbidden host'}), 403
return f(*args, **kwargs)
return decorated_function
def validate_request_size(max_size=1024):
"""Decorator to validate request size. POST without Content-Length is rejected (H2)."""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
content_length = request.content_length
if request.method == 'POST' and content_length is None:
return jsonify({'error': 'Content-Length required for POST'}), 411
if content_length is not None and content_length > max_size:
return jsonify({'error': 'Request too large'}), 413
return f(*args, **kwargs)
return decorated_function
return decorator
def add_security_headers(response):
"""Add security headers to all responses"""
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
response.headers['Content-Security-Policy'] = "default-src 'self'; style-src 'self' 'unsafe-inline'; script-src 'self'"
response.headers['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()'
# HSTS: enforce HTTPS (Traefik terminates TLS)
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains; preload'
return response
def sanitize_domain(domain):
"""Sanitize domain to prevent injection attacks"""
# Only allow alphanumeric, dots, and hyphens
sanitized = re.sub(r'[^a-zA-Z0-9.-]', '', domain)
# Prevent empty or invalid domains
if not sanitized or len(sanitized) > 253:
return None
# Basic domain validation
if not re.match(r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$', sanitized):
return None
return sanitized
# Reject unsafe HTTP methods before any route logic
@app.before_request
def reject_unsafe_methods():
if request.method not in ALLOWED_HTTP_METHODS:
return jsonify({'error': 'Method not allowed'}), 405
# Safe error handlers: no stack traces or internal details
@app.errorhandler(404)
def not_found(e):
return jsonify({'error': 'Not found'}), 404
@app.errorhandler(500)
def internal_error(e):
return jsonify({'error': 'Internal server error'}), 500
@app.errorhandler(413)
def request_entity_too_large(e):
return jsonify({'error': 'Request too large'}), 413
# Register security headers middleware
@app.after_request
def after_request(response):
return add_security_headers(response)
@app.route('/')
@rate_limit(max_requests=50, window=3600) # 50 requests per hour for main page
@validate_host
@validate_request_size(max_size=512)
def index():
host = request.headers.get('Host', '').lower()
if ':' in host:
host = host.split(':')[0]
subdomain = host.split('.', 1)[0] if '.' in host else ''
domain = host.split('.', 1)[1] if '.' in host else host
# Sanitize domain to prevent injection
sanitized_domain = sanitize_domain(domain)
if not sanitized_domain:
return jsonify({'error': 'Invalid domain'}), 400
base_html = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title}</title>
<style>
* {{ margin: 0; padding: 0; box-sizing: border-box; }}
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
display: flex;
align-items: center;
justify-content: center;
color: #333;
}}
.container {{
background: white;
padding: 2rem;
border-radius: 12px;
box-shadow: 0 10px 30px rgba(0,0,0,0.1);
max-width: 500px;
width: 90%;
text-align: center;
}}
h1 {{
color: #2c3e50;
margin-bottom: 1rem;
font-size: 1.8rem;
}}
.service-type {{
background: #3498db;
color: white;
padding: 0.5rem 1rem;
border-radius: 20px;
display: inline-block;
margin-bottom: 1rem;
font-size: 0.9rem;
font-weight: 500;
}}
.domain {{
background: #f8f9fa;
padding: 0.5rem;
border-radius: 6px;
margin: 1rem 0;
font-family: 'Monaco', 'Courier New', monospace;
color: #495057;
}}
.endpoint {{
background: #e8f5e8;
border: 1px solid #28a745;
border-radius: 6px;
padding: 1rem;
margin: 1rem 0;
}}
.endpoint-label {{
font-weight: 600;
color: #28a745;
margin-bottom: 0.5rem;
}}
.endpoint a {{
color: #007bff;
text-decoration: none;
font-family: 'Monaco', 'Courier New', monospace;
word-break: break-all;
}}
.endpoint a:hover {{
text-decoration: underline;
}}
.status-link {{
display: inline-block;
margin-top: 1rem;
padding: 0.5rem 1rem;
background: #6c757d;
color: white;
text-decoration: none;
border-radius: 6px;
transition: background 0.3s;
}}
.status-link:hover {{
background: #545b62;
}}
.icon {{
font-size: 2rem;
margin-bottom: 1rem;
}}
</style>
</head>
<body>
<div class="container">
<div class="icon">{icon}</div>
<span class="service-type">{service_type}</span>
<h1>{title}</h1>
<div class="domain">Domain: <strong>{domain}</strong></div>
{content}
<a href="/ping" class="status-link">📊 Service Status</a>
</div>
</body>
</html>
"""
if subdomain == 'autoconfig':
return base_html.format(
title="Mail Autoconfig Service",
service_type="Mozilla Thunderbird",
icon="🔧",
domain=sanitized_domain,
content=f'''
<div class="endpoint">
<div class="endpoint-label">Thunderbird Autoconfig Endpoint:</div>
<a href="/mail/config-v1.1.xml">https://{host}/mail/config-v1.1.xml</a>
</div>
'''
)
elif subdomain == 'autodiscover':
return base_html.format(
title="Mail Autodiscover Service",
service_type="Microsoft Outlook",
icon="🔍",
domain=sanitized_domain,
content=f'''
<div class="endpoint">
<div class="endpoint-label">Outlook Autodiscover Endpoint:</div>
<a href="/Autodiscover/Autodiscover.xml">https://{host}/Autodiscover/Autodiscover.xml</a>
</div>
'''
)
else:
return base_html.format(
title="Mail Configuration Service",
service_type="Error",
icon="",
domain="Invalid",
content='''
<div style="color: #dc3545; padding: 1rem;">
<strong>Invalid subdomain!</strong><br>
Please use <code>autoconfig.domain.com</code> or <code>autodiscover.domain.com</code>
</div>
'''
), 400
@app.route('/ping')
@rate_limit(max_requests=10, window=60) # 10 requests per minute for health check
@validate_host
def ping():
return "✅ Mail Autoconfig Service is running."
@app.route('/mail/config-v1.1.xml')
@rate_limit(max_requests=20, window=3600) # 20 requests per hour for config
@validate_host
@validate_request_size(max_size=1024)
def thunderbird_config():
host = request.headers.get('Host', '').lower()
if ':' in host:
host = host.split(':')[0]
domain = host.split('.', 1)[1] if '.' in host else host
sanitized_domain = sanitize_domain(domain)
if not sanitized_domain:
return jsonify({'error': 'Invalid domain'}), 400
xml = env.get_template('config-v1.1.xml.j2').render(DOMAIN=sanitized_domain)
return Response(xml, mimetype='application/xml')
@app.route('/Autodiscover/Autodiscover.xml', methods=['POST','GET'])
@rate_limit(max_requests=20, window=3600) # 20 requests per hour for autodiscover
@validate_host
@validate_request_size(max_size=2048)
def outlook_autodiscover():
host = request.headers.get('Host', '').lower()
if ':' in host:
host = host.split(':')[0]
domain = host.split('.', 1)[1] if '.' in host else host
sanitized_domain = sanitize_domain(domain)
if not sanitized_domain:
return jsonify({'error': 'Invalid domain'}), 400
xml = env.get_template('Autodiscover.xml.j2').render(DOMAIN=sanitized_domain)
return Response(xml, mimetype='text/xml')