Coverage for postrfp / model / issue.py: 99%

166 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1from decimal import Decimal 

2from datetime import datetime 

3from typing import Optional, TYPE_CHECKING 

4 

5from sqlalchemy import ( 

6 ForeignKey, 

7 DateTime, 

8 Boolean, 

9 Integer, 

10 literal, 

11 select, 

12 event, 

13 Index, 

14 insert, 

15 text, 

16 func, 

17) 

18from sqlalchemy.orm import ( 

19 Mapped, 

20 Session, 

21 mapped_column, 

22 relationship, 

23 backref, 

24 DynamicMapped, 

25 object_session, 

26) 

27from sqlalchemy.ext.hybrid import hybrid_method 

28import sqlalchemy.types as types 

29from sqlalchemy.types import VARCHAR, TEXT, DOUBLE, CHAR, INTEGER, DATETIME 

30 

31 

32from .questionnaire.answering import QuestionResponseState 

33from postrfp.model.humans import Organisation, User 

34 

35from postrfp.model.exc import BusinessRuleViolation 

36from .audit import AuditEvent 

37from .notify import IssueWatchList 

38from .meta import Base, AttachmentMixin 

39 

40if TYPE_CHECKING: 

41 from sqlalchemy import ColumnElement 

42 from .questionnaire.answering import Answer 

43 from .project import Project 

44 

45 

46class IssueStatusType(types.TypeDecorator): 

47 impl = types.SMALLINT() 

48 

49 cache_ok = True 

50 

51 def process_bind_param(self, value, _dialect): 

52 try: 

53 return Issue.issue_statuses_int[value] 

54 except KeyError: 

55 vals = ",".join(Issue.issue_statuses.values()) 

56 raise KeyError(f"Status '{value}' not found in {vals}") 

57 

58 def process_result_value(self, value, _dialect): 

59 return Issue.issue_statuses[value] 

60 

61 

62class Issue(Base): 

63 __tablename__ = "issues" 

64 

65 class Status: 

66 NOT_SENT = 0 

67 OPPORTUNITY = 10 

68 ACCEPTED = 20 

69 UPDATEABLE = 25 

70 DECLINED = 30 

71 SUBMITTED = 40 

72 RETRACTED = 50 

73 

74 issue_statuses = { 

75 Status.NOT_SENT: "Not Sent", 

76 Status.OPPORTUNITY: "Opportunity", 

77 Status.ACCEPTED: "Accepted", 

78 Status.UPDATEABLE: "Updateable", 

79 Status.DECLINED: "Declined", 

80 Status.SUBMITTED: "Submitted", 

81 Status.RETRACTED: "Retracted", 

82 } 

83 

84 issue_statuses_int = {v: k for k, v in issue_statuses.items()} 

85 status_names = set(issue_statuses.values()) 

86 

87 project_id: Mapped[int] = mapped_column( 

88 Integer, 

89 ForeignKey("projects.id"), 

90 nullable=False, 

91 index=True, 

92 server_default=text("'0'"), 

93 ) 

94 respondent_id: Mapped[Optional[str]] = mapped_column( 

95 VARCHAR(length=150), 

96 ForeignKey("organisations.id", onupdate="CASCADE"), 

97 nullable=True, 

98 ) 

99 respondent_email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=80)) 

100 issue_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

101 accepted_date: Mapped[Optional[datetime]] = mapped_column( 

102 DateTime 

103 ) # Note: This wasn't set before, might need adjustment if required. 

104 submitted_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

105 deadline: Mapped[Optional[datetime]] = mapped_column(DATETIME, nullable=True) 

106 use_workflow: Mapped[bool] = mapped_column( 

107 Boolean, nullable=False, default=False, server_default=text("FALSE") 

108 ) 

109 status: Mapped[str] = mapped_column( 

110 IssueStatusType, nullable=False, default="Not Sent" 

111 ) 

112 feedback: Mapped[Optional[str]] = mapped_column(TEXT) 

113 internal_comments: Mapped[Optional[str]] = mapped_column(TEXT) 

