64 lines
1.7 KiB
Python
64 lines
1.7 KiB
Python
from datetime import datetime, timedelta, timezone
|
|
import secrets
|
|
|
|
from passlib.context import CryptContext
|
|
from sqlalchemy import func, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models import LoginAttempt, User
|
|
|
|
|
|
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
def verify_password(password: str, password_hash: str) -> bool:
|
|
return pwd_context.verify(password, password_hash)
|
|
|
|
|
|
def new_csrf_token() -> str:
|
|
return secrets.token_urlsafe(32)
|
|
|
|
|
|
def register_failed_attempt(db: Session, email: str, ip_address: str) -> None:
|
|
db.add(LoginAttempt(email=email.lower(), ip_address=ip_address, success=False))
|
|
db.commit()
|
|
|
|
|
|
def register_successful_attempt(db: Session, email: str, ip_address: str) -> None:
|
|
db.add(LoginAttempt(email=email.lower(), ip_address=ip_address, success=True))
|
|
db.commit()
|
|
|
|
|
|
def is_login_blocked(
|
|
db: Session,
|
|
email: str,
|
|
ip_address: str,
|
|
max_attempts: int,
|
|
window_minutes: int,
|
|
) -> tuple[bool, int]:
|
|
cutoff = datetime.now(timezone.utc) - timedelta(minutes=window_minutes)
|
|
|
|
stmt = (
|
|
select(func.count(LoginAttempt.id))
|
|
.where(
|
|
LoginAttempt.success.is_(False),
|
|
LoginAttempt.created_at >= cutoff,
|
|
(LoginAttempt.email == email.lower()) | (LoginAttempt.ip_address == ip_address),
|
|
)
|
|
)
|
|
count = db.execute(stmt).scalar_one()
|
|
|
|
if count >= max_attempts:
|
|
return True, window_minutes
|
|
|
|
return False, 0
|
|
|
|
|
|
def find_user_by_email(db: Session, email: str) -> User | None:
|
|
stmt = select(User).where(User.email == email.lower())
|
|
return db.execute(stmt).scalar_one_or_none()
|