Coverage for postrfp / model / humans.py: 100%

217 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1from enum import Enum 

2from random import choice 

3from string import ascii_uppercase, digits 

4from typing import Optional, TYPE_CHECKING 

5from datetime import datetime 

6 

7from sqlalchemy import ( 

8 Column, 

9 DateTime, 

10 Table, 

11 types, 

12 Integer, 

13 ForeignKey, 

14 UniqueConstraint, 

15 DATETIME, 

16 text, 

17 func, 

18) 

19 

20from sqlalchemy.orm import Mapped, mapped_column, relationship, validates, DynamicMapped 

21from sqlalchemy.orm.exc import NoResultFound 

22from sqlalchemy.ext.associationproxy import association_proxy, AssociationProxy 

23from sqlalchemy.types import VARCHAR, BOOLEAN, INTEGER, TEXT, CHAR 

24from sqlalchemy.types import Enum as SqlaEnum # Import Enum 

25 

26 

27from postrfp.authorisation import perms 

28from postrfp.shared.exceptions import LacksPermission 

29from postrfp.shared.exceptions import AuthorizationFailure 

30from postrfp.shared.types import PermString 

31from postrfp.authorisation.roles import ROLES 

32from postrfp.model.meta import Base 

33from postrfp.model.audit import AuditEvent 

34 

35if TYPE_CHECKING: 

36 from sqlalchemy.orm import Query 

37 from postrfp.model.notify import ( 

38 WebhookSubscription, 

39 IssueWatchList, 

40 ProjectWatchList, 

41 ) 

42 from postrfp.model.acl import SectionPermission, ProjectPermission 

43 from postrfp.model.project import Project 

44 from postrfp.model.graph import RelationshipType, Edge 

45 from postrfp.model.misc import Category 

46 from postrfp.model.tags import Tag 

47 

48 

49org_cat_rel = Table( 

50 "org_cat_rel", 

51 Base.metadata, 

52 Column("id", Integer, primary_key=True), 

53 Column( 

54 "org_id", 

55 VARCHAR(length=50), 

56 ForeignKey( 

57 "organisations.id", 

58 # name="constr_org", # Removed name 

59 ondelete="CASCADE", 

60 onupdate="CASCADE", 

61 ), 

62 nullable=False, 

63 ), 

64 Column( 

65 "cat_id", 

66 INTEGER(), 

67 ForeignKey("org_categories.id"), # name="constr_cat", # Removed name 

68 nullable=False, 

69 ), 

70 UniqueConstraint("org_id", "cat_id"), # name="title", # Removed name 

71) 

72 

73organisation_suppliers = Table( 

74 "organisation_suppliers", 

75 Base.metadata, 

76 Column("id", Integer, primary_key=True), 

77 Column( 

78 "org_id", 

79 VARCHAR(length=50), 

80 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"), 

81 ), 

82 Column( 

83 "supplier_id", 

84 VARCHAR(length=50), 

85 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

86 ), 

87) 

88 

89 

90class OrganisationType(Enum): 

91 RESPONDENT = 0 

92 BUYER = 1 

93 CONSULTANT = 2 

94 

95 

96class ConsultantClientRelationship(Base): 

97 __tablename__ = "consultant_orgs" 

98 

99 id = None # type: ignore 

100 consultant_id: Mapped[str] = mapped_column( 

101 VARCHAR(length=50), 

102 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

103 primary_key=True, 

104 ) 

105 client_id: Mapped[str] = mapped_column( 

106 VARCHAR(length=50), 

107 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

108 primary_key=True, 

109 ) 

110 consultant: Mapped["Organisation"] = relationship( 

111 "Organisation", 

112 foreign_keys="ConsultantClientRelationship.consultant_id", 

113 backref="consultant_org_clients", 

114 ) 