114 award_status: Mapped[int] = mapped_column( 

115 types.SMALLINT(), 

116 nullable=False, 

117 default=0, 

118 server_default=text("'0'"), 

119 ) 

120 label: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255)) 

121 selected: Mapped[bool] = mapped_column( 

122 types.SMALLINT(), 

123 default=False, 

124 server_default=text("'0'"), 

125 nullable=False, 

126 ) 

127 reminder_sent: Mapped[Optional[datetime]] = mapped_column(DateTime) 

128 winloss_exposed: Mapped[bool] = mapped_column( 

129 Boolean, nullable=False, default=0, server_default=text("FALSE") 

130 ) 

131 winloss_expiry: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) 

132 winloss_weightset_id: Mapped[Optional[int]] = mapped_column( 

133 Integer, ForeignKey("weighting_sets.id", ondelete="SET NULL") 

134 ) 

135 

136 answers: DynamicMapped["Answer"] = relationship( 

137 "Answer", 

138 lazy="dynamic", 

139 passive_deletes=True, 

140 cascade_backrefs=False, 

141 cascade="all, delete", 

142 back_populates="issue", 

143 ) 

144 

145 response_states: DynamicMapped["QuestionResponseState"] = relationship( 

146 "QuestionResponseState", 

147 back_populates="issue", 

148 lazy="dynamic", 

149 passive_deletes=True, 

150 ) 

151 

152 project: Mapped["Project"] = relationship( 

153 "Project", uselist=False, back_populates="_issues" 

154 ) 

155 respondent: Mapped["Organisation"] = relationship( 

156 "Organisation", 

157 lazy="joined", 

158 uselist=False, 

159 backref=backref("issues", passive_deletes=True), 

160 ) 

161 

162 winloss_weightset = relationship("WeightingSet") 

163 

164 watchers: DynamicMapped[User] = relationship( 

165 "User", secondary="issue_watch_list", lazy="dynamic", viewonly=True 

166 ) 

167 scores: DynamicMapped["Score"] = relationship( 

168 "Score", back_populates="issue", lazy="dynamic" 

169 ) 

170 

171 events: DynamicMapped["AuditEvent"] = relationship( 

172 "AuditEvent", 

173 back_populates="issue", 

174 lazy="dynamic", 

175 cascade_backrefs=False, 

176 primaryjoin="foreign(AuditEvent.issue_id)==Issue.id", 

177 ) 

178 

179 watch_list: DynamicMapped["IssueWatchList"] = relationship( 

180 "IssueWatchList", 

181 lazy="dynamic", 

182 back_populates="issue", 

183 cascade="all,delete", 

184 passive_deletes=True, 

185 ) 

186 

187 attachments: DynamicMapped["IssueAttachment"] = relationship( 

188 "IssueAttachment", 

189 lazy="dynamic", 

190 back_populates="issue", 

191 ) 

192 

193 def get_attachment(self, attachment_id): 

194 return self.attachments.filter(IssueAttachment.id == attachment_id).one() 

195 

196 @hybrid_method 

197 def can_view_winloss(self, user) -> bool: 

198 return ( 

199 self.winloss_exposed 

200 and (self.winloss_expiry is not None) 

201 and (self.winloss_expiry > datetime.now()) 

202 and (user.org_id == self.respondent_id) 

203 ) 

204 

205 @can_view_winloss.expression # type: ignore 

206 def can_view_sql(self, user) -> bool: 

207 return ( 

208 self.winloss_exposed 

209 & (self.winloss_expiry is not None) 

210 & (self.winloss_expiry > datetime.now()) # type: ignore 

211 & (user.org_id == self.respondent_id) 

212 ) 

213 

214 def __repr__(self) -> str: 

215 if self.respondent_id is not None: 

216 return "< Issue[%s] %s (%s) >" % (self.id, self.respondent_id, self.status) 

217 else: 

218 return "< Issue id %s (%s) >" % (self.id, self.status) 

219 

220 @property 

221 def deadline_passed(self) -> bool: 

222 if not self.deadline: 

