Coverage for postrfp/buyer/fsm_endpoints.py: 100%

178 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2Finite State Machine (FSM) API endpoints for status management. 

3 

4This module provides endpoints for: 

51. FSM workflow management 

62. State and transition queries 

7 

8Endpoints follow this structure: 

9- GET /api/fsm/workflows - list all FSM workflows (with optional filters) 

10- GET /api/fsm/workflows/{id} - Get a specific FSM workflow 

11- POST /api/fsm/workflows - Create a new FSM workflow 

12- PUT /api/fsm/workflows/{id} - Update an FSM workflow 

13- DELETE /api/fsm/workflows/{id} - Delete an FSM workflow 

14 

15- GET /api/fsm/workflows/{id}/states - Get all states for an FSM workflow 

16- GET /api/fsm/workflows/{id}/transitions - Get all transitions for an FSM workflow 

17- POST /api/fsm/transition - Execute a transition on an FSM entity 

18""" 

19 

20from typing import Optional, TYPE_CHECKING 

21 

22from sqlalchemy import select, func 

23from sqlalchemy.orm import Session 

24 

25from postrfp.shared.decorators import http 

26from postrfp.shared.pager import Pager 

27from postrfp.authorisation import perms 

28from postrfp.buyer.api import authorise 

29from postrfp.model.helpers import audited_patch, random_string 

30from postrfp.model.audit import AuditEvent, evt_types 

31 

32from postrfp.model.fsm import Workflow, Status, Transition, StatusAction 

33from postrfp.shared.fsm_entity import FSMEntity 

34from postrfp.fsm.schemas import ( 

35 IdSchema, 

36 WorkflowSchema, 

37 StatusSchema, 

38 TransitionSchema, 

39 WorkflowList, 

40 WorkflowSummary, 

41 EntityTypeSchema, 

42 EntityTypeList, 

43 TransitionRequest, 

44 TransitionResult, 

45) 

46from postrfp.fsm.service import ( 

47 get_all_states, 

48 get_all_transitions, 

49 execute_transition, 

50 get_available_transitions, 

51 evaluate_transition, 

52) 

53 

54if TYPE_CHECKING: 

55 from postrfp.model import User 

56 

57 

58@http 

59def get_workflows( 

60 session: Session, 

61 user: "User", 

62 entity: Optional[str] = None, 

63 pager: Optional[Pager] = None, 

64) -> WorkflowList: 

65 """ 

66 Get a list of FSM workflows, optionally filtered by entity type and organisation. 

67 

68 @permissions MANAGE_ORGANISATION 

69 """ 

70 user.check_permission(perms.MANAGE_ORGANISATION) 

71 

72 if pager is None: 

73 pager = Pager(page=1, page_size=50) 

74 

75 q = select(Workflow).where(Workflow.organisation_id == user.org_id) 

76 

77 if entity: 

78 q = q.where(Workflow.entity_type == entity) 

79 

80 q = q.order_by(Workflow.entity_type, Workflow.title) 

81 

82 total_records = ( 

83 session.execute(select(func.count()).select_from(q.subquery())).scalar() or 0 

84 ) 

85 records = ( 

86 session.execute(q.offset(pager.startfrom).limit(pager.page_size)) 

87 .scalars() 

88 .all() 

89 ) 

90 return WorkflowList( 

91 data=[WorkflowSummary.model_validate(record) for record in records], 

92 pagination=pager.as_pagination(total_records, len(records)), 

93 ) 

94 

95 

96@http 

97def get_workflow(session: Session, user: "User", workflow_id: int) -> WorkflowSchema: 

98 """ 

99 Get a specific FSM workflow by ID. 

100 

101 @permissions MANAGE_ORGANISATION 

102 """ 

103 user.check_permission(perms.MANAGE_ORGANISATION) 

104 

105 workflow_def = session.get_one(Workflow, workflow_id) 

106 

107 if workflow_def.organisation_id != user.org_id: 

108 authorise.check(user, perms.MANAGE_USERS, target_org=workflow_def.organisation) 

109 

110 return WorkflowSchema.model_validate(workflow_def, from_attributes=True) 

111 

112 

113def _create_update_statuses( 

114 session: Session, 

115 workflow: Workflow, 

116 status_data_list: list[StatusSchema], 

117 event: AuditEvent, 

118 existing_statuses: Optional[dict[str, Status]] = None, 

119) -> dict[str, Status]: 

120 """ 

