Source code for odata_query.grammar

"""
Implementation of a subset of the
`OData formal grammar <https://docs.oasis-open.org/odata/odata/v4.01/csprd06/abnf/odata-abnf-construction-rules.txt>`_ .
Implemented with `SLY <https://sly.readthedocs.io/en/latest/>`_.
"""

import re
from typing import Any, Callable, List, Optional, TypeVar, Union

from sly import Lexer, Parser
from sly.lex import Token

from . import ast, exceptions

RuleDecorator = TypeVar("RuleDecorator", bound=Callable[..., Any])

_RWS = r"\s+"
_INTEGER = r"[+-]?\d+"
_DATE = r"[1-9]\d{3}-(?:0\d|1[0-2])-(?:[0-2]\d|3[01])"
_TIME = r"(?:[01]\d|2[0-3]):[0-5]\d(:?:[0-5]\d(?:\.\d{1,12})?)"

# Defines known functions and min/max nr of args:
ODATA_FUNCTIONS = {
    # String functions
    "concat": 2,
    "contains": 2,
    "endswith": 2,
    "indexof": 2,
    "length": 1,
    "startswith": 2,
    "substring": (2, 3),
    "matchesPattern": 2,
    "tolower": 1,
    "toupper": 1,
    "trim": 1,
    # Datetime functions
    "year": 1,
    "month": 1,
    "day": 1,
    "hour": 1,
    "minute": 1,
    "second": 1,
    "fractionalseconds": 1,
    "totalseconds": 1,
    "date": 1,
    "time": 1,
    "totaloffsetminutes": 1,
    "mindatetime": 0,
    "maxdatetime": 0,
    "now": 0,
    # Math functions
    "round": 1,
    "floor": 1,
    "ceiling": 1,
    # Geo functions
    "geo.distance": 2,
    "geo.length": 1,
    "geo.intersects": 2,
    # Set functions
    "hassubset": 2,
    "hassubsequence": 2,
}


