Coverage for postrfp / model / issue.py: 99%
166 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1from decimal import Decimal
2from datetime import datetime
3from typing import Optional, TYPE_CHECKING
5from sqlalchemy import (
6 ForeignKey,
7 DateTime,
8 Boolean,
9 Integer,
10 literal,
11 select,
12 event,
13 Index,
14 insert,
15 text,
16 func,
17)
18from sqlalchemy.orm import (
19 Mapped,
20 Session,
21 mapped_column,
22 relationship,
23 backref,
24 DynamicMapped,
25 object_session,
26)
27from sqlalchemy.ext.hybrid import hybrid_method
28import sqlalchemy.types as types
29from sqlalchemy.types import VARCHAR, TEXT, DOUBLE, CHAR, INTEGER, DATETIME
32from .questionnaire.answering import QuestionResponseState
33from postrfp.model.humans import Organisation, User
35from postrfp.model.exc import BusinessRuleViolation
36from .audit import AuditEvent
37from .notify import IssueWatchList
38from .meta import Base, AttachmentMixin
40if TYPE_CHECKING:
41 from sqlalchemy import ColumnElement
42 from .questionnaire.answering import Answer
43 from .project import Project
46class IssueStatusType(types.TypeDecorator):
47 impl = types.SMALLINT()
49 cache_ok = True
51 def process_bind_param(self, value, _dialect):
52 try:
53 return Issue.issue_statuses_int[value]
54 except KeyError:
55 vals = ",".join(Issue.issue_statuses.values())
56 raise KeyError(f"Status '{value}' not found in {vals}")
58 def process_result_value(self, value, _dialect):
59 return Issue.issue_statuses[value]
62class Issue(Base):
63 __tablename__ = "issues"
65 class Status:
66 NOT_SENT = 0
67 OPPORTUNITY = 10
68 ACCEPTED = 20
69 UPDATEABLE = 25
70 DECLINED = 30
71 SUBMITTED = 40
72 RETRACTED = 50
74 issue_statuses = {
75 Status.NOT_SENT: "Not Sent",
76 Status.OPPORTUNITY: "Opportunity",
77 Status.ACCEPTED: "Accepted",
78 Status.UPDATEABLE: "Updateable",
79 Status.DECLINED: "Declined",
80 Status.SUBMITTED: "Submitted",
81 Status.RETRACTED: "Retracted",
82 }
84 issue_statuses_int = {v: k for k, v in issue_statuses.items()}
85 status_names = set(issue_statuses.values())
87 project_id: Mapped[int] = mapped_column(
88 Integer,
89 ForeignKey("projects.id"),
90 nullable=False,
91 index=True,
92 server_default=text("'0'"),
93 )
94 respondent_id: Mapped[Optional[str]] = mapped_column(
95 VARCHAR(length=150),
96 ForeignKey("organisations.id", onupdate="CASCADE"),
97 nullable=True,
98 )
99 respondent_email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=80))
100 issue_date: Mapped[Optional[datetime]] = mapped_column(DateTime)
101 accepted_date: Mapped[Optional[datetime]] = mapped_column(
102 DateTime
103 ) # Note: This wasn't set before, might need adjustment if required.
104 submitted_date: Mapped[Optional[datetime]] = mapped_column(DateTime)
105 deadline: Mapped[Optional[datetime]] = mapped_column(DATETIME, nullable=True)
106 use_workflow: Mapped[bool] = mapped_column(
107 Boolean, nullable=False, default=False, server_default=text("FALSE")
108 )
109 status: Mapped[str] = mapped_column(
110 IssueStatusType, nullable=False, default="Not Sent"
111 )
112 feedback: Mapped[Optional[str]] = mapped_column(TEXT)
113 internal_comments: Mapped[Optional[str]] = mapped_column(TEXT)
114 award_status: Mapped[int] = mapped_column(
115 types.SMALLINT(),
116 nullable=False,
117 default=0,
118 server_default=text("'0'"),
119 )
120 label: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255))
121 selected: Mapped[bool] = mapped_column(
122 types.SMALLINT(),
123 default=False,
124 server_default=text("'0'"),
125 nullable=False,
126 )
127 reminder_sent: Mapped[Optional[datetime]] = mapped_column(DateTime)
128 winloss_exposed: Mapped[bool] = mapped_column(
129 Boolean, nullable=False, default=0, server_default=text("FALSE")
130 )
131 winloss_expiry: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
132 winloss_weightset_id: Mapped[Optional[int]] = mapped_column(
133 Integer, ForeignKey("weighting_sets.id", ondelete="SET NULL")
134 )
136 answers: DynamicMapped["Answer"] = relationship(
137 "Answer",
138 lazy="dynamic",
139 passive_deletes=True,
140 cascade_backrefs=False,
141 cascade="all, delete",
142 back_populates="issue",
143 )
145 response_states: DynamicMapped["QuestionResponseState"] = relationship(
146 "QuestionResponseState",
147 back_populates="issue",
148 lazy="dynamic",
149 passive_deletes=True,
150 )
152 project: Mapped["Project"] = relationship(
153 "Project", uselist=False, back_populates="_issues"
154 )
155 respondent: Mapped["Organisation"] = relationship(
156 "Organisation",
157 lazy="joined",
158 uselist=False,
159 backref=backref("issues", passive_deletes=True),
160 )
162 winloss_weightset = relationship("WeightingSet")
164 watchers: DynamicMapped[User] = relationship(
165 "User", secondary="issue_watch_list", lazy="dynamic", viewonly=True
166 )
167 scores: DynamicMapped["Score"] = relationship(
168 "Score", back_populates="issue", lazy="dynamic"
169 )
171 events: DynamicMapped["AuditEvent"] = relationship(
172 "AuditEvent",
173 back_populates="issue",
174 lazy="dynamic",
175 cascade_backrefs=False,
176 primaryjoin="foreign(AuditEvent.issue_id)==Issue.id",
177 )
179 watch_list: DynamicMapped["IssueWatchList"] = relationship(
180 "IssueWatchList",
181 lazy="dynamic",
182 back_populates="issue",
183 cascade="all,delete",
184 passive_deletes=True,
185 )
187 attachments: DynamicMapped["IssueAttachment"] = relationship(
188 "IssueAttachment",
189 lazy="dynamic",
190 back_populates="issue",
191 )
193 def get_attachment(self, attachment_id):
194 return self.attachments.filter(IssueAttachment.id == attachment_id).one()
196 @hybrid_method
197 def can_view_winloss(self, user) -> bool:
198 return (
199 self.winloss_exposed
200 and (self.winloss_expiry is not None)
201 and (self.winloss_expiry > datetime.now())
202 and (user.org_id == self.respondent_id)
203 )
205 @can_view_winloss.expression # type: ignore
206 def can_view_sql(self, user) -> bool:
207 return (
208 self.winloss_exposed
209 & (self.winloss_expiry is not None)
210 & (self.winloss_expiry > datetime.now()) # type: ignore
211 & (user.org_id == self.respondent_id)
212 )
214 def __repr__(self) -> str:
215 if self.respondent_id is not None:
216 return "< Issue[%s] %s (%s) >" % (self.id, self.respondent_id, self.status)
217 else:
218 return "< Issue id %s (%s) >" % (self.id, self.status)
220 @property
221 def deadline_passed(self) -> bool:
222 if not self.deadline:
223 return False
224 return datetime.now() > self.deadline
226 @classmethod
227 def scoreable_filter(cls, project: "Project") -> "ColumnElement":
228 """
229 Expression to use in query(...).filter() to limit Issues to Submitted or Updateable
230 Issues after the deadline if applicable for `project`.
231 """
232 expr = Issue.status.in_(("Submitted", "Updateable"))
233 return expr
235 def log_event(self, event_name, user, change_list: list[tuple[str, str, str]]):
236 session = object_session(self)
237 assert session is not None
238 evt = AuditEvent.create(
239 session,
240 event_name,
241 object_id=self.id,
242 user=user,
243 issue=self,
244 project=self.project,
245 change_list=change_list,
246 )
247 sess = object_session(self)
248 assert sess is not None
249 sess.add(evt)
251 def response_state_for_q(self, question_id):
252 return self.response_states.filter_by(question_instance_id=question_id).one()
254 def add_watcher(self, watching_user: User) -> bool:
255 """
256 Adds the given user to this watch list. Returns True if successful,
257 False if the user is already watching
258 """
259 if self.watch_list.filter_by(user=watching_user).count() == 0:
260 self.watch_list.append(IssueWatchList(user=watching_user))
261 return True
262 else:
263 return False
266@event.listens_for(Issue, "before_insert", propagate=True)
267def check_org_or_email(mapper, connection, issue):
268 if issue.respondent_id is None and issue.respondent_email is None:
269 m = "Issue has neither Organisation or an email address set"
270 raise BusinessRuleViolation(m)
273class Score(Base):
274 __tablename__ = "scores"
276 public_attrs = "id,question_id,issue_id,score,scoreset_id".split(",")
278 question_instance_id: Mapped[int] = mapped_column(
279 INTEGER, ForeignKey("question_instances.id"), nullable=False
280 )
281 issue_id: Mapped[int] = mapped_column(
282 INTEGER, ForeignKey("issues.id"), nullable=False
283 )
284 score: Mapped[Optional[Decimal]] = mapped_column(
285 DOUBLE(asdecimal=True), nullable=True, default=None
286 )
287 scoreset_id: Mapped[Optional[str]] = mapped_column(
288 VARCHAR(length=150), default="", nullable=True
289 )
291 issue = relationship("Issue", uselist=False, back_populates="scores")
293 question = relationship("QuestionInstance", uselist=False)
295 comments = relationship(
296 "ScoreComment", order_by="ScoreComment.comment_time", back_populates="score"
297 )
298 project = relationship(
299 "Project",
300 viewonly=True,
301 secondary="issues",
302 secondaryjoin="issues.c.project_id==projects.c.id",
303 backref=backref("scores_q", lazy="dynamic", viewonly=True),
304 )
306 def __repr__(self):
307 return "Score: %s" % self.score
309 @property
310 def question_id(self):
311 return self.question_instance_id
313 @classmethod
314 def validate_scoreset(cls, scoreset_id, project):
315 if not project.multiscored and scoreset_id not in (None, ""):
316 raise ValueError("Cannot assign scoreset project that is not multiscored")
318 @classmethod
319 def check_score_value(cls, score_value, project):
320 # None is valid as it can be used to remove a score
321 max_score = project.maximum_score
322 if score_value is not None and not 0 <= score_value <= max_score:
323 raise ValueError("Score must be between zero and %s" % max_score)
326class ScoreComment(Base):
327 __tablename__ = "score_comments"
328 __table_args__ = (
329 Index("score_comment_fulltext", "comment_text", mariadb_prefix="FULLTEXT"),
330 )
332 public_attrs = "comment_time,user_name,comment_text".split(",")
334 score_id: Mapped[int] = mapped_column(
335 INTEGER, ForeignKey("scores.id"), nullable=False
336 )
337 comment_time: Mapped[datetime] = mapped_column(
338 DATETIME(), server_default=func.utc_timestamp(), nullable=False
339 )
340 user_id: Mapped[Optional[str]] = mapped_column(
341 VARCHAR(length=150), ForeignKey("users.id"), nullable=True
342 )
343 comment_text: Mapped[str] = mapped_column(TEXT(), nullable=False)
344 type: Mapped[int] = mapped_column(
345 INTEGER, nullable=False, server_default=text("'0'")
346 )
348 score = relationship("Score", back_populates="comments")
349 user = relationship("User")
351 @property
352 def user_name(self):
353 return self.user.fullname
356class IssueAttachment(AttachmentMixin, Base):
357 __tablename__ = "issue_attachments"
359 public_attrs = (
360 "id,description,size,filename,url,respondent_name,private,date_uploaded"
361 ).split(",")
362 description: Mapped[Optional[str]] = mapped_column(VARCHAR(length=1024))
363 issue_id: Mapped[Optional[int]] = mapped_column(
364 Integer, ForeignKey("issues.id", ondelete="SET NULL")
365 )
366 date_uploaded: Mapped[datetime] = mapped_column(
367 DATETIME, server_default=func.utc_timestamp(), nullable=True
368 )
369 author_id: Mapped[Optional[str]] = mapped_column(
370 VARCHAR(length=150), ForeignKey("users.id", ondelete="SET NULL")
371 )
372 org_id: Mapped[Optional[str]] = mapped_column(
373 VARCHAR(length=150),
374 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="SET NULL"),
375 )
376 private: Mapped[bool] = mapped_column(
377 Boolean, default=False, server_default=text("FALSE"), nullable=True
378 )
379 type: Mapped[str] = mapped_column(
380 CHAR(1), server_default=text("'P'"), nullable=False
381 )
383 issue = relationship(Issue, back_populates="attachments")
384 organisation = relationship(Organisation)
386 @property
387 def url(self):
388 return "/api/project/%i/issue/%i/attachment/%i/" % (
389 self.issue.project.id,
390 self.issue.id,
391 self.id,
392 )
394 @property
395 def respondent_name(self):
396 return self.organisation.name
399# ----------------------------------------------------------------
400# Event listener to update QuestionResponseState tables when Issues are added
401# ----------------------------------------------------------------
404@event.listens_for(Issue, "after_insert")
405def issue_after_insert_handler(_mapper, _connection, issue: Issue) -> None:
406 """
407 Creates QuestionResponseState records when an Issue is created using a direct INSERT...SELECT.
409 This prepares the tracking system for respondents' answers. QuestionResponseState
410 records are used to track the progress, review status and approval state of each question response.
412 Replaces the 'make_question_response_states_update' database trigger.
413 """
414 from postrfp.model import QuestionInstance
415 from postrfp.model.questionnaire.answering import (
416 ResponseStatus,
417 QuestionResponseState,
418 )
420 session = Session.object_session(issue)
421 if session is None:
422 raise RuntimeError(
423 "Cannot create QuestionResponseState: No database session available"
424 )
426 insert_stmt = insert(QuestionResponseState).from_select(
427 ["issue_id", "question_instance_id", "status"],
428 select(
429 literal(issue.id),
430 QuestionInstance.id,
431 literal(ResponseStatus.NOT_ANSWERED.value),
432 ).where(QuestionInstance.project_id == issue.project_id),
433 )
435 session.execute(insert_stmt)