Coverage for postrfp / model / notify.py: 100%

70 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1import enum # Don't import Enum here - shadows the SQLAlchemy type 

2from datetime import datetime 

3from typing import Optional, TYPE_CHECKING 

4 

5from sqlalchemy import ( 

6 JSON, 

7 DateTime, 

8 Index, 

9 text, 

10 Integer, 

11 ForeignKey, 

12 func, 

13 UniqueConstraint, 

14) 

15from sqlalchemy.types import INTEGER, VARCHAR, Enum, TEXT 

16from sqlalchemy.orm import Mapped, mapped_column, relationship, validates 

17 

18from postrfp.model.meta import Base, HTTPHeadersMixin 

19from postrfp.shared import utils 

20 

21 

22if TYPE_CHECKING: 

23 from postrfp.model.project import Project 

24 from postrfp.model.issue import Issue 

25 from postrfp.model.humans import User, Organisation 

26 from postrfp.model.audit import AuditEvent 

27 from postrfp.model.jobs import JobExecution 

28 

29 

30class ProjectWatchList(Base): 

31 __tablename__ = "project_watch_list" 

32 __table_args__ = ( 

33 UniqueConstraint( 

34 "user_id", "project_id" 

35 ), # Removed name "_user_project_wl_uc" from Index 

36 ) 

37 

38 project_id: Mapped[Optional[int]] = mapped_column( 

39 INTEGER, ForeignKey("projects.id", ondelete="CASCADE"), nullable=True 

40 ) 

41 user_id: Mapped[Optional[str]] = mapped_column( 

42 VARCHAR(length=50), ForeignKey("users.id", ondelete="CASCADE"), nullable=True 

43 ) 

44 date_created: Mapped[Optional[datetime]] = mapped_column( 

45 DateTime, default=func.utc_timestamp(), nullable=True 

46 ) 

47 

48 project: Mapped["Project"] = relationship("Project", back_populates="watch_list") 

49 user: Mapped["User"] = relationship( 

50 "User", lazy="joined", back_populates="project_watches" 

51 ) 

52 

53 def __repr__(self): 

54 return f"<ProjectWatchList: [{self.user_id}] watching {self.project}>" 

55 

56 

57class IssueWatchList(Base): 

58 __tablename__ = "issue_watch_list" 

59 __table_args__ = ( 

60 UniqueConstraint( 

61 "user_id", "issue_id" 

62 ), # Removed name "_user_issue_wl_uc" from Index 

63 ) 

64 

65 issue_id: Mapped[int] = mapped_column( 

66 INTEGER, ForeignKey("issues.id", ondelete="CASCADE") 

67 ) 

68 user_id: Mapped[str] = mapped_column( 

69 VARCHAR(length=50), ForeignKey("users.id", ondelete="CASCADE") 

70 ) 

71 date_created: Mapped[datetime] = mapped_column( 

72 DateTime, default=func.utc_timestamp() 

73 ) 

74 

75 issue: Mapped["Issue"] = relationship( 

76 "Issue", 

77 back_populates="watch_list", 

78 ) 

79 user: Mapped["User"] = relationship( 

80 "User", 

81 lazy="joined", 

82 back_populates="issue_watches", 

83 ) 

84 

85 def __repr__(self) -> str: 

86 return "Issue WatchList: [%s] watching %s" % (self.user_id, self.issue) 

87 

88 

89class EmailNotification(Base): 

90 __tablename__ = "notification_queue" 

91 __table_args__ = ( 

92 UniqueConstraint( 

93 "user_id", "event_id" 

94 ), # Removed name "_user_event_em_uc" from Index 

95 ) 

96 

97 class Status(enum.Enum): 

98 enqueued = 1 

99 sent = 2 

100 delivered = 3 

101 opened = 4 

102 bounced = 5 

103 failed = 6 

104 