223 return False 

224 return datetime.now() > self.deadline 

225 

226 @classmethod 

227 def scoreable_filter(cls, project: "Project") -> "ColumnElement": 

228 """ 

229 Expression to use in query(...).filter() to limit Issues to Submitted or Updateable 

230 Issues after the deadline if applicable for `project`. 

231 """ 

232 expr = Issue.status.in_(("Submitted", "Updateable")) 

233 return expr 

234 

235 def log_event(self, event_name, user, change_list: list[tuple[str, str, str]]): 

236 session = object_session(self) 

237 assert session is not None 

238 evt = AuditEvent.create( 

239 session, 

240 event_name, 

241 object_id=self.id, 

242 user=user, 

243 issue=self, 

244 project=self.project, 

245 change_list=change_list, 

246 ) 

247 sess = object_session(self) 

248 assert sess is not None 

249 sess.add(evt) 

250 

251 def response_state_for_q(self, question_id): 

252 return self.response_states.filter_by(question_instance_id=question_id).one() 

253 

254 def add_watcher(self, watching_user: User) -> bool: 

255 """ 

256 Adds the given user to this watch list. Returns True if successful, 

257 False if the user is already watching 

258 """ 

259 if self.watch_list.filter_by(user=watching_user).count() == 0: 

260 self.watch_list.append(IssueWatchList(user=watching_user)) 

261 return True 

262 else: 

263 return False 

264 

265 

266@event.listens_for(Issue, "before_insert", propagate=True) 

267def check_org_or_email(mapper, connection, issue): 

268 if issue.respondent_id is None and issue.respondent_email is None: 

269 m = "Issue has neither Organisation or an email address set" 

270 raise BusinessRuleViolation(m) 

271 

272 

273class Score(Base): 

274 __tablename__ = "scores" 

275 

276 public_attrs = "id,question_id,issue_id,score,scoreset_id".split(",") 

277 

278 question_instance_id: Mapped[int] = mapped_column( 

279 INTEGER, ForeignKey("question_instances.id"), nullable=False 

280 ) 

281 issue_id: Mapped[int] = mapped_column( 

282 INTEGER, ForeignKey("issues.id"), nullable=False 

283 ) 

284 score: Mapped[Optional[Decimal]] = mapped_column( 

285 DOUBLE(asdecimal=True), nullable=True, default=None 

286 ) 

287 scoreset_id: Mapped[Optional[str]] = mapped_column( 

288 VARCHAR(length=150), default="", nullable=True 

289 ) 

290 

291 issue = relationship("Issue", uselist=False, back_populates="scores") 

292 

293 question = relationship("QuestionInstance", uselist=False) 

294 

295 comments = relationship( 

296 "ScoreComment", order_by="ScoreComment.comment_time", back_populates="score" 

297 ) 

298 project = relationship( 

299 "Project", 

300 viewonly=True, 

301 secondary="issues", 

302 secondaryjoin="issues.c.project_id==projects.c.id", 

303 backref=backref("scores_q", lazy="dynamic", viewonly=True), 

304 ) 

305 

306 def __repr__(self): 

307 return "Score: %s" % self.score 

308 

309 @property 

310 def question_id(self): 

311 return self.question_instance_id 

312 

313 @classmethod 

314 def validate_scoreset(cls, scoreset_id, project): 

315 if not project.multiscored and scoreset_id not in (None, ""): 

316 raise ValueError("Cannot assign scoreset project that is not multiscored") 

317 

318 @classmethod 

319 def check_score_value(cls, score_value, project): 

320 # None is valid as it can be used to remove a score 

321 max_score = project.maximum_score 

322 if score_value is not None and not 0 <= score_value <= max_score: 

323 raise ValueError("Score must be between zero and %s" % max_score) 

324 

325 

326class ScoreComment(Base): 

327 __tablename__ = "score_comments" 

328 __table_args__ = ( 

329 Index("score_comment_fulltext", "comment_text", mariadb_prefix="FULLTEXT"), 

330 ) 

331 

