Coverage for postrfp / model / audit / event.py: 99%
118 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1import logging
2from enum import Enum
3from typing import Optional, TYPE_CHECKING
4from datetime import datetime
5from xml.etree import ElementTree as ET
8from sqlalchemy import Boolean, DateTime, Integer, ForeignKey, event, func
9from sqlalchemy.orm import (
10 Mapped,
11 mapped_column,
12 relationship,
13 validates,
14 Session,
15 DynamicMapped,
16)
17from sqlalchemy.types import VARCHAR, TEXT
18from sqlalchemy.types import Enum as SqlaEnum # Import Enum
20from postrfp.model.meta import Base
21from postrfp.model.audit import visible
23if TYPE_CHECKING:
24 from postrfp.model.project import Project
25 from postrfp.model.issue import Issue
26 from postrfp.model.humans import Organisation, User
27 from postrfp.model.notify import EmailNotification
29log = logging.getLogger(__name__)
32class PropertyChanges(object):
33 def __init__(self):
34 self.root = ET.Element("d")
36 def add(self, property_name, old_value, new_value):
37 change = ET.SubElement(self.root, "c", name=property_name)
38 ET.SubElement(change, "old").text = str(old_value)
39 ET.SubElement(change, "new").text = str(new_value)
41 def as_string(self):
42 return ET.tostring(self.root, encoding="unicode")
45def change_to_json(change):
46 js = dict(name=change.get("name"))
47 for vals in change:
48 js[vals.tag] = vals.text
49 return js
52class Status(Enum):
53 pending = 0
54 done = 1
55 error = 2
58class AuditEvent(Base):
59 """@DynamicAttrs"""
61 __tablename__ = "audit_events"
63 public_attrs = (
64 "id",
65 "event_class",
66 "event_type",
67 "timestamp",
68 "question_id",
69 "project_id",
70 "issue_id",
71 "user_id",
72 "org_id",
73 "object_id",
74 "changes",
75 "user",
76 )
78 project_id: Mapped[Optional[int]] = mapped_column(
79 Integer, nullable=True, index=True
80 )
81 issue_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, index=True)
82 question_id: Mapped[Optional[int]] = mapped_column(
83 Integer, nullable=True, index=True
84 )
85 event_class: Mapped[Optional[str]] = mapped_column(
86 "class", VARCHAR(length=20), nullable=True
87 )
88 event_type: Mapped[Optional[str]] = mapped_column(
89 "type", VARCHAR(length=50), nullable=True, index=True
90 )
91 object_id: Mapped[Optional[str]] = mapped_column(
92 VARCHAR(length=150), nullable=True, index=True
93 )
94 user_id: Mapped[Optional[str]] = mapped_column(
95 VARCHAR(length=150), nullable=True, index=True
96 )
97 org_id: Mapped[Optional[str]] = mapped_column(
98 VARCHAR(length=150), nullable=True, index=True
99 )
100 private: Mapped[bool] = mapped_column(Boolean, nullable=False, default=0)
101 timestamp: Mapped[datetime] = mapped_column(
102 DateTime,
103 nullable=False,
104 server_default=func.utc_timestamp(),
105 index=True,
106 )
107 text: Mapped[Optional[str]] = mapped_column(TEXT())
108 status: Mapped[Status] = mapped_column(
109 "enqueued",
110 SqlaEnum(
111 Status,
112 name="audit_event_status_enum",
113 values_callable=lambda obj: [e.name for e in obj],
114 ), # Use SQLAlchemy Enum
115 default=Status.pending,
116 nullable=False,
117 index=True,
118 )
119 session_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=50), nullable=True)
121 project: Mapped["Project"] = relationship(
122 "Project",
123 back_populates="events",
124 primaryjoin=("foreign(AuditEvent.project_id)==Project.id"),
125 )
127 issue: Mapped["Issue"] = relationship(
128 "Issue",
129 back_populates="events",
130 primaryjoin="foreign(AuditEvent.issue_id)==Issue.id",
131 )
133 organisation: Mapped["Organisation"] = relationship(
134 "Organisation",
135 primaryjoin=("foreign(AuditEvent.org_id)==Organisation.id"),
136 )
137 user: Mapped["User"] = relationship(
138 "User",
139 primaryjoin="foreign(AuditEvent.user_id)==User.id",
140 back_populates="events",
141 )
143 acl: Mapped[list["EventOrgACL"]] = relationship(
144 "EventOrgACL",
145 back_populates="event",
146 passive_deletes=True,
147 )
149 notifications: DynamicMapped["EmailNotification"] = relationship(
150 "EmailNotification",
151 back_populates="event",
152 lazy="dynamic",
153 passive_deletes=True,
154 )
156 def __init__(self, *args, **kwargs) -> None:
157 super(AuditEvent, self).__init__(*args, **kwargs)
158 if self.event_class is None and self.event_type:
159 self.event_class = self.event_type.split("_")[0]
161 def __repr__(self) -> str:
162 return f"<AuditEvent #{self.id}, Status: {self.status}>"
164 @validates("object_id")
165 def check_object_id_unicode(self, key, object_id) -> str:
166 return str(object_id)
168 @classmethod
169 def create(cls, session: Session, event_type_name: str, **kwargs) -> "AuditEvent":
170 """Creates an instance of the given event type"""
172 if "user" in kwargs:
173 user = kwargs.pop("user")
174 kwargs["user_id"] = kwargs.get("user_id", user.id)
175 kwargs["org_id"] = kwargs.get("org_id", user.org_id)
177 change_list = kwargs.pop("change_list", None)
179 evt = cls(event_type=event_type_name, **kwargs)
180 if change_list:
181 for prop_name, old_value, new_value in change_list:
182 evt.add_change(prop_name, old_value, new_value)
184 session.add(evt)
185 evt.build_acl()
186 return evt
188 @property
189 def changes(self) -> list[dict]:
190 """
191 Parses the xml saved in the text column and returns a json
192 representation of the property changes associated with this event
193 """
195 if not self.text:
196 return []
197 try:
198 # The xml is generated by this application so we trust it to be well formed
199 changes_xml = ET.fromstring(self.text) # nosec
200 return [change_to_json(c) for c in changes_xml]
201 except Exception:
202 log.exception("Error attemping to parse AuditEvent xml text")
203 return []
205 def set_changes(self, property_changes: PropertyChanges) -> None:
206 self.text = property_changes.as_string()
208 def add_change(self, prop_name, old_value, new_value) -> None:
209 if getattr(self, "_pc", None) is None:
210 self._pc = PropertyChanges()
211 self._pc.add(prop_name, old_value, new_value)
212 self.set_changes(self._pc)
214 def build_acl(self) -> set[str]:
215 """Create EventOrgACL records for this event"""
217 event_type = self.event_type
219 acl_orgid_set = set()
221 if event_type in visible.to_participants:
222 for participant in self.project.participants:
223 acl_orgid_set.add(participant.org_id)
225 if event_type in visible.to_respondent and self.issue is not None:
226 if self.issue.respondent_id is not None:
227 acl_orgid_set.add(self.issue.respondent_id)
229 if event_type in visible.to_initiator and self.org_id is not None:
230 acl_orgid_set.add(self.org_id)
232 for org_id in acl_orgid_set:
233 self.add_to_acl(org_id)
235 return acl_orgid_set
237 def add_to_acl(self, org_id: str) -> None:
238 self.acl.append(EventOrgACL(org_id=org_id))
240 def set_done(self) -> None:
241 self.status = Status.done
244class EventOrgACL(Base):
245 """
246 Provides an Access Control List to restrict which organisations
247 can view which events.
248 """
250 __tablename__ = "audit_event_orgs"
252 event_id: Mapped[Optional[int]] = mapped_column(
253 Integer,
254 ForeignKey(
255 "audit_events.id",
256 ondelete="CASCADE", # Removed name="audit_event_orgs_ibfk_1"
257 ),
258 nullable=True,
259 )
261 org_id: Mapped[Optional[str]] = mapped_column(
262 VARCHAR(length=150),
263 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
264 nullable=True,
265 index=True,
266 )
268 event: Mapped[AuditEvent] = relationship(
269 AuditEvent, lazy="joined", back_populates="acl"
270 )
271 organisation: Mapped["Organisation"] = relationship("Organisation")
273 def __repr__(self) -> str:
274 return f"<EventACL Event {self.event_id} Org {self.org_id}>"
277@event.listens_for(AuditEvent, "before_insert", propagate=True)
278def serialize_property_changes(mapper, connection, target):
279 if hasattr(target, "_pc"):
280 target.text = target._pc.as_string()