Coverage for postrfp/model/humans.py: 100%

218 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1from enum import Enum 

2from random import choice 

3from string import ascii_uppercase, digits 

4from typing import Optional, TYPE_CHECKING 

5from datetime import datetime 

6 

7from sqlalchemy import ( 

8 Column, 

9 DateTime, 

10 Table, 

11 types, 

12 Integer, 

13 ForeignKey, 

14 UniqueConstraint, 

15 DATETIME, 

16 text, 

17 func, 

18) 

19 

20from sqlalchemy.orm import Mapped, mapped_column, relationship, validates, DynamicMapped 

21from sqlalchemy.orm.exc import NoResultFound 

22from sqlalchemy.ext.associationproxy import association_proxy, AssociationProxy 

23from sqlalchemy.types import VARCHAR, BOOLEAN, INTEGER, TEXT, CHAR 

24from sqlalchemy.types import Enum as SqlaEnum # Import Enum 

25 

26 

27from postrfp.authorisation import perms 

28from postrfp.shared.exceptions import LacksPermission 

29from postrfp.shared.exceptions import AuthorizationFailure 

30from postrfp.shared.types import PermString 

31from postrfp.authorisation.roles import ROLES 

32from postrfp.model.meta import Base 

33from postrfp.model.audit import AuditEvent 

34 

35if TYPE_CHECKING: 

36 from sqlalchemy.orm import Query 

37 from postrfp.model.notify import ( 

38 WebhookSubscription, 

39 IssueWatchList, 

40 ProjectWatchList, 

41 ) 

42 from postrfp.model.acl import SectionPermission, ProjectPermission 

43 from postrfp.model.project import Project 

44 from postrfp.model.graph import RelationshipType, Edge 

45 from postrfp.model.misc import Category 

46 from postrfp.model.tags import Tag 

47 

48 

49org_cat_rel = Table( 

50 "org_cat_rel", 

51 Base.metadata, 

52 Column("id", Integer, primary_key=True), 

53 Column( 

54 "org_id", 

55 VARCHAR(length=50), 

56 ForeignKey( 

57 "organisations.id", 

58 # name="constr_org", # Removed name 

59 ondelete="CASCADE", 

60 onupdate="CASCADE", 

61 ), 

62 nullable=False, 

63 ), 

64 Column( 

65 "cat_id", 

66 INTEGER(), 

67 ForeignKey("org_categories.id"), # name="constr_cat", # Removed name 

68 nullable=False, 

69 ), 

70 UniqueConstraint("org_id", "cat_id"), # name="title", # Removed name 

71) 

72 

73organisation_suppliers = Table( 

74 "organisation_suppliers", 

75 Base.metadata, 

76 Column("id", Integer, primary_key=True), 

77 Column( 

78 "org_id", 

79 VARCHAR(length=50), 

80 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"), 

81 ), 

82 Column( 

83 "supplier_id", 

84 VARCHAR(length=50), 

85 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

86 ), 

87) 

88 

89 

90class OrganisationType(Enum): 

91 RESPONDENT = 0 

92 BUYER = 1 

93 CONSULTANT = 2 

94 

95 

96BUYSIDE_ORGS = (OrganisationType.BUYER, OrganisationType.RESPONDENT) 

97 

98 

99class ConsultantClientRelationship(Base): 

100 __tablename__ = "consultant_orgs" 

101 

102 id = None # type: ignore 

103 consultant_id: Mapped[str] = mapped_column( 

104 VARCHAR(length=50), 

105 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

106 primary_key=True, 

107 ) 

108 client_id: Mapped[str] = mapped_column( 

109 VARCHAR(length=50), 

110 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

111 primary_key=True, 

112 ) 

113 consultant: Mapped["Organisation"] = relationship( 

114 "Organisation", 

115 foreign_keys="ConsultantClientRelationship.consultant_id", 

116 backref="consultant_org_clients", 

117 ) 

