Source code for czech_plus.logic.processor.implementations.verb

"""Module for implementing processing verbs."""
import typing as t

from czech_plus._vendor.loguru import logger

from czech_plus import models
from czech_plus.logic.lexer import VerbLexer, tokens
from czech_plus.logic.processor.implementations import BaseProcessor
from czech_plus.utils import assert_that

[docs]_T = t.TypeVar("_T", bound=t.Iterator[t.Union[str, tokens.BaseToken]])
[docs]class VerbProcessor(BaseProcessor): """Verb processor.""" def __init__(self) -> None: super().__init__() self.__czech_field_name = self._config.cards.verbs.fields.czech self.__pac_field_name = self._config.cards.verbs.fields.prepositions_and_cases
[docs] def process(self, content: dict[str, str], /) -> str: """Process the content. Args: content: Card fields inside dict. Returns: The processed ``czech`` field, ready to be inserted into the card. """ logger.debug( "Processing verb card\n" f"{self.__czech_field_name} (Czech field): {content[self.__czech_field_name]}\n" f"{self.__pac_field_name} (Prepositions and Cases field): {content[self.__pac_field_name]}" ) pre_processed = self._pre_process(content) if not content[self.__pac_field_name]: logger.warning(f"PaC field is empty, skipping. Czech field: {content[self.__czech_field_name]}") return content[self.__czech_field_name] return self._process(pre_processed)
[docs] def _pre_process( self, content: dict[str, str] ) -> t.Iterator[ tuple[ t.Union[str, tokens.FutureFormTokenStart, tokens.FutureFormTokenEnd], list[t.Union[tokens.BaseToken, str]] ] ]: logger.debug( "Pre-processing verb card\n" f"{self.__czech_field_name} (Czech field): {content[self.__czech_field_name]}\n" f"{self.__pac_field_name} (Prepositions and Cases field): {content[self.__pac_field_name]}" ) lexer = VerbLexer() lexed_czech = self._navigate_over(lexer.lex(content[self.__czech_field_name])) lexed_prepositions_and_cases = self._navigate_over( lexer.lex(content[self.__pac_field_name]), dont_skip_escaped=True ) future_form_was = False for czech in lexed_czech: logger.debug(f"Pre-processing czech {czech!r}.") if isinstance(czech, tokens.AdditionalSeparatorToken): logger.debug("Skipping additional separator token.") continue elif isinstance(czech, str): prepositions_and_cases = list(self._pre_process_czech_token(czech, lexed_prepositions_and_cases)) logger.debug(f"Pre-processed prepositions and cases here: {prepositions_and_cases!r}.") if len(prepositions_and_cases) > 0 and isinstance( prepositions_and_cases[-1], (tokens.FutureFormTokenStart, tokens.FutureFormTokenEnd) ): logger.debug("Future form was found.") future_form_was = True prepositions_and_cases.pop() yield czech, prepositions_and_cases elif isinstance(czech, (tokens.FutureFormTokenStart, tokens.FutureFormTokenEnd)): logger.debug("Processing future form.") if not future_form_was and isinstance(czech, tokens.FutureFormTokenStart): assert_that(list(self._pre_process_czech_token(czech, lexed_prepositions_and_cases)) == [czech]) future_form_was = False yield czech, [czech] else: # pragma: no cover raise NotImplementedError(f"Unexpected czech token type: {czech} ({type(czech)})")
[docs] def _pre_process_czech_token( self, czech: t.Union[tokens.BaseToken, str], lexed_prepositions_and_cases: t.Iterator[t.Union[tokens.BaseToken, str]], ) -> t.Iterator[t.Union[tokens.BaseToken, str]]: logger.debug(f"Pre-processing czech token {czech!r}.") if isinstance(czech, tokens.FutureFormTokenStart): assert_that(next(lexed_prepositions_and_cases) == tokens.FutureFormTokenStart()) yield tokens.FutureFormTokenStart() elif isinstance(czech, str): for i, preposition_and_case in enumerate(lexed_prepositions_and_cases): logger.debug(f"Pre-processing preposition and case {preposition_and_case!r} in czech token {czech!r}.") if isinstance(preposition_and_case, tokens.SeparatorToken): logger.debug(f"Skipping separator token (index={i}).") assert i != 0 break yield preposition_and_case if isinstance(preposition_and_case, tokens.FutureFormTokenStart): logger.trace("'tokens.FutureFormTokenStart' was found.") break else: # pragma: no cover raise NotImplementedError(f"Unexpected czech token type: {czech} ({type(czech)})")
[docs] def _process( self, pre_processed: t.Iterator[ tuple[ t.Union[str, tokens.FutureFormTokenStart, tokens.FutureFormTokenEnd], list[t.Union[tokens.BaseToken, str]], ] ], ) -> str: result: str = "" for czech, prepositions_and_cases in pre_processed: logger.debug(f"Processing {czech=} {prepositions_and_cases=} in pre-processed.") skip = False processed_prepositions_and_cases = "" for i, preposition_and_case in enumerate(prepositions_and_cases): logger.debug( f"Processing {preposition_and_case=} (index={i}) in prepositions and cases. " f"Already processed: {processed_prepositions_and_cases!r}" ) if skip: logger.debug(f"Skipping {preposition_and_case=} (index={i}), skip was True.") skip = False continue if isinstance(preposition_and_case, tokens.SkipToken): logger.debug("Found skip token.") if i == len(prepositions_and_cases) - 1 and processed_prepositions_and_cases.endswith(", "): logger.trace("Removing trailing comma on the end.") processed_prepositions_and_cases = processed_prepositions_and_cases[:-2] break skip = True continue processed_prepositions_and_cases += ( added_part := self._process_preposition_and_case(preposition_and_case) ) logger.trace(f"Added {added_part!r} to processed prepositions and cases.") if isinstance(czech, (tokens.FutureFormTokenStart, tokens.FutureFormTokenEnd)): logger.debug(f"'czech' is future form. ({czech=} {processed_prepositions_and_cases=})") to_add = ((" " if result else "") + "[") if isinstance(czech, tokens.FutureFormTokenStart) else "]" logger.trace(f"Adding {to_add!r} to the result.") result += to_add else: logger.trace(f"'czech' is not a future form. ({czech=} {processed_prepositions_and_cases=})") result += ", " if result and not result.endswith("[") else "" result += czech if processed_prepositions_and_cases != "": result += " (" + processed_prepositions_and_cases + ")" logger.debug(f"Processed result: {result!r}.") return result
[docs] def _process_preposition_and_case(self, preposition_and_case: t.Union[tokens.BaseToken, str], /) -> str: logger.trace(f"Processing preposition and case: {preposition_and_case!r}.") if isinstance(preposition_and_case, tokens.AdditionalSeparatorToken): return ", " elif isinstance(preposition_and_case, tokens.EscapedToken): return preposition_and_case.content elif isinstance(preposition_and_case, str): return self._process_raw_preposition_and_case(preposition_and_case) elif isinstance(preposition_and_case, tokens.FutureFormTokenStart): return "[" elif isinstance(preposition_and_case, tokens.FutureFormTokenEnd): return "]" else: # pragma: no cover raise NotImplementedError( f"Unexpected preposition and case token type: {preposition_and_case} ({type(preposition_and_case)})" )
[docs] def _process_raw_preposition_and_case(self, preposition_and_case: str, /) -> str: """Process raw preposition and case, when they're numbers for example. Args: preposition_and_case: Preposition and case to process. Returns: Processed preposition and case. """ logger.trace(f"Processing preposition and case: {preposition_and_case!r}") split = preposition_and_case.split(" ") if len(split) == 1: return models.Case(int(split[0])).questions # type: ignore[no-untyped-call] elif split[1] == "": # Preposition before escaped case # # prep !case # ^^^^^ # notice that last symbol is a space, but it was deleted by split logger.trace("Found preposition before escaped case.") return preposition_and_case preposition, case = split return f"{preposition} {models.Case(int(case)).questions}" # type: ignore[no-untyped-call]