121 Helper function to create or update statuses for a workflow. 

122 Returns a mapping of status codes to Status objects. 

123 """ 

124 

125 status_map = {} 

126 for status_data in status_data_list: 

127 status = None 

128 if existing_statuses and status_data.code in existing_statuses: 

129 status = existing_statuses[status_data.code] 

130 

131 if status is None: 

132 status = Status(workflow_id=workflow.id) 

133 

134 audited_patch( 

135 status, 

136 status_data, 

137 event, 

138 ("name", "code", "description"), 

139 prefix="workflow", 

140 ) 

141 

142 # Ensure new statuses have a non-null code; if omitted in input, 

143 # generate one to avoid None keys and DB constraint issues. 

144 if not getattr(status, "code", None): 

145 status.code = random_string() 

146 

147 is_new_status = not status.id 

148 if is_new_status: 

149 session.add(status) 

150 session.flush() # Generate ID and finalize defaults 

151 

152 # Key the map by the actual code on the model (may differ from input) 

153 status_map[status.code] = status 

154 

155 # Synchronise StatusAction entries with provided status_actions (if present) 

156 desired_actions = set(status_data.status_actions or []) 

157 

158 # Use no_autoflush to prevent autoflush when reading current actions 

159 # and when modifying the actions collection 

160 with session.no_autoflush: 

161 current_actions = set(status.status_actions) 

162 if desired_actions == current_actions: 

163 continue 

164 if is_new_status: 

165 # For new statuses, use the original association proxy approach 

166 # which is simpler and doesn't have the race condition issue 

167 status.actions.clear() 

168 status.status_actions.update(desired_actions) 

169 else: 

170 # For existing statuses, we need to carefully manage deletions and insertions 

171 # to avoid unique constraint violations 

172 for action in list(status.actions): 

173 session.delete(action) 

174 

175 # Ensure deletions are committed before insertions to avoid unique constraint violations 

176 session.flush() 

177 

178 # Add the desired actions as new StatusAction objects 

179 for action_name in desired_actions: 

180 new_action = StatusAction(status_id=status.id, action=action_name) 

181 session.add(new_action) 

182 

183 return status_map 

184 

185 

186def _create_update_transitions( 

187 session: Session, 

188 workflow: Workflow, 

189 transition_data_list: list[TransitionSchema], 

190 status_map: dict[str, Status], 

191 event: AuditEvent, 

192 existing_transitions: Optional[dict[str, Transition]] = None, 

193) -> None: 

194 """ 

195 Helper function to create or update transitions for a workflow. 

196 

197 It is assumed that status_map contains all statuses referenced by the 

198 transitions, i.e. the caller has already validated the parameters. 

199 """ 

200 

201 for trans_data in transition_data_list: 

202 transition = None 

203 if existing_transitions and trans_data.name in existing_transitions: 

204 transition = existing_transitions[trans_data.name] 

205 

206 if not transition: 

207 transition = Transition(workflow_id=workflow.id) 

208 

209 source_status = status_map[trans_data.source] 

210 target_status = status_map[trans_data.target] 

211 

212 transition.source_status = source_status 

213 transition.target_status = target_status 

214 

215 audited_patch( 

216 transition, 

217 trans_data, 

218 event, 

219 ("name", "guard_policy"), 

220 prefix="workflow", 

221 ) 

222 

223 event.add_change( 

224 "workflow.transition", "", f"{source_status.code}->{target_status.code}" 

225 ) 

226 

227 if not transition.id: 

228 session.add(transition) 

229 

230 

231@http 

232def post_workflow( 

233 session: Session, user: "User", workflow_doc: WorkflowSchema 

234) -> IdSchema: 

235 """ 

236 Create a new FSM workflow. 

237 

238 The workflow ID should not be set - it will be ignored if provided. 

239 

240 @permissions MANAGE_ORGANISATION 