118 client: Mapped["Organisation"] = relationship( 

119 "Organisation", 

120 foreign_keys="ConsultantClientRelationship.client_id", 

121 backref="consultant_org_consultants", 

122 ) 

123 

124 def __repr__(self): 

125 return f"<Consultant: {self.consultant_id} , Client: {self.client_id} >" 

126 

127 def __init__(self, client=None, consultant=None): 

128 """Constructor is called when values are appended to the 'clients' or 'consultants' 

129 attributes of Organisation 

130 """ 

131 self.consultant = consultant 

132 self.client = client 

133 

134 

135class Organisation(Base): 

136 __tablename__ = "organisations" 

137 

138 __mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": None} 

139 

140 public_attrs = "id,name,public,is_consultant".split(",") 

141 

142 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore 

143 

144 name: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False) 

145 address: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True) 

146 public: Mapped[bool] = mapped_column(BOOLEAN(), default=False) 

147 type: Mapped[OrganisationType] = mapped_column( 

148 SqlaEnum( 

149 OrganisationType, 

150 name="organisation_type_enum", 

151 values_callable=lambda obj: [e.name for e in obj], 

152 ), # Use SQLAlchemy Enum 

153 nullable=False, 

154 default=OrganisationType.RESPONDENT, 

155 ) 

156 password_expiry: Mapped[int] = mapped_column(Integer, nullable=False, default=0) 

157 domain_name: Mapped[Optional[str]] = mapped_column( 

158 VARCHAR(length=256), nullable=True 

159 ) 

160 

161 users: Mapped[list["User"]] = relationship( 

162 "User", 

163 back_populates="organisation", 

164 cascade="all,delete", 

165 passive_deletes=True, 

166 ) 

167 

168 organisation_categories: Mapped[list["OrganisationCategory"]] = relationship( 

169 "OrganisationCategory", secondary=org_cat_rel 

170 ) 

171 

172 custom_roles: Mapped[list["CustomRole"]] = relationship( 

173 "CustomRole", back_populates="organisation" 

174 ) 

175 

176 # These form the "Private Address Book" 

177 suppliers: DynamicMapped["Organisation"] = relationship( 

178 "Organisation", 

179 secondary="organisation_suppliers", 

180 primaryjoin="Organisation.id==organisation_suppliers.c.org_id", 

181 secondaryjoin="Organisation.id==organisation_suppliers.c.supplier_id", 

182 backref="buyers", 

183 lazy="dynamic", 

184 ) 

185 

186 visible_events: DynamicMapped["AuditEvent"] = relationship( 

187 "AuditEvent", 

188 primaryjoin="Organisation.id==EventOrgACL.org_id", 

189 secondaryjoin="EventOrgACL.event_id==AuditEvent.id", 

190 secondary="audit_event_orgs", 

191 lazy="dynamic", 

192 viewonly=True, 

193 ) 

194 

195 webhook_subscriptions: DynamicMapped["WebhookSubscription"] = relationship( 

196 "WebhookSubscription", 

197 back_populates="organisation", 

198 cascade="all, delete", 

199 passive_deletes=True, 

200 lazy="dynamic", 

201 ) 

202 

203 projects: DynamicMapped["Project"] = relationship( 

204 "Project", 

205 back_populates="owner_org", 

206 cascade="all, delete", 

207 passive_deletes=True, 

208 lazy="dynamic", 

209 ) 

210 relationship_types: DynamicMapped["RelationshipType"] = relationship( 

211 "RelationshipType", 

212 back_populates="organisation", 

213 cascade="all,delete", 

214 passive_deletes=True, 

215 lazy="dynamic", 

216 ) 

217 lower_edges: Mapped[list["Edge"]] = relationship( 

218 "Edge", 

219 primaryjoin="Organisation.id==Edge.from_org_id", 

220 back_populates="from_org", 

221 cascade="all, delete", 

222 ) 

