Coverage for postrfp/model/project.py: 97%
258 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1from decimal import Decimal
2import logging
3from datetime import datetime
4from typing import Iterable, Union, Optional, TYPE_CHECKING, Iterator
6from sqlalchemy import (
7 Column,
8 Unicode,
9 Boolean,
10 text,
11 func,
12 ForeignKeyConstraint,
13 Integer,
14 ForeignKey,
15 DateTime,
16 Table,
17 UniqueConstraint,
18 Index,
19)
20from sqlalchemy.orm import (
21 Mapped,
22 mapped_column,
23 relationship,
24 joinedload,
25 deferred,
26 selectinload,
27 DynamicMapped,
28)
29from sqlalchemy.types import VARCHAR, TEXT, INTEGER, BOOLEAN
30from sqlalchemy.orm.session import object_session
31from sqlalchemy.orm.exc import NoResultFound
33from postrfp.shared.exceptions import AuthorizationFailure
34from postrfp.shared.fsm_entity import FSMEntity, require_fsm_methods
35from postrfp.model.exc import QuestionnaireStructureException
36from postrfp.model.meta import Base, AttachmentMixin, Visitor
37from postrfp.model.issue import Issue, IssueAttachment
38from postrfp.model.notify import ProjectWatchList
39from postrfp.model.questionnaire.cte_weights import save_total_weights_cte
41from postrfp.model.questionnaire.nodes import (
42 Section,
43 QuestionInstance,
44 QuestionDefinition,
45)
47from postrfp.model import (
48 ProjectPermission,
49 Participant,
50 User,
51 Organisation,
52 CustomRole,
53)
54from postrfp.model.questionnaire.weightings import TotalWeighting, WeightingSet
56if TYPE_CHECKING:
57 from postrfp.model.misc import Category
58 from postrfp.model.questionnaire.nodes import QElement
59 from postrfp.model.notes import ProjectNote
60 from postrfp.model.audit.event import AuditEvent
61 from postrfp.model.questionnaire.answering import AnswerReport
64log = logging.getLogger(__name__)
67class LazyParticipants:
68 """
69 Provides convenient set/collection syntax access for querying
70 participants associated with the given project
71 """
73 def __init__(self, project: "Project") -> None:
74 self.project = project
76 @property
77 def roleq(self):
78 return self.project.participants_query.options(
79 joinedload(Participant.custom_role).load_only(CustomRole.name),
80 joinedload(Participant.organisation).load_only(
81 Organisation.name, Organisation.public
82 ),
83 )
85 def __contains__(self, organisation: Union[str, Organisation]) -> bool:
86 org_id = organisation if isinstance(organisation, str) else organisation.id
87 q = self.project.participants_query.filter_by(org_id=org_id)
88 session = object_session(self.project)
89 if session is None:
90 raise ValueError("Project not associated with a session")
91 return session.query(q.exists()).scalar()
93 def __iter__(self) -> Iterator[Participant]:
94 for p in self.roleq:
95 yield p
97 def __len__(self) -> int:
98 return self.project.participants_query.count()
100 def get(self, org_id: str) -> Participant:
101 return self.project.participants_query.filter_by(org_id=org_id).one()
103 def add(self, participant: Participant) -> None:
104 return self.project.participants_query.append(participant)
106 def clear(self) -> None:
107 self.project.participants_query.delete(synchronize_session="fetch")
110class LazyRestrictedUsers:
111 def __init__(self, project: "Project") -> None:
112 self.project = project
114 def project_perms_query(self):
115 session = object_session(self.project)
116 q = session.query(ProjectPermission)
117 q = (
118 q.join(Participant)
119 .join(Project)
120 .filter(Participant.project == self.project)
121 )
122 return q
124 def __contains__(self, user) -> bool:
125 q = self.project_perms_query().filter(
126 ProjectPermission.user == user,
127 Participant.organisation == user.organisation,
128 )
129 session = object_session(self.project)
130 if session is None:
131 raise ValueError("Project not associated with a session")
132 return session.query(q.exists()).scalar()
134 def __iter__(self):
135 for pp in self.project_perms_query():
136 yield pp
139proj_cat_rel = Table(
140 "project_categories",
141 Base.metadata,
142 Column("id", Integer, primary_key=True),
143 Column(
144 "project_id", Integer, index=True, nullable=False, server_default=text("'0'")
145 ),
146 Column("category_id", Integer, nullable=False, server_default=text("'0'")),
147 ForeignKeyConstraint(
148 ["category_id"],
149 ["categories.id"],
150 # name="project_categories_ibfk_1", # Removed name
151 ondelete="CASCADE",
152 ),
153 ForeignKeyConstraint(
154 ["project_id"],
155 ["projects.id"], # name="project_categories_ibfk_2" # Removed name
156 ),
157 UniqueConstraint("project_id", "category_id"),
158 Index("idx_cat_proj", "category_id", "project_id"),
159)
162@require_fsm_methods
163class Project(Base, FSMEntity):
164 __tablename__ = "projects"
166 base_attrs = (
167 "title,id,status,author_id,description,allow_private_communication,"
168 + "deadline,date_published,deadline,multiscored,url,maximum_score"
169 ).split(",")
171 section_id: Mapped[Optional[int]] = mapped_column(
172 Integer,
173 ForeignKey(
174 "sections.id",
175 use_alter=True,
176 ondelete="SET NULL", # name="fk_prj_sec_id", # Removed name
177 ),
178 nullable=True,
179 )
181 org_id: Mapped[Optional[str]] = mapped_column(
182 VARCHAR(length=50),
183 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="SET NULL"),
184 nullable=True,
185 )
187 author_id: Mapped[Optional[str]] = mapped_column(
188 VARCHAR(length=50), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
189 )
191 date_created: Mapped[datetime] = mapped_column(
192 DateTime, nullable=False, server_default=func.utc_timestamp()
193 )
194 date_published: Mapped[Optional[datetime]] = mapped_column(DateTime)
195 deadline: Mapped[Optional[datetime]] = mapped_column(DateTime)
196 description: Mapped[Optional[str]] = deferred(mapped_column(TEXT(), nullable=True))
197 email: Mapped[Optional[str]] = mapped_column(Unicode(255), nullable=True)
199 library: Mapped[bool] = mapped_column(BOOLEAN(), default=False)
200 maximum_score: Mapped[int] = mapped_column(
201 Integer, default=10, server_default=text("'10'"), nullable=False
202 )
203 multiscored: Mapped[bool] = mapped_column(BOOLEAN(), default=False)
204 normalised_weights: Mapped[bool] = mapped_column(BOOLEAN(), default=False)
206 questionnaire_locked: Mapped[bool] = mapped_column(BOOLEAN(), default=False)
208 template: Mapped[bool] = mapped_column(BOOLEAN(), default=False)
209 title: Mapped[str] = mapped_column(
210 Unicode(200), nullable=False, server_default=text("''")
211 )
213 lock_issues: Mapped[bool] = mapped_column(BOOLEAN(), default=False)
215 default_weighting_set_id: Mapped[Optional[int]] = mapped_column(
216 Integer,
217 ForeignKey("weighting_sets.id", ondelete="SET NULL", use_alter=True),
218 nullable=True,
219 )
221 owner_org: Mapped["Organisation"] = relationship(
222 "Organisation", lazy="joined", uselist=False, back_populates="projects"
223 )
225 author: Mapped["User"] = relationship("User")
227 _issues: DynamicMapped["Issue"] = relationship("Issue", lazy="dynamic")
229 participant_watchers: DynamicMapped["User"] = relationship(
230 "User", secondary="project_watch_list", lazy="dynamic", viewonly=True
231 )
233 questions: DynamicMapped["QuestionInstance"] = relationship(
234 "QuestionInstance",
235 lazy="dynamic",
236 cascade="all, delete-orphan",
237 passive_deletes=True,
238 back_populates="project",
239 overlaps="questions,section",
240 )
241 participants_query: DynamicMapped["Participant"] = relationship(
242 "Participant",
243 lazy="dynamic",
244 back_populates="project",
245 cascade="all, delete",
246 passive_deletes=True,
247 )
249 root_section: Mapped[Optional["Section"]] = relationship(
250 "Section",
251 uselist=False,
252 post_update=True,
253 primaryjoin="foreign(Project.section_id)==remote(Section.id)",
254 )
256 sections: DynamicMapped["Section"] = relationship(
257 "Section",
258 lazy="dynamic",
259 primaryjoin="Project.id==Section.project_id",
260 cascade="all, delete-orphan",
261 )
263 _attachments: DynamicMapped["ProjectAttachment"] = relationship(
264 "ProjectAttachment", lazy="dynamic", order_by="ProjectAttachment.position"
265 )
267 categories: Mapped[list["Category"]] = relationship(
268 "Category", secondary=proj_cat_rel
269 )
271 answer_reports: Mapped[list["AnswerReport"]] = relationship(
272 "AnswerReport",
273 back_populates="project",
274 cascade="all,delete",
275 passive_deletes=True,
276 )
278 weighting_sets: DynamicMapped["WeightingSet"] = relationship(
279 "WeightingSet",
280 lazy="dynamic",
281 back_populates="project",
282 cascade="all,delete",
283 passive_deletes=True,
284 foreign_keys="WeightingSet.project_id", # Specify which foreign key to use
285 )
287 project_fields: Mapped[list["ProjectField"]] = relationship(
288 "ProjectField",
289 back_populates="project",
290 cascade="all,delete,delete-orphan",
291 passive_deletes=True,
292 order_by="ProjectField.position",
293 )
295 approvals: Mapped[list["ProjectApproval"]] = relationship(
296 "ProjectApproval",
297 back_populates="project",
298 cascade="all,delete,delete-orphan",
299 passive_deletes=True,
300 order_by="ProjectApproval.date_approved",
301 )
303 qelements: DynamicMapped["QElement"] = relationship(
304 "QElement",
305 primaryjoin="Project.id==QuestionInstance.project_id",
306 secondaryjoin="QuestionDefinition.id==QElement.question_id",
307 secondary="join(QuestionInstance, QuestionDefinition)",
308 lazy="dynamic",
309 viewonly=True,
310 )
312 all_respondent_watchers: DynamicMapped["User"] = relationship(
313 User,
314 primaryjoin="Project.id==Issue.project_id",
315 secondaryjoin="IssueWatchList.user_id==User.id",
316 secondary="join(Issue, IssueWatchList)",
317 lazy="dynamic",
318 viewonly=True,
319 )
321 watch_list: DynamicMapped["ProjectWatchList"] = relationship(
322 "ProjectWatchList",
323 lazy="dynamic",
324 cascade="all, delete",
325 passive_deletes=True,
326 back_populates="project",
327 )
328 notes_query: DynamicMapped["ProjectNote"] = relationship(
329 "ProjectNote",
330 back_populates="project",
331 cascade="all,delete",
332 passive_deletes=True,
333 lazy="dynamic",
334 )
335 total_weightings: DynamicMapped["TotalWeighting"] = relationship(
336 "TotalWeighting",
337 back_populates="project",
338 cascade="all,delete",
339 passive_deletes=True,
340 lazy="dynamic",
341 )
343 default_weighting_set: Mapped[Optional["WeightingSet"]] = relationship(
344 "WeightingSet",
345 foreign_keys=[default_weighting_set_id],
346 back_populates="default_for_projects",
347 )
348 events: DynamicMapped["AuditEvent"] = relationship(
349 "AuditEvent",
350 back_populates="project",
351 cascade_backrefs=False,
352 lazy="dynamic",
353 primaryjoin=("foreign(AuditEvent.project_id)==Project.id"),
354 )
356 permissions: DynamicMapped["ProjectPermission"] = relationship(
357 "ProjectPermission",
358 secondary="project_org_roles",
359 lazy="dynamic",
360 viewonly=True,
361 )
363 def __init__(self, title: str, *args, **kwargs) -> None:
364 super(Project, self).__init__(*args, **kwargs)
365 self.title = title
367 @property
368 def issues(self) -> list[Issue]:
369 return self._issues.all()
371 def issue_by_id(self, issue_id: int) -> Issue:
372 return self._issues.filter_by(id=issue_id).one()
374 def question_by_number(self, question_number: str) -> "QuestionInstance":
375 return self.questions.filter_by(b36_number=question_number).one()
377 @property
378 def published_issues(self) -> list[Issue]:
379 """Issues visible to respondents"""
380 return self._issues.filter(
381 Issue.status.in_(("Accepted", "Submitted", "Opportunity", "Updateable"))
382 ).all()
384 @property
385 def opportunity_issues(self) -> list[Issue]:
386 """Issues at status Opportunity"""
387 return self._issues.filter(Issue.status.in_(("Opportunity",))).all()
389 @property
390 def respondent_watchers(self) -> Iterable[User]:
391 """
392 A SQLAlchemy query of issue watchers whose issues are visible to Respondents
393 """
394 visible_statuses = ("Accepted", "Submitted", "Opportunity", "Updateable")
395 return self.all_respondent_watchers.filter(Issue.status.in_(visible_statuses))
397 def iter_all_watchers(self) -> Iterable[User]:
398 """
399 Iterator returns all valid respondent or participant Users watching this project
400 """
401 yield from self.respondent_watchers
402 yield from self.participant_watchers
404 @property
405 def deadline_passed(self) -> bool:
406 if not self.deadline:
407 return False
408 return datetime.now() > self.deadline
410 @property
411 def scoreable_issues(self) -> list[Issue]:
412 if not hasattr(self, "_scoreable_issues_cache"):
413 issues = self._issues.filter(Issue.scoreable_filter(self)).all()
414 self._scoreable_issues_cache = issues
416 return self._scoreable_issues_cache
418 def get_issue(self, issue_id) -> Issue:
419 """
420 Fetch the Issue by id.
422 Raises NotFoundError if an Issue with the given ID doesn't belong to this project
423 """
424 return self._issues.filter(Issue.id == issue_id).one()
426 def __repr__(self) -> str:
427 return "<Project %s : %s>" % (self.id, self.title)
429 @property
430 def participants(self) -> LazyParticipants:
431 """A collection which lazily implements Set operations"""
432 return LazyParticipants(self)
434 @property
435 def restricted_users(self) -> LazyRestrictedUsers:
436 return LazyRestrictedUsers(self)
438 def participant_role_permissions(self, user: User) -> set:
439 try:
440 participant = self.participants.get(user.organisation.id)
441 return participant.role_permissions
442 except NoResultFound:
443 return set()
445 def list_attachments(self, user: User) -> list["ProjectAttachment"]:
446 self.check_attachments_visible(user)
447 return self._attachments.all()
449 def get_attachment(self, user: User, attachment_id: int) -> "ProjectAttachment":
450 self.check_attachments_visible(user)
451 return self._attachments.filter(ProjectAttachment.id == attachment_id).one()
453 def list_issue_attachments(self, user: User) -> list[IssueAttachment]:
454 self.check_attachments_visible(user)
455 rows = (
456 self._issues.filter(Issue.scoreable_filter(self))
457 .add_entity(IssueAttachment)
458 .join(IssueAttachment)
459 .order_by(Issue.id)
460 .all()
461 )
463 return [att for _iss, att in rows]
465 def check_attachments_visible(self, user: User) -> None:
466 """
467 Check if user can view attachments for this project
468 @raises AuthorizationFailure
469 """
470 if user.is_restricted:
471 raise AuthorizationFailure("Attachments not visible to restricted users")
473 def get_participant(self, organisation: Organisation) -> Participant:
474 """
475 Fetch the Participant associated with the given organisation
476 for this project
477 @raise NoResultFound if organisation is not a participant
478 """
479 return self.participants.get(organisation.id)
481 def query_visible_questions(self, user: User):
482 """
483 Returns a Question query object, filtered by section permission
484 if the user is restricted
485 """
486 if not user.is_restricted:
487 return self.questions
488 else:
489 return (
490 self.questions.join(QuestionInstance.section)
491 .join(Section.perms)
492 .filter_by(user=user)
493 )
495 def visit_questionnaire(self, *visitors: Visitor) -> "Section":
496 """
497 Run the provided visitor(s) on the root section of this project.
498 """
499 from postrfp.shared.utils import benchmark
501 q = self.sections.options(
502 selectinload(Section.subsections, recursion_depth=10)
503 .defer(Section.description)
504 .defer(Section.title)
505 .subqueryload(Section.questions)
506 .lazyload(QuestionInstance.question_def)
507 .lazyload(QuestionDefinition.elements)
508 )
510 # Should be the first item returned by genexp
511 root = next(s for s in q.order_by(Section.parent_id) if s.parent_id is None)
513 if root.parent_id is not None or root.id != self.section_id:
514 m = f"Failed to find root section for project {self.id}"
515 raise QuestionnaireStructureException(m)
517 for visitor in visitors:
518 with benchmark("visitor %s" % visitor):
519 root.accept(visitor)
520 visitor.finalise()
522 return root
524 def print_questionnaire(self) -> None: # pragma: no cover
525 self.visit_questionnaire(PrintVisitor())
527 def get_or_create_default_weighting_set_id(self) -> int:
528 """
529 Get the default weighting set ID, creating one if it doesn't exist.
530 This method explicitly handles the creation side effect.
531 """
532 if self.default_weighting_set_id is None:
533 # Create a default weighting set
534 session = object_session(self)
535 if session is None:
536 raise RuntimeError(
537 "Cannot create default weighting set: object not attached to session"
538 )
540 default_ws = WeightingSet(project=self, name="default")
541 session.add(default_ws)
542 session.flush() # Get the ID without committing
544 self.default_weighting_set_id = default_ws.id
546 return self.default_weighting_set_id
548 def calculate_normalised_weights(self) -> None:
549 """Calculate normalised weights using the default weighting set"""
550 # Use CTE approach for improved performance
551 self.save_total_weights(
552 weighting_set_id=self.get_or_create_default_weighting_set_id()
553 )
555 def save_total_weights(self, weighting_set_id: Optional[int] = None) -> None:
556 """
557 Calculate and save total weights for the given weighting set.
558 If no weighting_set_id is provided, the default weighting set is used.
559 """
561 sess = object_session(self)
562 if sess is None:
563 raise RuntimeError("Object not associated with a session")
564 save_total_weights_cte(sess, self, weighting_set_id)
566 def delete_total_weights(self, weighting_set_id: Optional[int] = None) -> None:
567 """Delete TotalWeight records for this project for the given weighting set"""
568 self.total_weightings.filter_by(weighting_set_id=weighting_set_id).delete()
570 def total_weights_exist_for(self, weighting_set_id: Optional[int] = None) -> bool:
571 q = self.total_weightings.filter_by(weighting_set_id=weighting_set_id)
572 return q.count() > 0
574 def add_issue(self, issue: Issue) -> None:
575 self._issues.append(issue)
577 def add_watcher(self, user: User) -> bool:
578 """
579 Adds the given user to this watch list. Returns True if successful,
580 False if the user is already watching
581 """
582 if self.watch_list.filter_by(user=user).count() == 0:
583 self.watch_list.append(ProjectWatchList(user=user))
584 return True
585 else:
586 return False
588 def log_approval(self, user: User) -> "ProjectApproval":
589 """
590 Log a ProjectApproval by the given user, assigning status_name
591 as the project_status at the time of approval.
592 """
593 approval = ProjectApproval(user=user, project_status=self.status_name)
594 self.approvals.append(approval)
595 return approval
597 def contains_section_id(self, section_id: int) -> bool:
598 return self.sections.filter_by(id=section_id).count() == 1
600 def get_context_data(self) -> dict:
601 from postrfp.model.fsm_context.project_context import get_project_context_data
603 return get_project_context_data(self)
605 @classmethod
606 def get_context_schema(cls) -> dict:
607 from postrfp.model.fsm_context.project_context import ProjectContext
609 return ProjectContext.model_json_schema()
612class ProjectAttachment(AttachmentMixin, Base):
613 __tablename__ = "project_attachments"
615 public_attrs = ("id,description,size,filename,mimetype,private").split(",")
616 description: Mapped[Optional[str]] = mapped_column(
617 VARCHAR(length=255), nullable=True
618 )
619 project_id: Mapped[Optional[int]] = mapped_column(
620 Integer, ForeignKey("projects.id", ondelete="SET NULL"), nullable=True
621 )
622 date_uploaded: Mapped[datetime] = mapped_column(
623 DateTime, server_default=func.utc_timestamp(), nullable=True
624 )
625 author_id: Mapped[Optional[str]] = mapped_column(
626 VARCHAR(length=150), ForeignKey("users.id", ondelete="SET NULL"), nullable=True
627 )
628 org_id: Mapped[Optional[str]] = mapped_column(
629 VARCHAR(length=150),
630 ForeignKey("organisations.id", onupdate="CASCADE", ondelete="SET NULL"),
631 nullable=True,
632 )
633 private: Mapped[bool] = mapped_column(
634 Boolean, default=False, server_default=text("0"), nullable=True
635 )
636 position: Mapped[int] = mapped_column(
637 Integer, server_default=text("0"), nullable=True
638 )
639 note_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
641 project = relationship(Project, back_populates="_attachments")
642 organisation = relationship("Organisation")
645class ProjectField(Base):
646 __tablename__ = "project_fields"
647 __table_args__ = (
648 UniqueConstraint(
649 "project_id", "key"
650 ), # name="project_field_id_key" # Removed name
651 UniqueConstraint(
652 "project_id", "position"
653 ), # name="project_field_id_position" # Removed name
654 )
655 public_attrs = ["id", "key", "value", "private"]
657 project_id: Mapped[int] = mapped_column(
658 INTEGER, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False
659 )
660 private: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
661 key: Mapped[str] = mapped_column(VARCHAR(length=64), nullable=False)
662 value: Mapped[Optional[str]] = mapped_column(TEXT(), nullable=True)
663 position: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
665 project = relationship("Project", back_populates="project_fields")
667 def __repr__(self) -> str:
668 return (
669 f"<ProjectField#{self.id}: key={self.key}, "
670 f"position={self.position}, project_id={self.project_id}>"
671 )
674class ProjectApproval(Base):
675 __tablename__ = "project_approvals"
676 __table_args__ = (UniqueConstraint("project_id", "user_id", "project_status"),)
678 project_id: Mapped[int] = mapped_column(
679 INTEGER, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False
680 )
681 user_id: Mapped[str] = mapped_column(
682 VARCHAR(length=50), ForeignKey("users.id", ondelete="CASCADE"), nullable=False
683 )
684 date_approved: Mapped[datetime] = mapped_column(
685 DateTime, server_default=func.utc_timestamp(), nullable=False
686 )
687 project_status: Mapped[Optional[str]] = mapped_column(
688 VARCHAR(length=128), nullable=False
689 )
691 project = relationship("Project", back_populates="approvals")
692 user = relationship("User", lazy="selectin")
694 def __repr__(self) -> str:
695 return f"<ProjectApproval#{self.id}: user_id={self.user_id}, project_id={self.project_id}>"
698class PrintVisitor(Visitor): # pragma: no cover
699 """Prints out the questionnaire structure for easier debugging"""
701 def __init__(self) -> None:
702 self.depth = 1
704 @property
705 def inset(self) -> str:
706 return " " * self.depth
708 def hello_section(self, sec: "Section") -> None:
709 print("")
710 fmt = "{inset} {num}: {title} -/{id}/{pid}" + " Abs Weight: {abs_weight:.3f})"
711 # These weight properties are no longer set by visitor pattern
712 # They're now calculated in the database via CTE approach
714 try:
715 absolute_weight = sec.absolute_weight
716 except Exception:
717 absolute_weight = Decimal(0)
719 sec_id = getattr(sec, "id", "<no id set>")
721 out = fmt.format(
722 inset=self.inset,
723 num=sec.number,
724 title=sec.title,
725 abs_weight=absolute_weight,
726 id=sec_id,
727 pid=sec.parent_id,
728 )
729 print(out)
730 self.depth += 1
732 def goodbye_section(self, _sec: "Section") -> None:
733 self.depth -= 1
735 def visit_question(self, q: "QuestionInstance") -> None:
736 try:
737 absolute_weight = q.absolute_weight
738 except Exception:
739 absolute_weight = Decimal(0)
741 m = f"{self.inset} Q: {q.number}, Absolute Weight: {absolute_weight:.4f})"
742 print(m)