Coverage for postrfp / model / audit / event.py: 99%

118 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1import logging 

2from enum import Enum 

3from typing import Optional, TYPE_CHECKING 

4from datetime import datetime 

5from xml.etree import ElementTree as ET 

6 

7 

8from sqlalchemy import Boolean, DateTime, Integer, ForeignKey, event, func 

9from sqlalchemy.orm import ( 

10 Mapped, 

11 mapped_column, 

12 relationship, 

13 validates, 

14 Session, 

15 DynamicMapped, 

16) 

17from sqlalchemy.types import VARCHAR, TEXT 

18from sqlalchemy.types import Enum as SqlaEnum # Import Enum 

19 

20from postrfp.model.meta import Base 

21from postrfp.model.audit import visible 

22 

23if TYPE_CHECKING: 

24 from postrfp.model.project import Project 

25 from postrfp.model.issue import Issue 

26 from postrfp.model.humans import Organisation, User 

27 from postrfp.model.notify import EmailNotification 

28 

29log = logging.getLogger(__name__) 

30 

31 

32class PropertyChanges(object): 

33 def __init__(self): 

34 self.root = ET.Element("d") 

35 

36 def add(self, property_name, old_value, new_value): 

37 change = ET.SubElement(self.root, "c", name=property_name) 

38 ET.SubElement(change, "old").text = str(old_value) 

39 ET.SubElement(change, "new").text = str(new_value) 

40 

41 def as_string(self): 

42 return ET.tostring(self.root, encoding="unicode") 

43 

44 

45def change_to_json(change): 

46 js = dict(name=change.get("name")) 

47 for vals in change: 

48 js[vals.tag] = vals.text 

49 return js 

50 

51 

52class Status(Enum): 

53 pending = 0 

54 done = 1 

55 error = 2 

56 

57 

58class AuditEvent(Base): 

59 """@DynamicAttrs""" 

60 

61 __tablename__ = "audit_events" 

62 

63 public_attrs = ( 

64 "id", 

65 "event_class", 

66 "event_type", 

67 "timestamp", 

68 "question_id", 

69 "project_id", 

70 "issue_id", 

71 "user_id", 

72 "org_id", 

73 "object_id", 

74 "changes", 

75 "user", 

76 ) 

77 

78 project_id: Mapped[Optional[int]] = mapped_column( 

79 Integer, nullable=True, index=True 

80 ) 

81 issue_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, index=True) 

82 question_id: Mapped[Optional[int]] = mapped_column( 

83 Integer, nullable=True, index=True 

84 ) 

85 event_class: Mapped[Optional[str]] = mapped_column( 

86 "class", VARCHAR(length=20), nullable=True 

87 ) 

88 event_type: Mapped[Optional[str]] = mapped_column( 

89 "type", VARCHAR(length=50), nullable=True, index=True 

90 ) 

91 object_id: Mapped[Optional[str]] = mapped_column( 

92 VARCHAR(length=150), nullable=True, index=True 

93 ) 

94 user_id: Mapped[Optional[str]] = mapped_column( 

95 VARCHAR(length=150), nullable=True, index=True 

96 ) 

97 org_id: Mapped[Optional[str]] = mapped_column( 

98 VARCHAR(length=150), nullable=True, index=True 

99 ) 

100 private: Mapped[bool] = mapped_column(Boolean, nullable=False, default=0) 

101 timestamp: Mapped[datetime] = mapped_column( 

102 DateTime, 

103 nullable=False, 

104 server_default=func.utc_timestamp(), 

105 index=True, 

106 ) 

107 text: Mapped[Optional[str]] = mapped_column(TEXT()) 

108 status: Mapped[Status] = mapped_column( 

109 "enqueued", 

110 SqlaEnum( 

111 Status, 

112 name="audit_event_status_enum", 

113 values_callable=lambda obj: [e.name for e in obj], 

114 ), # Use SQLAlchemy Enum 

115 default=Status.pending, 

116 nullable=False, 

117 index=True, 

118 ) 

119 session_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=50), nullable=True) 

120 

121 project: Mapped["Project"] = relationship( 

122 "Project", 

123 back_populates="events", 

124 primaryjoin=("foreign(AuditEvent.project_id)==Project.id"), 

125 ) 

126 

127 issue: Mapped["Issue"] = relationship( 

128 "Issue", 

129 back_populates="events", 

130 primaryjoin="foreign(AuditEvent.issue_id)==Issue.id", 

131 ) 

132 

133 organisation: Mapped["Organisation"] = relationship( 

134 "Organisation", 

135 primaryjoin=("foreign(AuditEvent.org_id)==Organisation.id"), 

136 ) 

137 user: Mapped["User"] = relationship( 

138 "User", 

139 primaryjoin="foreign(AuditEvent.user_id)==User.id", 

140 back_populates="events", 

141 ) 

142 

