import re from fastapi.testclient import TestClient from sqlalchemy import select from sqlalchemy.orm import Session from app.config import Settings from app.database import get_engine from app.main import create_app from app.models import EmailServerConfig, User def _build_settings(db_url: str, **overrides) -> Settings: values = { "APP_ENV": "test", "DB_URL": db_url, "SESSION_SECRET": "test-secret", "COOKIE_SECURE": False, "COOKIE_SAMESITE": "lax", "LOGIN_RATE_LIMIT_ATTEMPTS": 5, "LOGIN_RATE_LIMIT_WINDOW_MINUTES": 15, } values.update(overrides) return Settings(**values) def _extract_csrf(html: str) -> str: match = re.search(r'name="csrf_token" value="([^"]+)"', html) assert match is not None return match.group(1) def _find_latest_link(sent_mails: list[dict[str, str]], path_fragment: str) -> str: pattern = re.compile(rf"https?://[^\s]+{re.escape(path_fragment)}[^\s]*") for mail in reversed(sent_mails): match = pattern.search(mail["body"]) if match is not None: return match.group(0) raise AssertionError(f"no mail with link containing {path_fragment!r}") def _insert_mail_config() -> None: with Session(get_engine()) as db: db.add( EmailServerConfig( smtp_host="smtp.test.local", smtp_port=587, from_email="noreply@test.local", from_name="Stundenfuchs", use_starttls=True, use_ssl=False, verify_tls=False, registration_mails_enabled=False, password_reset_mails_enabled=True, ) ) db.commit() def test_login_rate_limit_ignores_spoofed_x_forwarded_for(app): email = "rate-limit@example.com" password = "strongpasswordRate1" with TestClient(app) as register_client: register = register_client.post( "/auth/register", json={"email": email, "password": password}, ) assert register.status_code == 200 with TestClient(app) as attacker: for idx in range(5): response = attacker.post( "/auth/login", headers={"x-forwarded-for": f"198.51.100.{idx}"}, json={"email": email, "password": "wrong-password"}, ) assert response.status_code == 401 blocked = attacker.post( "/auth/login", headers={"x-forwarded-for": "203.0.113.77"}, json={"email": email, "password": "wrong-password"}, ) assert blocked.status_code == 429 def test_password_reset_new_request_invalidates_previous_token(tmp_path, monkeypatch): db_path = tmp_path / "reset-rotation.db" app = create_app(_build_settings(f"sqlite:///{db_path}")) sent_mails: list[dict[str, str]] = [] def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None: sent_mails.append({"to": to_email, "subject": subject, "body": text_body}) monkeypatch.setattr("app.main.send_email", fake_send_email) _insert_mail_config() with TestClient(app) as auth_client: register = auth_client.post( "/auth/register", json={"email": "reset-user@example.com", "password": "strongpasswordReset1"}, ) assert register.status_code == 200 with TestClient(app) as reset_client: request_page = reset_client.get("/password-reset/request") request_csrf = _extract_csrf(request_page.text) first_request = reset_client.post( "/password-reset/request", data={"email": "reset-user@example.com", "csrf_token": request_csrf}, ) assert first_request.status_code == 200 first_link = _find_latest_link(sent_mails, "/password-reset/confirm?token=") request_page_again = reset_client.get("/password-reset/request") request_csrf_again = _extract_csrf(request_page_again.text) second_request = reset_client.post( "/password-reset/request", data={"email": "reset-user@example.com", "csrf_token": request_csrf_again}, ) assert second_request.status_code == 200 second_link = _find_latest_link(sent_mails, "/password-reset/confirm?token=") assert first_link != second_link first_token_page = reset_client.get(first_link) assert first_token_page.status_code == 400 second_token_page = reset_client.get(second_link) assert second_token_page.status_code == 200 assert 'name="token"' in second_token_page.text def test_password_change_invalidates_existing_reset_tokens(tmp_path, monkeypatch): db_path = tmp_path / "reset-password-change.db" app = create_app(_build_settings(f"sqlite:///{db_path}")) sent_mails: list[dict[str, str]] = [] def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None: sent_mails.append({"to": to_email, "subject": subject, "body": text_body}) monkeypatch.setattr("app.main.send_email", fake_send_email) _insert_mail_config() password = "strongpasswordReset2" with TestClient(app) as user_client, TestClient(app) as reset_client: register = user_client.post( "/auth/register", json={"email": "password-change@example.com", "password": password}, ) assert register.status_code == 200 csrf_token = register.json()["csrf_token"] request_page = reset_client.get("/password-reset/request") request_csrf = _extract_csrf(request_page.text) reset_request = reset_client.post( "/password-reset/request", data={"email": "password-change@example.com", "csrf_token": request_csrf}, ) assert reset_request.status_code == 200 reset_link = _find_latest_link(sent_mails, "/password-reset/confirm?token=") change_password = user_client.post( "/settings/password", data={ "current_password": password, "new_password": "strongpasswordReset3", "new_password_repeat": "strongpasswordReset3", "csrf_token": csrf_token, }, follow_redirects=False, ) assert change_password.status_code == 303 assert change_password.headers["location"] == "/settings?msg=password_updated" expired_reset = reset_client.get(reset_link) assert expired_reset.status_code == 400 def test_email_change_requires_reverification_and_clears_session(tmp_path, monkeypatch): db_path = tmp_path / "email-change.db" app = create_app( _build_settings( f"sqlite:///{db_path}", EMAIL_VERIFICATION_REQUIRED=True, ) ) sent_mails: list[dict[str, str]] = [] def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None: sent_mails.append({"to": to_email, "subject": subject, "body": text_body}) monkeypatch.setattr("app.main.send_email", fake_send_email) _insert_mail_config() password = "strongpasswordVerify2" with TestClient(app) as client: register_page = client.get("/register") register_csrf = _extract_csrf(register_page.text) register_submit = client.post( "/register", data={ "email": "verified-before-change@example.com", "password": password, "csrf_token": register_csrf, }, follow_redirects=False, ) assert register_submit.status_code == 303 assert register_submit.headers["location"] == "/login?msg=email_verification_sent" verify_link = _find_latest_link(sent_mails, "/verify-email?token=") verify_response = client.get(verify_link, follow_redirects=False) assert verify_response.status_code == 303 login_page = client.get("/login") login_csrf = _extract_csrf(login_page.text) login_submit = client.post( "/login", data={ "email": "verified-before-change@example.com", "password": password, "csrf_token": login_csrf, }, follow_redirects=False, ) assert login_submit.status_code == 303 settings_page = client.get("/settings") settings_csrf = _extract_csrf(settings_page.text) profile_update = client.post( "/settings/profile", data={ "email": "changed-address@example.com", "current_password": password, "csrf_token": settings_csrf, }, follow_redirects=False, ) assert profile_update.status_code == 303 assert profile_update.headers["location"] == "/login?msg=email_verification_sent" me_after_change = client.get("/me") assert me_after_change.status_code == 401 login_page_after_change = client.get("/login") login_csrf_after_change = _extract_csrf(login_page_after_change.text) blocked_login = client.post( "/login", data={ "email": "changed-address@example.com", "password": password, "csrf_token": login_csrf_after_change, }, follow_redirects=False, ) assert blocked_login.status_code == 403 assert sent_mails[-1]["to"] == "changed-address@example.com" with Session(get_engine()) as db: updated_user = db.execute(select(User).where(User.email == "changed-address@example.com")).scalar_one() assert updated_user.email_verified is False def test_api_mfa_resend_respects_cooldown(tmp_path, monkeypatch): db_path = tmp_path / "mfa-resend.db" app = create_app(_build_settings(f"sqlite:///{db_path}")) sent_mails: list[dict[str, str]] = [] def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None: sent_mails.append({"to": to_email, "subject": subject, "body": text_body}) monkeypatch.setattr("app.main.send_email", fake_send_email) _insert_mail_config() password = "strongpasswordMfa1" with TestClient(app) as client: register = client.post( "/auth/register", json={"email": "mfa-resend@example.com", "password": password}, ) assert register.status_code == 200 csrf_token = register.json()["csrf_token"] enable_email_mfa = client.post( "/settings/mfa", data={ "csrf_token": csrf_token, "mfa_method": "email", "current_password": password, }, follow_redirects=False, ) assert enable_email_mfa.status_code == 303 logout = client.post("/auth/logout", headers={"x-csrf-token": csrf_token}) assert logout.status_code == 200 login = client.post("/auth/login", json={"email": "mfa-resend@example.com", "password": password}) assert login.status_code == 200 assert login.json()["mfa_required"] is True pending_csrf = login.json()["csrf_token"] resend = client.post("/auth/mfa/resend", headers={"x-csrf-token": pending_csrf}) assert resend.status_code == 429 assert "Bitte kurz warten" in resend.text