332 public_attrs = "comment_time,user_name,comment_text".split(",") 

333 

334 score_id: Mapped[int] = mapped_column( 

335 INTEGER, ForeignKey("scores.id"), nullable=False 

336 ) 

337 comment_time: Mapped[datetime] = mapped_column( 

338 DATETIME(), server_default=func.utc_timestamp(), nullable=False 

339 ) 

340 user_id: Mapped[Optional[str]] = mapped_column( 

341 VARCHAR(length=150), ForeignKey("users.id"), nullable=True 

342 ) 

343 comment_text: Mapped[str] = mapped_column(TEXT(), nullable=False) 

344 type: Mapped[int] = mapped_column( 

345 INTEGER, nullable=False, server_default=text("'0'") 

346 ) 

347 

348 score = relationship("Score", back_populates="comments") 

349 user = relationship("User") 

350 

351 @property 

352 def user_name(self): 

353 return self.user.fullname 

354 

355 

356class IssueAttachment(AttachmentMixin, Base): 

357 __tablename__ = "issue_attachments" 

358 

359 public_attrs = ( 

360 "id,description,size,filename,url,respondent_name,private,date_uploaded" 

361 ).split(",") 

362 description: Mapped[Optional[str]] = mapped_column(VARCHAR(length=1024)) 

363 issue_id: Mapped[Optional[int]] = mapped_column( 

364 Integer, ForeignKey("issues.id", ondelete="SET NULL") 

365 ) 

366 date_uploaded: Mapped[datetime] = mapped_column( 

367 DATETIME, server_default=func.utc_timestamp(), nullable=True 

368 ) 

369 author_id: Mapped[Optional[str]] = mapped_column( 

370 VARCHAR(length=150), ForeignKey("users.id", ondelete="SET NULL") 

371 ) 

372 org_id: Mapped[Optional[str]] = mapped_column( 

373 VARCHAR(length=150), 

374 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="SET NULL"), 

375 ) 

376 private: Mapped[bool] = mapped_column( 

377 Boolean, default=False, server_default=text("FALSE"), nullable=True 

378 ) 

379 type: Mapped[str] = mapped_column( 

380 CHAR(1), server_default=text("'P'"), nullable=False 

381 ) 

382 

383 issue = relationship(Issue, back_populates="attachments") 

384 organisation = relationship(Organisation) 

385 

386 @property 

387 def url(self): 

388 return "/api/project/%i/issue/%i/attachment/%i/" % ( 

389 self.issue.project.id, 

390 self.issue.id, 

391 self.id, 

392 ) 

393 

394 @property 

395 def respondent_name(self): 

396 return self.organisation.name 

397 

398 

399# ---------------------------------------------------------------- 

400# Event listener to update QuestionResponseState tables when Issues are added 

401# ---------------------------------------------------------------- 

402 

403 

404@event.listens_for(Issue, "after_insert") 

405def issue_after_insert_handler(_mapper, _connection, issue: Issue) -> None: 

406 """ 

407 Creates QuestionResponseState records when an Issue is created using a direct INSERT...SELECT. 

408 

409 This prepares the tracking system for respondents' answers. QuestionResponseState 

410 records are used to track the progress, review status and approval state of each question response. 

411 

412 Replaces the 'make_question_response_states_update' database trigger. 

413 """ 

414 from postrfp.model import QuestionInstance 

415 from postrfp.model.questionnaire.answering import ( 

416 ResponseStatus, 

417 QuestionResponseState, 

418 ) 

419 

420 session = Session.object_session(issue) 

421 if session is None: 

422 raise RuntimeError( 

423 "Cannot create QuestionResponseState: No database session available" 

424 ) 

425 

426 insert_stmt = insert(QuestionResponseState).from_select( 

427 ["issue_id", "question_instance_id", "status"], 

428 select( 

429 literal(issue.id), 

430 QuestionInstance.id, 

431 literal(ResponseStatus.NOT_ANSWERED.value), 

432 ).where(QuestionInstance.project_id == issue.project_id), 

433 ) 

434 

435 session.execute(insert_stmt)