Coverage for postrfp/model/audit/event.py: 99%

132 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import logging 

2from enum import Enum 

3from typing import Optional, TYPE_CHECKING 

4from datetime import datetime 

5from xml.etree import ElementTree as ET 

6 

7 

8from sqlalchemy import Boolean, DateTime, Integer, ForeignKey, event, func 

9from sqlalchemy.orm import ( 

10 Mapped, 

11 mapped_column, 

12 relationship, 

13 validates, 

14 Session, 

15 DynamicMapped, 

16) 

17from sqlalchemy.types import VARCHAR, TEXT 

18from sqlalchemy.types import Enum as SqlaEnum # Import Enum 

19 

20from postrfp.model.meta import Base 

21from postrfp.model.audit import visible 

22 

23if TYPE_CHECKING: 

24 from postrfp.model.project import Project 

25 from postrfp.model.issue import Issue 

26 from postrfp.model.humans import Organisation, User 

27 from postrfp.model.notify import EmailNotification 

28 

29log = logging.getLogger(__name__) 

30 

31 

32class EventKlass: 

33 ANSWER = "ANSWER" 

34 BASE = "BASE" 

35 ISSUE = "ISSUE" 

36 ORGANISATION = "ORGANISATION" 

37 PROJECT = "PROJECT" 

38 QUESTION = "QUESTION" 

39 QUESTIONNAIRE = "QUESTIONNAIRE" 

40 ROLE = "ROLE" 

41 SCORE = "SCORE" 

42 SCORE_COMMENT = "SCORE_COMMENT" 

43 USER = "USER" 

44 SECTION = "SECTION" 

45 

46 

47PROJECT_KLASSES = { 

48 EventKlass.ISSUE, 

49 EventKlass.PROJECT, 

50 EventKlass.QUESTION, 

51 EventKlass.QUESTIONNAIRE, 

52 EventKlass.SCORE, 

53 EventKlass.SCORE_COMMENT, 

54 EventKlass.SECTION, 

55} 

56 

57 

58class PropertyChanges(object): 

59 def __init__(self): 

60 self.root = ET.Element("d") 

61 

62 def add(self, property_name, old_value, new_value): 

63 change = ET.SubElement(self.root, "c", name=property_name) 

64 ET.SubElement(change, "old").text = str(old_value) 

65 ET.SubElement(change, "new").text = str(new_value) 

66 

67 def as_string(self): 

68 return ET.tostring(self.root, encoding="unicode") 

69 

70 

71def change_to_json(change): 

72 js = dict(name=change.get("name")) 

73 for vals in change: 

74 js[vals.tag] = vals.text 

75 return js 

76 

77 

78class Status(Enum): 

79 pending = 0 

80 done = 1 

81 error = 2 

82 

83 

84class AuditEvent(Base): 

85 """@DynamicAttrs""" 

86 

87 __tablename__ = "audit_events" 

88 

89 public_attrs = ( 

90 "id", 

91 "event_class", 

92 "event_type", 

93 "timestamp", 

94 "question_id", 

95 "project_id", 

96 "issue_id", 

97 "user_id", 

98 "org_id", 

99 "object_id", 

100 "changes", 

101 "user", 

102 ) 

103 

104 project_id: Mapped[Optional[int]] = mapped_column( 

105 Integer, nullable=True, index=True 

106 ) 

107 issue_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, index=True) 

108 question_id: Mapped[Optional[int]] = mapped_column( 

109 Integer, nullable=True, index=True 

110 ) 

111 event_class: Mapped[Optional[str]] = mapped_column( 

112 "class", VARCHAR(length=20), nullable=True 

113 ) 

114 event_type: Mapped[Optional[str]] = mapped_column( 

115 "type", VARCHAR(length=50), nullable=True, index=True 

116 ) 

117 object_id: Mapped[Optional[str]] = mapped_column( 

118 VARCHAR(length=150), nullable=True, index=True 

119 ) 

120 user_id: Mapped[Optional[str]] = mapped_column( 

121 VARCHAR(length=150), nullable=True, index=True 

122 ) 

123 org_id: Mapped[Optional[str]] = mapped_column( 

124 VARCHAR(length=150), nullable=True, index=True 

125 ) 

126 private: Mapped[bool] = mapped_column(Boolean, nullable=False, default=0) 

127 timestamp: Mapped[datetime] = mapped_column( 

128 DateTime, 

129 nullable=False, 

130 server_default=func.utc_timestamp(), 

131 index=True, 

132 ) 

133 text: Mapped[Optional[str]] = mapped_column(TEXT()) 

134 status: Mapped[Status] = mapped_column( 

135 "enqueued", 

136 SqlaEnum( 

137 Status, 

138 name="audit_event_status_enum", 

139 values_callable=lambda obj: [e.name for e in obj], 

140 ), # Use SQLAlchemy Enum 

141 default=Status.pending, 

142 nullable=False, 

143 index=True, 

144 ) 

145 session_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=50), nullable=True) 

146 

147 project: Mapped["Project"] = relationship( 

148 "Project", 

149 back_populates="events", 

150 primaryjoin=("foreign(AuditEvent.project_id)==Project.id"), 

151 ) 

152 

153 issue: Mapped["Issue"] = relationship( 

154 "Issue", 

155 back_populates="events", 

156 primaryjoin="foreign(AuditEvent.issue_id)==Issue.id", 

157 ) 

158 