115 client: Mapped["Organisation"] = relationship( 

116 "Organisation", 

117 foreign_keys="ConsultantClientRelationship.client_id", 

118 backref="consultant_org_consultants", 

119 ) 

120 

121 def __repr__(self): 

122 return f"<Consultant: {self.consultant_id} , Client: {self.client_id} >" 

123 

124 def __init__(self, client=None, consultant=None): 

125 """Constructor is called when values are appended to the 'clients' or 'consultants' 

126 attributes of Organisation 

127 """ 

128 self.consultant = consultant 

129 self.client = client 

130 

131 

132class Organisation(Base): 

133 __tablename__ = "organisations" 

134 

135 __mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": None} 

136 

137 public_attrs = "id,name,public,is_consultant".split(",") 

138 

139 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore 

140 

141 name: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False) 

142 address: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True) 

143 public: Mapped[bool] = mapped_column(BOOLEAN(), default=False) 

144 type: Mapped[OrganisationType] = mapped_column( 

145 SqlaEnum( 

146 OrganisationType, 

147 name="organisation_type_enum", 

148 values_callable=lambda obj: [e.name for e in obj], 

149 ), # Use SQLAlchemy Enum 

150 nullable=False, 

151 default=OrganisationType.RESPONDENT, 

152 ) 

153 password_expiry: Mapped[int] = mapped_column(Integer, nullable=False, default=0) 

154 domain_name: Mapped[Optional[str]] = mapped_column( 

155 VARCHAR(length=256), nullable=True 

156 ) 

157 

158 users: Mapped[list["User"]] = relationship( 

159 "User", 

160 back_populates="organisation", 

161 cascade="all,delete", 

162 passive_deletes=True, 

163 ) 

164 

165 organisation_categories: Mapped[list["OrganisationCategory"]] = relationship( 

166 "OrganisationCategory", secondary=org_cat_rel 

167 ) 

168 

169 custom_roles: Mapped[list["CustomRole"]] = relationship( 

170 "CustomRole", back_populates="organisation" 

171 ) 

172 

173 # These form the "Private Address Book" 

174 suppliers: DynamicMapped["Organisation"] = relationship( 

175 "Organisation", 

176 secondary="organisation_suppliers", 

177 primaryjoin="Organisation.id==organisation_suppliers.c.org_id", 

178 secondaryjoin="Organisation.id==organisation_suppliers.c.supplier_id", 

179 backref="buyers", 

180 lazy="dynamic", 

181 ) 

182 

183 visible_events: DynamicMapped["AuditEvent"] = relationship( 

184 "AuditEvent", 

185 primaryjoin="Organisation.id==EventOrgACL.org_id", 

186 secondaryjoin="EventOrgACL.event_id==AuditEvent.id", 

187 secondary="audit_event_orgs", 

188 lazy="dynamic", 

189 viewonly=True, 

190 ) 

191 

192 webhook_subscriptions: DynamicMapped["WebhookSubscription"] = relationship( 

193 "WebhookSubscription", 

194 back_populates="organisation", 

195 cascade="all, delete", 

196 passive_deletes=True, 

197 lazy="dynamic", 

198 ) 

199 

200 projects: DynamicMapped["Project"] = relationship( 

201 "Project", 

202 back_populates="owner_org", 

203 cascade="all, delete", 

204 passive_deletes=True, 

205 lazy="dynamic", 

206 ) 

207 relationship_types: DynamicMapped["RelationshipType"] = relationship( 

208 "RelationshipType", 

209 back_populates="organisation", 

210 cascade="all,delete", 

211 passive_deletes=True, 

212 lazy="dynamic", 

213 ) 

214 lower_edges: Mapped[list["Edge"]] = relationship( 

215 "Edge", 

216 primaryjoin="Organisation.id==Edge.from_org_id", 

217 back_populates="from_org", 

218 cascade="all, delete", 

219 ) 