241 """ 

242 user.check_permission(perms.MANAGE_ORGANISATION) 

243 

244 workflow = Workflow() 

245 workflow.organisation_id = user.org_id 

246 

247 event = AuditEvent.create(session, evt_types.WORKFLOW_CREATED, user=user) 

248 

249 audited_patch( 

250 workflow, 

251 workflow_doc, 

252 event, 

253 ("title", "entity_type", "version", "is_active", "initial_status_code"), 

254 ) 

255 

256 session.add(workflow) 

257 session.flush() # Generate the ID 

258 

259 status_map = _create_update_statuses( 

260 session, workflow, workflow_doc.statuses, event 

261 ) 

262 

263 _create_update_transitions( 

264 session, workflow, workflow_doc.transitions, status_map, event 

265 ) 

266 return IdSchema(id=workflow.id) 

267 

268 

269@http 

270def put_workflow( 

271 session: Session, user: "User", workflow_id: int, workflow_doc: WorkflowSchema 

272) -> None: 

273 """ 

274 Update an existing FSM workflow. 

275 

276 @permissions MANAGE_ORGANISATION 

277 """ 

278 user.check_permission(perms.MANAGE_ORGANISATION) 

279 

280 workflow = session.get_one(Workflow, workflow_id) 

281 

282 if workflow.organisation_id != user.org_id: 

283 authorise.check(user, perms.MANAGE_USERS, target_org=workflow.organisation) 

284 

285 event = AuditEvent.create(session, evt_types.WORKFLOW_UPDATED, user=user) 

286 

287 audited_patch( 

288 workflow, 

289 workflow_doc, 

290 event, 

291 ("title", "entity_type", "version", "is_active", "initial_status_code"), 

292 ) 

293 

294 existing_statuses = {status.code: status for status in workflow.statuses} 

295 status_map = _create_update_statuses( 

296 session, workflow, workflow_doc.statuses, event, existing_statuses 

297 ) 

298 

299 # Use the actual resulting status codes from the update step to decide deletions 

300 status_codes_to_keep = set(status_map.keys()) 

301 for status in list(workflow.statuses): 

302 if status.code not in status_codes_to_keep: 

303 session.delete(status) 

304 event.add_change("workflow.status", status.code, "deleted") 

305 

306 existing_transitions = { 

307 transition.name: transition for transition in workflow.transitions 

308 } 

309 

310 _create_update_transitions( 

311 session, 

312 workflow, 

313 workflow_doc.transitions, 

314 status_map, 

315 event, 

316 existing_transitions, 

317 ) 

318 

319 transition_names_to_keep = { 

320 transition.name for transition in workflow_doc.transitions 

321 } 

322 for transition in list(workflow.transitions): 

323 if transition.name not in transition_names_to_keep: 

324 session.delete(transition) 

325 event.add_change("workflow.transition", transition.name, "deleted") 

326 

327 

328@http 

329def delete_workflow(session: Session, user: "User", workflow_id: int): 

330 """ 

331 Delete an FSM workflow. 

332 

333 @permissions MANAGE_ORGANISATION 

334 """ 

335 user.check_permission(perms.MANAGE_ORGANISATION) 

336 

337 fsm_def = session.get_one(Workflow, workflow_id) 

338 

339 if fsm_def.organisation_id != user.org_id: 

340 authorise.check(user, perms.MANAGE_USERS, target_org=fsm_def.organisation) 

341 

342 session.delete(fsm_def) 

343 session.flush() 

344 

345 

346@http 

347def get_workflow_statuses( 

348 session: Session, user: "User", workflow_id: int 

349) -> list[StatusSchema]: 

350 """ 

351 Get all states for an FSM workflow. 

352 

353 @permissions MANAGE_ORGANISATION 

354 """ 

355 user.check_permission(perms.MANAGE_ORGANISATION) 

356 

357 fsm_def = session.get_one(Workflow, workflow_id) 

358 

359 if fsm_def.organisation_id != user.org_id: 

360 authorise.check(user, perms.MANAGE_USERS, target_org=fsm_def.organisation) 

361 

362 states = get_all_states(session, fsm_def) 

363 return [StatusSchema.model_validate(state) for state in states] 

364 

365 

366@http 

367def get_workflow_transitions( 

368 session: Session, user: "User", workflow_id: int 

369) -> list[TransitionSchema]: 

370 """ 

371 Get all transitions for an FSM workflow. 

372 

373 @permissions MANAGE_ORGANISATION 

374 """ 

375 user.check_permission(perms.MANAGE_ORGANISATION) 

376 

377 fsm_def = session.get_one(Workflow, workflow_id) 

378 

379 if fsm_def.organisation_id != user.org_id: 

380 authorise.check(user, perms.MANAGE_USERS, target_org=fsm_def.organisation) 

381 

382 transitions = get_all_transitions(session, fsm_def) 

383 return [TransitionSchema.model_validate(transition) for transition in transitions] 

384 

385 

386@http 

387def get_workflow_entity_transitions( 

388 session: Session, user: "User", workflow_id: int, entity_id: int 

389) -> list[TransitionSchema]: 

390 """ 

