#kedrotips

I am tweeting out most of these snippets as I add them, you can find them all here #kedrotips.

🗣 Heads up

Below are some quick snippets/notes for when using kedro to build data pipelines. So far I am just compiling snippets. Eventually I will create several posts on kedro. These are mostly things that I use In my everyday with kedro. Some are a bit more essoteric. Some are helpful when writing production code, some are useful more usefule for exploration.

📚 Catalog

catalogs Photo by jesse orrico on Unsplash

CSVLocalDataSet

python


import pandas as pd
iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')

data_set = CSVLocalDataSet(filepath="test.csv",
                                 load_args=None,
                                 save_args={"index": False})

iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()

yaml


test_data:
   type: CSVLocalDataset
   filepath: test.csv
   load_args: None
   save_args:
      index: False

CSVHTTPDataSet


cities = CSVHTTPDataSet(
    fileurl="https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv",
    auth=None,
    load_args=None)

iris = iris_data_set.load()

cities:
   type: CSVHTTPDataSet
    fileurl: https://people.sc.fsu.edu/~jburkardt/data/csv/cities.csv
    auth: None
    load_args: None

HDFLocalDataSet


import pandas as pd
from kedro.io import HDFLocalDataSet

iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')
iris_data_set = HDFLocalDataSet(filepath="iris.hdf",
                           key="test_hdf_key",
                           load_args=None,
                           save_args=None)

iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()

cars:
   type: HDFLocalDataSet
   filepath: test.hdf
   key: test_hdf_key

HDFS3LocalDataSet


import pandas as pd
from kedro.io import HDFS3DataSet

iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')
iris_data_set = HDFS3DataSet(filepath="iris.hdf",
                        bucket_name="bucket-us-west-1",
                        key="test_hdf_key",
                        load_args=None,
                        save_args=None)

iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()

cars:
   type: HDFS3DataSet
   filepath: cars.hdf
   bucket_name: bucket-us-west-1
   key: test_hdf_key

JSONLocalDataSet


import pandas as pd
from kedro.io import JSONLocalDataSet

iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')
cars = JSONLocalDataSet(filepath="iris.json",
                        load_args=None,
                        save_args=None)

iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()

cars:
   type: JSONLocalDataSet
   filepath: iris.json

ParquetLocalDataSet


import pandas as pd
from kedro.io import ParquetLocalDataSet

iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')

iris_data_set = ParquetLocalDataSet('iris',
                           engine='auto',
                           load_args=None,
                           save_args=None,
                           version=None)

iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()

cars:
   type: JSONLocalDataSet
   filepath: cars

PickleS3DataSet

SQLTableDataSet

SQLQueryDataSet

TextLocalDataSet

ExcelLocalDataSet

⏳ Loading Data

loading data Photo by Battlecreek Coffee Roasters on Unsplash

Simple Loading


df = catalog.load('cars')

list all datasets


catalog.list()

Saving Data


catalog.save('cars', cars)

🔍 Finding data

simple keyword search


query = 'raw'
[data for data in catalog.list() if query in data]

see on #kedrotips

multi keyword serch


query = 'raw sales'
data_sets = catalog.list()
for word in query.split():
 data_sets = [
       data
       for data in data_sets
       if query in data
       ]

see on #kedrotips

🐒 monkey patch it


def query(*search_terms):
     data_sets = catalog.list()
     for search in search_terms:
         data_sets = [
         data
         for data in data_sets
         if search in data
         ]
     return data_sets

catalog.query = query

_see on #kedrotips

🤙 YOLO

You Only Load Once

simple


data = [catalog.load(d)
        for d in
        catalog.query('c_pri', 'cars')
        ]

more refined


data = {
   d: catalog.load(d)
   for d in catalog.query('c_pri', 'cars')
   }

🍷 refined like a fine wine


from types import SimpleNamespace
data = SimpleNamespace**{
   d: catalog.load(d)
   for d in catalog.query('c_pri', 'cars')
   })

🧀 Make it a function getting funcy


from types import SimpleNamespace

def yolo(*search_terms):
   """you only load once
   using query method from previous tip"""
   data = SimpleNamespace(**{
       d: catalog.load(d)
   for d in catalog.query(*search_terms)
    })
    return data

