Coverage for postrfp / shared / expression.py: 93%

27 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1from typing import Any 

2import logging 

3 

4from cel import evaluate 

5 

6 

7log = logging.getLogger(__name__) 

8 

9 

10def evaluate_expression(expression: str, context: dict[str, Any]) -> bool: 

11 """ 

12 Evaluate a CEL policy against a given context. 

13 

14 @returns True if the policy evaluates to true, False otherwise (including on error). 

15 """ 

16 if not expression or not expression.strip(): 

17 log.debug("Empty CEL expression, denying by default.") 

18 return False 

19 try: 

20 result = evaluate(expression, context) 

21 return bool(result) 

22 except Exception as e: 

23 log.error(f"Error evaluating policy '{expression}': {e}") 

24 return False 

25 

26 

27def transform_payload( 

28 expression: str, context: dict[str, Any] 

29) -> dict[str, Any] | list[Any]: 

30 """ 

31 Transform an event payload using a CEL expression. 

32 

33 The expression should evaluate to a dict or list that will become the new payload. 

34 If the expression is empty/None, returns the original context unchanged. 

35 If evaluation fails or returns invalid type, returns the original context. 

36 

37 Args: 

38 expression: CEL expression to evaluate (should return dict or list) 

39 context: Original event data to transform 

40 

41 Returns: 

42 Transformed payload (dict or list) or original context on error 

43 """ 

44 if not expression or not expression.strip(): 

45 log.debug("Empty transform expression, returning original payload.") 

46 return context 

47 

48 try: 

49 result = evaluate(expression, context) 

50 

51 # Validate result type - must be dict or list for JSON serialization 

52 if not isinstance(result, (dict, list)): 

53 log.error( 

54 f"Transform expression returned invalid type {type(result).__name__}, " 

55 f"expected dict or list. Returning original payload." 

56 ) 

57 return context 

58 

59 return result 

60 except Exception as e: 

61 log.error(f"Error evaluating transform expression '{expression}': {e}") 

62 return context