"""Filter expressions for data reading operations with predicate pushdown.
This module is responsible for translate filter expressions from a simplified SQL syntax
into different formats understood by the various backends. This way the same language can
be used to implement filtering regardless of the data source.
The grammar of the filter statements is the same as in a WHERE clause in SQL. Supported
features:
- Comparing column values to numbers, strings and another column's values using the operators
`> < = != >= <=`, e.g. `a.column < 5`
- Comparison against a set of values with ÌN and `NOT IN`, e.g. `a.column IN (1, 2, 3)`
- Boolean combination of conditions with `AND`, `OR` and `ǸOT`
- `NULL` comparison as in `a IS NULL` or `b IS NOT NULL`
Strings can be quoted with single-quotes and double-quotes.
Column names can but don't have to be quoted with SQL quotes (backticks). E.g.:
```sql
`a.column` = "abc" AND b IS NOT NULL OR index < 50
```
"""
from dataclasses import dataclass
from typing import Any, List, Tuple, Union
import lark
# See SQL 2003 working draft http://www.wiscorp.com/sql20nn.zip (Part 2, section 6.35)
lark_grammar = r"""
?boolean_value_exp: boolean_term
| boolean_value_exp "OR"i boolean_term -> or_operation
?boolean_term: boolean_factor
| boolean_term "AND"i boolean_factor -> and_operation
?boolean_factor: "(" boolean_value_exp ")"
| single_condition
| "NOT" single_condition -> negation
?single_condition: comparison
| null_comparison
| notin_list
| in_list
comparison: columnname BINOP column_rval
null_comparison: columnname (ISNULL | NOTNULL)
notin_list: columnname NOTIN literal_list
in_list: columnname "IN"i literal_list
literal_list: ("(") literal ("," literal)* (")")
?literal: SIGNED_NUMBER | ESCAPED_STRING
?column_rval: columnname
| SIGNED_NUMBER
| ESCAPED_STRING
?columnname: "`" columnname "`"
| UNQUOTED_COLUMNNAME
ISNULL.9: /IS\s+NULL/i
NOTNULL.10: /IS\s+NOT\s+NULL/i
NOTIN.10: /NOT\s+IN/i
NOT.9: "NOT"i
UNQUOTED_COLUMNNAME.1: NAMECHAR+ ("." + NAMECHAR+)*
NAMECHAR: "_"|"$"|LETTER|DIGIT
BINOP.10: "!="
| ">="
| "<="
| "<"
| ">"
| "="
ESCAPED_STRING.2 : DOUBLE_QUOTE_ESCAPED_STRING | SINGLE_QUOTE_ESCAPED_STRING
DOUBLE_QUOTE_ESCAPED_STRING.2 : "\"" _STRING_ESC_INNER "\""
SINGLE_QUOTE_ESCAPED_STRING.2 : "'" _STRING_ESC_INNER "'"
%import common.LETTER
%import common.DIGIT
%import common.SIGNED_NUMBER
%import common._STRING_ESC_INNER
%import common.WS_INLINE
%ignore WS_INLINE
"""
def _make_parser(transformer: lark.Transformer):
return lark.Lark(
lark_grammar,
start="boolean_value_exp",
lexer="standard",
parser="lalr",
transformer=transformer,
)
class _PrefixNotationTransformer(lark.Transformer):
"""lark.Transformer to translate to polish/prefix notation"""
# pylint: disable=missing-function-docstring,no-self-use
def and_operation(self, operands: lark.Token):
return self.format_operation("AND", *operands)
def or_operation(self, operands: lark.Token):
return self.format_operation("OR", *operands)
@lark.v_args(inline=True)
def comparison(self, left: lark.Token, operator: lark.Token, right: lark.Token):
return self.format_operation(operator, left, right)
@lark.v_args(inline=True)
def null_comparison(self, operand: lark.Token, operator: lark.Token):
if operator.type == "ISNULL":
return self.format_operation("ISNULL", operand)
if operator.type == "NOTNULL":
return self.format_operation("NOTNULL", operand)
raise lark.ParseError("Invalid NULL comparison")
@lark.v_args(inline=True)
def negation(self, operand: lark.Token):
return self.format_operation("NOT", operand)
@lark.v_args(inline=True)
def notin_list(self, column: lark.Token, _: lark.Token, lst: lark.Token):
return self.format_operation("NOTIN", column, lst)
@lark.v_args(inline=True)
def in_list(self, column: lark.Token, lst: lark.Token):
return self.format_operation("IN", column, lst)
def literal_list(self, members: lark.Token):
return "[" + ",".join(m for m in members) + "]"
def UNQUOTED_COLUMNNAME(self, name: lark.Token):
return f"Column<{name.value}>"
def ESCAPED_STRING(self, tok: lark.Token):
assert len(tok) > 1 and (tok[0] == tok[-1] == "'" or tok[0] == tok[-1] == '"')
return "'" + tok[1:-1] + "'"
def SIGNED_NUMBER(self, tok: lark.Token):
"""Convert the value of `tok` from string to number"""
try:
return str(int(tok))
except ValueError:
return str(float(tok))
@staticmethod
def format_operation(operator, *operands):
return f"({operator} {' '.join(operands)})"
def to_prefix_notation(statement: str) -> str:DOCS
"""Parse a filter statement and return it in prefix notation.
Args:
statement: A filter predicate as string
Returns:
The filter statement in prefix notation (polish notation) as string
Examples:
>>> to_prefix_notation("a.column != 0")
'(!= Column<a.column> 0)'
>>> to_prefix_notation("a > 1 and b <= 3")
'(AND (> Column<a> 1) (<= Column<b> 3))'
"""
parser = _make_parser(_PrefixNotationTransformer())
return parser.parse(statement)
class _PyarrowDNFTransformer(lark.Transformer):
"""lark.Transformer to translate to pyarrow's special DNF format"""
# pylint: disable=missing-function-docstring,missing-class-docstring,no-self-use
@dataclass
class Column:
name: str
@dataclass
class Or:
operands: list
def format(self) -> List[List[Tuple[str, str, Any]]]:
disjunction = []
for o in self.operands:
if isinstance(o, _PyarrowDNFTransformer.And):
disjunction.append(o.format())
elif isinstance(o, _PyarrowDNFTransformer.Condition):
disjunction.append([o.format()])
else:
raise ValueError(
"For PyArrow only disjunctions of conjunctions "
"or simple conditions are allowed. "
f"`{o}` is not a single condition nor a conjunction."
)
return disjunction
@dataclass
class And:
operands: list
def format(self) -> List[Tuple[str, str, Any]]:
conjunction = []
for o in self.operands:
if isinstance(o, _PyarrowDNFTransformer.Condition):
conjunction.append(o.format())
else:
raise ValueError(
"For PyArrow only conjunctions of simple conditions are allowed."
)
return conjunction
@dataclass
class Condition:
key: str
operator: str
value: Union[str, int, float]
def format(self) -> Tuple[str, str, Any]:
if not isinstance(self.key, _PyarrowDNFTransformer.Column):
raise ValueError(
"For pyarrow only comparisons of the form `key <operator> value`"
"are allowed, where `key` must be a column name."
)
if not isinstance(self.value, (int, float, str, set)):
raise ValueError(
"For pyarrow only comparisons of the form `key <operator> value` are allowed."
)
return self.key.name, self.operator, self.value
def and_operation(self, operands: lark.Token):
return _PyarrowDNFTransformer.And(list(operands))
def or_operation(self, operands: lark.Token):
return _PyarrowDNFTransformer.Or(list(operands))
@lark.v_args(inline=True)
def comparison(self, left: lark.Token, operator: lark.Token, right: lark.Token):
return _PyarrowDNFTransformer.Condition(left, operator.value, right)
@lark.v_args(inline=True)
def null_comparison(self, operand: lark.Token, operator: lark.Token):
if operator.type == "ISNULL":
return _PyarrowDNFTransformer.Condition(operand, "=", "null")
if operator.type == "NOTNULL":
return _PyarrowDNFTransformer.Condition(operand, "!=", "null")
raise lark.ParseError("Invalid NULL comparison")
def negation(self, _):
raise ValueError("Pyarrow doesn't support the `NOT` operator")
@lark.v_args(inline=True)
def notin_list(self, column: lark.Token, _: lark.Token, lst: lark.Token):
return _PyarrowDNFTransformer.Condition(column, "not in", lst)
@lark.v_args(inline=True)
def in_list(self, column: lark.Token, lst: lark.Token):
return _PyarrowDNFTransformer.Condition(column, "in", lst)
def literal_list(self, members: lark.Token):
return set(members)
def UNQUOTED_COLUMNNAME(self, name: lark.Token):
return _PyarrowDNFTransformer.Column(name.value)
def ESCAPED_STRING(self, tok: lark.Token):
assert len(tok) > 1 and (tok[0] == tok[-1] == "'" or tok[0] == tok[-1] == '"')
return tok[1:-1]
def SIGNED_NUMBER(self, tok: lark.Token):
"""Convert the value of `tok` from string to number"""
try:
return int(tok)
except ValueError:
return float(tok)
def _raise_error(e: Exception):
raise e
PyArrowDNFType = Union[
List[List[Tuple[str, str, Any]]], List[Tuple[str, str, Any]], Tuple[str, str, Any]
]
def to_pyarrow_dnf(statement: str) -> PyArrowDNFType:DOCS
"""Convert a filter statement to the disjunctive normal form understood by pyarrow
Predicates are expressed in disjunctive normal form (DNF), like `[[('x', '=', 0), ...], ...]`.
The outer list is understood as chain of disjunctions ("or"), every inner list as a chain
of conjunctions ("and"). The inner lists contain tuples with a single operation
in infix notation each.
More information about the format and its limitations can be found in the
[pyarrow documentation](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html#pyarrow-parquet-read-table).
Args:
statement: A filter predicate as string
Returns:
The filter statement converted to a list of lists of tuples.
Raises:
ValueError: If the statement cannot be parsed
Examples:
>>> to_pyarrow_dnf("a.column != 0")
[[('a.column', '!=', 0)]]
>>> to_pyarrow_dnf("a > 1 and b <= 3")
[[('a', '>', 1), ('b', '<=', 3)]]
>>> to_pyarrow_dnf("a > 1 and b <= 3 or c = 'abc'")
[[('a', '>', 1), ('b', '<=', 3)], [('c', '=', 'abc')]]
""" # noqa: E501 - Flake8 line-to-long: The link above cannot be shortened.
parser = _make_parser(_PyarrowDNFTransformer())
predicate = parser.parse(statement, on_error=_raise_error)
if isinstance(predicate, _PyarrowDNFTransformer.Condition):
return [[predicate.format()]]
if isinstance(predicate, _PyarrowDNFTransformer.And):
return [predicate.format()]
if isinstance(predicate, _PyarrowDNFTransformer.Or):
return predicate.format()
raise ValueError(f"Invalid statement {statement}")
class _PSQLTransformer(lark.Transformer):
"""lark.Transformer to translate the grammar into PostgreSQL"""
# pylint: disable=missing-function-docstring,missing-class-docstring,no-self-use
def and_operation(self, operands: lark.Token):
left, right = tuple(operands)
if "OR" in left:
left = f"({left})"
if "OR" in right:
right = f"({right})"
return f"{left} AND {right}"
def or_operation(self, operands: lark.Token):
left, right = tuple(operands)
return f"{left} OR {right}"
@lark.v_args(inline=True)
def comparison(self, left: lark.Token, operator: lark.Token, right: lark.Token):
op = "<>" if operator.value == "!=" else operator.value
return f"{left} {op} {right}"
@lark.v_args(inline=True)
def null_comparison(self, operand: lark.Token, operator: lark.Token):
if operator.type == "ISNULL":
return f"{operand} IS NULL"
if operator.type == "NOTNULL":
return f"{operand} IS NOT NULL"
raise lark.ParseError("Invalid NULL comparison")
@lark.v_args(inline=True)
def negation(self, expression: lark.Token):
return f"NOT {expression}"
@lark.v_args(inline=True)
def notin_list(self, column: lark.Token, _: lark.Token, lst: lark.Token):
return f"{column} NOT IN {lst}"
@lark.v_args(inline=True)
def in_list(self, column: lark.Token, lst: lark.Token):
return f"{column} IN {lst}"
def literal_list(self, members: lark.Token):
return f"({','.join(list(members))})"
def UNQUOTED_COLUMNNAME(self, name: lark.Token):
return f'"{name.value}"'
def ESCAPED_STRING(self, tok: lark.Token):
assert len(tok) > 1 and (tok[0] == tok[-1] == "'" or tok[0] == tok[-1] == '"')
return f"'{tok[1:-1]}'"
def SIGNED_NUMBER(self, tok: lark.Token):
"""Convert the value of `tok` from string to number"""
try:
return str(int(tok))
except ValueError:
return str(float(tok))
def to_psql(statement: str) -> str:DOCS
"""Convert a filter statement to Postgres SQL syntax
Args:
statement: A filter predicate as string
Returns:
The filter statement converted to psql.
Raises:
ValueError: If the statement cannot be parsed
Examples:
>>> to_psql("a.column != 0")
'"a.column" <> 0'
>>> to_psql("a > 1 and b <= 3")
'"a" > 1 AND "b" <= 3'
>>> to_psql("a > 1 and b <= 3 or c = 'abc'")
'"a" > 1 AND "b" <= 3 OR "c" = \\\'abc\\\''
"""
parser = _make_parser(_PSQLTransformer())
return parser.parse(statement, on_error=_raise_error)