223 higher_edges: Mapped[list["Edge"]] = relationship( 

224 "Edge", 

225 primaryjoin="Organisation.id==Edge.to_org_id", 

226 back_populates="to_org", 

227 cascade="all, delete", 

228 ) 

229 

230 categories: DynamicMapped["Category"] = relationship( 

231 "Category", back_populates="organisation" 

232 ) 

233 

234 tags: DynamicMapped["Tag"] = relationship( 

235 "Tag", back_populates="organisation", lazy="dynamic" 

236 ) 

237 

238 events: DynamicMapped[AuditEvent] = relationship( 

239 "AuditEvent", 

240 lazy="dynamic", 

241 back_populates="organisation", 

242 cascade_backrefs=False, 

243 primaryjoin=("foreign(AuditEvent.org_id)==Organisation.id"), 

244 ) 

245 

246 def __init__(self, id_name) -> None: 

247 self.id = id_name 

248 self.name = id_name 

249 

250 def __repr__(self) -> str: 

251 return f"Organisation: {self.name}" 

252 

253 @property 

254 def is_consultant(self) -> bool: 

255 return self.type == OrganisationType.CONSULTANT 

256 

257 @property 

258 def is_buyside(self) -> bool: 

259 return self.type in (OrganisationType.BUYER, OrganisationType.CONSULTANT) 

260 

261 def has_supplier(self, supplier_org) -> bool: 

262 try: 

263 self.suppliers.filter(Organisation.id == supplier_org.id).one() 

264 return True 

265 except NoResultFound: 

266 return False 

267 

268 

269class RespondentOrganisation(Organisation): 

270 __mapper_args__ = {"polymorphic_identity": OrganisationType.RESPONDENT} 

271 

272 

273class BuyerOrganisation(Organisation): 

274 __mapper_args__ = {"polymorphic_identity": OrganisationType.BUYER} 

275 consultants: AssociationProxy[list["ConsultantClientRelationship"]] = ( 

276 association_proxy( 

277 "consultant_org_consultants", 

278 "consultant", 

279 creator=lambda org: ConsultantClientRelationship(consultant=org), 

280 ) 

281 ) 

282 

283 

284class ConsultantOrganisation(Organisation): 

285 __mapper_args__ = {"polymorphic_identity": OrganisationType.CONSULTANT} 

286 # Weird SQLAlchemy magic here - these two constructors seem to aggregate 

287 # to call __init__ on ConsultantClientRelationship 

288 clients: AssociationProxy[list["BuyerOrganisation"]] = association_proxy( 

289 "consultant_org_clients", 

290 "client", 

291 creator=lambda org: ConsultantClientRelationship(client=org), 

292 ) 

293 

294 

295def custom_perms(session, user_id) -> "Query": 

296 return ( 

297 session.query(CustomRolePermission.permission_id) 

298 .join(CustomRole) 

299 .join(UserRole, CustomRole.id == UserRole.role_id) 

300 .filter(UserRole.user_id == user_id) 

301 ) 

302 

303 

304class RefreshToken(Base): 

305 __tablename__ = "refresh_tokens" 

306 

307 id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) 

308 user_id: Mapped[str] = mapped_column( 

309 VARCHAR(length=50), 

310 ForeignKey("users.id", ondelete="CASCADE"), 

311 nullable=False, 

312 index=True, 

313 ) 

314 token: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True) 

315 issued_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) 

316 expires_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) 

317 revoked: Mapped[bool] = mapped_column(types.Boolean, default=False, nullable=False) 

318 

319 user: Mapped["User"] = relationship("User", back_populates="refresh_tokens") 

320 

321 def __repr__(self): 

322 return f"<RefreshToken user_id={self.user_id} expires_at={self.expires_at} revoked={self.revoked}>" 

323 

324 

325class User(Base): 

326 __tablename__ = "users" 

327 

328 id: Mapped[str] = mapped_column(VARCHAR(length=50), primary_key=True) # type: ignore 

