Coverage for postrfp/vendor/api/attachments.py: 99%

75 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import logging 

2from cgi import FieldStorage 

3 

4from sqlalchemy.orm import Session 

5 

6from postrfp.authorisation import perms 

7from postrfp.model.questionnaire.qelements import ( 

8 QuestionAttachment, 

9) 

10from ..validation import validate 

11from postrfp.shared.decorators import http 

12from postrfp.model import IssueAttachment, User 

13from postrfp.shared import attachments, fetch, serial, update 

14from postrfp.shared.response import XAccelAttachmentResponse 

15from postrfp.model.questionnaire.answering import AAttachment 

16 

17log = logging.getLogger(__name__) 

18 

19 

20@http 

21def post_issue_attachment( 

22 session: Session, 

23 effective_user: User, 

24 issue_id: int, 

25 attachment_upload: str, 

26 attachment_description: str, 

27) -> serial.Id: 

28 """ 

29 Upload an Issue attachment together with metadata (description etc) 

30 

31 @param Attachment 

32 """ 

33 issue = fetch.issue(session, issue_id) 

34 validate( 

35 effective_user, issue=issue, action=perms.ISSUE_MANAGE_RESPONDENT_ATTACHMENTS 

36 ) 

37 

38 att = attachments.save_issue_attachment( 

39 session, issue_id, effective_user, attachment_upload, attachment_description 

40 ) 

41 return serial.Id(id=att.id) 

42 

43 

44@http 

45def post_issue_answer_attachment( 

46 session: Session, 

47 effective_user: User, 

48 issue_id: int, 

49 attachment_upload: FieldStorage, 

50 el_id: int, 

51) -> serial.AnswerAttachmentIds: 

52 """ 

53 Upload an attachment to provide an answer to a specific question. 

54 """ 

55 issue = fetch.issue(session, issue_id) 

56 qelement = fetch.qelement(session, el_id) 

57 question_instance = qelement.get_question_instance(issue.project_id) 

58 

59 validate( 

60 effective_user, 

61 issue=issue, 

62 question=question_instance, 

63 action=perms.ISSUE_SAVE_QUESTION_RESPONSE, 

64 ) 

65 

66 answers_lookup = {el_id: attachment_upload.filename} 

67 

68 # First, save and lookup an Answer - this is needed to find the disc 

69 # file path for the AAttachment. Don't use upload.save_answers() 

70 # because we don't want to log an event until we have the correct filename 

71 # with the filesize 

72 question_instance.validate_and_save_answers(answers_lookup, issue) 

73 session.flush() 

74 answer = question_instance._answers.filter_by( 

75 issue_id=issue.id, element_id=el_id 

76 ).one() 

77 

78 answer_attachment = AAttachment() 

79 answer_attachment.answer = answer 

80 answer_attachment.filename = attachment_upload.filename 

81 answer_attachment.guess_set_mimetype(attachment_upload.filename) 

82 session.add(answer_attachment) 

83 session.flush() # Need to flush to get the ID values set 

84 

85 attachments.save_to_disc(answer_attachment, attachment_upload.file) 

86 answer_txt = f"{answer_attachment.filename} ({answer_attachment.size})" 

87 answer_lookup = {el_id: answer_txt} 

88 

89 update.save_answers( 

90 session, effective_user, question_instance, answer_lookup, issue 

91 ) 

92 

93 return serial.AnswerAttachmentIds( 

94 attachment_id=answer_attachment.id, answer_id=answer.id 

95 ) 

96 

97 

98@http 

99def get_issue_answer_attachment( 

100 session: Session, effective_user: User, issue_id: int, attachment_id: int 

101): 

102 """Download a single Issue Attachment file""" 

103 issue = fetch.issue(session, issue_id) 

104 supporting_attachment_element = fetch.qelement(session, attachment_id) 

105 answer = supporting_attachment_element.get_answer(issue) 

106 

107 validate( 

108 effective_user, issue=issue, action=perms.ISSUE_VIEW_ANSWERS, answer=answer 

109 ) 

110 

111 return XAccelAttachmentResponse(answer.attachment) 

112 

113 

114@http 

115def get_issue_attachment( 

116 session: Session, effective_user: User, issue_id: int, attachment_id: int 

117): 

118 """Download a single Issue Attachment file""" 

119 issue = fetch.issue(session, issue_id) 

120 validate(effective_user, issue=issue, action=perms.ISSUE_VIEW_ANSWERS) 

121 ia = issue.attachments.filter_by(id=attachment_id).one() 

122 return XAccelAttachmentResponse(ia) 

123 

124 

125@http 

126def get_issue_attachments( 

127 session: Session, effective_user: User, issue_id: int 

128) -> list[serial.Attachment]: 

129 """list Issue Attachments for the given Issue ID""" 

130 issue = fetch.issue(session, issue_id) 

131 validate(effective_user, issue=issue, action=perms.ISSUE_VIEW_ANSWERS) 

132 return [ia.as_dict() for ia in issue.attachments] 

133 

134 

135@http 

136def delete_issue_attachments( 

137 session: Session, effective_user: User, issue_id: int, ids_doc: serial.IdList 

138): 

139 """ 

140 Delete those Issue attachments whose IDs are given in the JSON body 

141 document 

142 """ 

143 

144 issue = fetch.issue(session, issue_id) 

145 validate( 

146 effective_user, issue=issue, action=perms.ISSUE_MANAGE_RESPONDENT_ATTACHMENTS 

147 ) 

148 

149 att_id_list = ids_doc.ids 

150 atts = ( 

151 session.query(IssueAttachment) 

152 .filter(IssueAttachment.id.in_(att_id_list)) 

153 .filter(IssueAttachment.issue_id == issue_id) 

154 ) 

155 

156 for att in atts: 

157 session.delete(att) 

158 attachments.delete_from_disc(att) 

159 

160 

161@http 

162def get_issue_question_attachment( 

163 session: Session, 

164 effective_user: User, 

165 issue_id: int, 

166 question_id: int, 

167 attachment_id: int, 

168): 

169 """ 

170 Download a Question Attachment. 

171 @attachment_id is the ID of the QuestionAttachment *Element*, 

172 not QAttachment 

173 """ 

174 issue = fetch.issue(session, issue_id) 

175 qi = fetch.question(session, question_id) 

176 validate(effective_user, issue=issue, question=qi) 

177 

178 qe = session.get_one(QuestionAttachment, attachment_id) 

179 if qe.question_id != qi.question_def_id: 

180 raise ValueError("Attachment does not belong to the given Question") 

181 

182 return XAccelAttachmentResponse(qe.attachment)