Coverage for postrfp/shared/fetch/userq.py: 98%
40 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1import logging
3from sqlalchemy import (
4 and_,
5)
6from sqlalchemy.orm import (
7 Session,
8 Query,
9)
10from sqlalchemy.orm.session import object_session
11from sqlalchemy.orm.exc import NoResultFound
13from postrfp.model import (
14 Participant,
15 IssueWatchList,
16 Issue,
17 Project,
18 ProjectWatchList,
19 User,
20 Organisation,
21 Edge,
22 RelationshipType,
23)
24from postrfp.shared.password import (
25 validate_hash,
26 dummy_argon2_hash,
27)
29log = logging.getLogger(__name__)
32def user(session: Session, user_id: str) -> User:
33 """
34 Fetch a User by ID
35 @raises NoResultFound
36 """
37 return session.get_one(User, user_id)
40def organisation(session: Session, org_id: str) -> Organisation:
41 """
42 Fetch an Organisation by ID,
44 Raises
45 ------
46 NoResultFound if no Organisation found for the given org_id
47 """
48 return session.query(Organisation).filter(Organisation.id == org_id).one()
51def user_by_password(session: Session, user_id: str, password: str) -> User:
52 """
53 Fetch a User by ID and validate the given password.
54 Raises NoResultFound if the password is empty or invalid.
55 """
56 unauth_user = user(session, user_id)
57 if not unauth_user.password:
58 log.error("User %s has no password, refusing to authenticate", user_id)
59 raise NoResultFound("No matching user found")
60 # validate_hash now expects the full Argon2 string
61 if not validate_hash(password, unauth_user.password):
62 raise NoResultFound("No matching user found")
63 return unauth_user
66def dummy_hash(session: Session):
67 """
68 Perform a dummy password hash operation
69 to mitigate a timing attack.
70 """
71 # Perform a dummy Argon2 hash directly
72 dummy_argon2_hash()
73 # Optionally, still perform a quick DB query if desired, but the primary
74 # goal is to equalize the hashing time.
75 # session.query(User.id).first() # Example: minimal DB interaction
78def project_users(user: User, project_id: int, restricted_users_only: bool) -> Query:
79 session = object_session(user)
80 assert session is not None
81 uq = session.query(User).join(Organisation).order_by(Organisation.id, User.id)
83 if user.organisation.is_consultant:
84 uq = uq.join(Participant).filter(Participant.project_id == project_id)
85 else:
86 uq = uq.filter(User.organisation == user.organisation)
88 if restricted_users_only:
89 uq = uq.filter(User.type == "restricted")
91 return uq
94def issue_watchers(session: Session, issue: Issue) -> Query:
95 cols = (
96 User.id.label("user_id"),
97 User.email,
98 User.fullname,
99 IssueWatchList.date_created.label("watching_since"),
100 )
101 return (
102 session.query(*cols)
103 .outerjoin(
104 IssueWatchList,
105 and_(
106 IssueWatchList.user_id == User.id, IssueWatchList.issue_id == issue.id
107 ),
108 )
109 .filter(User.organisation == issue.respondent)
110 )
113def project_watchers(session: Session, project: Project) -> Query:
114 cols = (
115 User.id.label("user_id"),
116 User.email,
117 User.fullname,
118 ProjectWatchList.date_created.label("watching_since"),
119 )
120 return (
121 session.query(*cols)
122 .join(ProjectWatchList)
123 .filter(ProjectWatchList.project_id == project.id)
124 )
127def edges_for_org_query(session: Session, org_id: str) -> Query:
128 return (
129 session.query(Edge)
130 .join(RelationshipType)
131 .join(Organisation)
132 .filter(RelationshipType.org_id == org_id)
133 )