Coverage for postrfp/model/issue.py: 99%

167 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1from decimal import Decimal 

2from datetime import datetime 

3from typing import Optional, TYPE_CHECKING 

4 

5from sqlalchemy import ( 

6 ForeignKey, 

7 DateTime, 

8 Boolean, 

9 Integer, 

10 literal, 

11 select, 

12 event, 

13 Index, 

14 insert, 

15 and_, 

16 or_, 

17 text, 

18 func, 

19) 

20from sqlalchemy.orm import ( 

21 Mapped, 

22 Session, 

23 mapped_column, 

24 relationship, 

25 backref, 

26 DynamicMapped, 

27 object_session, 

28) 

29from sqlalchemy.ext.hybrid import hybrid_method 

30import sqlalchemy.types as types 

31from sqlalchemy.types import VARCHAR, TEXT, DOUBLE, CHAR, INTEGER, DATETIME 

32 

33 

34from .questionnaire.answering import QuestionResponseState 

35from postrfp.model.humans import Organisation, User 

36 

37from postrfp.model.exc import BusinessRuleViolation 

38from .audit import AuditEvent 

39from .notify import IssueWatchList 

40from .meta import Base, AttachmentMixin 

41 

42if TYPE_CHECKING: 

43 from sqlalchemy import ColumnElement 

44 from .questionnaire.answering import Answer 

45 from .project import Project 

46 

47 

48class IssueStatusType(types.TypeDecorator): 

49 impl = types.SMALLINT() 

50 

51 cache_ok = True 

52 

53 def process_bind_param(self, value, _dialect): 

54 try: 

55 return Issue.issue_statuses_int[value] 

56 except KeyError: 

57 vals = ",".join(Issue.issue_statuses.values()) 

58 raise KeyError(f"Status '{value}' not found in {vals}") 

59 

60 def process_result_value(self, value, _dialect): 

61 return Issue.issue_statuses[value] 

62 

63 

64class Issue(Base): 

65 __tablename__ = "issues" 

66 

67 class Status: 

68 NOT_SENT = 0 

69 OPPORTUNITY = 10 

70 ACCEPTED = 20 

71 UPDATEABLE = 25 

72 DECLINED = 30 

73 SUBMITTED = 40 

74 RETRACTED = 50 

75 

76 issue_statuses = { 

77 Status.NOT_SENT: "Not Sent", 

78 Status.OPPORTUNITY: "Opportunity", 

79 Status.ACCEPTED: "Accepted", 

80 Status.UPDATEABLE: "Updateable", 

81 Status.DECLINED: "Declined", 

82 Status.SUBMITTED: "Submitted", 

83 Status.RETRACTED: "Retracted", 

84 } 

85 

86 issue_statuses_int = {v: k for k, v in issue_statuses.items()} 

87 status_names = set(issue_statuses.values()) 

88 

89 scoreable_statuses = (Status.SUBMITTED, Status.UPDATEABLE) 

90 

91 project_id: Mapped[int] = mapped_column( 

92 Integer, 

93 ForeignKey("projects.id"), 

94 nullable=False, 

95 index=True, 

96 server_default=text("'0'"), 

97 ) 

98 respondent_id: Mapped[Optional[str]] = mapped_column( 

99 VARCHAR(length=150), 

100 ForeignKey("organisations.id", onupdate="CASCADE"), 

101 nullable=True, 

102 ) 

103 respondent_email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=80)) 

104 issue_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

105 accepted_date: Mapped[Optional[datetime]] = mapped_column( 

106 DateTime 

107 ) # Note: This wasn't set before, might need adjustment if required. 

108 submitted_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

109 deadline: Mapped[Optional[datetime]] = mapped_column(DATETIME, nullable=True) 

110 use_workflow: Mapped[bool] = mapped_column( 

111 Boolean, nullable=False, default=False, server_default=text("'0'") 

112 ) 

113 status: Mapped[str] = mapped_column( 

114 IssueStatusType, nullable=False, default="Not Sent" 

115 ) 

116 feedback: Mapped[Optional[str]] = mapped_column(TEXT) 

