This commit is contained in:
@@ -0,0 +1,307 @@
|
||||
import re
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.config import Settings
|
||||
from app.database import get_engine
|
||||
from app.main import create_app
|
||||
from app.models import EmailServerConfig, User
|
||||
|
||||
|
||||
def _build_settings(db_url: str, **overrides) -> Settings:
|
||||
values = {
|
||||
"APP_ENV": "test",
|
||||
"DB_URL": db_url,
|
||||
"SESSION_SECRET": "test-secret",
|
||||
"COOKIE_SECURE": False,
|
||||
"COOKIE_SAMESITE": "lax",
|
||||
"LOGIN_RATE_LIMIT_ATTEMPTS": 5,
|
||||
"LOGIN_RATE_LIMIT_WINDOW_MINUTES": 15,
|
||||
}
|
||||
values.update(overrides)
|
||||
return Settings(**values)
|
||||
|
||||
|
||||
def _extract_csrf(html: str) -> str:
|
||||
match = re.search(r'name="csrf_token" value="([^"]+)"', html)
|
||||
assert match is not None
|
||||
return match.group(1)
|
||||
|
||||
|
||||
def _find_latest_link(sent_mails: list[dict[str, str]], path_fragment: str) -> str:
|
||||
pattern = re.compile(rf"https?://[^\s]+{re.escape(path_fragment)}[^\s]*")
|
||||
for mail in reversed(sent_mails):
|
||||
match = pattern.search(mail["body"])
|
||||
if match is not None:
|
||||
return match.group(0)
|
||||
raise AssertionError(f"no mail with link containing {path_fragment!r}")
|
||||
|
||||
|
||||
def _insert_mail_config() -> None:
|
||||
with Session(get_engine()) as db:
|
||||
db.add(
|
||||
EmailServerConfig(
|
||||
smtp_host="smtp.test.local",
|
||||
smtp_port=587,
|
||||
from_email="noreply@test.local",
|
||||
from_name="Stundenfuchs",
|
||||
use_starttls=True,
|
||||
use_ssl=False,
|
||||
verify_tls=False,
|
||||
registration_mails_enabled=False,
|
||||
password_reset_mails_enabled=True,
|
||||
)
|
||||
)
|
||||
db.commit()
|
||||
|
||||
|
||||
def test_login_rate_limit_ignores_spoofed_x_forwarded_for(app):
|
||||
email = "rate-limit@example.com"
|
||||
password = "strongpasswordRate1"
|
||||
|
||||
with TestClient(app) as register_client:
|
||||
register = register_client.post(
|
||||
"/auth/register",
|
||||
json={"email": email, "password": password},
|
||||
)
|
||||
assert register.status_code == 200
|
||||
|
||||
with TestClient(app) as attacker:
|
||||
for idx in range(5):
|
||||
response = attacker.post(
|
||||
"/auth/login",
|
||||
headers={"x-forwarded-for": f"198.51.100.{idx}"},
|
||||
json={"email": email, "password": "wrong-password"},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
blocked = attacker.post(
|
||||
"/auth/login",
|
||||
headers={"x-forwarded-for": "203.0.113.77"},
|
||||
json={"email": email, "password": "wrong-password"},
|
||||
)
|
||||
assert blocked.status_code == 429
|
||||
|
||||
|
||||
def test_password_reset_new_request_invalidates_previous_token(tmp_path, monkeypatch):
|
||||
db_path = tmp_path / "reset-rotation.db"
|
||||
app = create_app(_build_settings(f"sqlite:///{db_path}"))
|
||||
sent_mails: list[dict[str, str]] = []
|
||||
|
||||
def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None:
|
||||
sent_mails.append({"to": to_email, "subject": subject, "body": text_body})
|
||||
|
||||
monkeypatch.setattr("app.main.send_email", fake_send_email)
|
||||
_insert_mail_config()
|
||||
|
||||
with TestClient(app) as auth_client:
|
||||
register = auth_client.post(
|
||||
"/auth/register",
|
||||
json={"email": "reset-user@example.com", "password": "strongpasswordReset1"},
|
||||
)
|
||||
assert register.status_code == 200
|
||||
|
||||
with TestClient(app) as reset_client:
|
||||
request_page = reset_client.get("/password-reset/request")
|
||||
request_csrf = _extract_csrf(request_page.text)
|
||||
first_request = reset_client.post(
|
||||
"/password-reset/request",
|
||||
data={"email": "reset-user@example.com", "csrf_token": request_csrf},
|
||||
)
|
||||
assert first_request.status_code == 200
|
||||
first_link = _find_latest_link(sent_mails, "/password-reset/confirm?token=")
|
||||
|
||||
request_page_again = reset_client.get("/password-reset/request")
|
||||
request_csrf_again = _extract_csrf(request_page_again.text)
|
||||
second_request = reset_client.post(
|
||||
"/password-reset/request",
|
||||
data={"email": "reset-user@example.com", "csrf_token": request_csrf_again},
|
||||
)
|
||||
assert second_request.status_code == 200
|
||||
second_link = _find_latest_link(sent_mails, "/password-reset/confirm?token=")
|
||||
|
||||
assert first_link != second_link
|
||||
first_token_page = reset_client.get(first_link)
|
||||
assert first_token_page.status_code == 400
|
||||
|
||||
second_token_page = reset_client.get(second_link)
|
||||
assert second_token_page.status_code == 200
|
||||
assert 'name="token"' in second_token_page.text
|
||||
|
||||
|
||||
def test_password_change_invalidates_existing_reset_tokens(tmp_path, monkeypatch):
|
||||
db_path = tmp_path / "reset-password-change.db"
|
||||
app = create_app(_build_settings(f"sqlite:///{db_path}"))
|
||||
sent_mails: list[dict[str, str]] = []
|
||||
|
||||
def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None:
|
||||
sent_mails.append({"to": to_email, "subject": subject, "body": text_body})
|
||||
|
||||
monkeypatch.setattr("app.main.send_email", fake_send_email)
|
||||
_insert_mail_config()
|
||||
|
||||
password = "strongpasswordReset2"
|
||||
with TestClient(app) as user_client, TestClient(app) as reset_client:
|
||||
register = user_client.post(
|
||||
"/auth/register",
|
||||
json={"email": "password-change@example.com", "password": password},
|
||||
)
|
||||
assert register.status_code == 200
|
||||
csrf_token = register.json()["csrf_token"]
|
||||
|
||||
request_page = reset_client.get("/password-reset/request")
|
||||
request_csrf = _extract_csrf(request_page.text)
|
||||
reset_request = reset_client.post(
|
||||
"/password-reset/request",
|
||||
data={"email": "password-change@example.com", "csrf_token": request_csrf},
|
||||
)
|
||||
assert reset_request.status_code == 200
|
||||
reset_link = _find_latest_link(sent_mails, "/password-reset/confirm?token=")
|
||||
|
||||
change_password = user_client.post(
|
||||
"/settings/password",
|
||||
data={
|
||||
"current_password": password,
|
||||
"new_password": "strongpasswordReset3",
|
||||
"new_password_repeat": "strongpasswordReset3",
|
||||
"csrf_token": csrf_token,
|
||||
},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert change_password.status_code == 303
|
||||
assert change_password.headers["location"] == "/settings?msg=password_updated"
|
||||
|
||||
expired_reset = reset_client.get(reset_link)
|
||||
assert expired_reset.status_code == 400
|
||||
|
||||
|
||||
def test_email_change_requires_reverification_and_clears_session(tmp_path, monkeypatch):
|
||||
db_path = tmp_path / "email-change.db"
|
||||
app = create_app(
|
||||
_build_settings(
|
||||
f"sqlite:///{db_path}",
|
||||
EMAIL_VERIFICATION_REQUIRED=True,
|
||||
)
|
||||
)
|
||||
sent_mails: list[dict[str, str]] = []
|
||||
|
||||
def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None:
|
||||
sent_mails.append({"to": to_email, "subject": subject, "body": text_body})
|
||||
|
||||
monkeypatch.setattr("app.main.send_email", fake_send_email)
|
||||
_insert_mail_config()
|
||||
|
||||
password = "strongpasswordVerify2"
|
||||
with TestClient(app) as client:
|
||||
register_page = client.get("/register")
|
||||
register_csrf = _extract_csrf(register_page.text)
|
||||
register_submit = client.post(
|
||||
"/register",
|
||||
data={
|
||||
"email": "verified-before-change@example.com",
|
||||
"password": password,
|
||||
"csrf_token": register_csrf,
|
||||
},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert register_submit.status_code == 303
|
||||
assert register_submit.headers["location"] == "/login?msg=email_verification_sent"
|
||||
|
||||
verify_link = _find_latest_link(sent_mails, "/verify-email?token=")
|
||||
verify_response = client.get(verify_link, follow_redirects=False)
|
||||
assert verify_response.status_code == 303
|
||||
|
||||
login_page = client.get("/login")
|
||||
login_csrf = _extract_csrf(login_page.text)
|
||||
login_submit = client.post(
|
||||
"/login",
|
||||
data={
|
||||
"email": "verified-before-change@example.com",
|
||||
"password": password,
|
||||
"csrf_token": login_csrf,
|
||||
},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert login_submit.status_code == 303
|
||||
|
||||
settings_page = client.get("/settings")
|
||||
settings_csrf = _extract_csrf(settings_page.text)
|
||||
profile_update = client.post(
|
||||
"/settings/profile",
|
||||
data={
|
||||
"email": "changed-address@example.com",
|
||||
"current_password": password,
|
||||
"csrf_token": settings_csrf,
|
||||
},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert profile_update.status_code == 303
|
||||
assert profile_update.headers["location"] == "/login?msg=email_verification_sent"
|
||||
|
||||
me_after_change = client.get("/me")
|
||||
assert me_after_change.status_code == 401
|
||||
|
||||
login_page_after_change = client.get("/login")
|
||||
login_csrf_after_change = _extract_csrf(login_page_after_change.text)
|
||||
blocked_login = client.post(
|
||||
"/login",
|
||||
data={
|
||||
"email": "changed-address@example.com",
|
||||
"password": password,
|
||||
"csrf_token": login_csrf_after_change,
|
||||
},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert blocked_login.status_code == 403
|
||||
|
||||
assert sent_mails[-1]["to"] == "changed-address@example.com"
|
||||
|
||||
with Session(get_engine()) as db:
|
||||
updated_user = db.execute(select(User).where(User.email == "changed-address@example.com")).scalar_one()
|
||||
assert updated_user.email_verified is False
|
||||
|
||||
|
||||
def test_api_mfa_resend_respects_cooldown(tmp_path, monkeypatch):
|
||||
db_path = tmp_path / "mfa-resend.db"
|
||||
app = create_app(_build_settings(f"sqlite:///{db_path}"))
|
||||
sent_mails: list[dict[str, str]] = []
|
||||
|
||||
def fake_send_email(*, settings, to_email: str, subject: str, text_body: str) -> None:
|
||||
sent_mails.append({"to": to_email, "subject": subject, "body": text_body})
|
||||
|
||||
monkeypatch.setattr("app.main.send_email", fake_send_email)
|
||||
_insert_mail_config()
|
||||
|
||||
password = "strongpasswordMfa1"
|
||||
with TestClient(app) as client:
|
||||
register = client.post(
|
||||
"/auth/register",
|
||||
json={"email": "mfa-resend@example.com", "password": password},
|
||||
)
|
||||
assert register.status_code == 200
|
||||
csrf_token = register.json()["csrf_token"]
|
||||
|
||||
enable_email_mfa = client.post(
|
||||
"/settings/mfa",
|
||||
data={
|
||||
"csrf_token": csrf_token,
|
||||
"mfa_method": "email",
|
||||
"current_password": password,
|
||||
},
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert enable_email_mfa.status_code == 303
|
||||
|
||||
logout = client.post("/auth/logout", headers={"x-csrf-token": csrf_token})
|
||||
assert logout.status_code == 200
|
||||
|
||||
login = client.post("/auth/login", json={"email": "mfa-resend@example.com", "password": password})
|
||||
assert login.status_code == 200
|
||||
assert login.json()["mfa_required"] is True
|
||||
pending_csrf = login.json()["csrf_token"]
|
||||
|
||||
resend = client.post("/auth/mfa/resend", headers={"x-csrf-token": pending_csrf})
|
||||
assert resend.status_code == 429
|
||||
assert "Bitte kurz warten" in resend.text
|
||||
Reference in New Issue
Block a user