🔥 #kedrotips hooks can be created using modules
# kedro-hooks/src/kedro-hooks/preflight.py
from kedro.hooks import hook_impl
from kedro.io.core import DataSetNotFoundError
from colorama import Fore
import textwrap
@hook_impl
def before_pipeline_run(run_params, pipeline, catalog):
missing_input = [i for i in pipeline.inputs() if not getattr(catalog.datasets, i)._exists()]
if len(missing_input) != 0:
raise DataSetNotFoundError(textwrap.dedent(f'''
{Fore.LIGHTBLACK_EX}―――――――― {Fore.RED}PREFLIGHT ERROR {Fore.LIGHTBLACK_EX}―――――――――
{Fore.RESET} preflight of pipeline failed due to {Fore.YELLOW}missing datasets
{Fore.BLUE} {missing_input}{Fore.RESET}
'''))