117 internal_comments: Mapped[Optional[str]] = mapped_column(TEXT) 

118 award_status: Mapped[int] = mapped_column( 

119 types.SMALLINT(), 

120 nullable=False, 

121 default=0, 

122 server_default=text("'0'"), 

123 ) 

124 label: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255)) 

125 selected: Mapped[bool] = mapped_column( 

126 types.SMALLINT(), 

127 default=False, 

128 server_default=text("'0'"), 

129 nullable=False, 

130 ) 

131 reminder_sent: Mapped[Optional[datetime]] = mapped_column(DateTime) 

132 winloss_exposed: Mapped[bool] = mapped_column( 

133 Boolean, nullable=False, default=0, server_default=text("'0'") 

134 ) 

135 winloss_expiry: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) 

136 winloss_weightset_id: Mapped[Optional[int]] = mapped_column( 

137 Integer, ForeignKey("weighting_sets.id", ondelete="SET NULL") 

138 ) 

139 

140 answers: DynamicMapped["Answer"] = relationship( 

141 "Answer", 

142 lazy="dynamic", 

143 passive_deletes=True, 

144 cascade_backrefs=False, 

145 cascade="all, delete", 

146 back_populates="issue", 

147 ) 

148 

149 response_states: DynamicMapped["QuestionResponseState"] = relationship( 

150 "QuestionResponseState", 

151 back_populates="issue", 

152 lazy="dynamic", 

153 passive_deletes=True, 

154 ) 

155 

156 project: Mapped["Project"] = relationship( 

157 "Project", uselist=False, back_populates="_issues" 

158 ) 

159 respondent: Mapped["Organisation"] = relationship( 

160 "Organisation", 

161 lazy="joined", 

162 uselist=False, 

163 backref=backref("issues", passive_deletes=True), 

164 ) 

165 

166 winloss_weightset = relationship("WeightingSet") 

167 

168 watchers: DynamicMapped[User] = relationship( 

169 "User", secondary="issue_watch_list", lazy="dynamic", viewonly=True 

170 ) 

171 scores: DynamicMapped["Score"] = relationship( 

172 "Score", back_populates="issue", lazy="dynamic" 

173 ) 

174 

175 events: DynamicMapped["AuditEvent"] = relationship( 

176 "AuditEvent", 

177 back_populates="issue", 

178 lazy="dynamic", 

179 cascade_backrefs=False, 

180 primaryjoin="foreign(AuditEvent.issue_id)==Issue.id", 

181 ) 

182 

183 watch_list: DynamicMapped["IssueWatchList"] = relationship( 

184 "IssueWatchList", 

185 lazy="dynamic", 

186 back_populates="issue", 

187 cascade="all,delete", 

188 passive_deletes=True, 

189 ) 

190 

191 attachments: DynamicMapped["IssueAttachment"] = relationship( 

192 "IssueAttachment", 

193 lazy="dynamic", 

194 back_populates="issue", 

195 ) 

196 

197 def get_attachment(self, attachment_id): 

198 return self.attachments.filter(IssueAttachment.id == attachment_id).one() 

199 

200 @hybrid_method 

201 def can_view_winloss(self, user) -> bool: 

202 return ( 

203 self.winloss_exposed 

204 and (self.winloss_expiry is not None) 

205 and (self.winloss_expiry > datetime.now()) 

206 and (user.org_id == self.respondent_id) 

207 ) 

208 

209 @can_view_winloss.expression # type: ignore 

210 def can_view_sql(self, user) -> bool: 

211 return ( 

212 self.winloss_exposed 

213 & (self.winloss_expiry is not None) 

214 & (self.winloss_expiry > datetime.now()) # type: ignore 

215 & (user.org_id == self.respondent_id) 

216 ) 

217 

218 def __repr__(self) -> str: 

219 if self.respondent_id is not None: 

220 return "< Issue[%s] %s (%s) >" % (self.id, self.respondent_id, self.status) 

221 else: 

222 return "< Issue id %s (%s) >" % (self.id, self.status) 

223 

224 @property 