220 higher_edges: Mapped[list["Edge"]] = relationship( 

221 "Edge", 

222 primaryjoin="Organisation.id==Edge.to_org_id", 

223 back_populates="to_org", 

224 cascade="all, delete", 

225 ) 

226 

227 categories: DynamicMapped["Category"] = relationship( 

228 "Category", back_populates="organisation" 

229 ) 

230 

231 tags: DynamicMapped["Tag"] = relationship( 

232 "Tag", back_populates="organisation", lazy="dynamic" 

233 ) 

234 

235 events: DynamicMapped[AuditEvent] = relationship( 

236 "AuditEvent", 

237 lazy="dynamic", 

238 back_populates="organisation", 

239 cascade_backrefs=False, 

240 primaryjoin=("foreign(AuditEvent.org_id)==Organisation.id"), 

241 ) 

242 

243 def __init__(self, id_name) -> None: 

244 self.id = id_name 

245 self.name = id_name 

246 

247 def __repr__(self) -> str: 

248 return f"Organisation: {self.name}" 

249 

250 @property 

251 def is_consultant(self) -> bool: 

252 return self.type == OrganisationType.CONSULTANT 

253 

254 @property 

255 def is_buyside(self) -> bool: 

256 return self.type in (OrganisationType.BUYER, OrganisationType.CONSULTANT) 

257 

258 def has_supplier(self, supplier_org) -> bool: 

259 try: 

260 self.suppliers.filter(Organisation.id == supplier_org.id).one() 

261 return True 

262 except NoResultFound: 

263 return False 

264 

265 

266class RespondentOrganisation(Organisation): 

267 __mapper_args__ = {"polymorphic_identity": OrganisationType.RESPONDENT} 

268 

269 

270class BuyerOrganisation(Organisation): 

271 __mapper_args__ = {"polymorphic_identity": OrganisationType.BUYER} 

272 consultants: AssociationProxy[list["ConsultantClientRelationship"]] = ( 

273 association_proxy( 

274 "consultant_org_consultants", 

275 "consultant", 

276 creator=lambda org: ConsultantClientRelationship(consultant=org), 

277 ) 

278 ) 

279 

280 

281class ConsultantOrganisation(Organisation): 

282 __mapper_args__ = {"polymorphic_identity": OrganisationType.CONSULTANT} 

283 # Weird SQLAlchemy magic here - these two constructors seem to aggregate 

284 # to call __init__ on ConsultantClientRelationship 

285 clients: AssociationProxy[list["BuyerOrganisation"]] = association_proxy( 

286 "consultant_org_clients", 

287 "client", 

288 creator=lambda org: ConsultantClientRelationship(client=org), 

289 ) 

290 

291 

292def custom_perms(session, user_id) -> "Query": 

293 return ( 

294 session.query(CustomRolePermission.permission_id) 

295 .join(CustomRole) 

296 .join(UserRole, CustomRole.id == UserRole.role_id) 

297 .filter(UserRole.user_id == user_id) 

298 ) 

299 

300 

301class RefreshToken(Base): 

302 __tablename__ = "refresh_tokens" 

303 

304 id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) 

305 user_id: Mapped[str] = mapped_column( 

306 VARCHAR(length=50), 

307 ForeignKey("users.id", ondelete="CASCADE"), 

308 nullable=False, 

309 index=True, 

310 ) 

311 token: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True) 

312 issued_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) 

313 expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) 

314 revoked: Mapped[bool] = mapped_column(types.Boolean, default=False, nullable=False) 

315 

316 user: Mapped["User"] = relationship("User", back_populates="refresh_tokens") 

317 

318 def __repr__(self): 

319 return f"<RefreshToken user_id={self.user_id} expires_at={self.expires_at} revoked={self.revoked}>" 

320 

321 

322class User(Base): 

323 __tablename__ = "users" 

324 

325 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore 

326 org_id: Mapped[str] = mapped_column( 

327 VARCHAR(length=50), 

328 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

329 index=True, 

330 nullable=True, 

331 ) 

