Source code for ctparse.ctparse

import logging
from datetime import datetime
from typing import (
    cast,
    Callable,
    Dict,
    Iterator,
    List,
    Optional,
    Sequence,
    Tuple,
    Union,
)

import regex

from ctparse.partial_parse import PartialParse
from ctparse.rule import _regex as global_regex
from ctparse.scorer import Scorer
from ctparse.timers import CTParseTimeoutError, timeit, timeout as timeout_
from ctparse.time.postprocess_latent import apply_postprocessing_rules
from ctparse.types import Artifact, RegexMatch
from ctparse.loader import load_default_scorer

logger = logging.getLogger(__name__)

_DEFAULT_SCORER = load_default_scorer()


[docs]class CTParse: def __init__( self, resolution: Artifact, production: Tuple[Union[int, str], ...], score: float, ) -> None: """A possible parse returned by ctparse. :param resolution: the parsed `Time`, `Interval` or `Duration` :param production: the sequence of rules (productions) used to arrive at the parse :param score: a numerical score used to rank parses. A high score means a more likely parse """ self.resolution = resolution self.production = production self.score = score def __repr__(self) -> str: return "CTParse({}, {}, {})".format( self.resolution, self.production, self.score ) def __str__(self) -> str: return "{} s={:.3f} p={}".format(self.resolution, self.score, self.production)
[docs]def ctparse( txt: str, ts: Optional[datetime] = None, timeout: Union[int, float] = 1.0, debug: bool = False, relative_match_len: float = 1.0, max_stack_depth: int = 10, scorer: Optional[Scorer] = None, latent_time: bool = True, ) -> Optional[CTParse]: """Parse a string *txt* into a time expression :param ts: reference time :type ts: datetime.datetime :param timeout: timeout for parsing in seconds; timeout=0 indicates no timeout :type timeout: float :param debug: if True do return iterator over all resolution, else return highest scoring one (default=False) :param relative_match_len: relative minimum share of characters an initial regex match sequence must cover compared to the longest such sequence found to be considered for productions (default=1.0) :type relative_match_len: float :param max_stack_depth: limit the maximal number of highest scored candidate productions considered for future productions (default=10); set to 0 to not limit :type max_stack_depth: int :param latent_time: if True, resolve expressions that contain only a time (e.g. 8:00 pm) to be the next matching time after reference time *ts* :returns: Optional[CTParse] """ parsed = ctparse_gen( txt, ts, timeout=timeout, relative_match_len=relative_match_len, max_stack_depth=max_stack_depth, scorer=scorer, latent_time=latent_time, ) # TODO: keep debug for back-compatibility, but remove it later if debug: return parsed # type: ignore else: parsed_list = list(parsed) # TODO: this way of testing a failure to find a match is a bit clunky with types if len(parsed_list) == 0 or (len(parsed_list) == 1 and parsed_list[0] is None): logger.warning('Failed to produce result for "{}"'.format(txt)) return None parsed_list.sort(key=lambda p: p.score) # type: ignore return parsed_list[-1]
[docs]def ctparse_gen( txt: str, ts: Optional[datetime] = None, timeout: Union[int, float] = 1.0, relative_match_len: float = 1.0, max_stack_depth: int = 10, scorer: Optional[Scorer] = None, latent_time: bool = True, ) -> Iterator[Optional[CTParse]]: """Generate parses for the string *txt*. This function is equivalent to ctparse, with the exception that it returns an iterator over the matches as soon as they are produced. """ if scorer is None: scorer = _DEFAULT_SCORER if ts is None: ts = datetime.now() for parse in _ctparse( _preprocess_string(txt), ts, timeout=timeout, relative_match_len=relative_match_len, max_stack_depth=max_stack_depth, scorer=scorer, ): if parse and latent_time: # NOTE: we post-process after scoring because the model has been trained # without using the latent time. This means also that the post processing # step won't be added to the rules prod = apply_postprocessing_rules(ts, parse.resolution) parse.resolution = prod yield parse
def _ctparse( txt: str, ts: datetime, timeout: float, relative_match_len: float, max_stack_depth: int, scorer: Scorer, ) -> Iterator[Optional[CTParse]]: t_fun = timeout_(timeout) try: logger.debug("=" * 80) logger.debug("-> matching regular expressions") p, _tp = timeit(_match_regex)(txt, global_regex) logger.debug("time in _match_regex: {:.0f}ms".format(1000 * _tp)) logger.debug("=" * 80) logger.debug("-> building initial stack") regex_stack, _ts = timeit(_regex_stack)(txt, p, t_fun) logger.debug("time in _regex_stack: {:.0f}ms".format(1000 * _ts)) # add empty production path + counter of contained regex stack = [PartialParse.from_regex_matches(s) for s in regex_stack] # TODO: the score should be kept separate from the partial parse # because it depends also on the text and the ts. A good idea is # to create a namedtuple of kind StackElement(partial_parse, score) for pp in stack: pp.score = scorer.score(txt, ts, pp) logger.debug("initial stack length: {}".format(len(stack))) # sort stack by length of covered string and - if that is equal - score # --> last element is longest coverage and highest scored stack.sort() # only keep initial stack elements that cover at least # relative_match_len characters of what the highest # scored/covering stack element does cover stack = [ s for s in stack if s.max_covered_chars >= stack[-1].max_covered_chars * relative_match_len ] logger.debug("stack length after relative match length: {}".format(len(stack))) # limit depth of stack stack = stack[-max_stack_depth:] logger.debug("stack length after max stack depth limit: {}".format(len(stack))) # track what has been added to the stack and do not add again # if the score is not better stack_prod = {} # type: Dict[Tuple[Artifact, ...], float] # track what has been emitted and do not emit again parse_prod = {} # type: Dict[Artifact, float] while stack: t_fun() s = stack.pop() logger.debug("-" * 80) logger.debug("producing on {}, score={:.2f}".format(s.prod, s.score)) new_stack_elements = [] for r_name, r in s.applicable_rules.items(): for r_match in _match_rule(s.prod, r[1]): # apply production part of rule new_s = s.apply_rule(ts, r[0], r_name, r_match) # TODO: We should store scores separately from the production itself # because the score may depend on the text and the ts if new_s is not None: new_s.score = scorer.score(txt, ts, new_s) if ( new_s and stack_prod.get(new_s.prod, new_s.score - 1) < new_s.score ): # either new_s.prod has never been produced # before or the score of new_s is higher than # a previous identical production new_stack_elements.append(new_s) logger.debug( " {} -> {}, score={:.2f}".format( r_name, new_s.prod, new_s.score ) ) stack_prod[new_s.prod] = new_s.score if not new_stack_elements: logger.debug("~" * 80) logger.debug("no rules applicable: emitting") # no new productions were generated from this stack element. # emit all (probably partial) production for x in s.prod: if not isinstance(x, RegexMatch): # TODO: why do we have a different method for scoring # final productions? This is because you may have non-reducible # parses of the kind [Time, RegexMatch, Interval] or # [Time, Time] etc. In this case we want to emit those Time, # Interval parses separately and score them appropriately # (the default Scorer.score function only operates on the # whole PartialParse). score_x = scorer.score_final(txt, ts, s, x) # only emit productions not emitted before or # productions emitted before but scored higher if parse_prod.get(x, score_x - 1) < score_x: parse_prod[x] = score_x logger.debug( " => {}, score={:.2f}, ".format(x.__repr__(), score_x) ) yield CTParse(x, s.rules, score_x) else: # new productions generated, put on stack and sort # stack by highst score stack.extend(new_stack_elements) stack.sort() stack = stack[-max_stack_depth:] logger.debug( "added {} new stack elements, depth after trunc: {}".format( len(new_stack_elements), len(stack) ) ) except CTParseTimeoutError: logger.debug('Timeout on "{}"'.format(txt)) return # replace all comma, semicolon, whitespace, invisible control, opening and # closing brackets _repl1 = regex.compile(r"[,;\pZ\pC\p{Ps}\p{Pe}]+", regex.VERSION1) _repl2 = regex.compile(r"(\p{Pd}|[\u2010-\u2015]|\u2043)+", regex.VERSION1) def _preprocess_string(txt: str) -> str: return cast( str, _repl2.sub("-", _repl1.sub(" ", txt, concurrent=True).strip()).strip() ) def _match_rule( seq: Sequence[Artifact], rule: Sequence[Callable[[Artifact], bool]] ) -> Iterator[Tuple[int, int]]: if not seq: return if not rule: return i_r = 0 i_s = 0 r_len = len(rule) s_len = len(seq) while i_s < s_len: if rule[0](seq[i_s]): i_start = i_s + 1 i_r = 1 while i_start < s_len and i_r < r_len and rule[i_r](seq[i_start]): i_r += 1 i_start += 1 if i_r == r_len: yield i_s, i_start i_s += 1 def _match_regex(txt: str, regexes: Dict[int, regex.Regex]) -> List[RegexMatch]: # Match a collection of regexes in *txt* # # The returned RegexMatch objects are sorted by the start of the match # :param txt: the text to match against # :param regexes: a collection of regexes name->pattern # :return: a list of RegexMatch objects ordered my RegexMatch.mstart matches = { RegexMatch(name, m) for name, re in regexes.items() for m in re.finditer(txt, overlapped=True, concurrent=True) } for m in matches: logger.debug("regex: {}".format(m.__repr__())) return sorted(matches, key=lambda x: (x.mstart, x.mend)) def _regex_stack( txt: str, regex_matches: List[RegexMatch], on_do_iter: Callable[[], None] = lambda: None, ) -> List[Tuple[RegexMatch, ...]]: # Group contiguous RegexMatch objects together. # # Assumes that regex_matches are sorted by increasing start index. on_do_iter # is a callback that will be invoked every time the algorithm performs a loop. # # Example: # Say you have the following text, where the regex matches are the # words between square brackets. # # [Tomorrow] I want to go to the movies between [2] [pm] and [5] [pm]. # # This function will return the matches that are contiguous (excluding space # characters) # [Tomorrow] # [2], [pm] # [5], [pm] # # This also works with overlapping matches. # # Algo: # * initialize an empty stack # # * add all sequences of one expression to the stack, excluding # expressions which can be reached from "earlier" expression # (i.e. there is no gap between them): # # - say A and B have no gap in between and all sequences starting # at A have already been produced. These by definition(which?: -) include as # sub-sequences all sequences starting at B. Any other sequences starting # at B directly will not add valid variations, as each of them could be # prefixed with a sequence starting at A # # * while the stack is not empty: # # * get top sequence s from stack # # * generate all possible continuations for this sequence, # i.e. sequences where expression can be appended to the last # element s[-1] in s and put these extended sequences on the stack # # * if no new continuation could be generated for s, this sequence of # RegexMatch is appended to the list of results. prods = [] n_rm = len(regex_matches) # Calculate the upper triangle of an n_rm x n_rm matrix M where # M[i, j] == 1 (for i<j) iff the expressions i and j are # consecutive (i.e. there is no gap and they can be put together # in one sequence). # import numpy as np # M = np.zeros(shape=(n_rm, n_rm), dtype=int) # --> avoid use of numpy here; since we need column sums below, # --> the representation of M is columns major, i.e. M[i] is the i-th # --> column; M[i, j] then basically becomes M[j][i] M = [[0 for _ in range(n_rm)] for _ in range(n_rm)] _separator_regex = regex.compile(r"\s*", regex.VERSION1) def get_m_dist(m1: RegexMatch, m2: RegexMatch) -> int: # 1 if there is no relevant gap between m1 and m2, 0 otherwise # assumes that m1 and m2 are sorted be their start index if m2.mstart < m1.mend: return 0 # Overlap gap_match = _separator_regex.fullmatch(txt[m1.mend : m2.mstart]) if gap_match: return 1 # No Gap else: return 0 # Gap for i in range(n_rm): for j in range(i + 1, n_rm): M[j][i] = get_m_dist(regex_matches[i], regex_matches[j]) # NOTE(glanaro): I believe this means that this is a beginning node. # why reversed? stack = [ (i,) for i in reversed(range(n_rm)) if sum(M[i]) == 0 ] # type: List[Tuple[int, ...]] while stack: on_do_iter() s = stack.pop() i = s[-1] new_prod = False for j in range(i + 1, n_rm): if M[j][i] == 1: stack.append(s + (j,)) new_prod = True if not new_prod: prod = tuple(regex_matches[i] for i in s) logger.debug("regex stack {}".format(prod)) prods.append(prod) return prods