""" This subpackage contains code to parse search queries received from the frontend into trees that can be used by the database backend. """ from __future__ import unicode_literals from re import IGNORECASE, search from sys import maxsize from dateutil.parser import parse as parse_date from .nodes import (String, Regex, Text, Language, Author, Date, Symbol, BinaryOp, UnaryOp) from .tree import Tree from ..languages import LANGS __all__ = ["QueryParseException", "parse_query"] class QueryParseException(Exception): """Raised by parse_query() when a query is invalid.""" pass class _QueryParser(object): """Wrapper class with methods to parse queries. Used as a singleton.""" def __init__(self): self._prefixes = { self._parse_language: ["l", "lang", "language"], self._parse_author: ["a", "author"], self._parse_modified: ["m", "mod", "modified", "modify"], self._parse_created: ["cr", "create", "created"], self._parse_symbol: ["s", "sym", "symb", "symbol"], self._parse_function: ["f", "fn", "fun", "func", "function", "meth", "method"], self._parse_class: ["cl", "class", "clss"], self._parse_variable: ["v", "var", "variable"], self._parse_namespace: ["n", "ns", "namespace", "module"], self._parse_interface: ["in", "inter", "interface", "implements"], self._parse_import: ["im", "imp", "import", "include", "require", "imports", "requires"] } def _scan_query(self, query, markers): """Scan a query (sub)string for the first occurance of some markers. Returns a 2-tuple of (first_marker_found, marker_index). """ def is_escaped(query, index): """Return whether a query marker is backslash-escaped.""" return (index > 0 and query[index - 1] == "\\" and (index < 2 or query[index - 2] != "\\")) best_marker, best_index = None, maxsize for marker in markers: index = query.find(marker) if is_escaped(query, index): _, new_index = self._scan_query(query[index + 1:], marker) index += new_index + 1 if index >= 0 and index < best_index: best_marker, best_index = marker, index return best_marker, best_index def _split_query(self, query, markers, parens=False): """Split a query string into a nested list of query terms. Returns a list of terms and/or nested sublists of terms. Each term and sublist is guarenteed to be non-empty. """ query = query.lstrip() if not query: return [] marker, index = self._scan_query(query, markers) if not marker: return [query] nest = [query[:index]] if index > 0 else [] after = query[index + 1:] if marker == " ": nest += self._split_query(after, markers, parens) elif marker in ('"', "'"): close_marker, close_index = self._scan_query(after, marker) if close_marker: if close_index > 0: nest.append(after[:close_index]) after = after[close_index + 1:] nest += self._split_query(after, markers, parens) elif after: nest.append(after) elif marker == "(": inner, after = self._split_query(after, markers, True), [] if inner and isinstance(inner[-1], tuple): after = self._split_query(inner.pop()[0], markers, parens) if inner: nest.append(inner) if after: nest += after elif marker == ")": if parens: nest.append((after,)) else: nest += self._split_query(after, markers) return nest def _parse_literal(self, literal): """Parse part of a search query into a string or regular expression.""" if literal.startswith(("r:", "re:", "regex:", "regexp:")): arg = literal.split(":", 1)[1] if not arg: err = 'Incomplete query term: "%s"' % literal raise QueryParseException(err) return Regex(arg) return String(literal) def _parse_language(self, term): """Parse part of a query into a language node and return it.""" term = self._parse_literal(term) if isinstance(term, Regex): langs = [i for i, lang in enumerate(LANGS) if search(term.regex, lang, IGNORECASE)] if not langs: err = 'No languages found for regex: "%s"' % term.regex raise QueryParseException(err) node = Language(langs.pop()) while langs: node = BinaryOp(Language(langs.pop()), BinaryOp.OR, node) return node needle = term.string.lower() for i, lang in enumerate(LANGS): if lang.lower() == needle: return Language(i) for i, lang in enumerate(LANGS): if lang.lower().startswith(needle): return Language(i) err = 'No languages found for string: "%s"' % term.string raise QueryParseException(err) def _parse_author(self, term): """Parse part of a query into an author node and return it.""" return Author(self._parse_literal(term)) def _parse_date(self, term, type_): """Parse part of a query into a date node and return it.""" if ":" not in term: err = "A date relationship is required " \ '("before:" or "after:"): "%s"' raise QueryParseException(err % term) relstr, dtstr = term.split(":", 1) if relstr.lower() in ("before", "b"): relation = Date.BEFORE elif relstr.lower() in ("after", "a"): relation = Date.AFTER else: err = 'Bad date relationship (should be "before" or "after"): "%s"' raise QueryParseException(err % relstr) try: dt = parse_date(dtstr) except (TypeError, ValueError): raise QueryParseException('Bad date/time string: "%s"' % dtstr) return Date(type_, relation, dt) def _parse_modified(self, term): """Parse part of a query into a date modified node and return it.""" return self._parse_date(term, Date.MODIFY) def _parse_created(self, term): """Parse part of a query into a date created node and return it.""" return self._parse_date(term, Date.CREATE) def _parse_symbol(self, term, stype=Symbol.ALL): """Parse part of a query into a symbol node and return it.""" defines = ("a:", "assign:", "assignment:", "d:", "def:", "definition:", "decl:", "declare:", "declaration:") uses = ("u:", "use:", "c:", "call:") if term.startswith(defines) or term.startswith(uses): context = Symbol.DEFINE if term.startswith(defines) else Symbol.USE term_part = term.split(":", 1)[1] if not term_part: raise QueryParseException('Incomplete query term: "%s"' % term) term = term_part else: context = Symbol.ALL literal = self._parse_literal(term) if isinstance(literal, String): make_symbol = lambda lit: Symbol(context, stype, String(lit)) symbols = self._split_query(literal.string, " \"'") node = make_symbol(symbols.pop()) while symbols: node = BinaryOp(make_symbol(symbols.pop()), BinaryOp.OR, node) return node return Symbol(context, stype, literal) def _parse_function(self, term): """Parse part of a query into a function node and return it.""" return self._parse_symbol(term, Symbol.FUNCTION) def _parse_class(self, term): """Parse part of a query into a class node and return it.""" return self._parse_symbol(term, Symbol.CLASS) def _parse_variable(self, term): """Parse part of a query into a variable node and return it.""" return self._parse_symbol(term, Symbol.VARIABLE) def _parse_namespace(self, term): """Parse part of a query into a namespace node and return it.""" return self._parse_symbol(term, Symbol.NAMESPACE) def _parse_interface(self, term): """Parse part of a query into a interface node and return it.""" return self._parse_symbol(term, Symbol.INTERFACE) def _parse_import(self, term): """Parse part of a query into a import node and return it.""" return self._parse_symbol(term, Symbol.IMPORT) def _parse_term(self, term): """Parse a query term into a tree node and return it.""" term = term.replace('\\"', '"').replace("\\\\", "\\") if ":" in term and not term[0] == ":": prefix, arg = term.split(":", 1) invert = prefix.lower() == "not" if invert: prefix, arg = arg.split(":", 1) if not arg: raise QueryParseException('Incomplete query term: "%s"' % term) for meth, prefixes in self._prefixes.iteritems(): if prefix.lower() in prefixes: if invert: return UnaryOp(UnaryOp.NOT, meth(arg)) return meth(arg) return Text(self._parse_literal(term)) def _parse_boolean_operators(self, nest): """Parse boolean operators in a nested query list.""" op_lookup = { "and": BinaryOp.AND, "or": BinaryOp.OR, "not": UnaryOp.NOT } for i, term in enumerate(nest): if isinstance(term, list): self._parse_boolean_operators(term) else: nest[i] = op_lookup.get(term.lower(), term) def _parse_nest(self, nest): """Recursively parse a nested list of search query terms.""" def parse_binary_op(op): """Parse a binary operator in a nested query list.""" index = nest.index(op) if index == 0 or index == len(nest) - 1: err = "Invalid query: '%s' given without argument." raise QueryParseException(err % BinaryOp.OPS[op]) left = self._parse_nest(nest[:index]) right = self._parse_nest(nest[index + 1:]) return BinaryOp(left, op, right) if not nest: err = "Error while parsing query: empty nest detected." raise QueryParseException(err) elif BinaryOp.OR in nest: return parse_binary_op(BinaryOp.OR) elif BinaryOp.AND in nest: return parse_binary_op(BinaryOp.AND) elif UnaryOp.NOT in nest: index = nest.index(UnaryOp.NOT) if index == len(nest) - 1: err = "Invalid query: '%s' given without argument." raise QueryParseException(err % UnaryOp.OPS[UnaryOp.NOT]) right = UnaryOp(UnaryOp.NOT, self._parse_nest(nest[index + 1:])) if index > 0: left = self._parse_nest(nest[:index]) return BinaryOp(left, BinaryOp.AND, right) return right elif len(nest) > 1: left, right = self._parse_term(nest[0]), self._parse_nest(nest[1:]) return BinaryOp(left, BinaryOp.AND, right) elif isinstance(nest[0], list): return self._parse_nest(nest[0]) else: return self._parse_term(nest[0]) def _balance_tree(self, node): """Auto-balance a tree using a string sorting function.""" if isinstance(node, BinaryOp): self._balance_tree(node.left) self._balance_tree(node.right) if node.right.sortkey() < node.left.sortkey(): node.left, node.right = node.right, node.left elif isinstance(node, UnaryOp): self._balance_tree(node.node) def parse(self, query): """ Parse a search query. The result is normalized with a sorting function so that ``"foo OR bar"`` and ``"bar OR foo"`` result in the same tree. This is important for caching purposes. :param query: The query be converted. :type query: str :return: A tree storing the data in the query. :rtype: :py:class:`~.query.tree.Tree` :raises: :py:class:`.QueryParseException` """ nest = self._split_query(query.rstrip(), " \"'()") if not nest: raise QueryParseException('Empty query: "%s"' % query) self._parse_boolean_operators(nest) root = self._parse_nest(nest) self._balance_tree(root) return Tree(root) parse_query = _QueryParser().parse