Coverage for postrfp/vendor/validation.py: 100%

79 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1from datetime import datetime 

2 

3 

4from postrfp.authorisation import perms, vendor_actions 

5from postrfp.authorisation.errors import AuthorisationFailures 

6from postrfp.shared.exceptions import AuthorizationFailure 

7 

8 

9def validate( 

10 effective_user, 

11 issue=None, 

12 action=perms.ISSUE_VIEW_ANSWERS, 

13 question=None, 

14 section=None, 

15 response_state=None, 

16 answer=None, 

17 bulk_import=False, 

18): 

19 validator = Validator( 

20 effective_user, 

21 issue=issue, 

22 action=action, 

23 question=question, 

24 section=section, 

25 response_state=response_state, 

26 answer=answer, 

27 bulk_import=bulk_import, 

28 ) 

29 validator.validate() 

30 

31 

32class Validator: 

33 def __init__( 

34 self, 

35 user, 

36 issue=None, 

37 action=perms.ISSUE_VIEW_ANSWERS, 

38 question=None, 

39 section=None, 

40 response_state=None, 

41 answer=None, 

42 bulk_import=False, 

43 ): 

44 self.user = user 

45 self.issue = issue 

46 self.action = action 

47 self.question = question 

48 self.section = section 

49 self.response_state = response_state 

50 self.answer = None 

51 self.bulk_import = bulk_import 

52 self.errors = AuthorisationFailures(action) 

53 self.now = datetime.now() 

54 

55 def validate(self): 

56 if self.issue is not None: 

57 self.check_visibility() 

58 if self.action: 

59 self.check_user_permissions() 

60 self.check_rules() 

61 self.raise_on_error() 

62 

63 def check_visibility(self): 

64 if not self.issue.respondent_id == self.user.organisation.id: 

65 m = "Not Permitted: {} does not belong to Respondent organisation {}" 

66 self.fail(m, self.user, self.issue.respondent_id) 

67 

68 if ( 

69 self.question is not None 

70 and self.question.project_id != self.issue.project_id 

71 ): 

72 self.fail( 

73 "Question # {} does not belong to Project {}", 

74 self.question.number, 

75 self.question.project_id, 

76 ) 

77 

78 if ( 

79 self.section is not None 

80 and self.section.project_id != self.issue.project_id 

81 ): 

82 self.fail( 

83 f"Section ID {self.section.id} does not belong to Issue ID{self.issue.id}" 

84 ) 

85 

86 def check_user_permissions(self): 

87 if not self.user.has_permission(self.action): 

88 self.fail("User {} lacks permission {}", self.user, self.action) 

89 

90 def check_rules(self): 

91 action = self.action 

92 if action == perms.ISSUE_VIEW_WINLOSS: 

93 self.authorise_winloss() 

94 

95 if action in vendor_actions.ISSUE_ACTIONS: 

96 self.check_issue_rules(self.action, self.issue) 

97 

98 def check_issue_rules(self, action, issue): 

99 if issue is None: 

100 self.fail("Issue not provided, cannot verify permission to {}", action) 

101 return 

102 if action not in vendor_actions.ISSUE_STATUS_ACTIONS[issue.status]: 

103 self.invalid_issue_status() 

104 if action in vendor_actions.ILLEGAL_AFTER_DEADLINE: 

105 if issue.deadline_passed: 

106 act = perms.title_cased(action) 

107 self.fail("Unable to {}, Deadline ({}) has passed", act, issue.deadline) 

108 if action in vendor_actions.ANSWERING_ACTIONS: 

109 self.check_answering_rules(action, issue) 

110 

111 def check_answering_rules(self, action, issue): 

112 if not issue.use_workflow or self.bulk_import: 

113 return # no special rules 

114 

115 if self.response_state is None: 

116 self.fail("response_state not provided - cannot determine permissions") 

117 

118 elif self.response_state.allocated_to != self.user.id: 

119 answer_anything = perms.ANSWER_QUESTIONS_ALLOCATED_TO_ANYONE 

120 if not self.user.has_permission(answer_anything): 

121 m = ( 

122 f"{self.user.id} has not been allocated responsibility" 

123 f" so cannot save this answer without " 

124 f' "{perms.ANSWER_QUESTIONS_ALLOCATED_TO_ANYONE}" permission' 

125 ) 

126 self.fail(m) 

127 

128 def authorise_winloss(self): 

129 issue = self.issue 

130 is_not_exposed = not issue.winloss_exposed 

131 is_expired = ( 

132 issue.winloss_expiry is not None and issue.winloss_expiry < datetime.now() 

133 ) 

134 

135 if is_not_exposed or is_expired: 

136 self.fail("WinLoss Reports not exposed for this Response") 

137 

138 def fail(self, tmpl, *args, **kwargs): 

139 msg = tmpl.format(*args, **kwargs) 

140 self.errors.auth_failure(msg) 

141 

142 def invalid_issue_status(self): 

143 self.errors.invalid_issue_status(self.issue.status) 

144 

145 def raise_on_error(self): 

146 if self.errors.has_errors: 

147 raise AuthorizationFailure(errors=self.errors)