Coverage for postrfp/shared/fetch/quesq.py: 99%

121 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import logging 

2from typing import ( 

3 Iterable, 

4 List, 

5 Optional, 

6 Dict, 

7 Any, 

8) 

9 

10from sqlalchemy import ( 

11 select, 

12 func, 

13 or_, 

14 and_, 

15 desc, 

16) 

17from sqlalchemy.orm import ( 

18 joinedload, 

19 lazyload, 

20 Bundle, 

21 Query, 

22 Session, 

23) 

24from sqlalchemy.orm.exc import NoResultFound 

25from sqlalchemy.orm.session import object_session 

26 

27 

28from postrfp.model.questionnaire.answering import ( 

29 QuestionResponseState, 

30 ResponseStatus, 

31) 

32from postrfp.model import ( 

33 Section, 

34 QuestionInstance, 

35 QuestionDefinition, 

36 QElement, 

37 Issue, 

38 Project, 

39) 

40from postrfp.model.questionnaire.qelements import QAttachment 

41from postrfp.model.questionnaire.b36 import from_b36 

42 

43 

44log = logging.getLogger(__name__) 

45 

46 

47def qelement(session: Session, q_element_id: int) -> QElement: 

48 element = session.get(QElement, q_element_id) 

49 if element is None: 

50 raise NoResultFound(f"No QElement found with id {q_element_id}") 

51 return element 

52 

53 

54def question( 

55 session: Session, question_id: int, with_project=False 

56) -> QuestionInstance: 

57 """ 

58 Fetch a question instance by ID, eager loading the project relationship. 

59 """ 

60 if with_project: 

61 query = session.query(QuestionInstance).options( 

62 joinedload(QuestionInstance.project) 

63 ) 

64 query = query.options(joinedload(QuestionInstance.project)) 

65 return query.filter(QuestionInstance.id == question_id).one() 

66 else: 

67 return session.get_one(QuestionInstance, question_id) 

68 

69 

70def question_of_project(project: Project, question_id: int) -> QuestionInstance: 

71 """ 

72 Fetch a question instance if it belongs to the given project. 

73 It is assumed the user has access to the project. 

74 Raises 

75 ------ 

76 NoResultFound 

77 """ 

78 return project.questions.filter(QuestionInstance.id == question_id).one() 

79 

80 

81def question_of_section( 

82 session: Session, section_id: int, question_id: int 

83) -> QuestionInstance: 

84 """ 

85 Fetch question instance by ID only if it belongs in the section whose ID provided 

86 Useful when a sections permission have been checked 

87 """ 

88 return ( 

89 session.query(QuestionInstance) 

90 .filter( 

91 QuestionInstance.id == question_id, 

92 QuestionInstance.section_id == section_id, 

93 ) 

94 .one() 

95 ) 

96 

97 

98def question_instance_by_number( 

99 session: Session, project_id: int, qnode_number: str 

100) -> QuestionInstance: 

101 return ( 

102 session.query(QuestionInstance) 

103 .filter( 

104 QuestionInstance.b36_number == qnode_number, 

105 QuestionInstance.project_id == project_id, 

106 ) 

107 .one() 

108 ) 

109 

110 

111ElBundle: Bundle = Bundle( 

112 "element_bundle", 

113 QElement.id, 

114 QElement.label, 

115 QElement.row, 

116 QElement.col, 

117 QElement.colspan, 

118 QElement.mandatory, 

119 QElement.height, 

120 QElement.width, 

121 QElement.regexp, 

122 QElement.el_type.label("el_type"), 

123 QElement.rowspan, 

124 QElement.choices, 

125 QuestionInstance.id.label("question_id"), 

126 QuestionInstance.b36_number, 

127 QuestionDefinition.title, 

128 single_entity=True, 

129) 

130 

131QAttBundle: Bundle = Bundle( 

132 "qatt_bundle", 

133 QAttachment.id, 

134 QAttachment.element_id, 

135 QAttachment.size_bytes, 

136 QAttachment.filename, 

137 QAttachment.mimetype, 

138 single_entity=True, 

139) 

140 

141 

142def response_states( 

143 issue: Issue, section_id: Optional[int] = None 

144) -> Dict[int, QuestionResponseState]: 

145 """ 

146 Returns a dictionary of QuestionResponseState objects keyed by question_instance_id 

147 """ 

148 session = object_session(issue) 

149 assert session is not None 

150 QRS = QuestionResponseState 

151 q = session.query(QRS).join(QuestionInstance).filter(QRS.issue == issue) 

152 

153 if section_id is not None: 

