Coverage for postrfp/buyer/api/endpoints/tags.py: 100%
83 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2Manage tags - keywords that can be used to categorize questions
3"""
5from postrfp.shared import fetch
6from typing import Callable, List
8from sqlalchemy import func, null
9from sqlalchemy.orm import Session
10from sqlalchemy.orm.exc import NoResultFound
12from postrfp.buyer.api import authorise
13from postrfp.authorisation import perms
14from postrfp.shared.decorators import http
15from postrfp.model import (
16 User,
17 Organisation,
18 Project,
19 Participant,
20 Section,
21 QuestionInstance,
22)
23from postrfp.model.tags import tags_qinstances_table, Tag
24from postrfp.model.exc import DuplicateDataProvided
25from postrfp.shared import serial
28@http
29def get_tags(session: Session, user: User) -> List[serial.Tag]:
30 """
31 Get an array of all the Tags defined by your organization
32 """
33 q = session.query(Tag).join(Organisation).filter(Organisation.id == user.org_id)
35 return [serial.Tag.model_validate(e) for e in q]
38@http
39def get_tags_section(
40 session: Session, user: User, section_id: int
41) -> List[serial.TagGroup]:
42 """
43 Get an array of objects, one for each tag, listing the question ids assigned to each
44 tag for the given section_id.
46 N.B. - this operation is not recursive only questions that are direct children of the given
47 section ID are considered
48 """
49 project_id = (
50 session.query(Section.project_id).filter(Section.id == section_id).scalar()
51 )
52 if project_id is None:
53 raise NoResultFound(f"Section ID {section_id} not found")
54 project = fetch.project(session, project_id)
56 authorise.check(
57 user, perms.PROJECT_VIEW_QUESTIONNAIRE, project=project, section_id=section_id
58 )
60 subsections = fetch.get_subsections_recursive(session, section_id).subquery()
61 q = (
62 session.query(
63 Tag.id.label("tag_id"),
64 func.GROUP_CONCAT(
65 func.IF(subsections.c.parent_lvl_1 == None, QuestionInstance.id, null()) # noqa: E711
66 ).label("qids"), # noqa: E711
67 func.GROUP_CONCAT(subsections.c.parent_lvl_1).label("section_ids"),
68 )
69 .select_from(Tag)
70 .join(tags_qinstances_table)
71 .join(QuestionInstance)
72 .join(subsections)
73 .group_by(Tag.id)
74 )
75 return [
76 serial.TagGroup(
77 tag_id=r.tag_id,
78 question_instance_ids=[int(qid) for qid in r.qids.split(",")]
79 if r.qids
80 else [],
81 section_ids=[int(sid) for sid in r.section_ids.split(",")]
82 if r.section_ids
83 else [],
84 )
85 for r in q
86 ]
89@http
90def post_tag(session: Session, user: User, tag_doc: serial.NewTag) -> serial.Tag:
91 """
92 Create a new Tag for your organization
94 @permission MANAGE_ORGANISATION
95 """
96 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
97 tag = Tag(name=tag_doc.name, description=tag_doc.description, colour=tag_doc.colour)
98 if (
99 session.query(Tag)
100 .filter(Tag.name == tag.name)
101 .filter(Tag.organisation == user.organisation)
102 .all()
103 ):
104 raise DuplicateDataProvided(f"tag '{tag.name}' already exists")
106 user.organisation.tags.append(tag)
107 session.flush()
108 return serial.Tag.model_validate(tag)
111@http
112def put_tag(
113 session: Session, user: User, tag_id: int, tag_doc: serial.NewTag
114) -> serial.Tag:
115 """
116 Update the name and description of an existing tag
118 @permission MANAGE_ORGANISATION
119 """
120 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
121 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one()
122 tag.name = tag_doc.name
123 tag.description = tag_doc.description
124 tag.colour = tag_doc.colour
125 return serial.Tag.model_validate(tag)
128@http
129def delete_tag(session: Session, user: User, tag_id: int):
130 """
131 Delete the tag with the given ID and remove all references to that tag
132 from related items.
134 @permission MANAGE_ORGANISATION
135 """
136 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
137 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one()
138 session.delete(tag)
141def _tag_assign(
142 session: Session,
143 user: User,
144 tag_id: int,
145 tag_assigns_doc: serial.TagAssigns,
146 action_func: Callable,
147):
148 """
149 Processes a TagAssigns document listing question ids to be assigned/unassigned (linked/unlinked)
150 to the tag with the given ID. Takes care of checking that that question IDs belong to projects
151 visible to the current user.
153 Action to assign or unassign the tag is delegated to the 'action_func' callable.
154 """
156 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
158 # Check tag exists for org
159 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one()
161 _add_questions_from_sections(session, user, tag_assigns_doc)
163 instance_ids = set(tag_assigns_doc.question_instance_ids)
164 provided_ids = [i for i in instance_ids]
166 qi_q = (
167 session.query(QuestionInstance)
168 .select_from(QuestionInstance)
169 .join(Participant, Participant.org_id == user.org_id)
170 .join(Project, Project.id == QuestionInstance.project_id)
171 .filter(QuestionInstance.id.in_(instance_ids))
172 )
174 action_func(tag, qi_q, instance_ids)
176 if instance_ids:
177 unmatched = ", ".join(str(i) for i in instance_ids)
178 raise ValueError(
179 f"The following Question Instance ids were not found: {unmatched}"
180 )
182 return provided_ids
185def _add_questions_from_sections(
186 session: Session, user: User, tag_assigns_doc: serial.TagAssigns
187):
188 """
189 Look up question instance ids for the sections IDs provided. Extend the list of provided
190 question_instance_id values in tag_assigns_docs.
191 """
192 qids = tag_assigns_doc.question_instance_ids
193 for sec in (
194 session.query(Section)
195 .join(Project, Project.id == Section.project_id)
196 .join(Participant)
197 .filter(Participant.org_id == user.org_id)
198 .filter(Section.id.in_(tag_assigns_doc.section_ids))
199 ):
200 if tag_assigns_doc.recursive:
201 q = (
202 session.query(QuestionInstance.id)
203 .filter(QuestionInstance.project_id == sec.project_id)
204 .filter(QuestionInstance.b36_number.startswith(sec.b36_number))
205 )
206 else:
207 q = session.query(QuestionInstance.id).filter(
208 QuestionInstance.section == sec
209 )
210 qids.extend(r.id for r in q)
213@http
214def post_tag_assign(
215 session: Session, user: User, tag_id: int, tag_assigns_doc: serial.TagAssigns
216) -> List[int]:
217 """
218 Assign the Tag with the given ID to the question instances with ID values
219 provided in the TagAssigns document.
221 Duplicate assignments are silently ignored.
223 An array of assigned question IDs is returned.
225 @permission MANAGE_ORGANISATION
226 """
228 def assign(tag, question_instances, qids_to_assign):
229 for qi in question_instances:
230 qids_to_assign.remove(qi.id)
231 if tag not in qi.tags:
232 qi.tags.append(tag)
234 return _tag_assign(session, user, tag_id, tag_assigns_doc, assign)
237@http
238def delete_tag_assign(
239 session: Session, user: User, tag_id: int, tag_assigns_doc: serial.TagAssigns
240) -> List[int]:
241 """
242 Un-assign the Tag with the given ID from the question instances with ID values
243 provided in the TagAssigns document.
245 An array of un-assigned question IDs is returned.
247 @permission MANAGE_ORGANISATION
248 """
250 def un_assign(tag, question_instances, qids_to_assign):
251 for qi in question_instances:
252 qids_to_assign.remove(qi.id)
253 if tag in qi.tags:
254 qi.tags.remove(tag)
256 return _tag_assign(session, user, tag_id, tag_assigns_doc, un_assign)