[docs]class ODataLexer(Lexer): tokens = { "ODATA_IDENTIFIER", "NULL", "STRING", "GEOGRAPHY", "GUID", "DATETIME", "DATE", "TIME", "DURATION", "DECIMAL", "INTEGER", "BOOLEAN", "ADD", "SUB", "MUL", "DIV", "MOD", "UMINUS", "AND", "OR", "NOT", "EQ", "NE", "LT", "LE", "GT", "GE", "IN", "ANY", "ALL", "WS", } literals = {"(", ")", ",", "/", ":", "="} reflags = re.I # Ensure MyPy doesn't lose its mind: _: Callable[..., Callable[[RuleDecorator], RuleDecorator]]
[docs] def error(self, token: Token): """ Error handler during tokenization Args: token: The token that failed to tokenize. Raises: TokenizingException """ raise exceptions.TokenizingException(token)
# NOTE: Ordering of tokens is important! Longer tokens preferably first #################################################################################### # Primitive literals #################################################################################### @_( r"duration'[+-]?P(?:\d+Y)?(?:\d+M)?(?:\d+D)?(?:T(?:\d+H)?(?:\d+M)?(?:\d+(?:\.\d+)?S)?)?'" ) def DURATION(self, t): ":meta private:" val = t.value.upper() # Strip the prefix and single quotes: val = val[len("DURATION") + 1 : -1] t.value = ast.Duration(val) return t @_(r"'(?:[^']|'')*'") def STRING(self, t): ":meta private:" # Strings are single-quoted. To represent a single quote within a string, # double it! # Strip only the first and last single quote: val = t.value[1:-1] # Replace double single-quotes with a single quote: val = val.replace("''", "'") t.value = ast.String(val) return t @_(r"geography'(?:[^']|'')*'") def GEOGRAPHY(self, t): ":meta private:" t.value = ast.Geography(t.value[10:-1]) return t @_(r"[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}") def GUID(self, t): ":meta private:" t.value = ast.GUID(t.value) return t @_(_DATE + r"T" + _TIME + r"?(Z|[+-](?:[01]\d|2[0-3]):[0-5]\d)?") def DATETIME(self, t): ":meta private:" t.value = ast.DateTime(t.value) return t @_(_DATE) def DATE(self, t): ":meta private:" t.value = ast.Date(t.value) return t @_(_TIME) def TIME(self, t): ":meta private:" t.value = ast.Time(t.value) return t @_(_INTEGER + r"((?:(?:\.\d+)(?:e[-+]?\d+))|(?:\.\d+)|(?:e[-+]?\d+))") def DECIMAL(self, t): ":meta private:" t.value = ast.Float(t.value) return t @_(_INTEGER) def INTEGER(self, t): ":meta private:" t.value = ast.Integer(t.value) return t @_(r"true|false") def BOOLEAN(self, t): ":meta private:" t.value = ast.Boolean(t.value) return t @_(r"null") def NULL(self, t): ":meta private:" t.value = ast.Null() return t #################################################################################### # Arithmetic #################################################################################### @_(rf"{_RWS}add{_RWS}") def ADD(self, t): ":meta private:" t.value = ast.Add() return t @_(rf"{_RWS}sub{_RWS}") def SUB(self, t): ":meta private:" t.value = ast.Sub() return t @_(rf"{_RWS}mul{_RWS}") def MUL(self, t): ":meta private:" t.value = ast.Mult() return t @_(rf"{_RWS}div{_RWS}") def DIV(self, t): ":meta private:" t.value = ast.Div() return t @_(rf"{_RWS}mod{_RWS}") def MOD(self, t): ":meta private:" t.value = ast.Mod() return t @_(r"-") def UMINUS(self, t): ":meta private:" t.value = ast.USub() return t #################################################################################### # Boolean logic #################################################################################### @_(rf"{_RWS}and{_RWS}") def AND(self, t): ":meta private:" t.value = ast.And() return t @_(rf"{_RWS}or{_RWS}") def OR(self, t): ":meta private:" t.value = ast.Or() return t @_(rf"not{_RWS}") def NOT(self, t): ":meta private:" t.value = ast.Not() return t #################################################################################### # Comparators #################################################################################### @_(rf"{_RWS}eq{_RWS}") def EQ(self, t): ":meta private:" t.value = ast.Eq() return t @_(rf"{_RWS}ne{_RWS}") def NE(self, t): ":meta private:" t.value = ast.NotEq() return t @_(rf"{_RWS}lt{_RWS}") def LT(self, t): ":meta private:" t.value = ast.Lt() return t @_(rf"{_RWS}le{_RWS}") def LE(self, t): ":meta private:" t.value = ast.LtE() return t @_(rf"{_RWS}gt{_RWS}") def GT(self, t): ":meta private:" t.value = ast.Gt() return t @_(rf"{_RWS}ge{_RWS}") def GE(self, t): ":meta private:" t.value = ast.GtE() return t @_(rf"{_RWS}in{_RWS}") def IN(self, t): ":meta private:" t.value = ast.In() return t #################################################################################### # Collection operators #################################################################################### @_(r"any") def ANY(self, t): ":meta private:" t.value = ast.Any() return t @_(r"all") def ALL(self, t): ":meta private:" t.value = ast.All() return t #################################################################################### # Misc #################################################################################### @_(r"[_a-z](?:\.?\w){0,127}") def ODATA_IDENTIFIER(self, t): ":meta private:" *ns, identifier = t.value.split(".") t.value = ast.Identifier(identifier, tuple(ns)) return t WS = _RWS
[docs]class ODataParser(Parser): debugfile = None tokens = ODataLexer.tokens # Predecence from low to high. # See: https://docs.oasis-open.org/odata/odata/v4.0/errata03/os/complete/part2-url-conventions/odata-v4.0-errata03-os-part2-url-conventions-complete.html#_Toc453752358 # OData 5.1.1.14 Operator Precedence precedence = ( ("left", "OR"), ("left", "AND"), ("left", "EQ", "NE"), ("left", "GT", "GE", "LT", "LE"), ("left", "ADD", "SUB"), ("left", "MUL", "DIV", "MOD"), ("right", "NOT", "UMINUS"), ("left", "IN"), ) # Ensure MyPy doesn't lose its mind: _: Callable[..., Callable[[RuleDecorator], RuleDecorator]]
[docs] def error(self, token: Optional[Token]): """ Error handler during parsing. Args: token: The token at which point parsing failed. Raises: ParsingFailedException """ eof = token is None raise exceptions.ParsingException(token, eof)
@_('"(" BWS common_expr BWS ")"') def common_expr(self, p): ":meta private:" return p.common_expr @_("primitive_literal", "first_member_expr", "list_expr") # type:ignore[no-redef] def common_expr(self, p): ":meta private:" return p[0] #################################################################################### # Primitives #################################################################################### @_( "NULL", "INTEGER", "DECIMAL", "STRING", "GEOGRAPHY", "BOOLEAN", "GUID", "DATE", "TIME", "DATETIME", "DURATION", ) def primitive_literal(self, p): ":meta private:" return p[0] @_('common_expr BWS "," BWS common_expr') def list_items(self, p): ":meta private:" return [p[0], p[4]] @_('list_items BWS "," BWS common_expr') # type:ignore[no-redef] def list_items(self, p): ":meta private:" p.list_items.append(p.common_expr) return p.list_items @_('"(" BWS common_expr BWS "," BWS ")"') def list_expr(self, p): ":meta private:" # NOTE: This is NOT according to the OData standard ABNF. # The standard says a single item list is (x), but that collides with the parenExpr # rule, e.g.: concat(('a'), ('b')) could mean concat of lists or concat of str expressions. # Therefore we follow Python syntax: a single item list has a comma at the end. return ast.List([p.common_expr]) @_('"(" BWS list_items BWS ")"') # type:ignore[no-redef] def list_expr(self, p): ":meta private:" return ast.List(p.list_items) #################################################################################### # Names, properties, variables #################################################################################### # NOTE: !WARNING! OData ABNF has a lot of rules with very similar names # in this section. Like 'propertyPath' and 'propertyPathExpr', and their # definitions look almost exactly the same! Stay vigilant! @_("member_expr") def first_member_expr(self, p): ":meta private:" return p.member_expr @_("property_path_expr") def member_expr(self, p): ":meta private:" return p[0] @_("entity_navigation_property") def property_path_expr(self, p): ":meta private:" return p.entity_navigation_property @_("entity_navigation_property single_navigation_expr") # type:ignore[no-redef] def property_path_expr(self, p): ":meta private:" if isinstance(p[1], ast.Attribute): # We want to nest attributes in the opposite direction of parsing, e.g.: # we prefer Attribute(Attribute(Id(A), 'created_by'), 'name') # over Attribute(Id(A), Attribute(Id(created_by), 'name')) naive_attr = ast.Attribute(p[0], p[1]) return self._reverse_attributes(naive_attr) elif isinstance(p[1], ast.CollectionLambda): # Very similar for CollectionLambdas: # We prefer the CollectionLambda to define its complete owner # instead of being a deeply nested attribute: owner: ast.Identifier = p[1].owner new_owner = ast.Attribute(p[0], owner.name) return ast.CollectionLambda(new_owner, p[1].operator, p[1].lambda_) else: return ast.Attribute(p[0], p[1].name) @_("ODATA_IDENTIFIER") def entity_navigation_property(self, p): ":meta private:" return p[0] @_("'/' member_expr") def single_navigation_expr(self, p): ":meta private:" return p.member_expr @_("'/' any_expr", "'/' all_expr") def collection_path_expr(self, p): ":meta private:" return p[1] @_("entity_navigation_property collection_path_expr") # type:ignore[no-redef] def property_path_expr(self, p): ":meta private:" return ast.CollectionLambda(p[0], *p[1]) #################################################################################### # Collections #################################################################################### @_('ODATA_IDENTIFIER BWS ":" BWS common_expr') def lambda_(self, p): ":meta private:" return ast.Lambda(p[0], p.common_expr) @_('ANY "(" BWS lambda_ BWS ")"') def any_expr(self, p): ":meta private:" return (p[0], p.lambda_) @_('ANY "(" BWS ")"') # type:ignore[no-redef] def any_expr(self, p): ":meta private:" return (p[0], None) @_('ALL "(" BWS lambda_ BWS ")"') def all_expr(self, p): ":meta private:" return (p[0], p.lambda_) #################################################################################### # Arithmetic #################################################################################### @_("UMINUS BWS common_expr") # type:ignore[no-redef] def common_expr(self, p): ":meta private:" return ast.UnaryOp(p[0], p[2]) @_( # type:ignore[no-redef] "common_expr ADD common_expr", "common_expr SUB common_expr", "common_expr MUL common_expr", "common_expr DIV common_expr", "common_expr MOD common_expr", ) def common_expr(self, p): ":meta private:" return ast.BinOp(p[1], p[0], p[2]) #################################################################################### # Comparisons #################################################################################### @_( # type:ignore[no-redef] "common_expr EQ common_expr", "common_expr NE common_expr", "common_expr LT common_expr", "common_expr LE common_expr", "common_expr GT common_expr", "common_expr GE common_expr", "common_expr IN list_expr", ) def common_expr(self, p): ":meta private:" return ast.Compare(p[1], p[0], p[2]) #################################################################################### # Boolean logic #################################################################################### @_( # type:ignore[no-redef] "common_expr AND common_expr", "common_expr OR common_expr" ) def common_expr(self, p): ":meta private:" return ast.BoolOp(p[1], p[0], p[2]) @_("NOT common_expr") # type:ignore[no-redef] def common_expr(self, p): ":meta private:" return ast.UnaryOp(p[0], p.common_expr) #################################################################################### # Function calls #################################################################################### def _function_call(self, func: ast.Identifier, args: List[ast._Node]): ":meta private:" func_name = func.full_name() if func.namespace in ((), ("geo",)): try: n_args_exp = ODATA_FUNCTIONS[func_name] except KeyError: raise exceptions.UnknownFunctionException(func_name) n_args_given = len(args) if isinstance(n_args_exp, int) and n_args_given != n_args_exp: raise exceptions.ArgumentCountException( func_name, n_args_exp, n_args_exp, n_args_given ) if isinstance(n_args_exp, tuple) and ( n_args_given < n_args_exp[0] or n_args_given > n_args_exp[1] ): raise exceptions.ArgumentCountException( func_name, n_args_exp[0], n_args_exp[1], n_args_given ) return ast.Call(func, args) @_('ODATA_IDENTIFIER "(" ")"') # type:ignore[no-redef] def common_expr(self, p): ":meta private:" args = [] return self._function_call(p[0], args) @_('ODATA_IDENTIFIER "(" BWS common_expr BWS ")"') # type:ignore[no-redef] def common_expr(self, p): ":meta private:" args = [p.common_expr] return self._function_call(p[0], args) @_("ODATA_IDENTIFIER list_expr") # type:ignore[no-redef] def common_expr(self, p): ":meta private:" args = p[1].val return self._function_call(p[0], args) @_('ODATA_IDENTIFIER "=" common_expr') # type:ignore[no-redef] def named_param(self, p): ":meta private:" return ast.NamedParam(p[0], p.common_expr) @_('named_param BWS "," BWS named_param') def list_named_param(self, p): ":meta private:" return [p[0], p[4]] @_('list_named_param BWS "," BWS named_param') # type:ignore[no-redef] def list_named_param(self, p): ":meta private:" return p.list_items + [p.named_param] @_('ODATA_IDENTIFIER "(" BWS named_param BWS ")"') # type:ignore[no-redef] def common_expr(self, p): ":meta private:" args = [p.named_param] return self._function_call(p[0], args) @_('ODATA_IDENTIFIER "(" BWS list_named_param BWS ")"') # type:ignore[no-redef] def common_expr(self, p): ":meta private:" args = p.list_named_param return self._function_call(p[0], args) #################################################################################### # Misc #################################################################################### @_("") def empty(self, p): ":meta private:" pass @_("WS") def BWS(self, p): """ 'Bad Whitespace' :meta private: """ pass @_("empty") # type:ignore[no-redef] def BWS(self, p): """ 'Bad Whitespace' :meta private: """ pass #################################################################################### # Utils #################################################################################### def _reverse_attributes(self, attr: ast.Attribute) -> ast.Attribute: """ Transforms an attribute like: ``Attribute(Id(A), Attribute(..., 'name'))`` into ``Attribute(Attribute(Id(A), ...), 'name')`` Args: attr: The :class:`Attribute` to reverse. Returns: The transformed attribute. """ exploded = self._explode_attr(attr) leaf_attr = exploded.pop() owner: Union[ast.Identifier, ast.Attribute] = ast.Identifier(exploded.pop(0)) for inter in exploded: owner = ast.Attribute(owner, inter) return ast.Attribute(owner, leaf_attr) def _explode_attr(self, attr: ast.Attribute) -> List[str]: """ Splits a (possibly nested) attribute into a list of all elements, e.g.: ``Attribute(Id(A), Attribute(Id(B), 'name'))`` into ``[A, B, name]`` Args: attr: The :class:`Attribute` to split. Returns: A list of all identifiers in the ``attr`` """ if isinstance(attr.owner, ast.Identifier): exploded = [attr.owner.name] elif isinstance(attr.owner, ast.Attribute): exploded = self._explode_attr(attr.owner) else: raise NotImplementedError() if isinstance(attr.attr, str): exploded.append(attr.attr) elif isinstance(attr.attr, ast.Attribute): exploded.extend(self._explode_attr(attr.attr)) else: raise NotImplementedError return exploded