Tags
#kedrotips
I am tweeting out most of these snippets as I add them, you can find them all here #kedrotips.
🗣 Heads up
Below are some quick snippets/notes for when using kedro to build data pipelines. So far I am just compiling snippets. Eventually I will create several posts on kedro. These are mostly things that I use In my everyday with kedro. Some are a bit more essoteric. Some are helpful when writing production code, some are useful more usefule for exploration.
📚 Catalog
Photo by jesse orrico on Unsplash
CSVLocalDataSet
python
import pandas as pd iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') data_set = CSVLocalDataSet(filepath="test.csv", load_args=None, save_args={"index": False}) iris_data_set.save(iris) reloaded_iris = iris_data_set.load()
yaml
test_data: type: CSVLocalDataset filepath: test.csv load_args: None save_args: index: False
CSVHTTPDataSet
cities = CSVHTTPDataSet( fileurl="https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv", auth=None, load_args=None) iris = iris_data_set.load()
cities: type: CSVHTTPDataSet fileurl: https://people.sc.fsu.edu/~jburkardt/data/csv/cities.csv auth: None load_args: None
HDFLocalDataSet
import pandas as pd from kedro.io import HDFLocalDataSet iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') iris_data_set = HDFLocalDataSet(filepath="iris.hdf", key="test_hdf_key", load_args=None, save_args=None) iris_data_set.save(iris) reloaded_iris = iris_data_set.load()
cars: type: HDFLocalDataSet filepath: test.hdf key: test_hdf_key
HDFS3LocalDataSet
import pandas as pd from kedro.io import HDFS3DataSet iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') iris_data_set = HDFS3DataSet(filepath="iris.hdf", bucket_name="bucket-us-west-1", key="test_hdf_key", load_args=None, save_args=None) iris_data_set.save(iris) reloaded_iris = iris_data_set.load()
cars: type: HDFS3DataSet filepath: cars.hdf bucket_name: bucket-us-west-1 key: test_hdf_key
JSONLocalDataSet
import pandas as pd from kedro.io import JSONLocalDataSet iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') cars = JSONLocalDataSet(filepath="iris.json", load_args=None, save_args=None) iris_data_set.save(iris) reloaded_iris = iris_data_set.load()
cars: type: JSONLocalDataSet filepath: iris.json
ParquetLocalDataSet
import pandas as pd from kedro.io import ParquetLocalDataSet iris = pd.read_csv('https://raw.githubusercontent.com/kedro-org/kedro/d3218bd89ce8d1148b1f79dfe589065f47037be6/kedro/template/%7B%7B%20cookiecutter.repo_name%20%7D%7D/data/01_raw/iris.csv') iris_data_set = ParquetLocalDataSet('iris', engine='auto', load_args=None, save_args=None, version=None) iris_data_set.save(iris) reloaded_iris = iris_data_set.load()
cars: type: JSONLocalDataSet filepath: cars
PickleS3DataSet
SQLTableDataSet
SQLQueryDataSet
TextLocalDataSet
ExcelLocalDataSet
⏳ Loading Data
Photo by Battlecreek Coffee Roasters on Unsplash
Simple Loading
df = catalog.load('cars')
list all datasets
catalog.list()
Saving Data
catalog.save('cars', cars)
🔍 Finding data
simple keyword search
query = 'raw' [data for data in catalog.list() if query in data]
see on #kedrotips
multi keyword serch
query = 'raw sales' data_sets = catalog.list() for word in query.split(): data_sets = [ data for data in data_sets if query in data ]
see on #kedrotips
🐒 monkey patch it
def query(*search_terms): data_sets = catalog.list() for search in search_terms: data_sets = [ data for data in data_sets if search in data ] return data_sets catalog.query = query
_see on #kedrotips
🤙 YOLO
You Only Load Once
simple
data = [catalog.load(d) for d in catalog.query('c_pri', 'cars') ]
more refined
data = { d: catalog.load(d) for d in catalog.query('c_pri', 'cars') }
🍷 refined like a fine wine
from types import SimpleNamespace data = SimpleNamespace**{ d: catalog.load(d) for d in catalog.query('c_pri', 'cars') })
🧀 Make it a function getting funcy
from types import SimpleNamespace def yolo(*search_terms): """you only load once using query method from previous tip""" data = SimpleNamespace(**{ d: catalog.load(d) for d in catalog.query(*search_terms) }) return data all_pri = yolo('c_pri')
🐒 monkey patch it
from functools import partial catalog.yolo = yolo catalog.yolo.__doc__ = "you only load once" all_pri = catalog.yolo('c_pri')
adding catalogs together
from kedro.io import DataCatalog DataCatalog({**cat1.__dict__['_data_sets'], **cat2.__dict__['_data_sets']})
🛢 Building pipelines
Photo by roman pentin on Unsplash
📍 Creating Nodes
from kedro.pipeline import node node = node(lambda x: x.dropna(), inputs='raw_cars', outputs='int_cars')
from kedro.pipeline import node def drop_columns(df, *columns): for column in columns: df = df.drop(columns=column) return df node = node( lambda x: drop_columns(x, 'vs', 'am', 'gear', 'carb'), inputs='int_cars', outputs='pri_cars' )
🛢 Creating a pipeline
Don't be so verbose
Create similar nodes dynamically
def halve_dataframe(data: pd.DataFrame) -> List[pd.DataFrame]: """ splits a dataframe in half """ return np.array_split(data, 2) nodes = [] datasets = [ 'cars', 'trucks', 'boats', 'motorcycles', 'planes', 'ships', 'busses', 'trains', 'subways' ] # creates a pipeline node for every dataset in the datasets list for dataset in datasets nodes.append( node(halve_dataframe, 'e_modin_{dataset}', ['train_{dataset}', 'test_{dataset}']), )
🏃♂️ Running Pipelines
Photo by Rodion Kutsaev on Unsplash
🔖 filter by tags
nodes = pipeline.only_nodes_with_tags('cars')
see on #kedrotips
filter by node
nodes = pipeline.only_nodes('b_int_cars')
_see on #kedrotips
filter nodes like
query_string = 'cars' nodes = [ node.name for node in pipeline.nodes if query_string in node.name ] pipeline.only_nodes(*nodes)
see on #kedrotips
only nodes with tags or
nodes = pipeline.only_nodes_with_tags('cars', 'trains')
only nodes with tags and
raw_nodes = pipeline.only_nodes_with_tags('raw') car_nodes = pipeline.only_nodes_with_tags('cars') raw_car_nodes = raw_nodes & car_nodes
raw_nodes = ( pipeline .only_nodes_with_tags('raw') .only_nodes_with_tags('cars') )
add pipelines
car_nodes = pipeline.only_nodes_with_tags('cars') train_nodes = pipeline.only_nodes_with_tags('trains') transportation_nodes = car_nodes + train_nodes
ensure nodes are attached
cars_attached = len( pipeline .only_nodes_with_tags('cars') .grouped_nodes ) == 1
🎂 Pipeline Decorators
from kedro.pipeline.decorators import log_time, mem_profile pipeline.decorate(log_running_time)
Pipeline IO
pipleine.all_inputs()
and pipeline.all_outputs()
return sets of pipeline inputs and outputs and you can do set operations on them. This is particularly useful to find the upper and lower edges of your pipeline or subset of pipeline. The pipeline object here is any kedro
pipeline including a filtered subset.
Find all raw data
pipeline.all_inputs() - pipeline.all_outputs()
Find all final data
pipeline.all_outputs() - pipeline.all_inputs()
Find all nodes that do not raw
This one is probably one that is pushing the limits of what I would do in a list comprehension that I use in prod or even put into a text editor, but I commonly use ipython for my adhoc work and keeping it all in one line is very handy. Complex list comprehensions kinda start becoming like regex in a way that they are really easy to write and really hard to read. I don't think this one quite hits that point but its getting close.
I find this one super useful to help me either move data beween environments, or avoid unnecessary database calls.
raw_inputs = pipeline.all_inputs() - pipeline.all_outputs() raw_nodes = [node for node in pipeline.nodes if [i for i in raw_inputs if i in set(node.inputs)] != []]