Tags
๐ฅ #kedrotips hooks can be created using modules
# kedro-hooks/src/kedro-hooks/preflight.py from kedro.hooks import hook_impl from kedro.io.core import DataSetNotFoundError from colorama import Fore import textwrap @hook_impl def before_pipeline_run(run_params, pipeline, catalog): missing_input = [i for i in pipeline.inputs() if not getattr(catalog.datasets, i)._exists()] if len(missing_input) != 0: raise DataSetNotFoundError(textwrap.dedent(f''' {Fore.LIGHTBLACK_EX}โโโโโโโโ {Fore.RED}PREFLIGHT ERROR {Fore.LIGHTBLACK_EX}โโโโโโโโโ {Fore.RESET} preflight of pipeline failed due to {Fore.YELLOW}missing datasets {Fore.BLUE} {missing_input}{Fore.RESET} '''))