329 org_id: Mapped[str] = mapped_column( 

330 VARCHAR(length=50), 

331 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

332 index=True, 

333 nullable=True, 

334 ) 

335 created_at: Mapped[datetime] = mapped_column( 

336 DateTime, 

337 nullable=False, 

338 server_default=func.utc_timestamp(), 

339 ) 

340 updated_at: Mapped[datetime] = mapped_column( 

341 DateTime, 

342 nullable=False, 

343 server_default=func.utc_timestamp(), 

344 server_onupdate=func.utc_timestamp(), 

345 ) 

346 fullname: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False) 

347 email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True) 

348 password: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), nullable=True) 

349 locale: Mapped[Optional[str]] = mapped_column(VARCHAR(length=10), nullable=True) 

350 previous_login_date: Mapped[Optional[datetime]] = mapped_column( 

351 DateTime, nullable=True 

352 ) 

353 password_set_date: Mapped[datetime] = mapped_column(DATETIME, nullable=True) 

354 is_active: Mapped[bool] = mapped_column(types.Boolean, default=True, nullable=False) 

355 failed_login_attempts: Mapped[int] = mapped_column( 

356 Integer, default=0, nullable=False 

357 ) 

358 total_failed_logins: Mapped[int] = mapped_column(Integer, default=0, nullable=False) 

359 locked_until: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) 

360 

361 type: Mapped[str] = mapped_column( 

362 VARCHAR(length=10), 

363 nullable=False, 

364 default="standard", 

365 server_default=text("'standard'"), 

366 ) 

367 

368 organisation: Mapped["Organisation"] = relationship( 

369 "Organisation", uselist=False, back_populates="users" 

370 ) 

371 

372 custom_roles: Mapped[list["CustomRole"]] = relationship( 

373 "CustomRole", 

374 secondary="user_roles", 

375 secondaryjoin="UserRole.role_id==CustomRole.name", 

376 viewonly=True, 

377 ) 

378 

379 role_list: Mapped[list["UserRole"]] = relationship( 

380 "UserRole", back_populates="user", cascade="all, delete", passive_deletes=True 

381 ) 

382 

383 roles: AssociationProxy[list[str]] = association_proxy( 

384 "role_list", "role_id", creator=lambda r: UserRole(role_id=r) 

385 ) 

386 

387 roles_q: Mapped[list["UserRole"]] = relationship( 

388 "UserRole", lazy="dynamic", viewonly=True 

389 ) 

390 

391 refresh_tokens: Mapped[list["RefreshToken"]] = relationship( 

392 "RefreshToken", 

393 back_populates="user", 

394 cascade="all, delete-orphan", 

395 passive_deletes=True, 

396 ) 

397 

398 section_permissions: DynamicMapped["SectionPermission"] = relationship( 

399 "SectionPermission", 

400 back_populates="user", 

401 cascade="all,delete", 

402 passive_deletes=True, 

403 lazy="dynamic", 

404 ) 

405 

406 project_permissions: Mapped[list["ProjectPermission"]] = relationship( 

407 "ProjectPermission", 

408 back_populates="user", 

409 cascade="all, delete", 

410 passive_deletes=True, 

411 ) 

412 

413 project_watches: Mapped[list["ProjectWatchList"]] = relationship( 

414 "ProjectWatchList", 

415 back_populates="user", 

416 cascade="all,delete", 

417 passive_deletes=True, 

418 ) 

419 

420 issue_watches: Mapped[list["IssueWatchList"]] = relationship( 

421 "IssueWatchList", 

422 back_populates="user", 

423 cascade="all,delete", 

424 passive_deletes=True, 

425 ) 

426 

427 events: DynamicMapped[AuditEvent] = relationship( 

428 "AuditEvent", 

429 lazy="dynamic", 

430 back_populates="user", 

431 cascade_backrefs=False, 

432 primaryjoin="foreign(AuditEvent.user_id)==User.id", 

433 ) 

434 

435 def __init__(self, user_id, *args, **kwargs): 