391 List transitions that can be performed by the current user at the current status 

392 for the given workflow 

393 

394 @permissions MANAGE_ORGANISATION 

395 """ 

396 user.check_permission(perms.MANAGE_ORGANISATION) 

397 

398 # Lookup Workflow 

399 workflow = session.get_one(Workflow, workflow_id) 

400 

401 if workflow.organisation_id != user.org_id: 

402 authorise.check(user, perms.MANAGE_USERS, target_org=workflow.organisation) 

403 

404 entity_class = FSMEntity.get_entity_by_name(workflow.entity_type) 

405 assert entity_class is not None 

406 entity: FSMEntity = session.get_one(entity_class, entity_id) 

407 

408 # Validate that the entity is actually using this workflow 

409 if entity.workflow_id != workflow_id: 

410 raise ValueError(f"Entity {entity_id} is not using workflow {workflow_id}") 

411 

412 context_data = entity.get_context_data() 

413 

414 valid_transitions: list[Transition] = [] 

415 for transition in get_available_transitions(session, entity): 

416 tr = evaluate_transition(transition, context_data) 

417 if tr.transition_permitted: 

418 valid_transitions.append(transition) 

419 

420 return [TransitionSchema.model_validate(t) for t in valid_transitions] 

421 

422 

423@http 

424def get_entities(session: Session, user: "User") -> EntityTypeList: 

425 """ 

426 Get a list of all available FSM entity types. 

427 

428 @permissions MANAGE_ORGANISATION 

429 """ 

430 user.check_permission(perms.MANAGE_ORGANISATION) 

431 

432 entity_types = [] 

433 

434 for name, entity_class in FSMEntity.get_registered_entities().items(): 

435 # Get the table name from SQLAlchemy 

436 table_name = getattr(entity_class, "__tablename__", name.lower()) 

437 

438 # Get the context schema from the entity class 

439 try: 

440 context_schema = entity_class.get_context_schema() 

441 except NotImplementedError: 

442 # If not implemented, provide a basic schema 

443 context_schema = {"type": "object", "properties": {}} 

444 

445 entity_type = EntityTypeSchema( 

446 name=name, table_name=table_name, context_schema=context_schema 

447 ) 

448 entity_types.append(entity_type) 

449 

450 # Sort by name for consistent ordering 

451 entity_types.sort(key=lambda x: x.name) 

452 

453 return EntityTypeList(data=entity_types) 

454 

455 

456@http 

457def post_transition( 

458 session: Session, user: "User", transition_doc: TransitionRequest 

459) -> TransitionResult: 

460 """ 

461 Execute a transition on an FSM entity. 

462 

463 This is a generic endpoint that can be used for any FSM entity type, 

464 replacing entity-specific endpoints like put_project_publish. 

465 

466 Permission checking is delegated to the guard function associated with 

467 the transition, which has access to entity context and user information 

468 to perform appropriate authorization checks. 

469 

470 @permissions Delegated to guard function 

471 """ 

472 # Get the entity class from the registry 

473 entity_class = FSMEntity.get_entity_by_name(transition_doc.entity_type) 

474 if entity_class is None: 

475 raise ValueError(f"Unknown entity type: {transition_doc.entity_type}") 

476 

477 # Query for the specific entity instance 

478 entity = session.get_one(entity_class, transition_doc.entity_id) 

479 

480 # Check if entity has a workflow assigned 

481 if not entity.workflow_id or not entity.workflow: 

482 raise ValueError( 

483 f"Entity {transition_doc.entity_type}#{transition_doc.entity_id} has no workflow assigned" 

484 ) 

485 

486 # Permission checking is delegated to the guard function during transition execution 

487 # The guard function will have access to entity context and can perform appropriate 

488 # ownership and permission checks based on the specific entity type 

489 

490 # Execute the transition using the service layer 

491 execute_transition(entity, transition_doc.transition_name) 

492 

493 # If we get here, the transition was successful 

494 return TransitionResult( 

495 transition_permitted=True, 

496 message=f"Transition '{transition_doc.transition_name}' executed successfully", 

497 job_ref=None, 

498 )