Coverage for postrfp / model / humans.py: 100%
217 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1from enum import Enum
2from random import choice
3from string import ascii_uppercase, digits
4from typing import Optional, TYPE_CHECKING
5from datetime import datetime
7from sqlalchemy import (
8 Column,
9 DateTime,
10 Table,
11 types,
12 Integer,
13 ForeignKey,
14 UniqueConstraint,
15 DATETIME,
16 text,
17 func,
18)
20from sqlalchemy.orm import Mapped, mapped_column, relationship, validates, DynamicMapped
21from sqlalchemy.orm.exc import NoResultFound
22from sqlalchemy.ext.associationproxy import association_proxy, AssociationProxy
23from sqlalchemy.types import VARCHAR, BOOLEAN, INTEGER, TEXT, CHAR
24from sqlalchemy.types import Enum as SqlaEnum # Import Enum
27from postrfp.authorisation import perms
28from postrfp.shared.exceptions import LacksPermission
29from postrfp.shared.exceptions import AuthorizationFailure
30from postrfp.shared.types import PermString
31from postrfp.authorisation.roles import ROLES
32from postrfp.model.meta import Base
33from postrfp.model.audit import AuditEvent
35if TYPE_CHECKING:
36 from sqlalchemy.orm import Query
37 from postrfp.model.notify import (
38 WebhookSubscription,
39 IssueWatchList,
40 ProjectWatchList,
41 )
42 from postrfp.model.acl import SectionPermission, ProjectPermission
43 from postrfp.model.project import Project
44 from postrfp.model.graph import RelationshipType, Edge
45 from postrfp.model.misc import Category
46 from postrfp.model.tags import Tag
49org_cat_rel = Table(
50 "org_cat_rel",
51 Base.metadata,
52 Column("id", Integer, primary_key=True),
53 Column(
54 "org_id",
55 VARCHAR(length=50),
56 ForeignKey(
57 "organisations.id",
58 # name="constr_org", # Removed name
59 ondelete="CASCADE",
60 onupdate="CASCADE",
61 ),
62 nullable=False,
63 ),
64 Column(
65 "cat_id",
66 INTEGER(),
67 ForeignKey("org_categories.id"), # name="constr_cat", # Removed name
68 nullable=False,
69 ),
70 UniqueConstraint("org_id", "cat_id"), # name="title", # Removed name
71)
73organisation_suppliers = Table(
74 "organisation_suppliers",
75 Base.metadata,
76 Column("id", Integer, primary_key=True),
77 Column(
78 "org_id",
79 VARCHAR(length=50),
80 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"),
81 ),
82 Column(
83 "supplier_id",
84 VARCHAR(length=50),
85 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
86 ),
87)
90class OrganisationType(Enum):
91 RESPONDENT = 0
92 BUYER = 1
93 CONSULTANT = 2
96class ConsultantClientRelationship(Base):
97 __tablename__ = "consultant_orgs"
99 id = None # type: ignore
100 consultant_id: Mapped[str] = mapped_column(
101 VARCHAR(length=50),
102 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
103 primary_key=True,
104 )
105 client_id: Mapped[str] = mapped_column(
106 VARCHAR(length=50),
107 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
108 primary_key=True,
109 )
110 consultant: Mapped["Organisation"] = relationship(
111 "Organisation",
112 foreign_keys="ConsultantClientRelationship.consultant_id",
113 backref="consultant_org_clients",
114 )
115 client: Mapped["Organisation"] = relationship(
116 "Organisation",
117 foreign_keys="ConsultantClientRelationship.client_id",
118 backref="consultant_org_consultants",
119 )
121 def __repr__(self):
122 return f"<Consultant: {self.consultant_id} , Client: {self.client_id} >"
124 def __init__(self, client=None, consultant=None):
125 """Constructor is called when values are appended to the 'clients' or 'consultants'
126 attributes of Organisation
127 """
128 self.consultant = consultant
129 self.client = client
132class Organisation(Base):
133 __tablename__ = "organisations"
135 __mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": None}
137 public_attrs = "id,name,public,is_consultant".split(",")
139 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore
141 name: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False)
142 address: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True)
143 public: Mapped[bool] = mapped_column(BOOLEAN(), default=False)
144 type: Mapped[OrganisationType] = mapped_column(
145 SqlaEnum(
146 OrganisationType,
147 name="organisation_type_enum",
148 values_callable=lambda obj: [e.name for e in obj],
149 ), # Use SQLAlchemy Enum
150 nullable=False,
151 default=OrganisationType.RESPONDENT,
152 )
153 password_expiry: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
154 domain_name: Mapped[Optional[str]] = mapped_column(
155 VARCHAR(length=256), nullable=True
156 )
158 users: Mapped[list["User"]] = relationship(
159 "User",
160 back_populates="organisation",
161 cascade="all,delete",
162 passive_deletes=True,
163 )
165 organisation_categories: Mapped[list["OrganisationCategory"]] = relationship(
166 "OrganisationCategory", secondary=org_cat_rel
167 )
169 custom_roles: Mapped[list["CustomRole"]] = relationship(
170 "CustomRole", back_populates="organisation"
171 )
173 # These form the "Private Address Book"
174 suppliers: DynamicMapped["Organisation"] = relationship(
175 "Organisation",
176 secondary="organisation_suppliers",
177 primaryjoin="Organisation.id==organisation_suppliers.c.org_id",
178 secondaryjoin="Organisation.id==organisation_suppliers.c.supplier_id",
179 backref="buyers",
180 lazy="dynamic",
181 )
183 visible_events: DynamicMapped["AuditEvent"] = relationship(
184 "AuditEvent",
185 primaryjoin="Organisation.id==EventOrgACL.org_id",
186 secondaryjoin="EventOrgACL.event_id==AuditEvent.id",
187 secondary="audit_event_orgs",
188 lazy="dynamic",
189 viewonly=True,
190 )
192 webhook_subscriptions: DynamicMapped["WebhookSubscription"] = relationship(
193 "WebhookSubscription",
194 back_populates="organisation",
195 cascade="all, delete",
196 passive_deletes=True,
197 lazy="dynamic",
198 )
200 projects: DynamicMapped["Project"] = relationship(
201 "Project",
202 back_populates="owner_org",
203 cascade="all, delete",
204 passive_deletes=True,
205 lazy="dynamic",
206 )
207 relationship_types: DynamicMapped["RelationshipType"] = relationship(
208 "RelationshipType",
209 back_populates="organisation",
210 cascade="all,delete",
211 passive_deletes=True,
212 lazy="dynamic",
213 )
214 lower_edges: Mapped[list["Edge"]] = relationship(
215 "Edge",
216 primaryjoin="Organisation.id==Edge.from_org_id",
217 back_populates="from_org",
218 cascade="all, delete",
219 )
220 higher_edges: Mapped[list["Edge"]] = relationship(
221 "Edge",
222 primaryjoin="Organisation.id==Edge.to_org_id",
223 back_populates="to_org",
224 cascade="all, delete",
225 )
227 categories: DynamicMapped["Category"] = relationship(
228 "Category", back_populates="organisation"
229 )
231 tags: DynamicMapped["Tag"] = relationship(
232 "Tag", back_populates="organisation", lazy="dynamic"
233 )
235 events: DynamicMapped[AuditEvent] = relationship(
236 "AuditEvent",
237 lazy="dynamic",
238 back_populates="organisation",
239 cascade_backrefs=False,
240 primaryjoin=("foreign(AuditEvent.org_id)==Organisation.id"),
241 )
243 def __init__(self, id_name) -> None:
244 self.id = id_name
245 self.name = id_name
247 def __repr__(self) -> str:
248 return f"Organisation: {self.name}"
250 @property
251 def is_consultant(self) -> bool:
252 return self.type == OrganisationType.CONSULTANT
254 @property
255 def is_buyside(self) -> bool:
256 return self.type in (OrganisationType.BUYER, OrganisationType.CONSULTANT)
258 def has_supplier(self, supplier_org) -> bool:
259 try:
260 self.suppliers.filter(Organisation.id == supplier_org.id).one()
261 return True
262 except NoResultFound:
263 return False
266class RespondentOrganisation(Organisation):
267 __mapper_args__ = {"polymorphic_identity": OrganisationType.RESPONDENT}
270class BuyerOrganisation(Organisation):
271 __mapper_args__ = {"polymorphic_identity": OrganisationType.BUYER}
272 consultants: AssociationProxy[list["ConsultantClientRelationship"]] = (
273 association_proxy(
274 "consultant_org_consultants",
275 "consultant",
276 creator=lambda org: ConsultantClientRelationship(consultant=org),
277 )
278 )
281class ConsultantOrganisation(Organisation):
282 __mapper_args__ = {"polymorphic_identity": OrganisationType.CONSULTANT}
283 # Weird SQLAlchemy magic here - these two constructors seem to aggregate
284 # to call __init__ on ConsultantClientRelationship
285 clients: AssociationProxy[list["BuyerOrganisation"]] = association_proxy(
286 "consultant_org_clients",
287 "client",
288 creator=lambda org: ConsultantClientRelationship(client=org),
289 )
292def custom_perms(session, user_id) -> "Query":
293 return (
294 session.query(CustomRolePermission.permission_id)
295 .join(CustomRole)
296 .join(UserRole, CustomRole.id == UserRole.role_id)
297 .filter(UserRole.user_id == user_id)
298 )
301class RefreshToken(Base):
302 __tablename__ = "refresh_tokens"
304 id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
305 user_id: Mapped[str] = mapped_column(
306 VARCHAR(length=50),
307 ForeignKey("users.id", ondelete="CASCADE"),
308 nullable=False,
309 index=True,
310 )
311 token: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True)
312 issued_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
313 expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
314 revoked: Mapped[bool] = mapped_column(types.Boolean, default=False, nullable=False)
316 user: Mapped["User"] = relationship("User", back_populates="refresh_tokens")
318 def __repr__(self):
319 return f"<RefreshToken user_id={self.user_id} expires_at={self.expires_at} revoked={self.revoked}>"
322class User(Base):
323 __tablename__ = "users"
325 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore
326 org_id: Mapped[str] = mapped_column(
327 VARCHAR(length=50),
328 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
329 index=True,
330 nullable=True,
331 )
332 created_at: Mapped[datetime] = mapped_column(
333 DateTime,
334 nullable=False,
335 server_default=func.utc_timestamp(),
336 )
337 updated_at: Mapped[datetime] = mapped_column(
338 DateTime,
339 nullable=False,
340 server_default=func.utc_timestamp(),
341 server_onupdate=func.utc_timestamp(),
342 )
343 fullname: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False)
344 email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True)
345 password: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True)
346 locale: Mapped[Optional[str]] = mapped_column(VARCHAR(length=10), nullable=True)
347 previous_login_date: Mapped[Optional[datetime]] = mapped_column(
348 DateTime, nullable=True
349 )
350 password_set_date: Mapped[datetime] = mapped_column(DATETIME, nullable=True)
351 is_active: Mapped[bool] = mapped_column(types.Boolean, default=True, nullable=False)
352 failed_login_attempts: Mapped[int] = mapped_column(
353 Integer, default=0, nullable=False
354 )
355 total_failed_logins: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
356 locked_until: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
358 type: Mapped[str] = mapped_column(
359 VARCHAR(length=10),
360 nullable=False,
361 default="standard",
362 server_default=text("'standard'"),
363 )
365 organisation: Mapped["Organisation"] = relationship(
366 "Organisation", uselist=False, back_populates="users"
367 )
369 custom_roles: Mapped[list["CustomRole"]] = relationship(
370 "CustomRole",
371 secondary="user_roles",
372 secondaryjoin="UserRole.role_id==CustomRole.name",
373 viewonly=True,
374 )
376 role_list: Mapped[list["UserRole"]] = relationship(
377 "UserRole", back_populates="user", cascade="all, delete", passive_deletes=True
378 )
380 roles: AssociationProxy[list[str]] = association_proxy(
381 "role_list", "role_id", creator=lambda r: UserRole(role_id=r)
382 )
384 roles_q: Mapped[list["UserRole"]] = relationship(
385 "UserRole", lazy="dynamic", viewonly=True
386 )
388 refresh_tokens: Mapped[list["RefreshToken"]] = relationship(
389 "RefreshToken",
390 back_populates="user",
391 cascade="all, delete-orphan",
392 passive_deletes=True,
393 )
395 section_permissions: DynamicMapped["SectionPermission"] = relationship(
396 "SectionPermission",
397 back_populates="user",
398 cascade="all,delete",
399 passive_deletes=True,
400 lazy="dynamic",
401 )
403 project_permissions: Mapped[list["ProjectPermission"]] = relationship(
404 "ProjectPermission",
405 back_populates="user",
406 cascade="all, delete",
407 passive_deletes=True,
408 )
410 project_watches: Mapped[list["ProjectWatchList"]] = relationship(
411 "ProjectWatchList",
412 back_populates="user",
413 cascade="all,delete",
414 passive_deletes=True,
415 )
417 issue_watches: Mapped[list["IssueWatchList"]] = relationship(
418 "IssueWatchList",
419 back_populates="user",
420 cascade="all,delete",
421 passive_deletes=True,
422 )
424 events: DynamicMapped[AuditEvent] = relationship(
425 "AuditEvent",
426 lazy="dynamic",
427 back_populates="user",
428 cascade_backrefs=False,
429 primaryjoin="foreign(AuditEvent.user_id)==User.id",
430 )
432 def __init__(self, user_id, *args, **kwargs):
433 super(User, self).__init__(*args, **kwargs)
434 self.id = user_id
435 self.fullname = user_id
437 def __repr__(self):
438 return f"<User: {self.id} ({self.fullname}) of {self.org_id}>"
440 def custom_permissions(self):
441 cq = custom_perms(self._instance_session, self.id)
442 return {cp[0] for cp in cq}
444 @property
445 def is_restricted(self):
446 return self.type == "restricted"
448 @is_restricted.setter
449 def is_restricted(self, restricted_bool):
450 self.type = "restricted" if restricted_bool else "standard"
452 def has_permission(self, permission: PermString):
453 """
454 May be called multiple times in the life of one http request
455 so attempting to optimise. Database stored custom roles are
456 sometimes in use, but all efforts are made to avoid querying the Database
457 """
458 if "Administrator" in self.roles:
459 return True
460 if permission in self.builtin_permissions:
461 return True
462 return permission in self.custom_permissions()
464 def check_permission(self, permission: PermString):
465 """
466 @raise LacksPermission if the user doesn't have the given permission
467 """
468 if not self.has_permission(permission):
469 raise LacksPermission(permission, self.id)
471 def check_is_standard(self):
472 if self.is_restricted:
473 raise AuthorizationFailure("Action not permitted for Domain Expert users")
475 @property
476 def builtin_permissions(self):
477 if hasattr(self, "_builtin_perms"):
478 return self._builtin_perms
480 # cache not populated, so build it
481 self._builtin_perms = pm = set()
482 # Add permissions for 'built-in roles'
483 for role_id in self.roles & ROLES.keys():
484 pm.update(ROLES[role_id])
485 return pm
487 @property
488 def all_permissions(self) -> set[PermString]:
489 # set union syntax
490 return self.custom_permissions() | self.builtin_permissions
492 @property
493 def sorted_permissions(self) -> list[PermString]:
494 return sorted(self.all_permissions)
496 def add_role(self, role_id: str) -> None:
497 if role_id not in ROLES:
498 if role_id not in {cr.id for cr in self.organisation.custom_roles}:
499 raise KeyError('Role "%s" not found' % role_id)
500 self.roles.append(role_id)
502 def is_in_role(self, role_id: str):
503 return role_id in self.roles
505 def is_administrator(self):
506 return self.is_in_role("Administrator")
508 def can_view_section_id(self, section_id: int) -> bool:
509 if self.is_restricted:
510 clause = text(f"section_id={section_id}")
511 if self.section_permissions.filter(clause).count() != 1:
512 return False
513 return True
516class UserRole(Base):
517 __tablename__ = "user_roles"
518 __table_args__ = (
519 UniqueConstraint(
520 "user_id", "role_id"
521 ), # name="unique_user_role", # Removed name
522 )
524 user_id: Mapped[str] = mapped_column(
525 VARCHAR(length=50),
526 ForeignKey(
527 "users.id", ondelete="CASCADE"
528 ), # name="constr_user", # Removed name
529 nullable=False,
530 )
531 role_id: Mapped[PermString] = mapped_column(
532 VARCHAR(length=255),
533 nullable=True,
534 )
536 user: Mapped["User"] = relationship(
537 "User", back_populates="role_list", viewonly=True
538 )
540 def __repr__(self):
541 return f"<UserRole {self.user_id} - {self.role_id}>"
544def munged_id(sqla_context):
545 """
546 id is a string formed by joining name, org_id and a random string,
547 e.g. Self Scorer/Thomas Murray#TK94SL
548 """
549 name = sqla_context.current_parameters["name"]
550 org_id = sqla_context.current_parameters["org_id"]
551 chars = ascii_uppercase + digits
552 ran_string = [choice(chars) for c in range(5)]
553 return "%s/%s#%s" % (name, org_id, "".join(ran_string))
556class CustomRole(Base):
557 __tablename__ = "roles"
559 __table_args__ = (
560 UniqueConstraint("name", "org_id"), # name="role_names", # Removed name
561 )
563 id: Mapped[str] = mapped_column( # type: ignore
564 VARCHAR(length=255), nullable=False, primary_key=True, default=munged_id
565 )
566 name: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False)
567 description: Mapped[str] = mapped_column(TEXT(), nullable=False)
568 type: Mapped[str] = mapped_column(
569 CHAR(length=1), nullable=False, server_default=text("'U'")
570 )
571 org_id: Mapped[str] = mapped_column(
572 VARCHAR(length=50),
573 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"),
574 nullable=False,
575 )
577 organisation: Mapped["Organisation"] = relationship(
578 Organisation, back_populates="custom_roles"
579 )
580 permissions: Mapped[list["CustomRolePermission"]] = relationship(
581 "CustomRolePermission",
582 back_populates="role",
583 cascade="all, delete",
584 passive_deletes=True,
585 )
587 def __repr__(self):
588 return f"<CustomRole {self.id} - {self.name}>"
591class CustomRolePermission(Base):
592 __tablename__ = "role_permissions"
594 id = None # type: ignore
596 role_id: Mapped[str] = mapped_column(
597 VARCHAR(length=255),
598 ForeignKey(
599 "roles.id", ondelete="CASCADE"
600 ), # name="role_reference", # Removed name
601 nullable=False,
602 primary_key=True,
603 )
605 permission_id: Mapped[PermString] = mapped_column(
606 VARCHAR(length=255), nullable=False, primary_key=True
607 )
608 role: Mapped["CustomRole"] = relationship(CustomRole, back_populates="permissions")
610 def __repr__(self):
611 return f"{self.permission_id} (Role: {self.role_id})"
613 @validates("permission_id")
614 def known_permission(self, key, perm_id: PermString) -> PermString:
615 if perm_id not in perms.ALL_PERMISSIONS:
616 raise ValueError('"%s" is not a recognised permission' % perm_id)
617 return perm_id
620class OrganisationCategory(Base):
621 __tablename__ = "org_categories"
623 title: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True)
624 description: Mapped[Optional[str]] = mapped_column(TEXT(), nullable=True)
626 def __repr__(self):
627 return f"<Organisation Category: {self.title}>"
630class FailedLoginAttempt(Base):
631 __tablename__ = "failed_login_attempts"
632 user_id: Mapped[str] = mapped_column(
633 VARCHAR(length=50), ForeignKey("users.id"), nullable=False
634 )
635 timestamp: Mapped[datetime] = mapped_column(DateTime, index=True, nullable=False)
636 ip_address: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False)
638 def __repr__(self):
639 return f"<FailedLoginAttempt {self.user_id} at {self.timestamp}>"
642__all__ = [
643 "CustomRolePermission",
644 "CustomRole",
645 "UserRole",
646 "User",
647 "Organisation",
648 "ConsultantOrganisation",
649 "OrganisationType",
650 "OrganisationCategory",
651 "FailedLoginAttempt",
652]