Coverage for postrfp/shared/attachments.py: 100%

106 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import pathlib 

2from typing import BinaryIO 

3 

4import logging 

5from datetime import datetime 

6from postrfp.model.questionnaire.answering import AAttachment 

7from postrfp.model.meta import AttachmentMixin 

8 

9from sqlalchemy.orm.session import Session 

10 

11from postrfp.model import ProjectAttachment, IssueAttachment 

12 

13from postrfp import conf 

14from postrfp.model.questionnaire.qelements import QAttachment 

15 

16 

17log = logging.getLogger(__name__) 

18 

19 

20class OrphanedAttachmentError(ValueError): 

21 pass 

22 

23 

24def get_sub_path(att) -> str: 

25 if not isinstance(att.id, int): 

26 raise ValueError("Can only determine path for attachments with an ID") 

27 

28 if isinstance(att, ProjectAttachment): 

29 return f"{att.project.id}/PROJECT/{att.id}" 

30 

31 if isinstance(att, IssueAttachment): 

32 return f"{att.issue.project.id}/ISSUE/{att.id}" 

33 

34 if isinstance(att, QAttachment): 

35 return f"QUESTION_ATTACHMENTS/{att.id}" 

36 

37 if isinstance(att, AAttachment): 

38 if att.answer is None: 

39 m = f"answer attachment # {att.id} is an orphan- no Answer found" 

40 raise OrphanedAttachmentError(m) 

41 return f"{att.answer.issue.project_id}/ANSWER/{att.id}" 

42 

43 raise ValueError(f"Attachment {att} is not a recognised type") 

44 

45 

46def local_to_full(local_path) -> pathlib.Path: 

47 atts_dir = root_attachments_dir() 

48 return atts_dir.joinpath(local_path) 

49 

50 

51def root_attachments_dir() -> pathlib.Path: 

52 return pathlib.Path(conf.CONF.attachments_dir) 

53 

54 

55def get_full_path(attachment) -> pathlib.Path: 

56 """pathlib.Path object for the attachment""" 

57 sub_path = get_sub_path(attachment) 

58 return local_to_full(sub_path) 

59 

60 

61def save_to_disc(attachment, readable_object: BinaryIO, overwrite=False): 

62 """ 

63 Handles saving a physical attachment file associated with an attachment 

64 object/ record (e.g. a Project Attachment) to disc. 

65 """ 

66 

67 if not hasattr(readable_object, "read"): 

68 raise ValueError("readable_object must have a read() method") 

69 

70 fpath = get_full_path(attachment) 

71 

72 if fpath.exists() and not overwrite: 

73 raise IOError("{} already exists, will not overwrite".format(fpath)) 

74 

75 if not fpath.parent.exists(): 

76 fpath.parent.mkdir(parents=True) 

77 

78 bytes_written = fpath.write_bytes(readable_object.read()) 

79 

80 attachment.size_bytes = bytes_written 

81 

82 

83def delete_from_disc(attachment): 

84 fpath = get_full_path(attachment) 

85 fpath.unlink() 

86 log.info("Deleted attachment %s from filepath %s", attachment.filename, fpath) 

87 return fpath 

88 

89 

90def find_orphan_answer_attachment(root_dir: pathlib.Path, attachment_id): 

91 try: 

92 return list(root_dir.glob(f"*/ANSWER/{attachment_id}"))[0] 

93 except IndexError: 

94 raise FileNotFoundError(f"Couldn't find attachment id {attachment_id}") 

95 

96 

97def delete_orphan_answer_attachments(session, dry_run=True, prune_db=False): 

98 """ 

99 Find and delete answer attachment files whose parent answer record has 

100 been deleted, leaving them orphan. 

101 If dry_run is True, just identify and return the files that would be chopped 

102 If prune_db is True, also delete the records from the database. 

103 

104 Returns a tuple: 

105 (deleted, missing, deleted_row_count) 

106 

107 First two values are a Set of pathlib.Path objects 

108 """ 

109 root = root_attachments_dir() 

110 deleted_filenames = set() 

111 missing_filenames = set() 

112 deleted_row_count = 0 

113 

114 for a in session.query(AAttachment.id).filter(AAttachment.answer_id.is_(None)): 

115 try: 

116 att_filename = find_orphan_answer_attachment(root, a.id) 

117 if not dry_run: 

118 att_filename.unlink() 

119 deleted_filenames.add(att_filename) 

120 except FileNotFoundError: 

121 missing_filenames.add(f"/ANSWER/{a.id}") 

122 if prune_db and not dry_run: 

123 session.commit() 

124 rc = session.query(AAttachment).filter(AAttachment.answer_id.is_(None)).delete() 

125 deleted_row_count = rc 

126 

127 return (deleted_filenames, missing_filenames, deleted_row_count) 

128 

129 

130def save_issue_attachment(session, issue_id, user, cgi_file, description): 

131 ia = IssueAttachment() 

132 ia.filename = cgi_file.filename 

133 ia.guess_set_mimetype(cgi_file.filename) 

134 ia.date_uploaded = datetime.now() 

135 ia.author_id = user.id 

136 ia.org_id = user.org_id 

137 ia.description = description 

138 

139 # Necessary to set size to non-null value for Mariadb to save the record 

140 ia.size_bytes = 0 

141 

142 ia.issue_id = issue_id 

143 

144 session.add(ia) 

145 session.flush() 

146 

147 save_to_disc(ia, cgi_file.file) 

148 

149 return ia 

150 

151 

152def save_project_attachment(session, project_id, user, cgi_file, description): 

153 pa = ProjectAttachment() 

154 pa.filename = cgi_file.filename 

155 pa.guess_set_mimetype(cgi_file.filename) 

156 pa.date_uploaded = datetime.now() 

157 pa.author_id = user.id 

158 pa.org_id = user.org_id 

159 pa.description = description 

160 

161 # Necessary to set size to non-null value for Mariadb to save the record 

162 pa.size_bytes = 0 

163 

164 pa.project_id = project_id 

165 

166 session.add(pa) 

167 session.flush() 

168 

169 save_to_disc(pa, cgi_file.file) 

170 

171 return pa 

172 

173 

174def delete_project_attachment(session: Session, attachment: AttachmentMixin): 

175 fpath = delete_from_disc(attachment) 

176 session.delete(attachment) 

177 session.flush() 

178 return fpath