Coverage for postrfp/shared/fetch/userq.py: 98%

40 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import logging 

2 

3from sqlalchemy import ( 

4 and_, 

5) 

6from sqlalchemy.orm import ( 

7 Session, 

8 Query, 

9) 

10from sqlalchemy.orm.session import object_session 

11from sqlalchemy.orm.exc import NoResultFound 

12 

13from postrfp.model import ( 

14 Participant, 

15 IssueWatchList, 

16 Issue, 

17 Project, 

18 ProjectWatchList, 

19 User, 

20 Organisation, 

21 Edge, 

22 RelationshipType, 

23) 

24from postrfp.shared.password import ( 

25 validate_hash, 

26 dummy_argon2_hash, 

27) 

28 

29log = logging.getLogger(__name__) 

30 

31 

32def user(session: Session, user_id: str) -> User: 

33 """ 

34 Fetch a User by ID 

35 @raises NoResultFound 

36 """ 

37 return session.get_one(User, user_id) 

38 

39 

40def organisation(session: Session, org_id: str) -> Organisation: 

41 """ 

42 Fetch an Organisation by ID, 

43 

44 Raises 

45 ------ 

46 NoResultFound if no Organisation found for the given org_id 

47 """ 

48 return session.query(Organisation).filter(Organisation.id == org_id).one() 

49 

50 

51def user_by_password(session: Session, user_id: str, password: str) -> User: 

52 """ 

53 Fetch a User by ID and validate the given password. 

54 Raises NoResultFound if the password is empty or invalid. 

55 """ 

56 unauth_user = user(session, user_id) 

57 if not unauth_user.password: 

58 log.error("User %s has no password, refusing to authenticate", user_id) 

59 raise NoResultFound("No matching user found") 

60 # validate_hash now expects the full Argon2 string 

61 if not validate_hash(password, unauth_user.password): 

62 raise NoResultFound("No matching user found") 

63 return unauth_user 

64 

65 

66def dummy_hash(session: Session): 

67 """ 

68 Perform a dummy password hash operation 

69 to mitigate a timing attack. 

70 """ 

71 # Perform a dummy Argon2 hash directly 

72 dummy_argon2_hash() 

73 # Optionally, still perform a quick DB query if desired, but the primary 

74 # goal is to equalize the hashing time. 

75 # session.query(User.id).first() # Example: minimal DB interaction 

76 

77 

78def project_users(user: User, project_id: int, restricted_users_only: bool) -> Query: 

79 session = object_session(user) 

80 assert session is not None 

81 uq = session.query(User).join(Organisation).order_by(Organisation.id, User.id) 

82 

83 if user.organisation.is_consultant: 

84 uq = uq.join(Participant).filter(Participant.project_id == project_id) 

85 else: 

86 uq = uq.filter(User.organisation == user.organisation) 

87 

88 if restricted_users_only: 

89 uq = uq.filter(User.type == "restricted") 

90 

91 return uq 

92 

93 

94def issue_watchers(session: Session, issue: Issue) -> Query: 

95 cols = ( 

96 User.id.label("user_id"), 

97 User.email, 

98 User.fullname, 

99 IssueWatchList.date_created.label("watching_since"), 

100 ) 

101 return ( 

102 session.query(*cols) 

103 .outerjoin( 

104 IssueWatchList, 

105 and_( 

106 IssueWatchList.user_id == User.id, IssueWatchList.issue_id == issue.id 

107 ), 

108 ) 

109 .filter(User.organisation == issue.respondent) 

110 ) 

111 

112 

113def project_watchers(session: Session, project: Project) -> Query: 

114 cols = ( 

115 User.id.label("user_id"), 

116 User.email, 

117 User.fullname, 

118 ProjectWatchList.date_created.label("watching_since"), 

119 ) 

120 return ( 

121 session.query(*cols) 

122 .join(ProjectWatchList) 

123 .filter(ProjectWatchList.project_id == project.id) 

124 ) 

125 

126 

127def edges_for_org_query(session: Session, org_id: str) -> Query: 

128 return ( 

129 session.query(Edge) 

130 .join(RelationshipType) 

131 .join(Organisation) 

132 .filter(RelationshipType.org_id == org_id) 

133 )