all_pri = yolo('c_pri')

🐒 monkey patch it


from functools import partial

catalog.yolo = yolo
catalog.yolo.__doc__ = "you only load once"

all_pri = catalog.yolo('c_pri')

adding catalogs together


from kedro.io import DataCatalog
DataCatalog({**cat1.__dict__['_data_sets'], **cat2.__dict__['_data_sets']})

🛢 Building pipelines

building pipelines Photo by roman pentin on Unsplash

📍 Creating Nodes


from kedro.pipeline import node
node = node(lambda x: x.dropna(), inputs='raw_cars', outputs='int_cars')

from kedro.pipeline import node

def drop_columns(df, *columns):
   for column in columns:
      df = df.drop(columns=column)
   return df

node = node(
   lambda x: drop_columns(x, 'vs', 'am', 'gear', 'carb'),
   inputs='int_cars',
   outputs='pri_cars'
   )

🛢 Creating a pipeline

Don't be so verbose

Create similar nodes dynamically


def halve_dataframe(data: pd.DataFrame) -> List[pd.DataFrame]:
   """ splits a dataframe in half """
    return np.array_split(data, 2)

nodes = []
datasets = [
   'cars', 'trucks', 'boats', 'motorcycles', 'planes',
   'ships', 'busses', 'trains', 'subways'
   ]

# creates a pipeline node for every dataset in the datasets list
for dataset in datasets
   nodes.append(
       node(halve_dataframe,
            'e_modin_{dataset}',
            ['train_{dataset}', 'test_{dataset}']),
   )

🏃‍♂️ Running Pipelines

running pipelines Photo by Rodion Kutsaev on Unsplash

🔖 filter by tags


nodes = pipeline.only_nodes_with_tags('cars')

see on #kedrotips

filter by node


nodes = pipeline.only_nodes('b_int_cars')

_see on #kedrotips

filter nodes like


query_string = 'cars'
nodes = [
   node.name
   for node in pipeline.nodes
   if query_string in node.name
   ]
pipeline.only_nodes(*nodes)

see on #kedrotips

only nodes with tags or


nodes = pipeline.only_nodes_with_tags('cars', 'trains')

only nodes with tags and


raw_nodes = pipeline.only_nodes_with_tags('raw')
car_nodes = pipeline.only_nodes_with_tags('cars')
raw_car_nodes = raw_nodes & car_nodes

raw_nodes = (
   pipeline
   .only_nodes_with_tags('raw')
   .only_nodes_with_tags('cars')
   )

add pipelines


car_nodes = pipeline.only_nodes_with_tags('cars')
train_nodes = pipeline.only_nodes_with_tags('trains')
transportation_nodes = car_nodes + train_nodes

ensure nodes are attached


cars_attached = len(
   pipeline
   .only_nodes_with_tags('cars')
   .grouped_nodes
   ) == 1

🎂 Pipeline Decorators

example - log_time


from kedro.pipeline.decorators import log_time, mem_profile
pipeline.decorate(log_running_time)

Pipeline IO

pipleine.all_inputs() and pipeline.all_outputs() return sets of pipeline inputs and outputs and you can do set operations on them. This is particularly useful to find the upper and lower edges of your pipeline or subset of pipeline. The pipeline object here is any kedro pipeline including a filtered subset.

Find all raw data


pipeline.all_inputs() - pipeline.all_outputs()

Find all final data


pipeline.all_outputs() - pipeline.all_inputs()

Find all nodes that do not raw

This one is probably one that is pushing the limits of what I would do in a list comprehension that I use in prod or even put into a text editor, but I commonly use ipython for my adhoc work and keeping it all in one line is very handy. Complex list comprehensions kinda start becoming like regex in a way that they are really easy to write and really hard to read. I don't think this one quite hits that point but its getting close.

I find this one super useful to help me either move data beween environments, or avoid unnecessary database calls.


raw_inputs = pipeline.all_inputs() - pipeline.all_outputs()
raw_nodes = [node for node in pipeline.nodes if [i for i in raw_inputs if i in set(node.inputs)] != []]