Kedro ━━━━━ My Notes about using kedro Date: November 2, 2019 See all of my kedro related posts in [[ tag/kedro ]]. #kedrotips ───────────────────────────────────────────────────────────── I am tweeting out most of these snippets as I add them, you can find them all here #kedrotips . 🗣 Heads up ────────── Below are some quick snippets/notes for when using kedro to build data pipelines. So far I am just compiling snippets. Eventually I will create several posts on kedro. These are mostly things that I use In my everyday with kedro. Some are a bit more essoteric. Some are helpful when writing production code, some are useful more usefule for exploration. 📚 Catalog ───────── Image: catalogs Photo by jesse orrico on Unsplash ### CSVLocalDataSet python ``` import pandas as pd iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') data_set = CSVLocalDataSet(filepath="test.csv", load_args=None, save_args={"index": False}) iris_data_set.save(iris) reloaded_iris = iris_data_set.load() ``` yaml ``` test_data: type: CSVLocalDataset filepath: test.csv load_args: None save_args: index: False ``` CSVHTTPDataSet ────────────── ``` cities = CSVHTTPDataSet( fileurl="https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv", auth=None, load_args=None) iris = iris_data_set.load() ``` ``` cities: type: CSVHTTPDataSet fileurl: https://people.sc.fsu.edu/~jburkardt/data/csv/cities.csv auth: None load_args: None ``` HDFLocalDataSet ─────────────── ``` import pandas as pd from kedro.io import HDFLocalDataSet iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') iris_data_set = HDFLocalDataSet(filepath="iris.hdf", key="test_hdf_key", load_args=None, save_args=None) iris_data_set.save(iris) reloaded_iris = iris_data_set.load() ``` ``` cars: type: HDFLocalDataSet filepath: test.hdf key: test_hdf_key ``` HDFS3LocalDataSet ───────────────── ``` import pandas as pd from kedro.io import HDFS3DataSet iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') iris_data_set = HDFS3DataSet(filepath="iris.hdf", bucket_name="bucket-us-west-1", key="test_hdf_key", load_args=None, save_args=None) iris_data_set.save(iris) reloaded_iris = iris_data_set.load() ``` ``` cars: type: HDFS3DataSet filepath: cars.hdf bucket_name: bucket-us-west-1 key: test_hdf_key ``` JSONLocalDataSet ──────────────── ``` import pandas as pd from kedro.io import JSONLocalDataSet iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') cars = JSONLocalDataSet(filepath="iris.json", load_args=None, save_args=None) iris_data_set.save(iris) reloaded_iris = iris_data_set.load() ``` ``` cars: type: JSONLocalDataSet filepath: iris.json ``` ParquetLocalDataSet ─────────────────── ``` import pandas as pd from kedro.io import ParquetLocalDataSet iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') iris_data_set = ParquetLocalDataSet('iris', engine='auto', load_args=None, save_args=None, version=None) iris_data_set.save(iris) reloaded_iris = iris_data_set.load() ``` ``` cars: type: JSONLocalDataSet filepath: cars ``` PickleS3DataSet SQLTableDataSet SQLQueryDataSet TextLocalDataSet ExcelLocalDataSet ⏳ Loading Data ────────────── Image: loading data Photo by Battlecreek Coffee Roasters on Unsplash ### Simple Loading ``` df = catalog.load('cars') ``` ### list all datasets ``` catalog.list() ``` ### Saving Data ``` catalog.save('cars', cars) ``` ### 🔍 Finding data simple keyword search ``` query = 'raw' [data for data in catalog.list() if query in data] ``` see on #kedrotips multi keyword serch ``` query = 'raw sales' data_sets = catalog.list() for word in query.split(): data_sets = [ data for data in data_sets if query in data ] ``` see on #kedrotips 🐒 monkey patch it ``` def query(*search_terms): data_sets = catalog.list() for search in search_terms: data_sets = [ data for data in data_sets if search in data ] return data_sets catalog.query = query ``` _see on #kedrotips ### 🤙 YOLO You Only Load Once simple ``` data = [catalog.load(d) for d in catalog.query('c_pri', 'cars') ] ``` more refined ``` data = { d: catalog.load(d) for d in catalog.query('c_pri', 'cars') } ``` 🍷 refined like a fine wine ``` from types import SimpleNamespace data = SimpleNamespace**{ d: catalog.load(d) for d in catalog.query('c_pri', 'cars') }) ``` 🧀 Make it a function getting funcy ``` from types import SimpleNamespace def yolo(*search_terms): """you only load once using query method from previous tip""" data = SimpleNamespace(**{ d: catalog.load(d) for d in catalog.query(*search_terms) }) return data all_pri = yolo('c_pri') ``` 🐒 monkey patch it ``` from functools import partial catalog.yolo = yolo catalog.yolo.__doc__ = "you only load once" all_pri = catalog.yolo('c_pri') ``` ### adding catalogs together ``` from kedro.io import DataCatalog DataCatalog({**cat1.__dict__['_data_sets'], **cat2.__dict__['_data_sets']}) ``` 🛢 Building pipelines ──────────────────── Image: building pipelines Photo by roman pentin on Unsplash ### 📍 Creating Nodes ``` from kedro.pipeline import node node = node(lambda x: x.dropna(), inputs='raw_cars', outputs='int_cars') ``` ``` from kedro.pipeline import node def drop_columns(df, *columns): for column in columns: df = df.drop(columns=column) return df node = node( lambda x: drop_columns(x, 'vs', 'am', 'gear', 'carb'), inputs='int_cars', outputs='pri_cars' ) ``` ### 🛢 Creating a pipeline ### Don’t be so verbose Create similar nodes dynamically ``` def halve_dataframe(data: pd.DataFrame) -> List[pd.DataFrame]: """ splits a dataframe in half """ return np.array_split(data, 2) nodes = [] datasets = [ 'cars', 'trucks', 'boats', 'motorcycles', 'planes', 'ships', 'busses', 'trains', 'subways' ] # creates a pipeline node for every dataset in the datasets list for dataset in datasets nodes.append( node(halve_dataframe, 'e_modin_{dataset}', ['train_{dataset}', 'test_{dataset}']), ) ``` 🏃‍♂️ Running Pipelines ────────────────────── Image: running pipelines Photo by Rodion Kutsaev on Unsplash 🔖 filter by tags ``` nodes = pipeline.only_nodes_with_tags('cars') ``` see on #kedrotips filter by node ``` nodes = pipeline.only_nodes('b_int_cars') ``` _see on #kedrotips filter nodes like ``` query_string = 'cars' nodes = [ node.name for node in pipeline.nodes if query_string in node.name ] pipeline.only_nodes(*nodes) ``` see on #kedrotips only nodes with tags or ``` nodes = pipeline.only_nodes_with_tags('cars', 'trains') ``` only nodes with tags and ``` raw_nodes = pipeline.only_nodes_with_tags('raw') car_nodes = pipeline.only_nodes_with_tags('cars') raw_car_nodes = raw_nodes & car_nodes ``` ``` raw_nodes = ( pipeline .only_nodes_with_tags('raw') .only_nodes_with_tags('cars') ) ``` add pipelines ``` car_nodes = pipeline.only_nodes_with_tags('cars') train_nodes = pipeline.only_nodes_with_tags('trains') transportation_nodes = car_nodes + train_nodes ``` ensure nodes are attached ``` cars_attached = len( pipeline .only_nodes_with_tags('cars') .grouped_nodes ) == 1 ``` ### 🎂 Pipeline Decorators example - log_time ``` from kedro.pipeline.decorators import log_time, mem_profile pipeline.decorate(log_running_time) ``` Pipeline IO ─────────── pipleine.all_inputs() and pipeline.all_outputs() return sets of pipeline inputs and outputs and you can do set operations on them. This is particularly useful to find the upper and lower edges of your pipeline or subset of pipeline. The pipeline object here is any kedro pipeline including a filtered subset. ### Find all raw data ``` pipeline.all_inputs() - pipeline.all_outputs() ``` ### Find all final data ``` pipeline.all_outputs() - pipeline.all_inputs() ``` ### Find all nodes that do not raw This one is probably one that is pushing the limits of what I would do in a list comprehension that I use in prod or even put into a text editor, but I commonly use ipython for my adhoc work and keeping it all in one line is very handy. Complex list comprehensions kinda start becoming like regex in a way that they are really easy to write and really hard to read. I don’t think this one quite hits that point but its getting close. I find this one super useful to help me either move data beween environments, or avoid unnecessary database calls. ``` raw_inputs = pipeline.all_inputs() - pipeline.all_outputs() raw_nodes = [node for node in pipeline.nodes if [i for i in raw_inputs if i in set(node.inputs)] != []] ```