Files
stundenfuchs/tests/test_security_regressions.py
maddin 6fbd1bb3c2
CI / checks (push) Has been cancelled
chore: initialize public repository
2026-03-22 12:57:09 +00:00

308 lines
11 KiB
Python

import re
from fastapi.testclient import TestClient
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.config import Settings
from app.database import get_engine
from app.main import create_app
from app.models import EmailServerConfig, User
def _build_settings(db_url: str, **overrides) -> Settings:
values = {
"APP_ENV": "test",
"DB_URL": db_url,
"SESSION_SECRET": "test-secret",
"COOKIE_SECURE": False,
"COOKIE_SAMESITE": "lax",
"LOGIN_RATE_LIMIT_ATTEMPTS": 5,
"LOGIN_RATE_LIMIT_WINDOW_MINUTES": 15,
}
values.update(overrides)
return Settings(**values)
def _extract_csrf(html: str) -> str:
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
assert match is not None
return match.group(1)
def _find_latest_link(sent_mails: list[dict[str, str]], path_fragment: str) -> str:
pattern = re.compile(rf"https?://[^\s]+{re.escape(path_fragment)}[^\s]*")
for mail in reversed(sent_mails):
match = pattern.search(mail["body"])
if match is not None:
return match.group(0)
raise AssertionError(f"no mail with link containing {path_fragment!r}")
def _insert_mail_config() -> None:
with Session(get_engine()) as db:
db.add(
EmailServerConfig(
smtp_host="smtp.test.local",
smtp_port=587,
from_email="noreply@test.local",
from_name="Stundenfuchs",
use_starttls=True,
use_ssl=False,
verify_tls=False,
registration_mails_enabled=False,
password_reset_mails_enabled=True,
)
)
db.commit()
def test_login_rate_limit_ignores_spoofed_x_forwarded_for(app):
email = "rate-limit@example.com"
password = "strongpasswordRate1"
with TestClient(app) as register_client:
register = register_client.post(
"/auth/register",
json={"email": email, "password": password},
)
assert register.status_code == 200
with TestClient(app) as attacker:
for idx in range(5):
response = attacker.post(
"/auth/login",
headers={"x-forwarded-for": f"198.51.100.{idx}"},
json={"email": email, "password": "wrong-password"},
)
assert response.status_code == 401
blocked = attacker.post(
"/auth/login",
headers={"x-forwarded-for": "203.0.113.77"},
json={"email": email, "password": "wrong-password"},
)
assert blocked.status_code == 429
def test_password_reset_new_request_invalidates_previous_token(tmp_path, monkeypatch):
db_path = tmp_path / "reset-rotation.db"
app = create_app(_build_settings(f"sqlite:///{db_path}"))
sent_mails: list[dict[str, str]] = []
def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None:
sent_mails.append({"to": to_email, "subject": subject, "body": text_body})
monkeypatch.setattr("app.main.send_email", fake_send_email)
_insert_mail_config()
with TestClient(app) as auth_client:
register = auth_client.post(
"/auth/register",
json={"email": "reset-user@example.com", "password": "strongpasswordReset1"},
)
assert register.status_code == 200
with TestClient(app) as reset_client:
request_page = reset_client.get("/password-reset/request")
request_csrf = _extract_csrf(request_page.text)
first_request = reset_client.post(
"/password-reset/request",
data={"email": "reset-user@example.com", "csrf_token": request_csrf},
)
assert first_request.status_code == 200
first_link = _find_latest_link(sent_mails, "/password-reset/confirm?token=")
request_page_again = reset_client.get("/password-reset/request")
request_csrf_again = _extract_csrf(request_page_again.text)
second_request = reset_client.post(
"/password-reset/request",
data={"email": "reset-user@example.com", "csrf_token": request_csrf_again},
)
assert second_request.status_code == 200
second_link = _find_latest_link(sent_mails, "/password-reset/confirm?token=")
assert first_link != second_link
first_token_page = reset_client.get(first_link)
assert first_token_page.status_code == 400
second_token_page = reset_client.get(second_link)
assert second_token_page.status_code == 200
assert 'name="token"' in second_token_page.text
def test_password_change_invalidates_existing_reset_tokens(tmp_path, monkeypatch):
db_path = tmp_path / "reset-password-change.db"
app = create_app(_build_settings(f"sqlite:///{db_path}"))
sent_mails: list[dict[str, str]] = []
def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None:
sent_mails.append({"to": to_email, "subject": subject, "body": text_body})
monkeypatch.setattr("app.main.send_email", fake_send_email)
_insert_mail_config()
password = "strongpasswordReset2"
with TestClient(app) as user_client, TestClient(app) as reset_client:
register = user_client.post(
"/auth/register",
json={"email": "password-change@example.com", "password": password},
)
assert register.status_code == 200
csrf_token = register.json()["csrf_token"]
request_page = reset_client.get("/password-reset/request")
request_csrf = _extract_csrf(request_page.text)
reset_request = reset_client.post(
"/password-reset/request",
data={"email": "password-change@example.com", "csrf_token": request_csrf},
)
assert reset_request.status_code == 200
reset_link = _find_latest_link(sent_mails, "/password-reset/confirm?token=")
change_password = user_client.post(
"/settings/password",
data={
"current_password": password,
"new_password": "strongpasswordReset3",
"new_password_repeat": "strongpasswordReset3",
"csrf_token": csrf_token,
},
follow_redirects=False,
)
assert change_password.status_code == 303
assert change_password.headers["location"] == "/settings?msg=password_updated"
expired_reset = reset_client.get(reset_link)
assert expired_reset.status_code == 400
def test_email_change_requires_reverification_and_clears_session(tmp_path, monkeypatch):
db_path = tmp_path / "email-change.db"
app = create_app(
_build_settings(
f"sqlite:///{db_path}",
EMAIL_VERIFICATION_REQUIRED=True,
)
)
sent_mails: list[dict[str, str]] = []
def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None:
sent_mails.append({"to": to_email, "subject": subject, "body": text_body})
monkeypatch.setattr("app.main.send_email", fake_send_email)
_insert_mail_config()
password = "strongpasswordVerify2"
with TestClient(app) as client:
register_page = client.get("/register")
register_csrf = _extract_csrf(register_page.text)
register_submit = client.post(
"/register",
data={
"email": "verified-before-change@example.com",
"password": password,
"csrf_token": register_csrf,
},
follow_redirects=False,
)
assert register_submit.status_code == 303
assert register_submit.headers["location"] == "/login?msg=email_verification_sent"
verify_link = _find_latest_link(sent_mails, "/verify-email?token=")
verify_response = client.get(verify_link, follow_redirects=False)
assert verify_response.status_code == 303
login_page = client.get("/login")
login_csrf = _extract_csrf(login_page.text)
login_submit = client.post(
"/login",
data={
"email": "verified-before-change@example.com",
"password": password,
"csrf_token": login_csrf,
},
follow_redirects=False,
)
assert login_submit.status_code == 303
settings_page = client.get("/settings")
settings_csrf = _extract_csrf(settings_page.text)
profile_update = client.post(
"/settings/profile",
data={
"email": "changed-address@example.com",
"current_password": password,
"csrf_token": settings_csrf,
},
follow_redirects=False,
)
assert profile_update.status_code == 303
assert profile_update.headers["location"] == "/login?msg=email_verification_sent"
me_after_change = client.get("/me")
assert me_after_change.status_code == 401
login_page_after_change = client.get("/login")
login_csrf_after_change = _extract_csrf(login_page_after_change.text)
blocked_login = client.post(
"/login",
data={
"email": "changed-address@example.com",
"password": password,
"csrf_token": login_csrf_after_change,
},
follow_redirects=False,
)
assert blocked_login.status_code == 403
assert sent_mails[-1]["to"] == "changed-address@example.com"
with Session(get_engine()) as db:
updated_user = db.execute(select(User).where(User.email == "changed-address@example.com")).scalar_one()
assert updated_user.email_verified is False
def test_api_mfa_resend_respects_cooldown(tmp_path, monkeypatch):
db_path = tmp_path / "mfa-resend.db"
app = create_app(_build_settings(f"sqlite:///{db_path}"))
sent_mails: list[dict[str, str]] = []
def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None:
sent_mails.append({"to": to_email, "subject": subject, "body": text_body})
monkeypatch.setattr("app.main.send_email", fake_send_email)
_insert_mail_config()
password = "strongpasswordMfa1"
with TestClient(app) as client:
register = client.post(
"/auth/register",
json={"email": "mfa-resend@example.com", "password": password},
)
assert register.status_code == 200
csrf_token = register.json()["csrf_token"]
enable_email_mfa = client.post(
"/settings/mfa",
data={
"csrf_token": csrf_token,
"mfa_method": "email",
"current_password": password,
},
follow_redirects=False,
)
assert enable_email_mfa.status_code == 303
logout = client.post("/auth/logout", headers={"x-csrf-token": csrf_token})
assert logout.status_code == 200
login = client.post("/auth/login", json={"email": "mfa-resend@example.com", "password": password})
assert login.status_code == 200
assert login.json()["mfa_required"] is True
pending_csrf = login.json()["csrf_token"]
resend = client.post("/auth/mfa/resend", headers={"x-csrf-token": pending_csrf})
assert resend.status_code == 429
assert "Bitte kurz warten" in resend.text