332 created_at: Mapped[datetime] = mapped_column( 

333 DateTime, 

334 nullable=False, 

335 server_default=func.utc_timestamp(), 

336 ) 

337 updated_at: Mapped[datetime] = mapped_column( 

338 DateTime, 

339 nullable=False, 

340 server_default=func.utc_timestamp(), 

341 server_onupdate=func.utc_timestamp(), 

342 ) 

343 fullname: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False) 

344 email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True) 

345 password: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True) 

346 locale: Mapped[Optional[str]] = mapped_column(VARCHAR(length=10), nullable=True) 

347 previous_login_date: Mapped[Optional[datetime]] = mapped_column( 

348 DateTime, nullable=True 

349 ) 

350 password_set_date: Mapped[datetime] = mapped_column(DATETIME, nullable=True) 

351 is_active: Mapped[bool] = mapped_column(types.Boolean, default=True, nullable=False) 

352 failed_login_attempts: Mapped[int] = mapped_column( 

353 Integer, default=0, nullable=False 

354 ) 

355 total_failed_logins: Mapped[int] = mapped_column(Integer, default=0, nullable=False) 

356 locked_until: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) 

357 

358 type: Mapped[str] = mapped_column( 

359 VARCHAR(length=10), 

360 nullable=False, 

361 default="standard", 

362 server_default=text("'standard'"), 

363 ) 

364 

365 organisation: Mapped["Organisation"] = relationship( 

366 "Organisation", uselist=False, back_populates="users" 

367 ) 

368 

369 custom_roles: Mapped[list["CustomRole"]] = relationship( 

370 "CustomRole", 

371 secondary="user_roles", 

372 secondaryjoin="UserRole.role_id==CustomRole.name", 

373 viewonly=True, 

374 ) 

375 

376 role_list: Mapped[list["UserRole"]] = relationship( 

377 "UserRole", back_populates="user", cascade="all, delete", passive_deletes=True 

378 ) 

379 

380 roles: AssociationProxy[list[str]] = association_proxy( 

381 "role_list", "role_id", creator=lambda r: UserRole(role_id=r) 

382 ) 

383 

384 roles_q: Mapped[list["UserRole"]] = relationship( 

385 "UserRole", lazy="dynamic", viewonly=True 

386 ) 

387 

388 refresh_tokens: Mapped[list["RefreshToken"]] = relationship( 

389 "RefreshToken", 

390 back_populates="user", 

391 cascade="all, delete-orphan", 

392 passive_deletes=True, 

393 ) 

394 

395 section_permissions: DynamicMapped["SectionPermission"] = relationship( 

396 "SectionPermission", 

397 back_populates="user", 

398 cascade="all,delete", 

399 passive_deletes=True, 

400 lazy="dynamic", 

401 ) 

402 

403 project_permissions: Mapped[list["ProjectPermission"]] = relationship( 

404 "ProjectPermission", 

405 back_populates="user", 

406 cascade="all, delete", 

407 passive_deletes=True, 

408 ) 

409 

410 project_watches: Mapped[list["ProjectWatchList"]] = relationship( 

411 "ProjectWatchList", 

412 back_populates="user", 

413 cascade="all,delete", 

414 passive_deletes=True, 

415 ) 

416 

417 issue_watches: Mapped[list["IssueWatchList"]] = relationship( 

418 "IssueWatchList", 

419 back_populates="user", 

420 cascade="all,delete", 

421 passive_deletes=True, 

422 ) 

423 

424 events: DynamicMapped[AuditEvent] = relationship( 

425 "AuditEvent", 

426 lazy="dynamic", 

427 back_populates="user", 

428 cascade_backrefs=False, 

429 primaryjoin="foreign(AuditEvent.user_id)==User.id", 

430 ) 

431 

432 def __init__(self, user_id, *args, **kwargs): 