154 q = q.filter(QuestionInstance.section_id == section_id) 

155 

156 return {qrs.question_instance_id: qrs for qrs in q} 

157 

158 

159def answered_questions(issue: Issue, section_id: int) -> Iterable[Dict[str, Any]]: 

160 session = object_session(issue) 

161 assert session is not None 

162 

163 aq = issue.answers.join(QuestionInstance).filter( 

164 QuestionInstance.section_id == section_id 

165 ) 

166 

167 answer_lookup = {a.element_id: a.answer for a in aq} 

168 

169 q_filter = QuestionInstance.section_id == section_id 

170 atq = ( 

171 session.query(QAttBundle) 

172 .join(QElement) 

173 .join(QuestionDefinition) 

174 .join(QuestionInstance) 

175 .filter(QuestionInstance.section_id == section_id) 

176 ) 

177 

178 qatt_lookup = {qa.element_id: qa._asdict() for qa in atq} 

179 

180 response_state_lookup = response_states(issue, section_id=section_id) 

181 

182 q = ( 

183 session.query(ElBundle) 

184 .join(QuestionDefinition) 

185 .join(QuestionInstance) 

186 .filter(q_filter) 

187 .order_by( 

188 QuestionInstance.b36_number, 

189 QuestionInstance.position, 

190 QElement.row, 

191 QElement.col, 

192 ) 

193 ) 

194 

195 return iter_quick_questions(q, answer_lookup, qatt_lookup, response_state_lookup) 

196 

197 

198def iter_quick_questions( 

199 elbundle_query: Query, 

200 answer_lookup: Dict[int, Any], 

201 qatt_lookup: Dict[int, Any], 

202 response_state_lookup: Dict[int, QuestionResponseState], 

203) -> Iterable[Dict[str, Any]]: 

204 """ 

205 Builds question dictionaries from ElBundle rows 

206 

207 This is about twice as fast as approaches going via the ORM 

208 """ 

209 

210 answerables = {"TX", "CB", "CR", "CC", "AT"} 

211 labelled = {"LB", "QA", "CB"} 

212 current_question = None 

213 current_question_id = None 

214 # row_list = None 

215 current_row = None 

216 

217 for el in elbundle_query: 

218 el_type = el.el_type 

219 

220 if el.question_id != current_question_id: 

221 # starting a new question 

222 if current_question is not None: 

223 # if not the first question, yield the previous one 

224 yield current_question 

225 

226 current_question_id = el.question_id 

227 row_list: list[list[dict]] = [] 

228 current_row = -1 

229 current_question = { 

230 "title": el.title, 

231 "id": current_question_id, 

232 "number": from_b36(el.b36_number), 

233 "elements": row_list, 

234 } 

235 rsjs = response_state_lookup[current_question_id].as_dict() 

236 current_question["response_state"] = rsjs 

237 

238 if el.row > current_row: 

239 row_list.append([]) 

240 

241 current_row = el.row 

242 

243 el_dict = { 

244 "id": el.id, 

245 "el_type": el.el_type, 

246 "colspan": el.colspan, 

247 "rowspan": el.rowspan, 

248 } 

249 

250 if el_type in labelled: 

251 el_dict["label"] = el.label 

252 if el_type == "QA" and el.id in qatt_lookup: 

253 el_dict["attachment"] = qatt_lookup[el.id] 

254 

255 if el_type in answerables: 

256 el_dict["answer"] = answer_lookup.get(el.id, None) 

257 

258 if el_type in ("CR", "CC"): 

259 el_dict["choices"] = [{"label": c["label"]} for c in el.choices] 

260 

261 elif el_type == "TX": 

262 el_dict["height"] = el.height 

263 el_dict["width"] = el.width 

264 el_dict["regexp"] = el.regexp 

265 

266 if el_type != "CB": 

267 el_dict["mandatory"] = el.mandatory 

268 

269 row_list[-1].append(el_dict) 

270 

271 if current_question is not None: 

272 yield current_question # don't forget the last one!! 

273 

274 

275def questionnaire_stats(session: Session, project_id: int) -> Dict[str, Any]: 

276 aec = ( 

277 session.query(QElement.el_type, func.count(QElement.id)) 

278 .join(QuestionDefinition) 

279 .join(QuestionInstance) 

280 .filter(QuestionInstance.project_id == project_id) 

281 .group_by(QElement.el_type) 

282 ) 

283 

284 el_counts: dict[str, int] = dict((el_type, count) for el_type, count in aec) 

285 answerable_count = 0 

286 

287 for answerable_type in QElement.answerable_types: 

288 if answerable_type in el_counts: 