105 template_name: Mapped[str] = mapped_column( 

106 VARCHAR(length=100), nullable=False, default=text("''") 

107 ) 

108 user_id: Mapped[Optional[str]] = mapped_column( 

109 VARCHAR(length=50), 

110 ForeignKey("users.id", ondelete="SET NULL", onupdate="CASCADE"), 

111 nullable=True, 

112 ) 

113 org_id: Mapped[Optional[str]] = mapped_column( 

114 VARCHAR(length=50), 

115 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

116 nullable=True, 

117 ) 

118 event_id: Mapped[int] = mapped_column( 

119 Integer, 

120 ForeignKey("audit_events.id", ondelete="CASCADE"), 

121 nullable=False, 

122 index=True, 

123 server_default=text("'0'"), 

124 ) 

125 email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=50)) 

126 updated_date: Mapped[datetime] = mapped_column( 

127 DateTime, nullable=False, default=func.utc_timestamp() 

128 ) 

129 sent_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

130 delivered_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

131 opened_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

132 error: Mapped[Optional[str]] = mapped_column(TEXT()) 

133 message_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), index=True) 

134 delivery_status: Mapped[Status] = mapped_column( 

135 Enum(Status), default=Status.enqueued.name, nullable=True 

136 ) 

137 

138 organisation: Mapped["Organisation"] = relationship("Organisation") 

139 user: Mapped["User"] = relationship("User") 

140 event: Mapped["AuditEvent"] = relationship( 

141 "AuditEvent", back_populates="notifications" 

142 ) 

143 

144 def __repr__(self): 

145 return ( 

146 f"<EmailNotification #{self.id} ({self.delivery_status}) " 

147 f"for {self.email} Evt {self.event_id}>" 

148 ) 

149 

150 def set_sent(self): 

151 self.delivery_status = self.Status.sent 

152 self.sent_date = utils.utcnow() 

153 

154 def set_delivered(self): 

155 self.delivery_status = self.Status.delivered 

156 self.delivered_date = utils.utcnow() 

157 

158 def set_bounced(self): 

159 self.delivery_status = self.Status.bounced 

160 

161 def set_failed(self): 

162 self.delivery_status = self.Status.failed 

163 

164 

165class WebhookSubscription(Base, HTTPHeadersMixin): 

166 """ 

167 Represents a webhook subscription for an organisation. Each subscription 

168 is for a specific event type and remote URL. The delivery status is used 

169 to track the success of the last delivery attempt. 

170 """ 

171 

172 __tablename__ = "webhook_subscriptions" 

173 __table_args__ = (UniqueConstraint("org_id", "event_type"),) 

174 MAX_ATTEMPTS = 4 

175 

176 org_id: Mapped[str] = mapped_column( 

177 VARCHAR(length=50), 

178 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

179 nullable=False, 

180 ) 

181 event_type: Mapped[str] = mapped_column( 

182 VARCHAR(length=50), 

183 nullable=False, 

184 ) 

185 remote_url: Mapped[str] = mapped_column(VARCHAR(length=150), nullable=False) 

186 

187 guard_policy: Mapped[Optional[str]] = mapped_column( 

188 VARCHAR(length=4000), 

189 nullable=True, 

190 comment="Optional CEL guard policy to determine if webhook should be sent", 

191 ) 

192 transform_expression: Mapped[Optional[str]] = mapped_column( 

193 VARCHAR(length=4000), 

194 nullable=True, 

195 comment="Optional CEL expression to transform the payload before sending", 

196 ) 

197 

198 organisation: Mapped["Organisation"] = relationship( 

199 "Organisation", back_populates="webhook_subscriptions" 

200 ) 

201 

202 executions: Mapped[list["JobExecution"]] = relationship( 

203 "JobExecution", back_populates="webhook", cascade="all, delete-orphan" 

204 ) 

205 

206 def __repr__(self): 

207 return f"<WebhookSubscription for {self.org_id}: {self.remote_url}>"