Coverage for postrfp/buyer/fsm_endpoints.py: 100%
178 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2Finite State Machine (FSM) API endpoints for status management.
4This module provides endpoints for:
51. FSM workflow management
62. State and transition queries
8Endpoints follow this structure:
9- GET /api/fsm/workflows - list all FSM workflows (with optional filters)
10- GET /api/fsm/workflows/{id} - Get a specific FSM workflow
11- POST /api/fsm/workflows - Create a new FSM workflow
12- PUT /api/fsm/workflows/{id} - Update an FSM workflow
13- DELETE /api/fsm/workflows/{id} - Delete an FSM workflow
15- GET /api/fsm/workflows/{id}/states - Get all states for an FSM workflow
16- GET /api/fsm/workflows/{id}/transitions - Get all transitions for an FSM workflow
17- POST /api/fsm/transition - Execute a transition on an FSM entity
18"""
20from typing import Optional, TYPE_CHECKING
22from sqlalchemy import select, func
23from sqlalchemy.orm import Session
25from postrfp.shared.decorators import http
26from postrfp.shared.pager import Pager
27from postrfp.authorisation import perms
28from postrfp.buyer.api import authorise
29from postrfp.model.helpers import audited_patch, random_string
30from postrfp.model.audit import AuditEvent, evt_types
32from postrfp.model.fsm import Workflow, Status, Transition, StatusAction
33from postrfp.shared.fsm_entity import FSMEntity
34from postrfp.fsm.schemas import (
35 IdSchema,
36 WorkflowSchema,
37 StatusSchema,
38 TransitionSchema,
39 WorkflowList,
40 WorkflowSummary,
41 EntityTypeSchema,
42 EntityTypeList,
43 TransitionRequest,
44 TransitionResult,
45)
46from postrfp.fsm.service import (
47 get_all_states,
48 get_all_transitions,
49 execute_transition,
50 get_available_transitions,
51 evaluate_transition,
52)
54if TYPE_CHECKING:
55 from postrfp.model import User
58@http
59def get_workflows(
60 session: Session,
61 user: "User",
62 entity: Optional[str] = None,
63 pager: Optional[Pager] = None,
64) -> WorkflowList:
65 """
66 Get a list of FSM workflows, optionally filtered by entity type and organisation.
68 @permissions MANAGE_ORGANISATION
69 """
70 user.check_permission(perms.MANAGE_ORGANISATION)
72 if pager is None:
73 pager = Pager(page=1, page_size=50)
75 q = select(Workflow).where(Workflow.organisation_id == user.org_id)
77 if entity:
78 q = q.where(Workflow.entity_type == entity)
80 q = q.order_by(Workflow.entity_type, Workflow.title)
82 total_records = (
83 session.execute(select(func.count()).select_from(q.subquery())).scalar() or 0
84 )
85 records = (
86 session.execute(q.offset(pager.startfrom).limit(pager.page_size))
87 .scalars()
88 .all()
89 )
90 return WorkflowList(
91 data=[WorkflowSummary.model_validate(record) for record in records],
92 pagination=pager.as_pagination(total_records, len(records)),
93 )
96@http
97def get_workflow(session: Session, user: "User", workflow_id: int) -> WorkflowSchema:
98 """
99 Get a specific FSM workflow by ID.
101 @permissions MANAGE_ORGANISATION
102 """
103 user.check_permission(perms.MANAGE_ORGANISATION)
105 workflow_def = session.get_one(Workflow, workflow_id)
107 if workflow_def.organisation_id != user.org_id:
108 authorise.check(user, perms.MANAGE_USERS, target_org=workflow_def.organisation)
110 return WorkflowSchema.model_validate(workflow_def, from_attributes=True)
113def _create_update_statuses(
114 session: Session,
115 workflow: Workflow,
116 status_data_list: list[StatusSchema],
117 event: AuditEvent,
118 existing_statuses: Optional[dict[str, Status]] = None,
119) -> dict[str, Status]:
120 """
121 Helper function to create or update statuses for a workflow.
122 Returns a mapping of status codes to Status objects.
123 """
125 status_map = {}
126 for status_data in status_data_list:
127 status = None
128 if existing_statuses and status_data.code in existing_statuses:
129 status = existing_statuses[status_data.code]
131 if status is None:
132 status = Status(workflow_id=workflow.id)
134 audited_patch(
135 status,
136 status_data,
137 event,
138 ("name", "code", "description"),
139 prefix="workflow",
140 )
142 # Ensure new statuses have a non-null code; if omitted in input,
143 # generate one to avoid None keys and DB constraint issues.
144 if not getattr(status, "code", None):
145 status.code = random_string()
147 is_new_status = not status.id
148 if is_new_status:
149 session.add(status)
150 session.flush() # Generate ID and finalize defaults
152 # Key the map by the actual code on the model (may differ from input)
153 status_map[status.code] = status
155 # Synchronise StatusAction entries with provided status_actions (if present)
156 desired_actions = set(status_data.status_actions or [])
158 # Use no_autoflush to prevent autoflush when reading current actions
159 # and when modifying the actions collection
160 with session.no_autoflush:
161 current_actions = set(status.status_actions)
162 if desired_actions == current_actions:
163 continue
164 if is_new_status:
165 # For new statuses, use the original association proxy approach
166 # which is simpler and doesn't have the race condition issue
167 status.actions.clear()
168 status.status_actions.update(desired_actions)
169 else:
170 # For existing statuses, we need to carefully manage deletions and insertions
171 # to avoid unique constraint violations
172 for action in list(status.actions):
173 session.delete(action)
175 # Ensure deletions are committed before insertions to avoid unique constraint violations
176 session.flush()
178 # Add the desired actions as new StatusAction objects
179 for action_name in desired_actions:
180 new_action = StatusAction(status_id=status.id, action=action_name)
181 session.add(new_action)
183 return status_map
186def _create_update_transitions(
187 session: Session,
188 workflow: Workflow,
189 transition_data_list: list[TransitionSchema],
190 status_map: dict[str, Status],
191 event: AuditEvent,
192 existing_transitions: Optional[dict[str, Transition]] = None,
193) -> None:
194 """
195 Helper function to create or update transitions for a workflow.
197 It is assumed that status_map contains all statuses referenced by the
198 transitions, i.e. the caller has already validated the parameters.
199 """
201 for trans_data in transition_data_list:
202 transition = None
203 if existing_transitions and trans_data.name in existing_transitions:
204 transition = existing_transitions[trans_data.name]
206 if not transition:
207 transition = Transition(workflow_id=workflow.id)
209 source_status = status_map[trans_data.source]
210 target_status = status_map[trans_data.target]
212 transition.source_status = source_status
213 transition.target_status = target_status
215 audited_patch(
216 transition,
217 trans_data,
218 event,
219 ("name", "guard_policy"),
220 prefix="workflow",
221 )
223 event.add_change(
224 "workflow.transition", "", f"{source_status.code}->{target_status.code}"
225 )
227 if not transition.id:
228 session.add(transition)
231@http
232def post_workflow(
233 session: Session, user: "User", workflow_doc: WorkflowSchema
234) -> IdSchema:
235 """
236 Create a new FSM workflow.
238 The workflow ID should not be set - it will be ignored if provided.
240 @permissions MANAGE_ORGANISATION
241 """
242 user.check_permission(perms.MANAGE_ORGANISATION)
244 workflow = Workflow()
245 workflow.organisation_id = user.org_id
247 event = AuditEvent.create(session, evt_types.WORKFLOW_CREATED, user=user)
249 audited_patch(
250 workflow,
251 workflow_doc,
252 event,
253 ("title", "entity_type", "version", "is_active", "initial_status_code"),
254 )
256 session.add(workflow)
257 session.flush() # Generate the ID
259 status_map = _create_update_statuses(
260 session, workflow, workflow_doc.statuses, event
261 )
263 _create_update_transitions(
264 session, workflow, workflow_doc.transitions, status_map, event
265 )
266 return IdSchema(id=workflow.id)
269@http
270def put_workflow(
271 session: Session, user: "User", workflow_id: int, workflow_doc: WorkflowSchema
272) -> None:
273 """
274 Update an existing FSM workflow.
276 @permissions MANAGE_ORGANISATION
277 """
278 user.check_permission(perms.MANAGE_ORGANISATION)
280 workflow = session.get_one(Workflow, workflow_id)
282 if workflow.organisation_id != user.org_id:
283 authorise.check(user, perms.MANAGE_USERS, target_org=workflow.organisation)
285 event = AuditEvent.create(session, evt_types.WORKFLOW_UPDATED, user=user)
287 audited_patch(
288 workflow,
289 workflow_doc,
290 event,
291 ("title", "entity_type", "version", "is_active", "initial_status_code"),
292 )
294 existing_statuses = {status.code: status for status in workflow.statuses}
295 status_map = _create_update_statuses(
296 session, workflow, workflow_doc.statuses, event, existing_statuses
297 )
299 # Use the actual resulting status codes from the update step to decide deletions
300 status_codes_to_keep = set(status_map.keys())
301 for status in list(workflow.statuses):
302 if status.code not in status_codes_to_keep:
303 session.delete(status)
304 event.add_change("workflow.status", status.code, "deleted")
306 existing_transitions = {
307 transition.name: transition for transition in workflow.transitions
308 }
310 _create_update_transitions(
311 session,
312 workflow,
313 workflow_doc.transitions,
314 status_map,
315 event,
316 existing_transitions,
317 )
319 transition_names_to_keep = {
320 transition.name for transition in workflow_doc.transitions
321 }
322 for transition in list(workflow.transitions):
323 if transition.name not in transition_names_to_keep:
324 session.delete(transition)
325 event.add_change("workflow.transition", transition.name, "deleted")
328@http
329def delete_workflow(session: Session, user: "User", workflow_id: int):
330 """
331 Delete an FSM workflow.
333 @permissions MANAGE_ORGANISATION
334 """
335 user.check_permission(perms.MANAGE_ORGANISATION)
337 fsm_def = session.get_one(Workflow, workflow_id)
339 if fsm_def.organisation_id != user.org_id:
340 authorise.check(user, perms.MANAGE_USERS, target_org=fsm_def.organisation)
342 session.delete(fsm_def)
343 session.flush()
346@http
347def get_workflow_statuses(
348 session: Session, user: "User", workflow_id: int
349) -> list[StatusSchema]:
350 """
351 Get all states for an FSM workflow.
353 @permissions MANAGE_ORGANISATION
354 """
355 user.check_permission(perms.MANAGE_ORGANISATION)
357 fsm_def = session.get_one(Workflow, workflow_id)
359 if fsm_def.organisation_id != user.org_id:
360 authorise.check(user, perms.MANAGE_USERS, target_org=fsm_def.organisation)
362 states = get_all_states(session, fsm_def)
363 return [StatusSchema.model_validate(state) for state in states]
366@http
367def get_workflow_transitions(
368 session: Session, user: "User", workflow_id: int
369) -> list[TransitionSchema]:
370 """
371 Get all transitions for an FSM workflow.
373 @permissions MANAGE_ORGANISATION
374 """
375 user.check_permission(perms.MANAGE_ORGANISATION)
377 fsm_def = session.get_one(Workflow, workflow_id)
379 if fsm_def.organisation_id != user.org_id:
380 authorise.check(user, perms.MANAGE_USERS, target_org=fsm_def.organisation)
382 transitions = get_all_transitions(session, fsm_def)
383 return [TransitionSchema.model_validate(transition) for transition in transitions]
386@http
387def get_workflow_entity_transitions(
388 session: Session, user: "User", workflow_id: int, entity_id: int
389) -> list[TransitionSchema]:
390 """
391 List transitions that can be performed by the current user at the current status
392 for the given workflow
394 @permissions MANAGE_ORGANISATION
395 """
396 user.check_permission(perms.MANAGE_ORGANISATION)
398 # Lookup Workflow
399 workflow = session.get_one(Workflow, workflow_id)
401 if workflow.organisation_id != user.org_id:
402 authorise.check(user, perms.MANAGE_USERS, target_org=workflow.organisation)
404 entity_class = FSMEntity.get_entity_by_name(workflow.entity_type)
405 assert entity_class is not None
406 entity: FSMEntity = session.get_one(entity_class, entity_id)
408 # Validate that the entity is actually using this workflow
409 if entity.workflow_id != workflow_id:
410 raise ValueError(f"Entity {entity_id} is not using workflow {workflow_id}")
412 context_data = entity.get_context_data()
414 valid_transitions: list[Transition] = []
415 for transition in get_available_transitions(session, entity):
416 tr = evaluate_transition(transition, context_data)
417 if tr.transition_permitted:
418 valid_transitions.append(transition)
420 return [TransitionSchema.model_validate(t) for t in valid_transitions]
423@http
424def get_entities(session: Session, user: "User") -> EntityTypeList:
425 """
426 Get a list of all available FSM entity types.
428 @permissions MANAGE_ORGANISATION
429 """
430 user.check_permission(perms.MANAGE_ORGANISATION)
432 entity_types = []
434 for name, entity_class in FSMEntity.get_registered_entities().items():
435 # Get the table name from SQLAlchemy
436 table_name = getattr(entity_class, "__tablename__", name.lower())
438 # Get the context schema from the entity class
439 try:
440 context_schema = entity_class.get_context_schema()
441 except NotImplementedError:
442 # If not implemented, provide a basic schema
443 context_schema = {"type": "object", "properties": {}}
445 entity_type = EntityTypeSchema(
446 name=name, table_name=table_name, context_schema=context_schema
447 )
448 entity_types.append(entity_type)
450 # Sort by name for consistent ordering
451 entity_types.sort(key=lambda x: x.name)
453 return EntityTypeList(data=entity_types)
456@http
457def post_transition(
458 session: Session, user: "User", transition_doc: TransitionRequest
459) -> TransitionResult:
460 """
461 Execute a transition on an FSM entity.
463 This is a generic endpoint that can be used for any FSM entity type,
464 replacing entity-specific endpoints like put_project_publish.
466 Permission checking is delegated to the guard function associated with
467 the transition, which has access to entity context and user information
468 to perform appropriate authorization checks.
470 @permissions Delegated to guard function
471 """
472 # Get the entity class from the registry
473 entity_class = FSMEntity.get_entity_by_name(transition_doc.entity_type)
474 if entity_class is None:
475 raise ValueError(f"Unknown entity type: {transition_doc.entity_type}")
477 # Query for the specific entity instance
478 entity = session.get_one(entity_class, transition_doc.entity_id)
480 # Check if entity has a workflow assigned
481 if not entity.workflow_id or not entity.workflow:
482 raise ValueError(
483 f"Entity {transition_doc.entity_type}#{transition_doc.entity_id} has no workflow assigned"
484 )
486 # Permission checking is delegated to the guard function during transition execution
487 # The guard function will have access to entity context and can perform appropriate
488 # ownership and permission checks based on the specific entity type
490 # Execute the transition using the service layer
491 execute_transition(entity, transition_doc.transition_name)
493 # If we get here, the transition was successful
494 return TransitionResult(
495 transition_permitted=True,
496 message=f"Transition '{transition_doc.transition_name}' executed successfully",
497 job_ref=None,
498 )