Coverage for postrfp/model/humans.py: 100%
218 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1from enum import Enum
2from random import choice
3from string import ascii_uppercase, digits
4from typing import Optional, TYPE_CHECKING
5from datetime import datetime
7from sqlalchemy import (
8 Column,
9 DateTime,
10 Table,
11 types,
12 Integer,
13 ForeignKey,
14 UniqueConstraint,
15 DATETIME,
16 text,
17 func,
18)
20from sqlalchemy.orm import Mapped, mapped_column, relationship, validates, DynamicMapped
21from sqlalchemy.orm.exc import NoResultFound
22from sqlalchemy.ext.associationproxy import association_proxy, AssociationProxy
23from sqlalchemy.types import VARCHAR, BOOLEAN, INTEGER, TEXT, CHAR
24from sqlalchemy.types import Enum as SqlaEnum # Import Enum
27from postrfp.authorisation import perms
28from postrfp.shared.exceptions import LacksPermission
29from postrfp.shared.exceptions import AuthorizationFailure
30from postrfp.shared.types import PermString
31from postrfp.authorisation.roles import ROLES
32from postrfp.model.meta import Base
33from postrfp.model.audit import AuditEvent
35if TYPE_CHECKING:
36 from sqlalchemy.orm import Query
37 from postrfp.model.notify import (
38 WebhookSubscription,
39 IssueWatchList,
40 ProjectWatchList,
41 )
42 from postrfp.model.acl import SectionPermission, ProjectPermission
43 from postrfp.model.project import Project
44 from postrfp.model.graph import RelationshipType, Edge
45 from postrfp.model.misc import Category
46 from postrfp.model.tags import Tag
49org_cat_rel = Table(
50 "org_cat_rel",
51 Base.metadata,
52 Column("id", Integer, primary_key=True),
53 Column(
54 "org_id",
55 VARCHAR(length=50),
56 ForeignKey(
57 "organisations.id",
58 # name="constr_org", # Removed name
59 ondelete="CASCADE",
60 onupdate="CASCADE",
61 ),
62 nullable=False,
63 ),
64 Column(
65 "cat_id",
66 INTEGER(),
67 ForeignKey("org_categories.id"), # name="constr_cat", # Removed name
68 nullable=False,
69 ),
70 UniqueConstraint("org_id", "cat_id"), # name="title", # Removed name
71)
73organisation_suppliers = Table(
74 "organisation_suppliers",
75 Base.metadata,
76 Column("id", Integer, primary_key=True),
77 Column(
78 "org_id",
79 VARCHAR(length=50),
80 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"),
81 ),
82 Column(
83 "supplier_id",
84 VARCHAR(length=50),
85 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
86 ),
87)
90class OrganisationType(Enum):
91 RESPONDENT = 0
92 BUYER = 1
93 CONSULTANT = 2
96BUYSIDE_ORGS = (OrganisationType.BUYER, OrganisationType.RESPONDENT)
99class ConsultantClientRelationship(Base):
100 __tablename__ = "consultant_orgs"
102 id = None # type: ignore
103 consultant_id: Mapped[str] = mapped_column(
104 VARCHAR(length=50),
105 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
106 primary_key=True,
107 )
108 client_id: Mapped[str] = mapped_column(
109 VARCHAR(length=50),
110 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
111 primary_key=True,
112 )
113 consultant: Mapped["Organisation"] = relationship(
114 "Organisation",
115 foreign_keys="ConsultantClientRelationship.consultant_id",
116 backref="consultant_org_clients",
117 )
118 client: Mapped["Organisation"] = relationship(
119 "Organisation",
120 foreign_keys="ConsultantClientRelationship.client_id",
121 backref="consultant_org_consultants",
122 )
124 def __repr__(self):
125 return f"<Consultant: {self.consultant_id} , Client: {self.client_id} >"
127 def __init__(self, client=None, consultant=None):
128 """Constructor is called when values are appended to the 'clients' or 'consultants'
129 attributes of Organisation
130 """
131 self.consultant = consultant
132 self.client = client
135class Organisation(Base):
136 __tablename__ = "organisations"
138 __mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": None}
140 public_attrs = "id,name,public,is_consultant".split(",")
142 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore
144 name: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False)
145 address: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True)
146 public: Mapped[bool] = mapped_column(BOOLEAN(), default=False)
147 type: Mapped[OrganisationType] = mapped_column(
148 SqlaEnum(
149 OrganisationType,
150 name="organisation_type_enum",
151 values_callable=lambda obj: [e.name for e in obj],
152 ), # Use SQLAlchemy Enum
153 nullable=False,
154 default=OrganisationType.RESPONDENT,
155 )
156 password_expiry: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
157 domain_name: Mapped[Optional[str]] = mapped_column(
158 VARCHAR(length=256), nullable=True
159 )
161 users: Mapped[list["User"]] = relationship(
162 "User",
163 back_populates="organisation",
164 cascade="all,delete",
165 passive_deletes=True,
166 )
168 organisation_categories: Mapped[list["OrganisationCategory"]] = relationship(
169 "OrganisationCategory", secondary=org_cat_rel
170 )
172 custom_roles: Mapped[list["CustomRole"]] = relationship(
173 "CustomRole", back_populates="organisation"
174 )
176 # These form the "Private Address Book"
177 suppliers: DynamicMapped["Organisation"] = relationship(
178 "Organisation",
179 secondary="organisation_suppliers",
180 primaryjoin="Organisation.id==organisation_suppliers.c.org_id",
181 secondaryjoin="Organisation.id==organisation_suppliers.c.supplier_id",
182 backref="buyers",
183 lazy="dynamic",
184 )
186 visible_events: DynamicMapped["AuditEvent"] = relationship(
187 "AuditEvent",
188 primaryjoin="Organisation.id==EventOrgACL.org_id",
189 secondaryjoin="EventOrgACL.event_id==AuditEvent.id",
190 secondary="audit_event_orgs",
191 lazy="dynamic",
192 viewonly=True,
193 )
195 webhook_subscriptions: DynamicMapped["WebhookSubscription"] = relationship(
196 "WebhookSubscription",
197 back_populates="organisation",
198 cascade="all, delete",
199 passive_deletes=True,
200 lazy="dynamic",
201 )
203 projects: DynamicMapped["Project"] = relationship(
204 "Project",
205 back_populates="owner_org",
206 cascade="all, delete",
207 passive_deletes=True,
208 lazy="dynamic",
209 )
210 relationship_types: DynamicMapped["RelationshipType"] = relationship(
211 "RelationshipType",
212 back_populates="organisation",
213 cascade="all,delete",
214 passive_deletes=True,
215 lazy="dynamic",
216 )
217 lower_edges: Mapped[list["Edge"]] = relationship(
218 "Edge",
219 primaryjoin="Organisation.id==Edge.from_org_id",
220 back_populates="from_org",
221 cascade="all, delete",
222 )
223 higher_edges: Mapped[list["Edge"]] = relationship(
224 "Edge",
225 primaryjoin="Organisation.id==Edge.to_org_id",
226 back_populates="to_org",
227 cascade="all, delete",
228 )
230 categories: DynamicMapped["Category"] = relationship(
231 "Category", back_populates="organisation"
232 )
234 tags: DynamicMapped["Tag"] = relationship(
235 "Tag", back_populates="organisation", lazy="dynamic"
236 )
238 events: DynamicMapped[AuditEvent] = relationship(
239 "AuditEvent",
240 lazy="dynamic",
241 back_populates="organisation",
242 cascade_backrefs=False,
243 primaryjoin=("foreign(AuditEvent.org_id)==Organisation.id"),
244 )
246 def __init__(self, id_name) -> None:
247 self.id = id_name
248 self.name = id_name
250 def __repr__(self) -> str:
251 return f"Organisation: {self.name}"
253 @property
254 def is_consultant(self) -> bool:
255 return self.type == OrganisationType.CONSULTANT
257 @property
258 def is_buyside(self) -> bool:
259 return self.type in (OrganisationType.BUYER, OrganisationType.CONSULTANT)
261 def has_supplier(self, supplier_org) -> bool:
262 try:
263 self.suppliers.filter(Organisation.id == supplier_org.id).one()
264 return True
265 except NoResultFound:
266 return False
269class RespondentOrganisation(Organisation):
270 __mapper_args__ = {"polymorphic_identity": OrganisationType.RESPONDENT}
273class BuyerOrganisation(Organisation):
274 __mapper_args__ = {"polymorphic_identity": OrganisationType.BUYER}
275 consultants: AssociationProxy[list["ConsultantClientRelationship"]] = (
276 association_proxy(
277 "consultant_org_consultants",
278 "consultant",
279 creator=lambda org: ConsultantClientRelationship(consultant=org),
280 )
281 )
284class ConsultantOrganisation(Organisation):
285 __mapper_args__ = {"polymorphic_identity": OrganisationType.CONSULTANT}
286 # Weird SQLAlchemy magic here - these two constructors seem to aggregate
287 # to call __init__ on ConsultantClientRelationship
288 clients: AssociationProxy[list["BuyerOrganisation"]] = association_proxy(
289 "consultant_org_clients",
290 "client",
291 creator=lambda org: ConsultantClientRelationship(client=org),
292 )
295def custom_perms(session, user_id) -> "Query":
296 return (
297 session.query(CustomRolePermission.permission_id)
298 .join(CustomRole)
299 .join(UserRole, CustomRole.id == UserRole.role_id)
300 .filter(UserRole.user_id == user_id)
301 )
304class RefreshToken(Base):
305 __tablename__ = "refresh_tokens"
307 id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
308 user_id: Mapped[str] = mapped_column(
309 VARCHAR(length=50),
310 ForeignKey("users.id", ondelete="CASCADE"),
311 nullable=False,
312 index=True,
313 )
314 token: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True)
315 issued_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
316 expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False)
317 revoked: Mapped[bool] = mapped_column(types.Boolean, default=False, nullable=False)
319 user: Mapped["User"] = relationship("User", back_populates="refresh_tokens")
321 def __repr__(self):
322 return f"<RefreshToken user_id={self.user_id} expires_at={self.expires_at} revoked={self.revoked}>"
325class User(Base):
326 __tablename__ = "users"
328 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore
329 org_id: Mapped[str] = mapped_column(
330 VARCHAR(length=50),
331 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
332 index=True,
333 nullable=True,
334 )
335 created_at: Mapped[datetime] = mapped_column(
336 DateTime,
337 nullable=False,
338 server_default=func.utc_timestamp(),
339 )
340 updated_at: Mapped[datetime] = mapped_column(
341 DateTime,
342 nullable=False,
343 server_default=func.utc_timestamp(),
344 server_onupdate=func.utc_timestamp(),
345 )
346 fullname: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False)
347 email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True)
348 password: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True)
349 locale: Mapped[Optional[str]] = mapped_column(VARCHAR(length=10), nullable=True)
350 previous_login_date: Mapped[Optional[datetime]] = mapped_column(
351 DateTime, nullable=True
352 )
353 password_set_date: Mapped[datetime] = mapped_column(DATETIME, nullable=True)
354 is_active: Mapped[bool] = mapped_column(types.Boolean, default=True, nullable=False)
355 failed_login_attempts: Mapped[int] = mapped_column(
356 Integer, default=0, nullable=False
357 )
358 total_failed_logins: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
359 locked_until: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
361 type: Mapped[str] = mapped_column(
362 VARCHAR(length=10),
363 nullable=False,
364 default="standard",
365 server_default=text("'standard'"),
366 )
368 organisation: Mapped["Organisation"] = relationship(
369 "Organisation", uselist=False, back_populates="users"
370 )
372 custom_roles: Mapped[list["CustomRole"]] = relationship(
373 "CustomRole",
374 secondary="user_roles",
375 secondaryjoin="UserRole.role_id==CustomRole.name",
376 viewonly=True,
377 )
379 role_list: Mapped[list["UserRole"]] = relationship(
380 "UserRole", back_populates="user", cascade="all, delete", passive_deletes=True
381 )
383 roles: AssociationProxy[list[str]] = association_proxy(
384 "role_list", "role_id", creator=lambda r: UserRole(role_id=r)
385 )
387 roles_q: Mapped[list["UserRole"]] = relationship(
388 "UserRole", lazy="dynamic", viewonly=True
389 )
391 refresh_tokens: Mapped[list["RefreshToken"]] = relationship(
392 "RefreshToken",
393 back_populates="user",
394 cascade="all, delete-orphan",
395 passive_deletes=True,
396 )
398 section_permissions: DynamicMapped["SectionPermission"] = relationship(
399 "SectionPermission",
400 back_populates="user",
401 cascade="all,delete",
402 passive_deletes=True,
403 lazy="dynamic",
404 )
406 project_permissions: Mapped[list["ProjectPermission"]] = relationship(
407 "ProjectPermission",
408 back_populates="user",
409 cascade="all, delete",
410 passive_deletes=True,
411 )
413 project_watches: Mapped[list["ProjectWatchList"]] = relationship(
414 "ProjectWatchList",
415 back_populates="user",
416 cascade="all,delete",
417 passive_deletes=True,
418 )
420 issue_watches: Mapped[list["IssueWatchList"]] = relationship(
421 "IssueWatchList",
422 back_populates="user",
423 cascade="all,delete",
424 passive_deletes=True,
425 )
427 events: DynamicMapped[AuditEvent] = relationship(
428 "AuditEvent",
429 lazy="dynamic",
430 back_populates="user",
431 cascade_backrefs=False,
432 primaryjoin="foreign(AuditEvent.user_id)==User.id",
433 )
435 def __init__(self, user_id, *args, **kwargs):
436 super(User, self).__init__(*args, **kwargs)
437 self.id = user_id
438 self.fullname = user_id
440 def __repr__(self):
441 return f"<User: {self.id} ({self.fullname}) of {self.org_id}>"
443 def custom_permissions(self):
444 cq = custom_perms(self._instance_session, self.id)
445 return {cp[0] for cp in cq}
447 @property
448 def is_restricted(self):
449 return self.type == "restricted"
451 @is_restricted.setter
452 def is_restricted(self, restricted_bool):
453 self.type = "restricted" if restricted_bool else "standard"
455 def has_permission(self, permission: PermString):
456 """
457 May be called multiple times in the life of one http request
458 so attempting to optimise. Database stored custom roles are
459 sometimes in use, but all efforts are made to avoid querying the Database
460 """
461 if "Administrator" in self.roles:
462 return True
463 if permission in self.builtin_permissions:
464 return True
465 return permission in self.custom_permissions()
467 def check_permission(self, permission: PermString):
468 """
469 @raise LacksPermission if the user doesn't have the given permission
470 """
471 if not self.has_permission(permission):
472 raise LacksPermission(permission, self.id)
474 def check_is_standard(self):
475 if self.is_restricted:
476 raise AuthorizationFailure("Action not permitted for Domain Expert users")
478 @property
479 def builtin_permissions(self):
480 if hasattr(self, "_builtin_perms"):
481 return self._builtin_perms
483 # cache not populated, so build it
484 self._builtin_perms = pm = set()
485 # Add permissions for 'built-in roles'
486 for role_id in self.roles & ROLES.keys():
487 pm.update(ROLES[role_id])
488 return pm
490 @property
491 def all_permissions(self) -> set[PermString]:
492 # set union syntax
493 return self.custom_permissions() | self.builtin_permissions
495 @property
496 def sorted_permissions(self) -> list[PermString]:
497 return sorted(self.all_permissions)
499 def add_role(self, role_id: str) -> None:
500 if role_id not in ROLES:
501 if role_id not in {cr.id for cr in self.organisation.custom_roles}:
502 raise KeyError('Role "%s" not found' % role_id)
503 self.roles.append(role_id)
505 def is_in_role(self, role_id: str):
506 return role_id in self.roles
508 def is_administrator(self):
509 return self.is_in_role("Administrator")
511 def can_view_section_id(self, section_id: int) -> bool:
512 if self.is_restricted:
513 clause = text(f"section_id={section_id}")
514 if self.section_permissions.filter(clause).count() != 1:
515 return False
516 return True
519class UserRole(Base):
520 __tablename__ = "user_roles"
521 __table_args__ = (
522 UniqueConstraint(
523 "user_id", "role_id"
524 ), # name="unique_user_role", # Removed name
525 )
527 user_id: Mapped[str] = mapped_column(
528 VARCHAR(length=50),
529 ForeignKey(
530 "users.id", ondelete="CASCADE"
531 ), # name="constr_user", # Removed name
532 nullable=False,
533 )
534 role_id: Mapped[PermString] = mapped_column(
535 VARCHAR(length=255),
536 nullable=True,
537 )
539 user: Mapped["User"] = relationship(
540 "User", back_populates="role_list", viewonly=True
541 )
543 def __repr__(self):
544 return f"<UserRole {self.user_id} - {self.role_id}>"
547def munged_id(sqla_context):
548 """
549 id is a string formed by joining name, org_id and a random string,
550 e.g. Self Scorer/Thomas Murray#TK94SL
551 """
552 name = sqla_context.current_parameters["name"]
553 org_id = sqla_context.current_parameters["org_id"]
554 chars = ascii_uppercase + digits
555 ran_string = [choice(chars) for c in range(5)]
556 return "%s/%s#%s" % (name, org_id, "".join(ran_string))
559class CustomRole(Base):
560 __tablename__ = "roles"
562 __table_args__ = (
563 UniqueConstraint("name", "org_id"), # name="role_names", # Removed name
564 )
566 id: Mapped[str] = mapped_column( # type: ignore
567 VARCHAR(length=255), nullable=False, primary_key=True, default=munged_id
568 )
569 name: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False)
570 description: Mapped[str] = mapped_column(TEXT(), nullable=False)
571 type: Mapped[str] = mapped_column(
572 CHAR(length=1), nullable=False, server_default=text("'U'")
573 )
574 org_id: Mapped[str] = mapped_column(
575 VARCHAR(length=50),
576 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"),
577 nullable=False,
578 )
580 organisation: Mapped["Organisation"] = relationship(
581 Organisation, back_populates="custom_roles"
582 )
583 permissions: Mapped[list["CustomRolePermission"]] = relationship(
584 "CustomRolePermission",
585 back_populates="role",
586 cascade="all, delete",
587 passive_deletes=True,
588 )
590 def __repr__(self):
591 return f"<CustomRole {self.id} - {self.name}>"
594class CustomRolePermission(Base):
595 __tablename__ = "role_permissions"
597 id = None # type: ignore
599 role_id: Mapped[str] = mapped_column(
600 VARCHAR(length=255),
601 ForeignKey(
602 "roles.id", ondelete="CASCADE"
603 ), # name="role_reference", # Removed name
604 nullable=False,
605 primary_key=True,
606 )
608 permission_id: Mapped[PermString] = mapped_column(
609 VARCHAR(length=255), nullable=False, primary_key=True
610 )
611 role: Mapped["CustomRole"] = relationship(CustomRole, back_populates="permissions")
613 def __repr__(self):
614 return f"{self.permission_id} (Role: {self.role_id})"
616 @validates("permission_id")
617 def known_permission(self, key, perm_id: PermString) -> PermString:
618 if perm_id not in perms.ALL_PERMISSIONS:
619 raise ValueError('"%s" is not a recognised permission' % perm_id)
620 return perm_id
623class OrganisationCategory(Base):
624 __tablename__ = "org_categories"
626 title: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True)
627 description: Mapped[Optional[str]] = mapped_column(TEXT(), nullable=True)
629 def __repr__(self):
630 return f"<Organisation Category: {self.title}>"
633class FailedLoginAttempt(Base):
634 __tablename__ = "failed_login_attempts"
635 user_id: Mapped[str] = mapped_column(
636 VARCHAR(length=50), ForeignKey("users.id"), nullable=False
637 )
638 timestamp: Mapped[datetime] = mapped_column(DateTime, index=True, nullable=False)
639 ip_address: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False)
641 def __repr__(self):
642 return f"<FailedLoginAttempt {self.user_id} at {self.timestamp}>"
645__all__ = [
646 "CustomRolePermission",
647 "CustomRole",
648 "UserRole",
649 "User",
650 "Organisation",
651 "ConsultantOrganisation",
652 "OrganisationType",
653 "OrganisationCategory",
654 "FailedLoginAttempt",
655]