Coverage for postrfp / shared / fetch / scoreq.py: 95%
123 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1"""
2Functions which fetch Score objects from the database
3"""
5import logging
6from operator import itemgetter
7from typing import Any, TYPE_CHECKING, Union, NamedTuple
8from decimal import Decimal
10from sqlalchemy.orm.exc import NoResultFound
12from sqlalchemy import literal, func, case, and_, cast, INTEGER, VARCHAR, text, tuple_
13from sqlalchemy.orm import Session, Query
14from sqlalchemy.orm.session import object_session
16if TYPE_CHECKING:
17 from sqlalchemy import Alias, Subquery
18 from decimal import Decimal
20from postrfp.model.questionnaire.b36 import from_b36
21from postrfp.model import (
22 User,
23 Project,
24 Section,
25 Issue,
26 Score,
27 QuestionInstance,
28 SectionPermission,
29 Participant,
30 TotalWeighting,
31)
32from postrfp.templates import get_template
33from postrfp.authorisation import perms
36log = logging.getLogger(__name__)
39class ScoreTuple(NamedTuple):
40 issue_id: int
41 score_value: Decimal | None = None
42 scoreset_id: str = ""
43 comment: str | None = None
46def scores(
47 session: Session,
48 project: Project,
49 section: Section,
50 scoreset_id: str,
51 user: User,
52 filter_by,
53) -> list[dict[str, Union[str, int, "Decimal"]]]:
54 Score.validate_scoreset(scoreset_id, project)
55 score_perm = get_permission_for_scoreset(user, scoreset_id)
56 user.check_permission(score_perm)
58 sq = (
59 session.query(
60 QuestionInstance.id.label("question_id"),
61 Issue.id.label("issue_id"),
62 literal(scoreset_id).label("scoreset_id"),
63 Score.score.label("score"),
64 )
65 .join(Section)
66 .join(Project, Section.project)
67 .join(Issue)
68 .outerjoin(
69 Score,
70 and_(
71 Score.scoreset_id == scoreset_id,
72 QuestionInstance.id == Score.question_instance_id,
73 Issue.id == Score.issue_id,
74 ),
75 )
76 .filter(Project.id == Issue.project_id, Section.id == section.id, filter_by)
77 .order_by(QuestionInstance.id.asc(), Issue.id.asc())
78 )
79 return [s._asdict() for s in sq]
82def or_create_score(
83 session: Session,
84 project: Project,
85 question: QuestionInstance,
86 score_doc,
87) -> tuple[Score, bool]:
88 """
89 Get issue and question required to access score
90 Do required validation and permission checks
91 """
92 issue_id, scoreset_id = score_doc.issue_id, score_doc.scoreset_id
93 if scoreset_id is None:
94 scoreset_id = ""
96 Score.validate_scoreset(scoreset_id, project)
98 issue = project.get_issue(issue_id)
100 # Fetch or create score object
101 created = False
102 try:
103 score = (
104 session.query(Score)
105 .filter(
106 Score.question_instance_id == question.id,
107 Score.issue_id == issue.id,
108 Score.scoreset_id == scoreset_id,
109 )
110 .one()
111 )
112 except NoResultFound:
113 score = Score(issue=issue, question=question, scoreset_id=scoreset_id)
114 session.add(score)
115 created = True
117 return score, created
120def or_create_scores_batch(
121 session: Session,
122 project: Project,
123 score_requests: list[tuple[QuestionInstance, ScoreTuple]],
124) -> list[tuple[Score, bool]]:
125 """
126 Batch version of or_create_score for better performance.
128 Args:
129 session: Database session
130 project: Project object
131 score_requests: List of (question, score_doc) tuples
133 Returns:
134 List of (score, created) tuples in the same order as input
135 """
136 if not score_requests:
137 return []
139 # Normalize scoreset_ids and validate
140 normalized_requests = []
141 for question, score_doc in score_requests:
142 scoreset_id = score_doc.scoreset_id or ""
143 Score.validate_scoreset(scoreset_id, project)
144 issue = project.get_issue(score_doc.issue_id)
145 normalized_requests.append((question, issue, scoreset_id, score_doc))
147 # Build keys for a single fetch of existing scores
148 keys = [
149 (question.id, issue.id, scoreset_id)
150 for (question, issue, scoreset_id, _sd) in normalized_requests
151 ]
153 # Fetch all existing scores
154 existing_scores = {}
155 if keys:
156 existing_q = session.query(Score).filter(
157 tuple_(Score.question_instance_id, Score.issue_id, Score.scoreset_id).in_(
158 keys
159 )
160 )
161 for score in existing_q:
162 key = (score.question_instance_id, score.issue_id, score.scoreset_id)
163 existing_scores[key] = score
165 # Build results, creating new scores where needed
166 results = []
167 new_scores = []
168 for question, issue, scoreset_id, score_doc in normalized_requests:
169 key = (question.id, issue.id, scoreset_id)
170 if key in existing_scores:
171 results.append((existing_scores[key], False))
172 else:
173 new_score = Score(issue=issue, question=question, scoreset_id=scoreset_id)
174 new_scores.append(new_score)
175 results.append((new_score, True))
177 if new_scores:
178 session.add_all(new_scores)
180 return results
183def scoring_data(project: Project, scoreset_id: str = "") -> Query:
184 """
185 Returns a list of a score records for the given project and scoring set.
187 Each dict has keys: score, number, issue_id
189 """
190 session = object_session(project)
191 assert session is not None
192 q = (
193 session.query(
194 Score.score,
195 QuestionInstance.id.label("question_id"),
196 QuestionInstance.b36_number,
197 Score.issue_id,
198 )
199 .join(QuestionInstance)
200 .filter(QuestionInstance.project == project, Score.scoreset_id == scoreset_id)
201 )
203 return q
206def score_totals_by_project(
207 session: Session, org_id: str, project_ids: list[int]
208) -> Query:
209 project_ids_q = (
210 session.query(Participant.project_id)
211 .filter(Participant.org_id == org_id)
212 .filter(Participant.project_id.in_(project_ids))
213 .distinct()
214 )
216 return (
217 session.query(
218 func.sum(Score.score * TotalWeighting.absolute_weight).label(
219 "total_weighted_score"
220 ),
221 Project.title.label("project_title"),
222 Project.org_id.label("project_owner"),
223 Project.date_published,
224 Project.id.label("project_id"),
225 Issue.respondent_id,
226 )
227 .join(
228 TotalWeighting,
229 Score.question_instance_id == TotalWeighting.question_instance_id,
230 )
231 .join(Issue)
232 .join(Project)
233 .filter(
234 Score.scoreset_id == "",
235 TotalWeighting.weighting_set_id == Project.default_weighting_set_id,
236 TotalWeighting.section_id == 0,
237 Project.id.in_(project_ids_q),
238 )
239 .group_by(
240 Project.id,
241 Score.issue_id,
242 Issue.respondent_id,
243 Project.title,
244 Project.org_id,
245 Project.date_published,
246 )
247 .order_by(Project.id.desc(), text("total_weighted_score desc"))
248 )
251def score_gaps(
252 issue: Issue,
253 weighting_set_id: int | None = None,
254 expose_weights: bool = True,
255 show_gap_value: bool = True,
256 debug: bool = False,
257) -> list[dict[str, Any]]:
258 # Resolve None weightset_id to the actual default weighting set ID
259 if weighting_set_id is None:
260 session = object_session(issue)
261 assert session is not None
262 project = session.query(Project).filter(Project.id == issue.project_id).one()
263 weighting_set_id = project.get_or_create_default_weighting_set_id()
265 tmpl = get_template("sql/score_gap.sql")
266 sql_script = text(
267 tmpl.render(
268 expose_weights=expose_weights, show_gap_value=show_gap_value, debug=debug
269 )
270 )
272 params = {
273 "project_id": issue.project_id,
274 "issue_id": issue.id,
275 "weighting_set_id": weighting_set_id,
276 }
278 res: list[dict[str, Any]] = []
279 session = object_session(issue)
281 assert session is not None
283 if show_gap_value:
284 for q_row in session.execute(sql_script, params):
285 q_dict = dict(q_row._mapping)
286 q_dict["number"] = from_b36(q_row.b36_number)
287 res.append(q_dict)
288 else:
289 # Don't show a numerical value for the gap, just =, - or +
290 for q_row in session.execute(sql_script, params):
291 q_dict = dict(q_row._mapping)
292 q_dict["number"] = from_b36(q_row.b36_number)
293 score_gap = q_dict["score_gap"]
294 if score_gap is None:
295 q_dict["score_gap"] = "=="
296 elif score_gap > 0:
297 q_dict["score_gap"] = "+"
298 elif score_gap == 0:
299 q_dict["score_gap"] = "=="
300 else:
301 # score_gap < 1
302 q_dict["score_gap"] = "-"
303 res.append(q_dict)
305 res.sort(key=itemgetter("number"))
307 return res
310def question_scoresummary(
311 session: Session,
312 user: User,
313 project: Project,
314 section: Section,
315 scoreset_id: str = "",
316) -> Query:
317 # Questions
318 questions = (
319 session.query(
320 QuestionInstance.id.label("question_id"),
321 Issue.id.label("issue_id"),
322 literal(scoreset_id).label("scoreset_id"),
323 )
324 .join(Section)
325 .join(Project, Section.project)
326 .join(Issue)
327 .outerjoin(
328 Score,
329 and_(
330 Score.scoreset_id == scoreset_id,
331 QuestionInstance.id == Score.question_instance_id,
332 Issue.id == Score.issue_id,
333 ),
334 )
335 .filter(
336 Issue.project == project,
337 Issue.scoreable_filter(project),
338 Section.id == section.id,
339 )
340 )
342 if user.is_restricted:
343 query = questions.outerjoin(
344 SectionPermission,
345 and_(
346 SectionPermission.user == user,
347 Section.id == SectionPermission.section_id,
348 ),
349 ).add_columns(
350 case(
351 (SectionPermission.section_id != None, cast(Score.score, VARCHAR)), # noqa
352 else_=literal("N/A"),
353 ).label("score")
354 )
355 else:
356 query = questions.add_columns(Score.score)
358 return query
361def subsection_scoressummary(
362 session: Session,
363 user: User,
364 project: Project,
365 section: Section,
366 scoreset_id: str = "",
367) -> "Subquery":
368 """
369 Subsections
370 First create a subquery totalling up the counts of questions
371 contained immediately in each section.
372 Exclude all sections the score_user does not have permissions on
373 """
374 sub = (
375 session.query(
376 Section.b36_number.label("b36_number"),
377 Issue.id.label("issue_id"),
378 func.count(QuestionInstance.id).label("q_count"),
379 func.count(Score.score).label("q_scored"),
380 func.coalesce(func.sum(Score.score), 0).label("score"),
381 )
382 .join(Project, Section.project)
383 .join(Issue)
384 .outerjoin(
385 QuestionInstance,
386 and_(
387 QuestionInstance.project_id == Project.id,
388 QuestionInstance.section_id == Section.id,
389 ),
390 )
391 .outerjoin(
392 Score,
393 and_(
394 Score.scoreset_id == scoreset_id,
395 QuestionInstance.id == Score.question_instance_id,
396 Issue.id == Score.issue_id,
397 ),
398 )
399 .filter(Issue.project_id == project.id, Issue.scoreable_filter(project))
400 .group_by(Section.id, Issue.id, Section.b36_number)
401 )
403 if user.is_restricted:
404 sub = sub.join(
405 SectionPermission,
406 and_(
407 SectionPermission.user == user,
408 Section.id == SectionPermission.section_id,
409 ),
410 )
412 return sub.subquery()
415def section_scoresummary(
416 session: Session,
417 user: User,
418 project: Project,
419 section: Section,
420 sub: Union["Alias", "Subquery"],
421) -> Query:
422 """
423 create main query of all immediate subsections in this section
424 and total up the totals from the subsection and child subsections
425 from the subquery. No rows from the subquery indicates the score_user
426 does not have access to any subsections, in this case a score of 'N/A'
427 is returned.
428 """
430 subsections = (
431 session.query(
432 Section.id.label("section_id"),
433 Issue.id.label("issue_id"),
434 cast(func.sum(sub.c.q_count), INTEGER).label("question_count"),
435 cast(func.sum(sub.c.q_scored), INTEGER).label("questions_scored"),
436 func.coalesce(
437 cast(cast(func.sum(sub.c.score), INTEGER), VARCHAR), literal("N/A")
438 ).label("score"),
439 )
440 .join(Project, Section.project)
441 .join(Issue)
442 .outerjoin(
443 sub,
444 and_(
445 sub.c.b36_number.startswith(Section.b36_number),
446 Issue.id == sub.c.issue_id,
447 ),
448 )
449 .filter(
450 Issue.project == project,
451 Issue.scoreable_filter(project),
452 Section.parent_id == section.id,
453 )
454 .group_by(Section.id, Issue.id)
455 )
457 return subsections
460def get_permission_for_scoreset(user, scoreset_id, to_save=False):
461 if scoreset_id in (None, "") or scoreset_id != user.id:
462 return (
463 perms.ISSUE_SAVE_AGREED_SCORES
464 if to_save
465 else perms.ISSUE_VIEW_AGREED_SCORES
466 )
467 else:
468 return perms.ISSUE_SAVE_SCORES if to_save else perms.ISSUE_VIEW_SCORES