436 super(User, self).__init__(*args, **kwargs) 

437 self.id = user_id 

438 self.fullname = user_id 

439 

440 def __repr__(self): 

441 return f"<User: {self.id} ({self.fullname}) of {self.org_id}>" 

442 

443 def custom_permissions(self): 

444 cq = custom_perms(self._instance_session, self.id) 

445 return {cp[0] for cp in cq} 

446 

447 @property 

448 def is_restricted(self): 

449 return self.type == "restricted" 

450 

451 @is_restricted.setter 

452 def is_restricted(self, restricted_bool): 

453 self.type = "restricted" if restricted_bool else "standard" 

454 

455 def has_permission(self, permission: PermString): 

456 """ 

457 May be called multiple times in the life of one http request 

458 so attempting to optimise. Database stored custom roles are 

459 sometimes in use, but all efforts are made to avoid querying the Database 

460 """ 

461 if "Administrator" in self.roles: 

462 return True 

463 if permission in self.builtin_permissions: 

464 return True 

465 return permission in self.custom_permissions() 

466 

467 def check_permission(self, permission: PermString): 

468 """ 

469 @raise LacksPermission if the user doesn't have the given permission 

470 """ 

471 if not self.has_permission(permission): 

472 raise LacksPermission(permission, self.id) 

473 

474 def check_is_standard(self): 

475 if self.is_restricted: 

476 raise AuthorizationFailure("Action not permitted for Domain Expert users") 

477 

478 @property 

479 def builtin_permissions(self): 

480 if hasattr(self, "_builtin_perms"): 

481 return self._builtin_perms 

482 

483 # cache not populated, so build it 

484 self._builtin_perms = pm = set() 

485 # Add permissions for 'built-in roles' 

486 for role_id in self.roles & ROLES.keys(): 

487 pm.update(ROLES[role_id]) 

488 return pm 

489 

490 @property 

491 def all_permissions(self) -> set[PermString]: 

492 # set union syntax 

493 return self.custom_permissions() | self.builtin_permissions 

494 

495 @property 

496 def sorted_permissions(self) -> list[PermString]: 

497 return sorted(self.all_permissions) 

498 

499 def add_role(self, role_id: str) -> None: 

500 if role_id not in ROLES: 

501 if role_id not in {cr.id for cr in self.organisation.custom_roles}: 

502 raise KeyError('Role "%s" not found' % role_id) 

503 self.roles.append(role_id) 

504 

505 def is_in_role(self, role_id: str): 

506 return role_id in self.roles 

507 

508 def is_administrator(self): 

509 return self.is_in_role("Administrator") 

510 

511 def can_view_section_id(self, section_id: int) -> bool: 

512 if self.is_restricted: 

513 clause = text(f"section_id={section_id}") 

514 if self.section_permissions.filter(clause).count() != 1: 

515 return False 

516 return True 

517 

518 

519class UserRole(Base): 

520 __tablename__ = "user_roles" 

521 __table_args__ = ( 

522 UniqueConstraint( 

523 "user_id", "role_id" 

524 ), # name="unique_user_role", # Removed name 

525 ) 

526 

527 user_id: Mapped[str] = mapped_column( 

528 VARCHAR(length=50), 

529 ForeignKey( 

530 "users.id", ondelete="CASCADE" 

531 ), # name="constr_user", # Removed name 

532 nullable=False, 

533 ) 

534 role_id: Mapped[PermString] = mapped_column( 

535 VARCHAR(length=255), 

536 nullable=True, 

537 ) 

538 

539 user: Mapped["User"] = relationship( 

540 "User", back_populates="role_list", viewonly=True 

541 ) 

542 

543 def __repr__(self): 

544 return f"<UserRole {self.user_id} - {self.role_id}>" 

545 

546 

547def munged_id(sqla_context): 

548 """ 

549 id is a string formed by joining name, org_id and a random string, 

550 e.g. Self Scorer/Thomas Murray#TK94SL 

551 """ 

