Tags
With the latest releases of kedro 0.17.x
, it is now possible to run kedro
pipelines from within scripts. While I would not start a project with this
technique, it will be a good tool to keep in my back pocket when I want to
sprinkle in a bit of kedro goodness in existing projects.
New to Kedro
If your just learning about kedro check out this post walking through it
No More Rabbit Hole of Errors
as of 0.17.2
I've tried to do this in kedro 0.16.x,
and it turned into a rabbit hole of
errors. First kedro needed a conf
directory, if you tried to fake one in it
would then ask for logging setup. These errors just kept coming to the point
it wasnt worth doing and I might as well use a proper template for real
projects and stick to simple function calls for things that are not a kedro
project.
Kedro in a script
To get kedro running, you will need a pipeline, catalog, and runner at a minimum. Those who have used kedro before the pipeline will look very similar to what you are familiar with, but the catalog will not be loaded from yaml and you will need to bring your own runner.
from kedro.pipeline import Pipeline, node from kedro.io import DataCatalog from kedro.runner.sequential_runner import SequentialRunner # additional datasets you want to use from kedro.extras.datasets.pandas.csv_dataset import CSVDataSet from kedro.extras.datasets.pandas.parquet_dataset import ParquetDataSet # the sequential runner is the simplest. It runs one node at a time. runner = SequentialRunner() # this is a super simple example pipeline pipeline = Pipeline( [ node(lambda: range(100), None, "range"), node(lambda x: [i ** 2 for i in x], "range", "range**2"), node(lambda x: [i for i in x if i > 5000], "range**2", "range>5k"), node(lambda x: x[:5], "range>5k", "range>5k-head"), node(lambda x: sum(x) / len(x), "range>5k", "range>5k-mean"), ] ) # to get up and running, you can use an empty catalog catalog = DataCatalog() runner.run(pipeline, catalog)
π Above is the minimal setup to get a kedro pipeline running
more practically
More often, your kedro pipelines are going to use a function rather than a lambda, and pandas DataFrames.
def clean_columns(df: pd.DataFrame): df.columns = [col.lower().strip() for col in df.columns] pipeline = Pipeline( [ node(clean_columns, "raw_data", "clean_columns", name="create_clean_columns"), ] ) catalog = DataCatalog( { "raw_data": ParquetDataSet(filepath=f"data/raw_data.parquet") "clean_columns": ParquetDataSet(filepath=f"data/clean_columns.parquet") } )
One single node pipeline to get you started
Semi-automatic catalog
For some reason, when I tried to use the DataCatalogWithDefault it did not pick up my datasets right. I suspect this has something to do with not setting up a proper session, so this is what I did in a pinch to get that catalog goodness for my DataFrames without setting up each one manually.
catalog = DataCatalog( { name: ParquetDataSet(filepath=f"data/{name}.parquet") for name in pipeline.all_outputs() } )
β If all of your datasets are pandas dataframes
For the example above that does not use DataFrames, I would pickle all of my outputs to enable re-loading them later.
catalog = DataCatalog( { name: PickleDataSet(filepath=f"data/{name}.pkl") for name in pipeline.all_outputs() } )
π₯ for use with non-pandas datasets
Logging
Once you explicitly add datasets, kedro will start logging when it's loading, running, or saving each node. Things will begin to look a bit more familiar to anyone who has used kedro before.
ww3 βͺmain Β©kedro-in-scripts v3.8.8 ipython β― runner.run(pipeline, catalog) 2021-04-18 09:30:58,099 - kedro.pipeline.node - INFO - Running node: <lambda>(None) -> [range] 2021-04-18 09:30:58,100 - kedro.io.data_catalog - INFO - Saving data to `range` (PickleDataSet)... 2021-04-18 09:30:58,104 - kedro.runner.sequential_runner - INFO - Completed 1 out of 5 tasks 2021-04-18 09:30:58,105 - kedro.io.data_catalog - INFO - Loading data from `range` (PickleDataSet)... 2021-04-18 09:30:58,105 - kedro.pipeline.node - INFO - Running node: <lambda>([range]) -> [range**2] 2021-04-18 09:30:58,105 - kedro.io.data_catalog - INFO - Saving data to `range**2` (PickleDataSet)... 2021-04-18 09:30:58,111 - kedro.runner.sequential_runner - INFO - Completed 2 out of 5 tasks 2021-04-18 09:30:58,111 - kedro.io.data_catalog - INFO - Loading data from `range**2` (PickleDataSet)... 2021-04-18 09:30:58,112 - kedro.pipeline.node - INFO - Running node: <lambda>([range**2]) -> [range>5k] 2021-04-18 09:30:58,112 - kedro.io.data_catalog - INFO - Saving data to `range>5k` (PickleDataSet)... 2021-04-18 09:30:58,115 - kedro.runner.sequential_runner - INFO - Completed 3 out of 5 tasks 2021-04-18 09:30:58,115 - kedro.io.data_catalog - INFO - Loading data from `range>5k` (PickleDataSet)... 2021-04-18 09:30:58,115 - kedro.pipeline.node - INFO - Running node: <lambda>([range>5k]) -> [range>5k-mean] 2021-04-18 09:30:58,115 - kedro.io.data_catalog - INFO - Saving data to `range>5k-mean` (PickleDataSet)... 2021-04-18 09:30:58,118 - kedro.runner.sequential_runner - INFO - Completed 4 out of 5 tasks 2021-04-18 09:30:58,119 - kedro.io.data_catalog - INFO - Loading data from `range>5k` (PickleDataSet)... 2021-04-18 09:30:58,119 - kedro.pipeline.node - INFO - Running node: <lambda>([range>5k]) -> [range>5k-head] 2021-04-18 09:30:58,119 - kedro.io.data_catalog - INFO - Saving data to `range>5k-head` (PickleDataSet)... 2021-04-18 09:30:58,122 - kedro.runner.sequential_runner - INFO - Completed 5 out of 5 tasks 2021-04-18 09:30:58,122 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
Kedro Viz
I was not able to quickly get kedro viz up and running for my use case. If you really wanted to you could start modifying their format_pipelines_data function in server.py. Or you could render a new template and put your pipeline there for viz purposes.
It's possible, but might as well stick to the template
cli
For something that I would be using this on, I am probably not going to put much effort into the cli as it is not likely something that we will have a team of developers interacting with constantly. I would just put together the minimum necessary to run my application how I need.
if __name__ == "__main__": import sys if '--skip-raw' in sys.argv: runner.run(pipeline.from_inputs('range**2'), catalog) else: runner.run(pipeline, catalog)
Keeping it simple
If I want to go down the route of having a full cli built out I am probably going to use the full kedro template, or something very similar.
It's a bit Rough
While I might use this in production somewhere, it will be inside of some other not kedro application. I will still be using something quite similar to their template for my pipelining projects. It misses some excellent things that bring me to kedro like hooks, plugins, credentials, catalog, logging config, cli, and viz.