143 acl: Mapped[list["EventOrgACL"]] = relationship( 

144 "EventOrgACL", 

145 back_populates="event", 

146 passive_deletes=True, 

147 ) 

148 

149 notifications: DynamicMapped["EmailNotification"] = relationship( 

150 "EmailNotification", 

151 back_populates="event", 

152 lazy="dynamic", 

153 passive_deletes=True, 

154 ) 

155 

156 def __init__(self, *args, **kwargs) -> None: 

157 super(AuditEvent, self).__init__(*args, **kwargs) 

158 if self.event_class is None and self.event_type: 

159 self.event_class = self.event_type.split("_")[0] 

160 

161 def __repr__(self) -> str: 

162 return f"<AuditEvent #{self.id}, Status: {self.status}>" 

163 

164 @validates("object_id") 

165 def check_object_id_unicode(self, key, object_id) -> str: 

166 return str(object_id) 

167 

168 @classmethod 

169 def create(cls, session: Session, event_type_name: str, **kwargs) -> "AuditEvent": 

170 """Creates an instance of the given event type""" 

171 

172 if "user" in kwargs: 

173 user = kwargs.pop("user") 

174 kwargs["user_id"] = kwargs.get("user_id", user.id) 

175 kwargs["org_id"] = kwargs.get("org_id", user.org_id) 

176 

177 change_list = kwargs.pop("change_list", None) 

178 

179 evt = cls(event_type=event_type_name, **kwargs) 

180 if change_list: 

181 for prop_name, old_value, new_value in change_list: 

182 evt.add_change(prop_name, old_value, new_value) 

183 

184 session.add(evt) 

185 evt.build_acl() 

186 return evt 

187 

188 @property 

189 def changes(self) -> list[dict]: 

190 """ 

191 Parses the xml saved in the text column and returns a json 

192 representation of the property changes associated with this event 

193 """ 

194 

195 if not self.text: 

196 return [] 

197 try: 

198 # The xml is generated by this application so we trust it to be well formed 

199 changes_xml = ET.fromstring(self.text) # nosec 

200 return [change_to_json(c) for c in changes_xml] 

201 except Exception: 

202 log.exception("Error attemping to parse AuditEvent xml text") 

203 return [] 

204 

205 def set_changes(self, property_changes: PropertyChanges) -> None: 

206 self.text = property_changes.as_string() 

207 

208 def add_change(self, prop_name, old_value, new_value) -> None: 

209 if getattr(self, "_pc", None) is None: 

210 self._pc = PropertyChanges() 

211 self._pc.add(prop_name, old_value, new_value) 

212 self.set_changes(self._pc) 

213 

214 def build_acl(self) -> set[str]: 

215 """Create EventOrgACL records for this event""" 

216 

217 event_type = self.event_type 

218 

219 acl_orgid_set = set() 

220 

221 if event_type in visible.to_participants: 

222 for participant in self.project.participants: 

223 acl_orgid_set.add(participant.org_id) 

224 

225 if event_type in visible.to_respondent and self.issue is not None: 

226 if self.issue.respondent_id is not None: 

227 acl_orgid_set.add(self.issue.respondent_id) 

228 

229 if event_type in visible.to_initiator and self.org_id is not None: 

230 acl_orgid_set.add(self.org_id) 

231 

232 for org_id in acl_orgid_set: 

233 self.add_to_acl(org_id) 

234 

235 return acl_orgid_set 

236 

237 def add_to_acl(self, org_id: str) -> None: 

238 self.acl.append(EventOrgACL(org_id=org_id)) 

239 

240 def set_done(self) -> None: 

241 self.status = Status.done 

242 

243 

244class EventOrgACL(Base): 

245 """ 

246 Provides an Access Control List to restrict which organisations 

247 can view which events. 

248 """ 

249 

250 __tablename__ = "audit_event_orgs" 

251 

252 event_id: Mapped[Optional[int]] = mapped_column( 

253 Integer, 

254 ForeignKey( 

255 "audit_events.id", 

256 ondelete="CASCADE", # Removed name="audit_event_orgs_ibfk_1" 

257 ), 

258 nullable=True, 

259 ) 

260 

261 org_id: Mapped[Optional[str]] = mapped_column( 

262 VARCHAR(length=150), 

263 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

264 nullable=True, 

265 index=True, 

266 ) 

267 

268 event: Mapped[AuditEvent] = relationship( 

269 AuditEvent, lazy="joined", back_populates="acl" 

270 ) 

271 organisation: Mapped["Organisation"] = relationship("Organisation") 

272 

273 def __repr__(self) -> str: 

274 return f"<EventACL Event {self.event_id} Org {self.org_id}>" 

275 

276 

277@event.listens_for(AuditEvent, "before_insert", propagate=True) 

278def serialize_property_changes(mapper, connection, target): 

279 if hasattr(target, "_pc"): 

280 target.text = target._pc.as_string()