Kedro ━━━━━ My Notes about using kedro Date: November 2, 2019 See all of my kedro related posts in [[ tag/kedro ]]. #kedrotips <https://twitter.com/search?q=%23kedrotips&f=live> ───────────────────────────────────────────────────────────── I am tweeting out most of these snippets as I add them, you can find them all here #kedrotips <https://twitter.com/search?q=%23kedrotips>. 🗣 Heads up ────────── Below are some quick snippets/notes for when using kedro to build data pipelines. So far I am just compiling snippets. Eventually I will create several posts on kedro. These are mostly things that I use In my everyday with kedro. Some are a bit more essoteric. Some are helpful when writing production code, some are useful more usefule for exploration. 📚 Catalog ───────── Image: catalogs Photo by jesse orrico on Unsplash ### CSVLocalDataSet python [code] import pandas as pd iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')  data_set = CSVLocalDataSet(filepath="test.csv",  load_args=None,  save_args={"index": False})  iris_data_set.save(iris) reloaded_iris = iris_data_set.load() yaml [code] test_data:  type: CSVLocalDataset  filepath: test.csv  load_args: None  save_args:  index: False CSVHTTPDataSet ────────────── [code] cities = CSVHTTPDataSet(  fileurl="https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv",  auth=None,  load_args=None)  iris = iris_data_set.load() [code] cities:  type: CSVHTTPDataSet  fileurl: https://people.sc.fsu.edu/~jburkardt/data/csv/cities.csv  auth: None  load_args: None HDFLocalDataSet ─────────────── [code] import pandas as pd from kedro.io import HDFLocalDataSet  iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') iris_data_set = HDFLocalDataSet(filepath="iris.hdf",  key="test_hdf_key",  load_args=None,  save_args=None)  iris_data_set.save(iris) reloaded_iris = iris_data_set.load() [code] cars: type: HDFLocalDataSet filepath: test.hdf key: test_hdf_key HDFS3LocalDataSet ───────────────── [code] import pandas as pd from kedro.io import HDFS3DataSet  iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') iris_data_set = HDFS3DataSet(filepath="iris.hdf",  bucket_name="bucket-us-west-1",  key="test_hdf_key",  load_args=None,  save_args=None)  iris_data_set.save(iris) reloaded_iris = iris_data_set.load() [code] cars: type: HDFS3DataSet filepath: cars.hdf bucket_name: bucket-us-west-1 key: test_hdf_key JSONLocalDataSet ──────────────── [code] import pandas as pd from kedro.io import JSONLocalDataSet  iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') cars = JSONLocalDataSet(filepath="iris.json",  load_args=None,  save_args=None)  iris_data_set.save(iris) reloaded_iris = iris_data_set.load() [code] cars: type: JSONLocalDataSet filepath: iris.json ParquetLocalDataSet ─────────────────── [code] import pandas as pd from kedro.io import ParquetLocalDataSet  iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')  iris_data_set = ParquetLocalDataSet('iris',  engine='auto',  load_args=None,  save_args=None,  version=None)  iris_data_set.save(iris) reloaded_iris = iris_data_set.load() [code] cars: type: JSONLocalDataSet filepath: cars PickleS3DataSet SQLTableDataSet SQLQueryDataSet TextLocalDataSet ExcelLocalDataSet ⏳ Loading Data ────────────── Image: loading data Photo by Battlecreek Coffee Roasters on Unsplash ### Simple Loading [code] df = catalog.load('cars') ### list all datasets [code] catalog.list() ### Saving Data [code] catalog.save('cars', cars) ### 🔍 Finding data simple keyword search [code] query = 'raw' [data for data in catalog.list() if query in data] see on #kedrotips <https://twitter.com/_WaylonWalker/status/1197130980659732480?s=20> multi keyword serch [code] query = 'raw sales' data_sets = catalog.list() for word in query.split(): data_sets = [ data for data in data_sets if query in data ] see on #kedrotips <https://twitter.com/_WaylonWalker/status/1197528461587419139?s=20> 🐒 monkey patch it [code] def query(*search_terms): data_sets = catalog.list() for search in search_terms: data_sets = [ data for data in data_sets if search in data ] return data_sets catalog.query = query _see on #kedrotips <https://twitter.com/_WaylonWalker/status/1197855759507300352?s=20> ### 🤙 YOLO You Only Load Once simple [code] data = [catalog.load(d)  for d in  catalog.query('c_pri', 'cars')  ] more refined [code] data = {  d: catalog.load(d)  for d in catalog.query('c_pri', 'cars')  } 🍷 refined like a fine wine [code] from types import SimpleNamespace data = SimpleNamespace**{  d: catalog.load(d)  for d in catalog.query('c_pri', 'cars')  }) 🧀 Make it a function getting funcy [code] from types import SimpleNamespace  def yolo(*search_terms):  """you only load once  using query method from previous tip"""  data = SimpleNamespace(**{  d: catalog.load(d)  for d in catalog.query(*search_terms)  })  return data  all_pri = yolo('c_pri') 🐒 monkey patch it [code] from functools import partial  catalog.yolo = yolo catalog.yolo.__doc__ = "you only load once"  all_pri = catalog.yolo('c_pri') ### adding catalogs together [code] from kedro.io import DataCatalog DataCatalog({**cat1.__dict__['_data_sets'], **cat2.__dict__['_data_sets']}) 🛢 Building pipelines ──────────────────── Image: building pipelines Photo by roman pentin on Unsplash ### 📍 Creating Nodes [code] from kedro.pipeline import node node = node(lambda x: x.dropna(), inputs='raw_cars', outputs='int_cars') [code] from kedro.pipeline import node def drop_columns(df, *columns): for column in columns: df = df.drop(columns=column) return df node = node( lambda x: drop_columns(x, 'vs', 'am', 'gear', 'carb'), inputs='int_cars', outputs='pri_cars' ) ### 🛢 Creating a pipeline ### Don’t be so verbose Create similar nodes dynamically [code] def halve_dataframe(data: pd.DataFrame) -> List[pd.DataFrame]: """ splits a dataframe in half """ return np.array_split(data, 2) nodes = [] datasets = [ 'cars', 'trucks', 'boats', 'motorcycles', 'planes', 'ships', 'busses', 'trains', 'subways' ] # creates a pipeline node for every dataset in the datasets list for dataset in datasets nodes.append( node(halve_dataframe, 'e_modin_{dataset}', ['train_{dataset}', 'test_{dataset}']), ) 🏃‍♂️ Running Pipelines ────────────────────── Image: running pipelines Photo by Rodion Kutsaev on Unsplash 🔖 filter by tags [code] nodes = pipeline.only_nodes_with_tags('cars') see on #kedrotips <https://twitter.com/_WaylonWalker/status/1195319044808888321?s=20> filter by node [code] nodes = pipeline.only_nodes('b_int_cars') _see on #kedrotips <https://twitter.com/_WaylonWalker/status/1196406204479737856?s=20> filter nodes like [code] query_string = 'cars' nodes = [ node.name for node in pipeline.nodes if query_string in node.name ] pipeline.only_nodes(*nodes) see on #kedrotips <https://twitter.com/_WaylonWalker/status/1196813895228428288?s=20> only nodes with tags or [code] nodes = pipeline.only_nodes_with_tags('cars', 'trains') only nodes with tags and [code] raw_nodes = pipeline.only_nodes_with_tags('raw') car_nodes = pipeline.only_nodes_with_tags('cars') raw_car_nodes = raw_nodes & car_nodes [code] raw_nodes = ( pipeline .only_nodes_with_tags('raw') .only_nodes_with_tags('cars') ) add pipelines [code] car_nodes = pipeline.only_nodes_with_tags('cars') train_nodes = pipeline.only_nodes_with_tags('trains') transportation_nodes = car_nodes + train_nodes ensure nodes are attached [code] cars_attached = len( pipeline .only_nodes_with_tags('cars') .grouped_nodes ) == 1 ### 🎂 Pipeline Decorators example - log_time <https://kedro.readthedocs.io/en/latest/_modules/kedro/pipeline/decorators.html#log_time> [code] from kedro.pipeline.decorators import log_time, mem_profile pipeline.decorate(log_running_time) Pipeline IO ─────────── pipleine.all_inputs() and pipeline.all_outputs() return sets of pipeline inputs and outputs and you can do set operations on them. This is particularly useful to find the upper and lower edges of your pipeline or subset of pipeline. The pipeline object here is any kedro pipeline including a filtered subset. ### Find all raw data [code] pipeline.all_inputs() - pipeline.all_outputs() ### Find all final data [code] pipeline.all_outputs() - pipeline.all_inputs() ### Find all nodes that do not raw This one is probably one that is pushing the limits of what I would do in a list comprehension that I use in prod or even put into a text editor, but I commonly use ipython for my adhoc work and keeping it all in one line is very handy. Complex list comprehensions kinda start becoming like regex in a way that they are really easy to write and really hard to read. I don’t think this one quite hits that point but its getting close. I find this one super useful to help me either move data beween environments, or avoid unnecessary database calls. [code] raw_inputs = pipeline.all_inputs() - pipeline.all_outputs() raw_nodes = [node for node in pipeline.nodes if [i for i in raw_inputs if i in set(node.inputs)] != []]