Coverage for postrfp/buyer/api/endpoints/tags.py: 100%

83 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2Manage tags - keywords that can be used to categorize questions 

3""" 

4 

5from postrfp.shared import fetch 

6from typing import Callable, List 

7 

8from sqlalchemy import func, null 

9from sqlalchemy.orm import Session 

10from sqlalchemy.orm.exc import NoResultFound 

11 

12from postrfp.buyer.api import authorise 

13from postrfp.authorisation import perms 

14from postrfp.shared.decorators import http 

15from postrfp.model import ( 

16 User, 

17 Organisation, 

18 Project, 

19 Participant, 

20 Section, 

21 QuestionInstance, 

22) 

23from postrfp.model.tags import tags_qinstances_table, Tag 

24from postrfp.model.exc import DuplicateDataProvided 

25from postrfp.shared import serial 

26 

27 

28@http 

29def get_tags(session: Session, user: User) -> List[serial.Tag]: 

30 """ 

31 Get an array of all the Tags defined by your organization 

32 """ 

33 q = session.query(Tag).join(Organisation).filter(Organisation.id == user.org_id) 

34 

35 return [serial.Tag.model_validate(e) for e in q] 

36 

37 

38@http 

39def get_tags_section( 

40 session: Session, user: User, section_id: int 

41) -> List[serial.TagGroup]: 

42 """ 

43 Get an array of objects, one for each tag, listing the question ids assigned to each 

44 tag for the given section_id. 

45 

46 N.B. - this operation is not recursive only questions that are direct children of the given 

47 section ID are considered 

48 """ 

49 project_id = ( 

50 session.query(Section.project_id).filter(Section.id == section_id).scalar() 

51 ) 

52 if project_id is None: 

53 raise NoResultFound(f"Section ID {section_id} not found") 

54 project = fetch.project(session, project_id) 

55 

56 authorise.check( 

57 user, perms.PROJECT_VIEW_QUESTIONNAIRE, project=project, section_id=section_id 

58 ) 

59 

60 subsections = fetch.get_subsections_recursive(session, section_id).subquery() 

61 q = ( 

62 session.query( 

63 Tag.id.label("tag_id"), 

64 func.GROUP_CONCAT( 

65 func.IF(subsections.c.parent_lvl_1 == None, QuestionInstance.id, null()) # noqa: E711 

66 ).label("qids"), # noqa: E711 

67 func.GROUP_CONCAT(subsections.c.parent_lvl_1).label("section_ids"), 

68 ) 

69 .select_from(Tag) 

70 .join(tags_qinstances_table) 

71 .join(QuestionInstance) 

72 .join(subsections) 

73 .group_by(Tag.id) 

74 ) 

75 return [ 

76 serial.TagGroup( 

77 tag_id=r.tag_id, 

78 question_instance_ids=[int(qid) for qid in r.qids.split(",")] 

79 if r.qids 

80 else [], 

81 section_ids=[int(sid) for sid in r.section_ids.split(",")] 

82 if r.section_ids 

83 else [], 

84 ) 

85 for r in q 

86 ] 

87 

88 

89@http 

90def post_tag(session: Session, user: User, tag_doc: serial.NewTag) -> serial.Tag: 

91 """ 

92 Create a new Tag for your organization 

93 

94 @permission MANAGE_ORGANISATION 

95 """ 

96 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

97 tag = Tag(name=tag_doc.name, description=tag_doc.description, colour=tag_doc.colour) 

98 if ( 

99 session.query(Tag) 

100 .filter(Tag.name == tag.name) 

101 .filter(Tag.organisation == user.organisation) 

102 .all() 

103 ): 

104 raise DuplicateDataProvided(f"tag '{tag.name}' already exists") 

105 

106 user.organisation.tags.append(tag) 

107 session.flush() 

108 return serial.Tag.model_validate(tag) 

109 

110 

111@http 

112def put_tag( 

113 session: Session, user: User, tag_id: int, tag_doc: serial.NewTag 

114) -> serial.Tag: 

115 """ 

116 Update the name and description of an existing tag 

117 

118 @permission MANAGE_ORGANISATION 

119 """ 

120 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

121 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one() 

122 tag.name = tag_doc.name 

123 tag.description = tag_doc.description 

124 tag.colour = tag_doc.colour 

125 return serial.Tag.model_validate(tag) 

126 

127 

128@http 

129def delete_tag(session: Session, user: User, tag_id: int): 

130 """ 