433 super(User, self).__init__(*args, **kwargs) 

434 self.id = user_id 

435 self.fullname = user_id 

436 

437 def __repr__(self): 

438 return f"<User: {self.id} ({self.fullname}) of {self.org_id}>" 

439 

440 def custom_permissions(self): 

441 cq = custom_perms(self._instance_session, self.id) 

442 return {cp[0] for cp in cq} 

443 

444 @property 

445 def is_restricted(self): 

446 return self.type == "restricted" 

447 

448 @is_restricted.setter 

449 def is_restricted(self, restricted_bool): 

450 self.type = "restricted" if restricted_bool else "standard" 

451 

452 def has_permission(self, permission: PermString): 

453 """ 

454 May be called multiple times in the life of one http request 

455 so attempting to optimise. Database stored custom roles are 

456 sometimes in use, but all efforts are made to avoid querying the Database 

457 """ 

458 if "Administrator" in self.roles: 

459 return True 

460 if permission in self.builtin_permissions: 

461 return True 

462 return permission in self.custom_permissions() 

463 

464 def check_permission(self, permission: PermString): 

465 """ 

466 @raise LacksPermission if the user doesn't have the given permission 

467 """ 

468 if not self.has_permission(permission): 

469 raise LacksPermission(permission, self.id) 

470 

471 def check_is_standard(self): 

472 if self.is_restricted: 

473 raise AuthorizationFailure("Action not permitted for Domain Expert users") 

474 

475 @property 

476 def builtin_permissions(self): 

477 if hasattr(self, "_builtin_perms"): 

478 return self._builtin_perms 

479 

480 # cache not populated, so build it 

481 self._builtin_perms = pm = set() 

482 # Add permissions for 'built-in roles' 

483 for role_id in self.roles & ROLES.keys(): 

484 pm.update(ROLES[role_id]) 

485 return pm 

486 

487 @property 

488 def all_permissions(self) -> set[PermString]: 

489 # set union syntax 

490 return self.custom_permissions() | self.builtin_permissions 

491 

492 @property 

493 def sorted_permissions(self) -> list[PermString]: 

494 return sorted(self.all_permissions) 

495 

496 def add_role(self, role_id: str) -> None: 

497 if role_id not in ROLES: 

498 if role_id not in {cr.id for cr in self.organisation.custom_roles}: 

499 raise KeyError('Role "%s" not found' % role_id) 

500 self.roles.append(role_id) 

501 

502 def is_in_role(self, role_id: str): 

503 return role_id in self.roles 

504 

505 def is_administrator(self): 

506 return self.is_in_role("Administrator") 

507 

508 def can_view_section_id(self, section_id: int) -> bool: 

509 if self.is_restricted: 

510 clause = text(f"section_id={section_id}") 

511 if self.section_permissions.filter(clause).count() != 1: 

512 return False 

513 return True 

514 

515 

516class UserRole(Base): 

517 __tablename__ = "user_roles" 

518 __table_args__ = ( 

519 UniqueConstraint( 

520 "user_id", "role_id" 

521 ), # name="unique_user_role", # Removed name 

522 ) 

523 

524 user_id: Mapped[str] = mapped_column( 

525 VARCHAR(length=50), 

526 ForeignKey( 

527 "users.id", ondelete="CASCADE" 

528 ), # name="constr_user", # Removed name 

529 nullable=False, 

530 ) 

531 role_id: Mapped[PermString] = mapped_column( 

532 VARCHAR(length=255), 

533 nullable=True, 

534 ) 

535 

536 user: Mapped["User"] = relationship( 

537 "User", back_populates="role_list", viewonly=True 

538 ) 

539 

540 def __repr__(self): 

541 return f"<UserRole {self.user_id} - {self.role_id}>" 

542 

543 

544def munged_id(sqla_context): 

