Coverage for postrfp/shared/fetch/scoreq.py: 95%
123 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2Functions which fetch Score objects from the database
3"""
5import logging
6from operator import itemgetter
7from typing import Any, TYPE_CHECKING, Union, NamedTuple
8from decimal import Decimal
10from sqlalchemy.orm.exc import NoResultFound
12from sqlalchemy import literal, func, case, and_, cast, INTEGER, text, tuple_
13from sqlalchemy.orm import Session, Query
14from sqlalchemy.orm.session import object_session
16if TYPE_CHECKING:
17 from sqlalchemy import Alias, Subquery
18 from decimal import Decimal
20from postrfp.model.questionnaire.b36 import from_b36
21from postrfp.model import (
22 User,
23 Project,
24 Section,
25 Issue,
26 Score,
27 QuestionInstance,
28 SectionPermission,
29 Participant,
30 TotalWeighting,
31)
32from postrfp.templates import get_template
33from postrfp.authorisation import perms
36log = logging.getLogger(__name__)
39class ScoreTuple(NamedTuple):
40 issue_id: int
41 score_value: Decimal | None = None
42 scoreset_id: str = ""
43 comment: str | None = None
46def scores(
47 session: Session,
48 project: Project,
49 section: Section,
50 scoreset_id: str,
51 user: User,
52 filter_by,
53) -> list[dict[str, Union[str, int, "Decimal"]]]:
54 Score.validate_scoreset(scoreset_id, project)
55 score_perm = get_permission_for_scoreset(user, scoreset_id)
56 user.check_permission(score_perm)
58 sq = (
59 session.query(
60 QuestionInstance.id.label("question_id"),
61 Issue.id.label("issue_id"),
62 literal(scoreset_id).label("scoreset_id"),
63 Score.score.label("score"),
64 )
65 .join(Section)
66 .join(Project, Section.project)
67 .join(Issue)
68 .outerjoin(
69 Score,
70 and_(
71 Score.scoreset_id == scoreset_id,
72 QuestionInstance.id == Score.question_instance_id,
73 Issue.id == Score.issue_id,
74 ),
75 )
76 .filter(Project.id == Issue.project_id, Section.id == section.id, filter_by)
77 .order_by(QuestionInstance.id.asc(), Issue.id.asc())
78 )
79 return [s._asdict() for s in sq]
82def or_create_score(
83 session: Session,
84 project: Project,
85 question: QuestionInstance,
86 score_doc,
87) -> tuple[Score, bool]:
88 """
89 Get issue and question required to access score
90 Do required validation and permission checks
91 """
92 issue_id, scoreset_id = score_doc.issue_id, score_doc.scoreset_id
93 if scoreset_id is None:
94 scoreset_id = ""
96 Score.validate_scoreset(scoreset_id, project)
98 issue = project.get_issue(issue_id)
100 # Fetch or create score object
101 created = False
102 try:
103 score = (
104 session.query(Score)
105 .filter(
106 Score.question_instance_id == question.id,
107 Score.issue_id == issue.id,
108 Score.scoreset_id == scoreset_id,
109 )
110 .one()
111 )
112 except NoResultFound:
113 score = Score(issue=issue, question=question, scoreset_id=scoreset_id)
114 session.add(score)
115 created = True
117 return score, created
120def or_create_scores_batch(
121 session: Session,
122 project: Project,
123 score_requests: list[tuple[QuestionInstance, ScoreTuple]],
124) -> list[tuple[Score, bool]]:
125 """
126 Batch version of or_create_score for better performance.
128 Args:
129 session: Database session
130 project: Project object
131 score_requests: List of (question, score_doc) tuples
133 Returns:
134 List of (score, created) tuples in the same order as input
135 """
136 if not score_requests:
137 return []
139 # Normalize scoreset_ids and validate
140 normalized_requests = []
141 for question, score_doc in score_requests:
142 scoreset_id = score_doc.scoreset_id or ""
143 Score.validate_scoreset(scoreset_id, project)
144 issue = project.get_issue(score_doc.issue_id)
145 normalized_requests.append((question, issue, scoreset_id, score_doc))
147 # Build keys for a single fetch of existing scores
148 keys = [
149 (question.id, issue.id, scoreset_id)
150 for (question, issue, scoreset_id, _sd) in normalized_requests
151 ]
153 # Fetch all existing scores
154 existing_scores = {}
155 if keys:
156 existing_q = session.query(Score).filter(
157 tuple_(Score.question_instance_id, Score.issue_id, Score.scoreset_id).in_(
158 keys
159 )
160 )
161 for score in existing_q:
162 key = (score.question_instance_id, score.issue_id, score.scoreset_id)
163 existing_scores[key] = score
165 # Build results, creating new scores where needed
166 results = []
167 new_scores = []
168 for question, issue, scoreset_id, score_doc in normalized_requests:
169 key = (question.id, issue.id, scoreset_id)
170 if key in existing_scores:
171 results.append((existing_scores[key], False))
172 else:
173 new_score = Score(issue=issue, question=question, scoreset_id=scoreset_id)
174 new_scores.append(new_score)
175 results.append((new_score, True))
177 if new_scores:
178 session.add_all(new_scores)
180 return results
183def scoring_data(project: Project, scoreset_id: str = "") -> Query:
184 """
185 Returns a list of a score records for the given project and scoring set.
187 Each dict has keys: score, number, issue_id
189 """
190 session = object_session(project)
191 assert session is not None
192 q = (
193 session.query(
194 Score.score,
195 QuestionInstance.id.label("question_id"),
196 QuestionInstance.b36_number,
197 Score.issue_id,
198 )
199 .join(QuestionInstance)
200 .filter(QuestionInstance.project == project, Score.scoreset_id == scoreset_id)
201 )
203 return q
206def score_totals_by_project(
207 session: Session, org_id: str, project_ids: list[int]
208) -> Query:
209 project_ids_q = (
210 session.query(Participant.project_id)
211 .filter(Participant.org_id == org_id)
212 .filter(Participant.project_id.in_(project_ids))
213 .distinct()
214 )
216 return (
217 session.query(
218 func.sum(Score.score * TotalWeighting.absolute_weight).label(
219 "total_weighted_score"
220 ),
221 Project.title.label("project_title"),
222 Project.org_id.label("project_owner"),
223 Project.date_published,
224 Project.id.label("project_id"),
225 Issue.respondent_id,
226 )
227 .join(
228 TotalWeighting,
229 Score.question_instance_id == TotalWeighting.question_instance_id,
230 )
231 .join(Issue)
232 .join(Project)
233 .filter(
234 Score.scoreset_id == "",
235 TotalWeighting.weighting_set_id == Project.default_weighting_set_id,
236 TotalWeighting.section_id == 0,
237 Project.id.in_(project_ids_q),
238 )
239 .group_by(Project.id, Score.issue_id)
240 .order_by(Project.id.desc(), text("total_weighted_score desc"))
241 )
244def score_gaps(
245 issue: Issue,
246 weighting_set_id: int | None = None,
247 expose_weights: bool = True,
248 show_gap_value: bool = True,
249 debug: bool = False,
250) -> list[dict[str, Any]]:
251 # Resolve None weightset_id to the actual default weighting set ID
252 if weighting_set_id is None:
253 session = object_session(issue)
254 assert session is not None
255 project = session.query(Project).filter(Project.id == issue.project_id).one()
256 weighting_set_id = project.get_or_create_default_weighting_set_id()
258 tmpl = get_template("sql/score_gap.sql")
259 sql_script = text(
260 tmpl.render(
261 expose_weights=expose_weights, show_gap_value=show_gap_value, debug=debug
262 )
263 )
265 params = {
266 "project_id": issue.project_id,
267 "issue_id": issue.id,
268 "weighting_set_id": weighting_set_id,
269 }
271 res: list[dict[str, Any]] = []
272 session = object_session(issue)
274 assert session is not None
276 if show_gap_value:
277 for q_row in session.execute(sql_script, params):
278 q_dict = dict(q_row._mapping)
279 q_dict["number"] = from_b36(q_row.b36_number)
280 res.append(q_dict)
281 else:
282 # Don't show a numerical value for the gap, just =, - or +
283 for q_row in session.execute(sql_script, params):
284 q_dict = dict(q_row._mapping)
285 q_dict["number"] = from_b36(q_row.b36_number)
286 score_gap = q_dict["score_gap"]
287 if score_gap is None:
288 q_dict["score_gap"] = "=="
289 elif score_gap > 0:
290 q_dict["score_gap"] = "+"
291 elif score_gap == 0:
292 q_dict["score_gap"] = "=="
293 else:
294 # score_gap < 1
295 q_dict["score_gap"] = "-"
296 res.append(q_dict)
298 res.sort(key=itemgetter("number"))
300 return res
303def question_scoresummary(
304 session: Session,
305 user: User,
306 project: Project,
307 section: Section,
308 scoreset_id: str = "",
309) -> Query:
310 # Questions
311 questions = (
312 session.query(
313 QuestionInstance.id.label("question_id"),
314 Issue.id.label("issue_id"),
315 literal(scoreset_id).label("scoreset_id"),
316 )
317 .join(Section)
318 .join(Project, Section.project)
319 .join(Issue)
320 .outerjoin(
321 Score,
322 and_(
323 Score.scoreset_id == scoreset_id,
324 QuestionInstance.id == Score.question_instance_id,
325 Issue.id == Score.issue_id,
326 ),
327 )
328 .filter(
329 Issue.project == project,
330 Issue.scoreable_filter(project),
331 Section.id == section.id,
332 )
333 )
335 if user.is_restricted:
336 query = questions.outerjoin(
337 SectionPermission,
338 and_(
339 SectionPermission.user == user,
340 Section.id == SectionPermission.section_id,
341 ),
342 ).add_columns(
343 case(
344 (SectionPermission.section_id != None, Score.score), # noqa
345 else_=literal("N/A"),
346 ).label("score")
347 )
348 else:
349 query = questions.add_columns(Score.score)
351 return query
354def subsection_scoressummary(
355 session: Session,
356 user: User,
357 project: Project,
358 section: Section,
359 scoreset_id: str = "",
360) -> "Subquery":
361 """
362 Subsections
363 First create a subquery totalling up the counts of questions
364 contained immediately in each section.
365 Exclude all sections the score_user does not have permissions on
366 """
367 sub = (
368 session.query(
369 Section.b36_number.label("b36_number"),
370 Issue.id.label("issue_id"),
371 func.count(QuestionInstance.id).label("q_count"),
372 func.count(Score.score).label("q_scored"),
373 func.coalesce(func.sum(Score.score), 0).label("score"),
374 )
375 .join(Project, Section.project)
376 .join(Issue)
377 .outerjoin(
378 QuestionInstance,
379 and_(
380 QuestionInstance.project_id == Project.id,
381 QuestionInstance.section_id == Section.id,
382 ),
383 )
384 .outerjoin(
385 Score,
386 and_(
387 Score.scoreset_id == scoreset_id,
388 QuestionInstance.id == Score.question_instance_id,
389 Issue.id == Score.issue_id,
390 ),
391 )
392 .filter(Issue.project_id == project.id, Issue.scoreable_filter(project))
393 .group_by(Section.id, Issue.id)
394 )
396 if user.is_restricted:
397 sub = sub.join(
398 SectionPermission,
399 and_(
400 SectionPermission.user == user,
401 Section.id == SectionPermission.section_id,
402 ),
403 )
405 return sub.subquery()
408def section_scoresummary(
409 session: Session,
410 user: User,
411 project: Project,
412 section: Section,
413 sub: Union["Alias", "Subquery"],
414) -> Query:
415 """
416 create main query of all immediate subsections in this section
417 and total up the totals from the subsection and child subsections
418 from the subquery. No rows from the subquery indicates the score_user
419 does not have access to any subsections, in this case a score of 'N/A'
420 is returned.
421 """
423 subsections = (
424 session.query(
425 Section.id.label("section_id"),
426 Issue.id.label("issue_id"),
427 cast(func.sum(sub.c.q_count), INTEGER).label("question_count"),
428 cast(func.sum(sub.c.q_scored), INTEGER).label("questions_scored"),
429 func.coalesce(cast(func.sum(sub.c.score), INTEGER), literal("N/A")).label(
430 "score"
431 ),
432 )
433 .join(Project, Section.project)
434 .join(Issue)
435 .outerjoin(
436 sub,
437 and_(
438 sub.c.b36_number.startswith(Section.b36_number),
439 Issue.id == sub.c.issue_id,
440 ),
441 )
442 .filter(
443 Issue.project == project,
444 Issue.scoreable_filter(project),
445 Section.parent_id == section.id,
446 )
447 .group_by(Section.id, Issue.id)
448 )
450 return subsections
453def get_permission_for_scoreset(user, scoreset_id, to_save=False):
454 if scoreset_id in (None, "") or scoreset_id != user.id:
455 return (
456 perms.ISSUE_SAVE_AGREED_SCORES
457 if to_save
458 else perms.ISSUE_VIEW_AGREED_SCORES
459 )
460 else:
461 return perms.ISSUE_SAVE_SCORES if to_save else perms.ISSUE_VIEW_SCORES