131 Delete the tag with the given ID and remove all references to that tag 

132 from related items. 

133 

134 @permission MANAGE_ORGANISATION 

135 """ 

136 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

137 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one() 

138 session.delete(tag) 

139 

140 

141def _tag_assign( 

142 session: Session, 

143 user: User, 

144 tag_id: int, 

145 tag_assigns_doc: serial.TagAssigns, 

146 action_func: Callable, 

147): 

148 """ 

149 Processes a TagAssigns document listing question ids to be assigned/unassigned (linked/unlinked) 

150 to the tag with the given ID. Takes care of checking that that question IDs belong to projects 

151 visible to the current user. 

152 

153 Action to assign or unassign the tag is delegated to the 'action_func' callable. 

154 """ 

155 

156 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

157 

158 # Check tag exists for org 

159 tag = session.query(Tag).filter(Tag.id == tag_id, Tag.org_id == user.org_id).one() 

160 

161 _add_questions_from_sections(session, user, tag_assigns_doc) 

162 

163 instance_ids = set(tag_assigns_doc.question_instance_ids) 

164 provided_ids = [i for i in instance_ids] 

165 

166 qi_q = ( 

167 session.query(QuestionInstance) 

168 .select_from(QuestionInstance) 

169 .join(Participant, Participant.org_id == user.org_id) 

170 .join(Project, Project.id == QuestionInstance.project_id) 

171 .filter(QuestionInstance.id.in_(instance_ids)) 

172 ) 

173 

174 action_func(tag, qi_q, instance_ids) 

175 

176 if instance_ids: 

177 unmatched = ", ".join(str(i) for i in instance_ids) 

178 raise ValueError( 

179 f"The following Question Instance ids were not found: {unmatched}" 

180 ) 

181 

182 return provided_ids 

183 

184 

185def _add_questions_from_sections( 

186 session: Session, user: User, tag_assigns_doc: serial.TagAssigns 

187): 

188 """ 

189 Look up question instance ids for the sections IDs provided. Extend the list of provided 

190 question_instance_id values in tag_assigns_docs. 

191 """ 

192 qids = tag_assigns_doc.question_instance_ids 

193 for sec in ( 

194 session.query(Section) 

195 .join(Project, Project.id == Section.project_id) 

196 .join(Participant) 

197 .filter(Participant.org_id == user.org_id) 

198 .filter(Section.id.in_(tag_assigns_doc.section_ids)) 

199 ): 

200 if tag_assigns_doc.recursive: 

201 q = ( 

202 session.query(QuestionInstance.id) 

203 .filter(QuestionInstance.project_id == sec.project_id) 

204 .filter(QuestionInstance.b36_number.startswith(sec.b36_number)) 

205 ) 

206 else: 

207 q = session.query(QuestionInstance.id).filter( 

208 QuestionInstance.section == sec 

209 ) 

210 qids.extend(r.id for r in q) 

211 

212 

213@http 

214def post_tag_assign( 

215 session: Session, user: User, tag_id: int, tag_assigns_doc: serial.TagAssigns 

216) -> List[int]: 

217 """ 

218 Assign the Tag with the given ID to the question instances with ID values 

219 provided in the TagAssigns document. 

220 

221 Duplicate assignments are silently ignored. 

222 

223 An array of assigned question IDs is returned. 

224 

225 @permission MANAGE_ORGANISATION 

226 """ 

227 

228 def assign(tag, question_instances, qids_to_assign): 

229 for qi in question_instances: 

230 qids_to_assign.remove(qi.id) 

231 if tag not in qi.tags: 

232 qi.tags.append(tag) 

233 

234 return _tag_assign(session, user, tag_id, tag_assigns_doc, assign) 

235 

236 

237@http 

238def delete_tag_assign( 

239 session: Session, user: User, tag_id: int, tag_assigns_doc: serial.TagAssigns 

240) -> List[int]: 

241 """ 

242 Un-assign the Tag with the given ID from the question instances with ID values 

243 provided in the TagAssigns document. 

244 

245 An array of un-assigned question IDs is returned. 

246 

247 @permission MANAGE_ORGANISATION 

248 """ 

249 

250 def un_assign(tag, question_instances, qids_to_assign): 

251 for qi in question_instances: 

252 qids_to_assign.remove(qi.id) 

253 if tag in qi.tags: 

254 qi.tags.remove(tag) 

255 

256 return _tag_assign(session, user, tag_id, tag_assigns_doc, un_assign)