Tags
This past week I had a really weird bug in my kedro pipeline. For some reason data running through my pipeline was coming out completely made no sense, but if I manually request raw data outside of the pipeline it matched expectations.
NOTE While this story is about a kedro pipeline, it can be applied anywhere closures are put into an iterable.
After a few days of looking at it off and on, I pinpointed that it was all the way down in the raw layer. Right as data is coming off of the database. For this I already had existing sql
files stored and a read_sql
function to get the data so I opted to just set up the pipeline to utilize the existing code as much as possible, leaning on the kedro framework a bit less.
I have dynamically created lists of pipeline nodes many times in the past, but typically I take data from kedro input and use it in the lambda. I prefer the simplicity of using lambdas over functools.partial
. It typically looks something like this.
# π I do this all the time from kedro.pipeline import node from my_generic_project_lib import clean datasets_to_clean = ['sales', 'production', 'inventory'] nodes = [] for dataset in datasets_to_clean: nodes.append( node( func=lambda x: clean(x) inputs = f'raw_{dataset}' outputs=f'int_{dataset}' tags=['int', dataset] name=f'create_int_{dataset}' ) )
What was different this time is that I needed to pass in the name of the dataset to my read_sql function, not the data loaded in the framework.
# β This does not work from kedro.pipeline import node from my_generic_project_lib import read_sql datasets_to_read = ['sales', 'production', 'inventory'] nodes = [] for dataset in datasets_to_clean: nodes.append( node( func=lambda: read_sql(dataset) # π₯ The major issue inputs = f'dummy' outputs=f'int_{dataset}' tags=['int', dataset] name=f'create_int_{dataset}' ) )
As I am still oblivious to what has happened I pop in a breakpoint()
and quickly see that during the first run the dataset passed into read_sql
was 'inventory'
, in fact, every single one was 'inventory'
. The lambda is just using the latest value of dataset from outside and has no local
dataset
attached to it.
# π Much Better from kedro.pipeline import node from my_generic_project_lib import read_sql datasets_to_read = ['sales', 'production', 'inventory'] nodes = [] for dataset in datasets_to_clean: nodes.append( node( func=lambda dataset=dataset: read_sql(dataset) # dataset is now bound to the lambda β¨ inputs = f'dummy' outputs=f'int_{dataset}' tags=['int', dataset] name=f'create_int_{dataset}' ) )
I made a slightly more simple example so that you can try it and play with it yourself, edit it, share it with your friends, laugh at my mistake, whatever you like.