Kedro

2019-11-02T05:00:00.000Z

You are reading my notes. This is a large collection of smaller unfinished ideas. If you find something here that needs to be turned into a real post tweet it at@_waylonwalker

#kedrotips

I am tweeting out most of these snippets as I add them, you can find them all here #kedrotips.

🗣 Heads up

Below are some quick snippets/notes for when using kedro to build data pipelines. So far I am just compiling snippets. Eventually I will create several posts on kedro. These are mostly things that I use In my everyday with kedro. Some are a bit more essoteric. Some are helpful when writing production code, some are useful more usefule for exploration.

📚 Catalog

catalogs Photo by jesse orrico on Unsplash

CSVLocalDataSet

python

import pandas as pd
iris = pd.read_csv('https://raw.githubusercontent.com/quantumblacklabs/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')

data_set = CSVLocalDataSet(filepath="test.csv",
                                 load_args=None,
                                 save_args={"index": False})

iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()                        

yaml

test_data:
   type: CSVLocalDataset
   filepath: test.csv
   load_args: None
   save_args: 
      index: False

CSVHTTPDataSet

cities = CSVHTTPDataSet(
    fileurl="https://raw.githubusercontent.com/quantumblacklabs/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv",
    auth=None,
    load_args=None)
    
iris = iris_data_set.load() 
cities:
   type: CSVHTTPDataSet
    fileurl: https://people.sc.fsu.edu/~jburkardt/data/csv/cities.csv
    auth: None
    load_args: None

HDFLocalDataSet

import pandas as pd
from kedro.io import HDFLocalDataSet

iris = pd.read_csv('https://raw.githubusercontent.com/quantumblacklabs/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')
iris_data_set = HDFLocalDataSet(filepath="iris.hdf",
                           key="test_hdf_key",
                           load_args=None,
                           save_args=None)

iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()    
cars:
   type: HDFLocalDataSet
   filepath: test.hdf
   key: test_hdf_key

HDFS3LocalDataSet

import pandas as pd
from kedro.io import HDFS3DataSet

iris = pd.read_csv('https://raw.githubusercontent.com/quantumblacklabs/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')
iris_data_set = HDFS3DataSet(filepath="iris.hdf",
                        bucket_name="bucket-us-west-1",
                        key="test_hdf_key",
                        load_args=None,
                        save_args=None)

iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()    
cars:
   type: HDFS3DataSet
   filepath: cars.hdf
   bucket_name: bucket-us-west-1
   key: test_hdf_key

JSONLocalDataSet

import pandas as pd
from kedro.io import JSONLocalDataSet

iris = pd.read_csv('https://raw.githubusercontent.com/quantumblacklabs/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')
cars = JSONLocalDataSet(filepath="iris.json",
                        load_args=None,
                        save_args=None)

iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()  
cars:
   type: JSONLocalDataSet
   filepath: iris.json

ParquetLocalDataSet

import pandas as pd
from kedro.io import ParquetLocalDataSet

iris = pd.read_csv('https://raw.githubusercontent.com/quantumblacklabs/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv')

iris_data_set = ParquetLocalDataSet('iris', 
                           engine='auto', 
                           load_args=None, 
                           save_args=None, 
                           version=None)

iris_data_set.save(iris)
reloaded_iris = iris_data_set.load()
cars:
   type: JSONLocalDataSet
   filepath: cars

PickleS3DataSet

SQLTableDataSet

SQLQueryDataSet

TextLocalDataSet

ExcelLocalDataSet

⏳ Loading Data

loading data Photo by Battlecreek Coffee Roasters on Unsplash

Simple Loading

df = catalog.load('cars')

list all datasets

catalog.list()

Saving Data

catalog.save('cars', cars)

🔍 Finding data

simple keyword search

query = 'raw'
[data for data in catalog.list() if query in data]

see on #kedrotips

multi keyword serch

query = 'raw sales'
data_sets = catalog.list()
for word in query.split():
	data_sets = [
       data 
       for data in data_sets 
       if query in data
       ]

see on #kedrotips

🐒 monkey patch it

def query(*search_terms):
     data_sets = catalog.list()
     for search in search_terms:
         data_sets = [
         data 
         for data in data_sets 
         if search in data
         ]
     return data_sets
     
catalog.query = query

see on [#kedrotips](https://twitter.com/WaylonWalker/status/1197855759507300352?s=20)

🤙 YOLO

You Only Load Once

simple

data = [catalog.load(d) 
        for d in 
        catalog.query('c_pri', 'cars')
        ]

more refined

data = {
   d: catalog.load(d)
   for d in catalog.query('c_pri', 'cars')
   }

🍷 refined like a fine wine

from types import SimpleNamespace
data = SimpleNamespace**{
   d: catalog.load(d) 
   for d in catalog.query('c_pri', 'cars')
   })

