003

edit ✏️

🔥 #kedrotips hooks can be created using modules

# kedro-hooks/src/kedro-hooks/preflight.py
from kedro.hooks import hook_impl
from kedro.io.core import DataSetNotFoundError
from colorama import Fore
import textwrap


@hook_impl
def before_pipeline_run(run_params, pipeline, catalog):
    missing_input = [i for i in pipeline.inputs() if not getattr(catalog.datasets, i)._exists()]
    if len(missing_input) != 0:
        raise DataSetNotFoundError(textwrap.dedent(f'''

    {Fore.LIGHTBLACK_EX}――――――――  {Fore.RED}PREFLIGHT ERROR {Fore.LIGHTBLACK_EX}―――――――――
    {Fore.RESET} preflight of pipeline failed due to {Fore.YELLOW}missing datasets
    {Fore.BLUE} {missing_input}{Fore.RESET}
    '''))