Coverage for postrfp/model/audit/event.py: 99%
132 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1import logging
2from enum import Enum
3from typing import Optional, TYPE_CHECKING
4from datetime import datetime
5from xml.etree import ElementTree as ET
8from sqlalchemy import Boolean, DateTime, Integer, ForeignKey, event, func
9from sqlalchemy.orm import (
10 Mapped,
11 mapped_column,
12 relationship,
13 validates,
14 Session,
15 DynamicMapped,
16)
17from sqlalchemy.types import VARCHAR, TEXT
18from sqlalchemy.types import Enum as SqlaEnum # Import Enum
20from postrfp.model.meta import Base
21from postrfp.model.audit import visible
23if TYPE_CHECKING:
24 from postrfp.model.project import Project
25 from postrfp.model.issue import Issue
26 from postrfp.model.humans import Organisation, User
27 from postrfp.model.notify import EmailNotification
29log = logging.getLogger(__name__)
32class EventKlass:
33 ANSWER = "ANSWER"
34 BASE = "BASE"
35 ISSUE = "ISSUE"
36 ORGANISATION = "ORGANISATION"
37 PROJECT = "PROJECT"
38 QUESTION = "QUESTION"
39 QUESTIONNAIRE = "QUESTIONNAIRE"
40 ROLE = "ROLE"
41 SCORE = "SCORE"
42 SCORE_COMMENT = "SCORE_COMMENT"
43 USER = "USER"
44 SECTION = "SECTION"
47PROJECT_KLASSES = {
48 EventKlass.ISSUE,
49 EventKlass.PROJECT,
50 EventKlass.QUESTION,
51 EventKlass.QUESTIONNAIRE,
52 EventKlass.SCORE,
53 EventKlass.SCORE_COMMENT,
54 EventKlass.SECTION,
55}
58class PropertyChanges(object):
59 def __init__(self):
60 self.root = ET.Element("d")
62 def add(self, property_name, old_value, new_value):
63 change = ET.SubElement(self.root, "c", name=property_name)
64 ET.SubElement(change, "old").text = str(old_value)
65 ET.SubElement(change, "new").text = str(new_value)
67 def as_string(self):
68 return ET.tostring(self.root, encoding="unicode")
71def change_to_json(change):
72 js = dict(name=change.get("name"))
73 for vals in change:
74 js[vals.tag] = vals.text
75 return js
78class Status(Enum):
79 pending = 0
80 done = 1
81 error = 2
84class AuditEvent(Base):
85 """@DynamicAttrs"""
87 __tablename__ = "audit_events"
89 public_attrs = (
90 "id",
91 "event_class",
92 "event_type",
93 "timestamp",
94 "question_id",
95 "project_id",
96 "issue_id",
97 "user_id",
98 "org_id",
99 "object_id",
100 "changes",
101 "user",
102 )
104 project_id: Mapped[Optional[int]] = mapped_column(
105 Integer, nullable=True, index=True
106 )
107 issue_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, index=True)
108 question_id: Mapped[Optional[int]] = mapped_column(
109 Integer, nullable=True, index=True
110 )
111 event_class: Mapped[Optional[str]] = mapped_column(
112 "class", VARCHAR(length=20), nullable=True
113 )
114 event_type: Mapped[Optional[str]] = mapped_column(
115 "type", VARCHAR(length=50), nullable=True, index=True
116 )
117 object_id: Mapped[Optional[str]] = mapped_column(
118 VARCHAR(length=150), nullable=True, index=True
119 )
120 user_id: Mapped[Optional[str]] = mapped_column(
121 VARCHAR(length=150), nullable=True, index=True
122 )
123 org_id: Mapped[Optional[str]] = mapped_column(
124 VARCHAR(length=150), nullable=True, index=True
125 )
126 private: Mapped[bool] = mapped_column(Boolean, nullable=False, default=0)
127 timestamp: Mapped[datetime] = mapped_column(
128 DateTime,
129 nullable=False,
130 server_default=func.utc_timestamp(),
131 index=True,
132 )
133 text: Mapped[Optional[str]] = mapped_column(TEXT())
134 status: Mapped[Status] = mapped_column(
135 "enqueued",
136 SqlaEnum(
137 Status,
138 name="audit_event_status_enum",
139 values_callable=lambda obj: [e.name for e in obj],
140 ), # Use SQLAlchemy Enum
141 default=Status.pending,
142 nullable=False,
143 index=True,
144 )
145 session_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=50), nullable=True)
147 project: Mapped["Project"] = relationship(
148 "Project",
149 back_populates="events",
150 primaryjoin=("foreign(AuditEvent.project_id)==Project.id"),
151 )
153 issue: Mapped["Issue"] = relationship(
154 "Issue",
155 back_populates="events",
156 primaryjoin="foreign(AuditEvent.issue_id)==Issue.id",
157 )
159 organisation: Mapped["Organisation"] = relationship(
160 "Organisation",
161 primaryjoin=("foreign(AuditEvent.org_id)==Organisation.id"),
162 )
163 user: Mapped["User"] = relationship(
164 "User",
165 primaryjoin="foreign(AuditEvent.user_id)==User.id",
166 back_populates="events",
167 )
169 acl: Mapped[list["EventOrgACL"]] = relationship(
170 "EventOrgACL",
171 back_populates="event",
172 passive_deletes=True,
173 )
175 notifications: DynamicMapped["EmailNotification"] = relationship(
176 "EmailNotification",
177 back_populates="event",
178 lazy="dynamic",
179 passive_deletes=True,
180 )
182 def __init__(self, *args, **kwargs) -> None:
183 super(AuditEvent, self).__init__(*args, **kwargs)
184 if self.event_class is None and self.event_type:
185 self.event_class = self.event_type.split("_")[0]
187 def __repr__(self) -> str:
188 return f"<AuditEvent #{self.id}, Status: {self.status}>"
190 def set_text(self, text) -> None: # pragma: no cover
191 # subclasses to specialise
192 self.text = str(text)
194 @validates("object_id")
195 def check_object_id_unicode(self, key, object_id) -> str:
196 return str(object_id)
198 @classmethod
199 def create(cls, session: Session, event_type_name: str, **kwargs) -> "AuditEvent":
200 """Creates an instance of the given event type"""
202 if "user" in kwargs:
203 user = kwargs.pop("user")
204 kwargs["user_id"] = kwargs.get("user_id", user.id)
205 kwargs["org_id"] = kwargs.get("org_id", user.org_id)
207 change_list = kwargs.pop("change_list", None)
209 evt = cls(event_type=event_type_name, **kwargs)
210 if change_list:
211 for prop_name, old_value, new_value in change_list:
212 evt.add_change(prop_name, old_value, new_value)
214 session.add(evt)
215 evt.build_acl()
216 return evt
218 @property
219 def changes(self) -> list[dict]:
220 """
221 Parses the xml saved in the text column and returns a json
222 representation of the property changes associated with this event
223 """
225 if not self.text:
226 return []
227 try:
228 # The xml is generated by this application so we trust it to be well formed
229 changes_xml = ET.fromstring(self.text) # nosec
230 return [change_to_json(c) for c in changes_xml]
231 except Exception:
232 log.exception("Error attemping to parse AuditEvent xml text")
233 return []
235 def set_changes(self, property_changes: PropertyChanges) -> None:
236 self.text = property_changes.as_string()
238 def add_change(self, prop_name, old_value, new_value) -> None:
239 if getattr(self, "_pc", None) is None:
240 self._pc = PropertyChanges()
241 self._pc.add(prop_name, old_value, new_value)
242 self.set_changes(self._pc)
244 def build_acl(self) -> set[str]:
245 """Create EventOrgACL records for this event"""
247 event_type = self.event_type
249 acl_orgid_set = set()
251 if event_type in visible.to_participants:
252 for participant in self.project.participants:
253 acl_orgid_set.add(participant.org_id)
255 if event_type in visible.to_respondent and self.issue is not None:
256 if self.issue.respondent_id is not None:
257 acl_orgid_set.add(self.issue.respondent_id)
259 if event_type in visible.to_initiator and self.org_id is not None:
260 acl_orgid_set.add(self.org_id)
262 for org_id in acl_orgid_set:
263 self.add_to_acl(org_id)
265 return acl_orgid_set
267 def add_to_acl(self, org_id: str) -> None:
268 self.acl.append(EventOrgACL(org_id=org_id))
270 def set_done(self) -> None:
271 self.status = Status.done
274class EventOrgACL(Base):
275 """
276 Provides an Access Control List to restrict which organisations
277 can view which events.
278 """
280 __tablename__ = "audit_event_orgs"
282 event_id: Mapped[Optional[int]] = mapped_column(
283 Integer,
284 ForeignKey(
285 "audit_events.id",
286 ondelete="CASCADE", # Removed name="audit_event_orgs_ibfk_1"
287 ),
288 nullable=True,
289 )
291 org_id: Mapped[Optional[str]] = mapped_column(
292 VARCHAR(length=150),
293 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
294 nullable=True,
295 index=True,
296 )
298 event: Mapped[AuditEvent] = relationship(
299 AuditEvent, lazy="joined", back_populates="acl"
300 )
301 organisation: Mapped["Organisation"] = relationship("Organisation")
303 def __repr__(self) -> str:
304 return f"<EventACL Event {self.event_id} Org {self.org_id}>"
307@event.listens_for(AuditEvent, "before_insert", propagate=True)
308def serialize_property_changes(mapper, connection, target):
309 if hasattr(target, "_pc"):
310 target.text = target._pc.as_string()