🧀 Make it a function getting funcy

from types import SimpleNamespace

def yolo(*search_terms):
   """you only load once 
   using query method from previous tip"""
   data = SimpleNamespace(**{
       d: catalog.load(d)
   for d in catalog.query(*search_terms)
    })
    return data

all_pri = yolo('c_pri')

🐒 monkey patch it

from functools import partial

catalog.yolo = yolo
catalog.yolo.__doc__ = "you only load once"

all_pri = catalog.yolo('c_pri')

adding catalogs together

from kedro.io import DataCatalog
DataCatalog({**cat1.__dict__['_data_sets'], **cat2.__dict__['_data_sets']})

🛢 Building pipelines

building pipelines Photo by roman pentin on Unsplash

📍 Creating Nodes

from kedro.pipeline import node
node = node(lambda x: x.dropna(), inputs='raw_cars', outputs='int_cars')
from kedro.pipeline import node

def drop_columns(df, *columns):
   for column in columns:
      df = df.drop(columns=column)
   return df

node = node(
   lambda x: drop_columns(x, 'vs', 'am', 'gear', 'carb'),
   inputs='int_cars',
   outputs='pri_cars'
   )

🛢 Creating a pipeline

Don't be so verbose

Create similar nodes dynamically

def halve_dataframe(data: pd.DataFrame) -> List[pd.DataFrame]:
   """ splits a dataframe in half """
    return np.array_split(data, 2)

nodes = []
datasets = [
   'cars', 'trucks', 'boats', 'motorcycles', 'planes', 
   'ships', 'busses', 'trains', 'subways'
   ]

# creates a pipeline node for every dataset in the datasets list
for dataset in datasets
   nodes.append(
       node(halve_dataframe,
            'e_modin_{dataset}',
            ['train_{dataset}', 'test_{dataset}']),
   )

🏃‍♂️ Running Pipelines

running pipelines Photo by Rodion Kutsaev on Unsplash

🔖 filter by tags

nodes = pipeline.only_nodes_with_tags('cars')

see on #kedrotips

filter by node

nodes = pipeline.only_nodes('b_int_cars')

see on [#kedrotips](https://twitter.com/WaylonWalker/status/1196406204479737856?s=20)

filter nodes like

query_string = 'cars'
nodes = [
   node.name 
   for node in pipeline.nodes 
   if query_string in node.name
   ]
pipeline.only_nodes(*nodes)

see on #kedrotips

only nodes with tags or

nodes = pipeline.only_nodes_with_tags('cars', 'trains')

only nodes with tags and

raw_nodes = pipeline.only_nodes_with_tags('raw')
car_nodes = pipeline.only_nodes_with_tags('cars')
raw_car_nodes = raw_nodes & car_nodes
raw_nodes = (
   pipeline
   .only_nodes_with_tags('raw')
   .only_nodes_with_tags('cars')
   )

add pipelines

car_nodes = pipeline.only_nodes_with_tags('cars')
train_nodes = pipeline.only_nodes_with_tags('trains')
transportation_nodes = car_nodes + train_nodes

ensure nodes are attached

cars_attached = len(
   pipeline
   .only_nodes_with_tags('cars')
   .grouped_nodes
   ) == 1

🎂 Pipeline Decorators

example - log_time

from kedro.pipeline.decorators import log_time, mem_profile
pipeline.decorate(log_running_time)

Pipeline IO

pipleine.all_inputs() and pipeline.all_outputs() return sets of pipeline inputs and outputs and you can do set operations on them. This is particularly useful to find the upper and lower edges of your pipeline or subset of pipeline. The pipeline object here is any kedro pipeline including a filtered subset.

Find all raw data

pipeline.all_inputs() - pipeline.all_outputs()

Find all final data

pipeline.all_outputs() - pipeline.all_inputs()

Find all nodes that do not raw

This one is probably one that is pushing the limits of what I would do in a list comprehension that I use in prod or even put into a text editor, but I commonly use ipython for my adhoc work and keeping it all in one line is very handy. Complex list comprehensions kinda start becoming like regex in a way that they are really easy to write and really hard to read. I don't think this one quite hits that point but its getting close.

I find this one super useful to help me either move data beween environments, or avoid unnecessary database calls.

raw_inputs = pipeline.all_inputs() - pipeline.all_outputs()
raw_nodes = [node for node in pipeline.nodes if [i for i in raw_inputs if i in set(node.inputs)] != []]

Check out my otherNotes

.