545 """ 

546 id is a string formed by joining name, org_id and a random string, 

547 e.g. Self Scorer/Thomas Murray#TK94SL 

548 """ 

549 name = sqla_context.current_parameters["name"] 

550 org_id = sqla_context.current_parameters["org_id"] 

551 chars = ascii_uppercase + digits 

552 ran_string = [choice(chars) for c in range(5)] 

553 return "%s/%s#%s" % (name, org_id, "".join(ran_string)) 

554 

555 

556class CustomRole(Base): 

557 __tablename__ = "roles" 

558 

559 __table_args__ = ( 

560 UniqueConstraint("name", "org_id"), # name="role_names", # Removed name 

561 ) 

562 

563 id: Mapped[str] = mapped_column( # type: ignore 

564 VARCHAR(length=255), nullable=False, primary_key=True, default=munged_id 

565 ) 

566 name: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False) 

567 description: Mapped[str] = mapped_column(TEXT(), nullable=False) 

568 type: Mapped[str] = mapped_column( 

569 CHAR(length=1), nullable=False, server_default=text("'U'") 

570 ) 

571 org_id: Mapped[str] = mapped_column( 

572 VARCHAR(length=50), 

573 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"), 

574 nullable=False, 

575 ) 

576 

577 organisation: Mapped["Organisation"] = relationship( 

578 Organisation, back_populates="custom_roles" 

579 ) 

580 permissions: Mapped[list["CustomRolePermission"]] = relationship( 

581 "CustomRolePermission", 

582 back_populates="role", 

583 cascade="all, delete", 

584 passive_deletes=True, 

585 ) 

586 

587 def __repr__(self): 

588 return f"<CustomRole {self.id} - {self.name}>" 

589 

590 

591class CustomRolePermission(Base): 

592 __tablename__ = "role_permissions" 

593 

594 id = None # type: ignore 

595 

596 role_id: Mapped[str] = mapped_column( 

597 VARCHAR(length=255), 

598 ForeignKey( 

599 "roles.id", ondelete="CASCADE" 

600 ), # name="role_reference", # Removed name 

601 nullable=False, 

602 primary_key=True, 

603 ) 

604 

605 permission_id: Mapped[PermString] = mapped_column( 

606 VARCHAR(length=255), nullable=False, primary_key=True 

607 ) 

608 role: Mapped["CustomRole"] = relationship(CustomRole, back_populates="permissions") 

609 

610 def __repr__(self): 

611 return f"{self.permission_id} (Role: {self.role_id})" 

612 

613 @validates("permission_id") 

614 def known_permission(self, key, perm_id: PermString) -> PermString: 

615 if perm_id not in perms.ALL_PERMISSIONS: 

616 raise ValueError('"%s" is not a recognised permission' % perm_id) 

617 return perm_id 

618 

619 

620class OrganisationCategory(Base): 

621 __tablename__ = "org_categories" 

622 

623 title: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True) 

624 description: Mapped[Optional[str]] = mapped_column(TEXT(), nullable=True) 

625 

626 def __repr__(self): 

627 return f"<Organisation Category: {self.title}>" 

628 

629 

630class FailedLoginAttempt(Base): 

631 __tablename__ = "failed_login_attempts" 

632 user_id: Mapped[str] = mapped_column( 

633 VARCHAR(length=50), ForeignKey("users.id"), nullable=False 

634 ) 

635 timestamp: Mapped[datetime] = mapped_column(DateTime, index=True, nullable=False) 

636 ip_address: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False) 

637 

638 def __repr__(self): 

639 return f"<FailedLoginAttempt {self.user_id} at {self.timestamp}>" 

640 

641 

642__all__ = [ 

643 "CustomRolePermission", 

644 "CustomRole", 

645 "UserRole", 

646 "User", 

647 "Organisation", 

648 "ConsultantOrganisation", 

649 "OrganisationType", 

650 "OrganisationCategory", 

651 "FailedLoginAttempt", 

652]