289 answerable_count += el_counts[answerable_type] 

290 

291 el_counts["answerable_elements"] = answerable_count 

292 qi_count = ( 

293 session.query(QuestionInstance.id).filter_by(project_id=project_id).count() 

294 ) 

295 sec_count = session.query(Section.id).filter_by(project_id=project_id).count() 

296 

297 return {"questions": qi_count, "sections": sec_count, "elements": el_counts} 

298 

299 

300def unanswered_mandatory(issue: Issue) -> Query: 

301 """ 

302 Return a query fetching QuestionInstance id and number fields for unanswered, mandatory 

303 questions for the given issue 

304 """ 

305 session = object_session(issue) 

306 assert session is not None 

307 return ( 

308 session.query(QuestionInstance.id, QuestionInstance.b36_number) 

309 .join(QuestionDefinition) 

310 .join(QElement) 

311 .join(QuestionResponseState) 

312 .filter(QElement.mandatory == 1) 

313 .filter(QuestionInstance.project == issue.project) 

314 .filter(QuestionResponseState.status == ResponseStatus.NOT_ANSWERED) 

315 .filter(QuestionResponseState.issue_id == issue.id) 

316 .distinct() 

317 ) 

318 

319 

320def _question_ids_q( 

321 session: Session, target_project_id: int, sec_number: Optional[str] = None 

322) -> Query: 

323 # IDs of all question definitions in target (destination) issue 

324 sel = session.query(QuestionInstance.question_def_id).filter( 

325 QuestionInstance.project_id == target_project_id 

326 ) 

327 if sec_number is not None: 

328 sel = sel.filter(QuestionInstance.b36_number.startswith(str(sec_number))) 

329 return sel 

330 

331 

332def importable_answers( 

333 session: Session, target_issue: Issue, sec_number: Optional[str] = None 

334) -> Query: 

335 """Get a count of answered questions in source_issue that can be imported into 

336 target_issue 

337 """ 

338 

339 sel_al = ( 

340 _question_ids_q(session, target_issue.project_id, sec_number=sec_number) 

341 .subquery() 

342 .alias() 

343 ) 

344 

345 q = ( 

346 session.query( 

347 Issue.id.label("issue_id"), 

348 Issue.issue_date, 

349 Issue.submitted_date, 

350 Project.title, 

351 func.count(QuestionResponseState.id).label("question_count"), 

352 ) 

353 .join(QuestionResponseState.question_instance) 

354 .join(Issue) 

355 .join(Project) 

356 .join(sel_al, sel_al.c.question_def_id == QuestionInstance.question_def_id) 

357 .filter( 

358 Issue.respondent_id == target_issue.respondent_id, 

359 Issue.id != target_issue.id, 

360 Issue.status.in_(["Accepted", "Submitted", "Updateable"]), 

361 QuestionResponseState.status.in_( 

362 [ResponseStatus.ANSWERED, ResponseStatus.APPROVED] 

363 ), 

364 ) 

365 .group_by(Issue.id) 

366 .order_by(desc("question_count")) 

367 ) 

368 

369 return q 

370 

371 

372def duplicated_qdefs( 

373 session: Session, 

374 destination_project_id: int, 

375 source_project_id: int, 

376 src_sections: List[Section], 

377 src_questions: List[QuestionInstance], 

378) -> List[QuestionInstance]: 

379 """ 

380 Returns a list of QuestionInstance objects belonging to the Project given by 

381 destination_project_id and whose QuestionDefintion is shared with QuestionInstances in 

382 src_questions or src_sections 

383 """ 

384 # Find QuestionDefinition ids for all questions to be imported from the source project 

385 condition = [] 

386 if src_sections: 

387 regex = "|".join([f"^{s.b36_number}" for s in src_sections]) 

388 condition.append( 

389 and_( 

390 QuestionInstance.b36_number.op("REGEXP")(regex), 

391 QuestionInstance.project_id == source_project_id, 

392 ) 

393 ) 

394 

395 if src_questions: 

396 condition.append(QuestionInstance.id.in_([q.id for q in src_questions])) 

397 

398 source_qids = select(QuestionInstance.question_def_id).where(or_(*condition)) 

399 

400 # Find QuestionInstances in the destination project that share the same QuestionDefinitions 

401 # as in the source project 

402 qi_query = ( 

403 session.query(QuestionInstance) 

404 .filter(QuestionInstance.question_def_id.in_(source_qids)) 

405 .filter(QuestionInstance.project_id == destination_project_id) 

406 .options(lazyload(QuestionInstance.question_def)) 

407 ) 

408 

409 return qi_query.all()