Coverage for postrfp/vendor/validation.py: 100%
79 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1from datetime import datetime
4from postrfp.authorisation import perms, vendor_actions
5from postrfp.authorisation.errors import AuthorisationFailures
6from postrfp.shared.exceptions import AuthorizationFailure
9def validate(
10 effective_user,
11 issue=None,
12 action=perms.ISSUE_VIEW_ANSWERS,
13 question=None,
14 section=None,
15 response_state=None,
16 answer=None,
17 bulk_import=False,
18):
19 validator = Validator(
20 effective_user,
21 issue=issue,
22 action=action,
23 question=question,
24 section=section,
25 response_state=response_state,
26 answer=answer,
27 bulk_import=bulk_import,
28 )
29 validator.validate()
32class Validator:
33 def __init__(
34 self,
35 user,
36 issue=None,
37 action=perms.ISSUE_VIEW_ANSWERS,
38 question=None,
39 section=None,
40 response_state=None,
41 answer=None,
42 bulk_import=False,
43 ):
44 self.user = user
45 self.issue = issue
46 self.action = action
47 self.question = question
48 self.section = section
49 self.response_state = response_state
50 self.answer = None
51 self.bulk_import = bulk_import
52 self.errors = AuthorisationFailures(action)
53 self.now = datetime.now()
55 def validate(self):
56 if self.issue is not None:
57 self.check_visibility()
58 if self.action:
59 self.check_user_permissions()
60 self.check_rules()
61 self.raise_on_error()
63 def check_visibility(self):
64 if not self.issue.respondent_id == self.user.organisation.id:
65 m = "Not Permitted: {} does not belong to Respondent organisation {}"
66 self.fail(m, self.user, self.issue.respondent_id)
68 if (
69 self.question is not None
70 and self.question.project_id != self.issue.project_id
71 ):
72 self.fail(
73 "Question # {} does not belong to Project {}",
74 self.question.number,
75 self.question.project_id,
76 )
78 if (
79 self.section is not None
80 and self.section.project_id != self.issue.project_id
81 ):
82 self.fail(
83 f"Section ID {self.section.id} does not belong to Issue ID{self.issue.id}"
84 )
86 def check_user_permissions(self):
87 if not self.user.has_permission(self.action):
88 self.fail("User {} lacks permission {}", self.user, self.action)
90 def check_rules(self):
91 action = self.action
92 if action == perms.ISSUE_VIEW_WINLOSS:
93 self.authorise_winloss()
95 if action in vendor_actions.ISSUE_ACTIONS:
96 self.check_issue_rules(self.action, self.issue)
98 def check_issue_rules(self, action, issue):
99 if issue is None:
100 self.fail("Issue not provided, cannot verify permission to {}", action)
101 return
102 if action not in vendor_actions.ISSUE_STATUS_ACTIONS[issue.status]:
103 self.invalid_issue_status()
104 if action in vendor_actions.ILLEGAL_AFTER_DEADLINE:
105 if issue.deadline_passed:
106 act = perms.title_cased(action)
107 self.fail("Unable to {}, Deadline ({}) has passed", act, issue.deadline)
108 if action in vendor_actions.ANSWERING_ACTIONS:
109 self.check_answering_rules(action, issue)
111 def check_answering_rules(self, action, issue):
112 if not issue.use_workflow or self.bulk_import:
113 return # no special rules
115 if self.response_state is None:
116 self.fail("response_state not provided - cannot determine permissions")
118 elif self.response_state.allocated_to != self.user.id:
119 answer_anything = perms.ANSWER_QUESTIONS_ALLOCATED_TO_ANYONE
120 if not self.user.has_permission(answer_anything):
121 m = (
122 f"{self.user.id} has not been allocated responsibility"
123 f" so cannot save this answer without "
124 f' "{perms.ANSWER_QUESTIONS_ALLOCATED_TO_ANYONE}" permission'
125 )
126 self.fail(m)
128 def authorise_winloss(self):
129 issue = self.issue
130 is_not_exposed = not issue.winloss_exposed
131 is_expired = (
132 issue.winloss_expiry is not None and issue.winloss_expiry < datetime.now()
133 )
135 if is_not_exposed or is_expired:
136 self.fail("WinLoss Reports not exposed for this Response")
138 def fail(self, tmpl, *args, **kwargs):
139 msg = tmpl.format(*args, **kwargs)
140 self.errors.auth_failure(msg)
142 def invalid_issue_status(self):
143 self.errors.invalid_issue_status(self.issue.status)
145 def raise_on_error(self):
146 if self.errors.has_errors:
147 raise AuthorizationFailure(errors=self.errors)