552 name = sqla_context.current_parameters["name"] 

553 org_id = sqla_context.current_parameters["org_id"] 

554 chars = ascii_uppercase + digits 

555 ran_string = [choice(chars) for c in range(5)] 

556 return "%s/%s#%s" % (name, org_id, "".join(ran_string)) 

557 

558 

559class CustomRole(Base): 

560 __tablename__ = "roles" 

561 

562 __table_args__ = ( 

563 UniqueConstraint("name", "org_id"), # name="role_names", # Removed name 

564 ) 

565 

566 id: Mapped[str] = mapped_column( # type: ignore 

567 VARCHAR(length=255), nullable=False, primary_key=True, default=munged_id 

568 ) 

569 name: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False) 

570 description: Mapped[str] = mapped_column(TEXT(), nullable=False) 

571 type: Mapped[str] = mapped_column( 

572 CHAR(length=1), nullable=False, server_default=text("'U'") 

573 ) 

574 org_id: Mapped[str] = mapped_column( 

575 VARCHAR(length=50), 

576 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="CASCADE"), 

577 nullable=False, 

578 ) 

579 

580 organisation: Mapped["Organisation"] = relationship( 

581 Organisation, back_populates="custom_roles" 

582 ) 

583 permissions: Mapped[list["CustomRolePermission"]] = relationship( 

584 "CustomRolePermission", 

585 back_populates="role", 

586 cascade="all, delete", 

587 passive_deletes=True, 

588 ) 

589 

590 def __repr__(self): 

591 return f"<CustomRole {self.id} - {self.name}>" 

592 

593 

594class CustomRolePermission(Base): 

595 __tablename__ = "role_permissions" 

596 

597 id = None # type: ignore 

598 

599 role_id: Mapped[str] = mapped_column( 

600 VARCHAR(length=255), 

601 ForeignKey( 

602 "roles.id", ondelete="CASCADE" 

603 ), # name="role_reference", # Removed name 

604 nullable=False, 

605 primary_key=True, 

606 ) 

607 

608 permission_id: Mapped[PermString] = mapped_column( 

609 VARCHAR(length=255), nullable=False, primary_key=True 

610 ) 

611 role: Mapped["CustomRole"] = relationship(CustomRole, back_populates="permissions") 

612 

613 def __repr__(self): 

614 return f"{self.permission_id} (Role: {self.role_id})" 

615 

616 @validates("permission_id") 

617 def known_permission(self, key, perm_id: PermString) -> PermString: 

618 if perm_id not in perms.ALL_PERMISSIONS: 

619 raise ValueError('"%s" is not a recognised permission' % perm_id) 

620 return perm_id 

621 

622 

623class OrganisationCategory(Base): 

624 __tablename__ = "org_categories" 

625 

626 title: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False, unique=True) 

627 description: Mapped[Optional[str]] = mapped_column(TEXT(), nullable=True) 

628 

629 def __repr__(self): 

630 return f"<Organisation Category: {self.title}>" 

631 

632 

633class FailedLoginAttempt(Base): 

634 __tablename__ = "failed_login_attempts" 

635 user_id: Mapped[str] = mapped_column( 

636 VARCHAR(length=50), ForeignKey("users.id"), nullable=False 

637 ) 

638 timestamp: Mapped[datetime] = mapped_column(DateTime, index=True, nullable=False) 

639 ip_address: Mapped[str] = mapped_column(VARCHAR(length=50), nullable=False) 

640 

641 def __repr__(self): 

642 return f"<FailedLoginAttempt {self.user_id} at {self.timestamp}>" 

643 

644 

645__all__ = [ 

646 "CustomRolePermission", 

647 "CustomRole", 

648 "UserRole", 

649 "User", 

650 "Organisation", 

651 "ConsultantOrganisation", 

652 "OrganisationType", 

653 "OrganisationCategory", 

654 "FailedLoginAttempt", 

655]