225 def deadline_passed(self) -> bool: 

226 if not self.deadline: 

227 return False 

228 return datetime.now() > self.deadline 

229 

230 @classmethod 

231 def scoreable_filter(cls, project: "Project") -> "ColumnElement": 

232 """ 

233 Expression to use in query(...).filter() to limit Issues to Submitted or Updateable 

234 Issues after the deadline if applicable for `project`. 

235 """ 

236 expr = Issue.status.in_(("Submitted", "Updateable")) 

237 return expr 

238 

239 def log_event(self, event_name, user, change_list: list[tuple[str, str, str]]): 

240 session = object_session(self) 

241 assert session is not None 

242 evt = AuditEvent.create( 

243 session, 

244 event_name, 

245 object_id=self.id, 

246 user=user, 

247 issue=self, 

248 project=self.project, 

249 change_list=change_list, 

250 ) 

251 sess = object_session(self) 

252 assert sess is not None 

253 sess.add(evt) 

254 

255 def response_state_for_q(self, question_id): 

256 return self.response_states.filter_by(question_instance_id=question_id).one() 

257 

258 def add_watcher(self, watching_user: User) -> bool: 

259 """ 

260 Adds the given user to this watch list. Returns True if successful, 

261 False if the user is already watching 

262 """ 

263 if self.watch_list.filter_by(user=watching_user).count() == 0: 

264 self.watch_list.append(IssueWatchList(user=watching_user)) 

265 return True 

266 else: 

267 return False 

268 

269 

270@event.listens_for(Issue, "before_insert", propagate=True) 

271def check_org_or_email(mapper, connection, issue): 

272 if issue.respondent_id is None and issue.respondent_email is None: 

273 m = "Issue has neither Organisation or an email address set" 

274 raise BusinessRuleViolation(m) 

275 

276 

277class Score(Base): 

278 __tablename__ = "scores" 

279 

280 public_attrs = "id,question_id,issue_id,score,scoreset_id".split(",") 

281 

282 question_instance_id: Mapped[int] = mapped_column( 

283 INTEGER, ForeignKey("question_instances.id"), nullable=False 

284 ) 

285 issue_id: Mapped[int] = mapped_column( 

286 INTEGER, ForeignKey("issues.id"), nullable=False 

287 ) 

288 score: Mapped[Optional[Decimal]] = mapped_column( 

289 DOUBLE(asdecimal=True), nullable=True, default=None 

290 ) 

291 scoreset_id: Mapped[Optional[str]] = mapped_column( 

292 VARCHAR(length=150), default="", nullable=True 

293 ) 

294 

295 issue = relationship("Issue", uselist=False, back_populates="scores") 

296 

297 question = relationship("QuestionInstance", uselist=False) 

298 

299 comments = relationship( 

300 "ScoreComment", order_by="ScoreComment.comment_time", back_populates="score" 

301 ) 

302 project = relationship( 

303 "Project", 

304 viewonly=True, 

305 secondary="issues", 

306 secondaryjoin="issues.c.project_id==projects.c.id", 

307 backref=backref("scores_q", lazy="dynamic", viewonly=True), 

308 ) 

309 

310 def __repr__(self): 

311 return "Score: %s" % self.score 

312 

313 @property 

314 def question_id(self): 

315 return self.question_instance_id 

316 

317 @classmethod 

318 def validate_scoreset(cls, scoreset_id, project): 

319 if not project.multiscored and scoreset_id not in (None, ""): 

320 raise ValueError("Cannot assign scoreset project that is not multiscored") 

321 

322 @classmethod 

323 def check_score_value(cls, score_value, project): 

324 # None is valid as it can be used to remove a score 

325 max_score = project.maximum_score 

326 if score_value is not None and not 0 <= score_value <= max_score: 

327 raise ValueError("Score must be between zero and %s" % max_score) 

328 

329 

330class ScoreComment(Base): 

331 __tablename__ = "score_comments" 

332 __table_args__ = ( 

333 Index("score_comment_fulltext", "comment_text", mariadb_prefix="FULLTEXT"), 

334 ) 

335 