159 organisation: Mapped["Organisation"] = relationship( 

160 "Organisation", 

161 primaryjoin=("foreign(AuditEvent.org_id)==Organisation.id"), 

162 ) 

163 user: Mapped["User"] = relationship( 

164 "User", 

165 primaryjoin="foreign(AuditEvent.user_id)==User.id", 

166 back_populates="events", 

167 ) 

168 

169 acl: Mapped[list["EventOrgACL"]] = relationship( 

170 "EventOrgACL", 

171 back_populates="event", 

172 passive_deletes=True, 

173 ) 

174 

175 notifications: DynamicMapped["EmailNotification"] = relationship( 

176 "EmailNotification", 

177 back_populates="event", 

178 lazy="dynamic", 

179 passive_deletes=True, 

180 ) 

181 

182 def __init__(self, *args, **kwargs) -> None: 

183 super(AuditEvent, self).__init__(*args, **kwargs) 

184 if self.event_class is None and self.event_type: 

185 self.event_class = self.event_type.split("_")[0] 

186 

187 def __repr__(self) -> str: 

188 return f"<AuditEvent #{self.id}, Status: {self.status}>" 

189 

190 def set_text(self, text) -> None: # pragma: no cover 

191 # subclasses to specialise 

192 self.text = str(text) 

193 

194 @validates("object_id") 

195 def check_object_id_unicode(self, key, object_id) -> str: 

196 return str(object_id) 

197 

198 @classmethod 

199 def create(cls, session: Session, event_type_name: str, **kwargs) -> "AuditEvent": 

200 """Creates an instance of the given event type""" 

201 

202 if "user" in kwargs: 

203 user = kwargs.pop("user") 

204 kwargs["user_id"] = kwargs.get("user_id", user.id) 

205 kwargs["org_id"] = kwargs.get("org_id", user.org_id) 

206 

207 change_list = kwargs.pop("change_list", None) 

208 

209 evt = cls(event_type=event_type_name, **kwargs) 

210 if change_list: 

211 for prop_name, old_value, new_value in change_list: 

212 evt.add_change(prop_name, old_value, new_value) 

213 

214 session.add(evt) 

215 evt.build_acl() 

216 return evt 

217 

218 @property 

219 def changes(self) -> list[dict]: 

220 """ 

221 Parses the xml saved in the text column and returns a json 

222 representation of the property changes associated with this event 

223 """ 

224 

225 if not self.text: 

226 return [] 

227 try: 

228 # The xml is generated by this application so we trust it to be well formed 

229 changes_xml = ET.fromstring(self.text) # nosec 

230 return [change_to_json(c) for c in changes_xml] 

231 except Exception: 

232 log.exception("Error attemping to parse AuditEvent xml text") 

233 return [] 

234 

235 def set_changes(self, property_changes: PropertyChanges) -> None: 

236 self.text = property_changes.as_string() 

237 

238 def add_change(self, prop_name, old_value, new_value) -> None: 

239 if getattr(self, "_pc", None) is None: 

240 self._pc = PropertyChanges() 

241 self._pc.add(prop_name, old_value, new_value) 

242 self.set_changes(self._pc) 

243 

244 def build_acl(self) -> set[str]: 

245 """Create EventOrgACL records for this event""" 

246 

247 event_type = self.event_type 

248 

249 acl_orgid_set = set() 

250 

251 if event_type in visible.to_participants: 

252 for participant in self.project.participants: 

253 acl_orgid_set.add(participant.org_id) 

254 

255 if event_type in visible.to_respondent and self.issue is not None: 

256 if self.issue.respondent_id is not None: 

257 acl_orgid_set.add(self.issue.respondent_id) 

258 

259 if event_type in visible.to_initiator and self.org_id is not None: 

260 acl_orgid_set.add(self.org_id) 

261 

262 for org_id in acl_orgid_set: 

263 self.add_to_acl(org_id) 

264 

265 return acl_orgid_set 

266 

267 def add_to_acl(self, org_id: str) -> None: 

268 self.acl.append(EventOrgACL(org_id=org_id)) 

269 

270 def set_done(self) -> None: 

271 self.status = Status.done 

272 

273 

274class EventOrgACL(Base): 

275 """ 

276 Provides an Access Control List to restrict which organisations 

277 can view which events. 

278 """ 

279 

280 __tablename__ = "audit_event_orgs" 

281 

282 event_id: Mapped[Optional[int]] = mapped_column( 

283 Integer, 

284 ForeignKey( 

285 "audit_events.id", 

286 ondelete="CASCADE", # Removed name="audit_event_orgs_ibfk_1" 

287 ), 

288 nullable=True, 

289 ) 

290 

291 org_id: Mapped[Optional[str]] = mapped_column( 

292 VARCHAR(length=150), 

293 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

294 nullable=True, 

295 index=True, 

296 ) 

297 

298 event: Mapped[AuditEvent] = relationship( 

299 AuditEvent, lazy="joined", back_populates="acl" 

300 ) 

301 organisation: Mapped["Organisation"] = relationship("Organisation") 

302 

303 def __repr__(self) -> str: 

304 return f"<EventACL Event {self.event_id} Org {self.org_id}>" 

305 

306 

307@event.listens_for(AuditEvent, "before_insert", propagate=True) 

308def serialize_property_changes(mapper, connection, target): 

309 if hasattr(target, "_pc"): 

310 target.text = target._pc.as_string()