Processing Data with DaskA painless introduction to an evolutionary approach to distributed data processingEric NessBlockedUnblockFollowFollowingFeb 22Photo by Edoardo Busti on Unsplash https://unsplash.
com/photos/5ofKC-FQK3QMotivationAt When I Work we record key actions that users take on the site in order to improve our products.
In a typical day, this amounts to 65 million records and 1 TB of data.
The volume of data can be challenging to analyze over a range of many days.
The size of the data forces our analyses to be performed over a shorter period than we would like.
This challenge inspired us to find a way to process the data down into a summary form that can be analyzed across long time spans.
We use the typical Python data toolkit for our ETL jobs.
The sheer volume of data is too large for our standard toolsnumpy / pandas to handle.
There are distributed computing frameworks, like Spark, that handles the heavy lifting.
While Spark could handle the job, moving to Spark from the Python data toolkit is a radical change.
It requires new infrastructure, tools and programming patterns.
If possible, we wanted to make an incremental change rather than a revolutionary change to get the job done.
Dask is designed to extend the numpy and pandas packages to work on data processing problems that are too large to be kept in memory.
It breaks the larger processing job into many smaller tasks that are handled by numpy or pandas and then it reassembles the results into a coherent whole.
This happens behind a seamless interface that is designed to mimic the numpy / pandas interfaces.
ExampleIntroductionIn order to give you an idea of what a data processing job looks like in Dask, I’ve created a small script that will process a large set of public data.
The script will process the New York City taxi driver data set.
This data set has 10 years of data and the data file for each month is close to 1 GB in size.
It takes a powerful computer to be able to fit this data into memory like pandas requires.
However, using Dask, the entire dataset is processed on a typical laptop.
We’re going to step through the code that processes the NYC taxi data one piece at a time.
The code sections in this story are snippets of the complete Python script and won’t run independently.
The complete script is available on Github.
You can run it on any computer that has Dask installed.
Loading DataThe first step is to tell Dask where to find the data.
In this case the data is available in a public S3 bucket.
The format of the data is CSV.
Here’s the code to access the data file for April 2018:You can see that dask.
read_csv supports reading files directly from S3.
The code here reads a single file since they are each 1 GB in size.
It is easy to change Dask to read all of the yellow taxi files by simply changing yellow_tripdata_2018-04.
csv to yellow_tripdata_*.
One thing to be aware of when making this change is that while Dask will improve the speed of processing the data, it won’t improve the speed of downloading the data.
So depending on your connection speed this may take a long time.
A better option to get an idea of the speed of Dask would be to download all of the data to your local system using the AWS CLI:aws s3 cp "s3://nyc-tlc/trip data/" .
–no-sign-request –exclude "*" –include "yellow_tripdata*" –recursive –dryrunSimply remove the –dryrun flag to download the files.
Then the read_csv function can be pointed to the data files location on local disk rather than in S3.
One thing to note about the read_csv function is it doesn’t actually load your data into memory.
Instead it creates a task graph of the work that needs to be done to load the data into memory.
The task graph is executed in a later part of the program.
We can take a quick look at what the dask.
dataframe looks like by printing it out:Dask DataFrame Structure: VendorID tpep_pickup_datetime npartitions=13 int64 object .
Dask Name: from-delayed, 39 tasksAt this point we see that the dataframe knows the structure of the data it will load and has divided the work into tasks.
The data includes the columns VendorID, tpep_pickup_datetime and others that were removed from the output for clarity.
Dask divides the work of loading the data into 39 tasks.
However, it hasn’t completed any of the tasks yet.
Transforming DataThe next step is to transform the data.
In order to create a reasonably complex transformation process to show off what Dask can do, we’ll assume that we’re interested in the mean fare to go a specified distance.
Since the time to travel a set distance may vary by traffic, we’ll also break the mean fare down by travel time.
Here’s the code to calculate those numbers.
Feel free to skim over it since the specifics of the transformation process aren’t important.
This code will look hauntingly familiar if you’re experienced with pandas.
We see many functions which are old friends: groupby, drop and assign for example.
Almost exactly the same code could be used to process a pandas.
DataFrame if the data would fit into memory.
At this point you might think we’re really getting close to our final result!.Sorry to disappoint, but we’re still figuring out what tasks need to be done.
Dask adds more steps to the task graph, however none of them have executed yet.
Here’s what the dask.
dataframe looks like now:Dask DataFrame Structure: avg_amountnpartitions=1 float64 .
Dask Name: rename, 294 tasksYou can see that the only column which exists outside the index is avg_amount which stores the average fare for a given trip time and distance.
The number of tasks has also grown.
Over two hundred new tasks are added to the dataframe’s task graph to handle all of the data processing.
Computing DataWe’re finally to the part of the code that does the data processing!.Any time that you call compute on a dask.
dataframe all of the tasks in the graph gets executed.
This also includes calls to head or getting the len of a dataframe.
Basically any time that you want to inspect the data itself, Dask will kick off the execution of the task graph.
Development patterns will be different than pandas.
With pandas it’s easy to display the head of a DataFrame to make sure the processing is going as expected.
Calling head on a dask.
dataframe will kick off processing of potentially hundreds or thousands of tasks.
So while it may be necessary during the development of a script, it should be avoided during any production processing job.
We’re finally looking at some real live data.
The table is the mean values of fares by distance and time.
If you run this code on your laptop, Dask runs tasks on multiple cores in the background.
Therefore, if you have four cores on your machine the processing will happen roughly four times faster than usingpandas.
While this is a nice performance boost on a single machine, the great thing about Dask is that the exact same code runs on a distributed cluster of up to hundreds of machines.
The task scheduler takes care of everything in the background and the only difference that you notice is that the program runs much faster!.The speed will scale approximately linearly with the size of the cluster.
ConclusionDask is an excellent choice for extending data processing workloads from a single machine up to a distributed cluster.
It will seem familiar to users of the standard Python data science toolkit and allows for an evolution to distributed processing.
It builds on the existing tools that many teams use today instead of throwing them aside and starting from scratch.