336 public_attrs = "comment_time,user_name,comment_text".split(",") 

337 

338 score_id: Mapped[int] = mapped_column( 

339 INTEGER, ForeignKey("scores.id"), nullable=False 

340 ) 

341 comment_time: Mapped[datetime] = mapped_column( 

342 DATETIME(), server_default=func.utc_timestamp(), nullable=False 

343 ) 

344 user_id: Mapped[Optional[str]] = mapped_column( 

345 VARCHAR(length=150), ForeignKey("users.id"), nullable=True 

346 ) 

347 comment_text: Mapped[str] = mapped_column(TEXT(), nullable=False) 

348 type: Mapped[int] = mapped_column( 

349 INTEGER, nullable=False, server_default=text("'0'") 

350 ) 

351 

352 score = relationship("Score", back_populates="comments") 

353 user = relationship("User") 

354 

355 @property 

356 def user_name(self): 

357 return self.user.fullname 

358 

359 

360class IssueAttachment(AttachmentMixin, Base): 

361 __tablename__ = "issue_attachments" 

362 

363 public_attrs = ( 

364 "id,description,size,filename,url,respondent_name,private,date_uploaded" 

365 ).split(",") 

366 description: Mapped[Optional[str]] = mapped_column(VARCHAR(length=1024)) 

367 issue_id: Mapped[Optional[int]] = mapped_column( 

368 Integer, ForeignKey("issues.id", ondelete="SET NULL") 

369 ) 

370 date_uploaded: Mapped[datetime] = mapped_column( 

371 DATETIME, server_default=func.utc_timestamp(), nullable=True 

372 ) 

373 author_id: Mapped[Optional[str]] = mapped_column( 

374 VARCHAR(length=150), ForeignKey("users.id", ondelete="SET NULL") 

375 ) 

376 org_id: Mapped[Optional[str]] = mapped_column( 

377 VARCHAR(length=150), 

378 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="SET NULL"), 

379 ) 

380 private: Mapped[bool] = mapped_column( 

381 Boolean, default=False, server_default=text("'0'"), nullable=True 

382 ) 

383 type: Mapped[str] = mapped_column( 

384 CHAR(1), server_default=text("'P'"), nullable=False 

385 ) 

386 

387 issue = relationship(Issue, back_populates="attachments") 

388 organisation = relationship(Organisation) 

389 

390 @property 

391 def url(self): 

392 return "/api/project/%i/issue/%i/attachment/%i/" % ( 

393 self.issue.project.id, 

394 self.issue.id, 

395 self.id, 

396 ) 

397 

398 @property 

399 def respondent_name(self): 

400 return self.organisation.name 

401 

402 

403# ---------------------------------------------------------------- 

404# Event listener to update QuestionResponseState tables when Issues are added 

405# ---------------------------------------------------------------- 

406 

407 

408@event.listens_for(Issue, "after_insert") 

409def issue_after_insert_handler(_mapper, _connection, issue: Issue) -> None: 

410 """ 

411 Creates QuestionResponseState records when an Issue is created using a direct INSERT...SELECT. 

412 

413 This prepares the tracking system for respondents' answers. QuestionResponseState 

414 records are used to track the progress, review status and approval state of each question response. 

415 

416 Replaces the 'make_question_response_states_update' database trigger. 

417 """ 

418 from postrfp.model import QuestionInstance 

419 from postrfp.model.questionnaire.answering import ( 

420 ResponseStatus, 

421 QuestionResponseState, 

422 ) 

423 

424 session = Session.object_session(issue) 

425 if session is None: 

426 raise RuntimeError( 

427 "Cannot create QuestionResponseState: No database session available" 

428 ) 

429 

430 insert_stmt = insert(QuestionResponseState).from_select( 

431 ["issue_id", "question_instance_id", "status"], 

432 select( 

433 literal(issue.id), 

434 QuestionInstance.id, 

435 literal(ResponseStatus.NOT_ANSWERED.value), 

436 ).where(QuestionInstance.project_id == issue.project_id), 

437 ) 

438 

439 session.execute(insert_stmt)