Photo Credit: tian kuanDistributed Deep Learning Pipelines with PySpark and KerasAn easy approach to data pipelining using PySpark and doing distributed deep learning with KerasAndre ViolanteBlockedUnblockFollowFollowingJun 20IntroductionIn this notebook I use PySpark, Keras, and Elephas python libraries to build an end-to-end deep learning pipeline that runs on Spark.
Spark is an open-source distributed analytics engine that can process large amounts of data with tremendous speed.
PySpark is simply the python API for Spark that allows you to use an easy programming language, like python, and leverage the power of Apache Spark.
ObjectiveMy interest in putting together this example was to learn and prototype.
More specifically, learn more about PySpark pipelines as well as how I could integrate deep learning into the PySpark pipeline.
I ran this entire project using Jupyter on my local machine to build a prototype for an upcoming project where the data will be massive.
Since I work for IBM, I’ll take this entire analytics project (Jupyter Notebook) and move it to IBM.
This allows me to do my data ingestion, pipelining, training and deployment on a unified platform and on a much larger Spark cluster.
Obviously, if you had a real and sizable project or using image data you would NOT do this on your local machine.
Overall, I found it not too difficult to put together this prototype or working example so I hope others will find it useful.
I’ll break down this project into 9 steps.
A lot of steps will be self explanatory, but for others I’ll try and make it painless.
If you want to see just the notebook with explanations and code you can go directly to GitHub.
If I can do this, so can you!Step 1Import LibrariesFigure 1 — 3 Libraries Here: PySpark (Install Spark), Keras, and ElephasStep 2Start Spark SessionYou can set a name for your project using setAppName() and also set how many workers you want.
I’m just running this locally and I set it to a possible 6 workers.
Just a warning, if your data isnt very large then distributing the work, specifically when doing machine learning, may actually be less helpful and provide worse results.
When I do my training below I set it to 1 worker, but when I use this prototype for a later project I’ll change those settings.
Figure 2 — Spark SessionStep 3Load and Preview Data with PysparkHere we’ll load the data.
The data we’ll use comes from a Kaggle competition.
It’s a typical banking dataset.
I use the inferSchema parameter here which helps to identify the feature types when loading in the data.
Per the PySpark documentation this ”requires one extra pass over the data”.
Since the bank data I’m loading only has ~11k observations it doesn’t take long at all, but it may be worth noting if you have a very large dataset.
Figure 3 — Load DataAfter we load the data we can see the schema and the various feature types.
All our features are either string type or integer.
We then preview the first 5 observations.
I’m pretty familiar with with Pandas python library so through this example you’ll see me use toPandas() to convert the spark dataframe to a pandas dataframe and do some manipulations.
Not right or wrong, just easier for me.
Figure 4 — View Schema and Preview DataframeFinally, we’ll drop the 2 date columns since we won’t be using those in our deep learning model.
They could be possibly significant and featurized by I decided to just drop them all together.
Figure 5 — Drop ColumnsStep 4Create the Spark Data PipelineNow we create the pipeline using PySpark.
This essentially takes your data and, per the feature lists you pass, will do the transformations and vectorizing so it is ready for modeling.
I referenced the “Extracting, transforming and selecting features” Apache Spark documentation a lot for this pipeline and project.
Below is a helper function to select from the numeric features which ones to standardize based on the kurtosis or skew of that feature.
The current defaults for upper_skew and lower_skew are just general guidelines (depending where you read), but you can modify the upper and lower skew as desired.
Figure 6 — Select Features to Standardize FunctionCreate PipelineNow we’ll get into the actual data pipeline.
The feature list selection part can be further enhanced to be more dynamic vs listing out each feature, but for this small dataset I just left it as is with cat_features, num_features, and label.
Selecting features by type can be done similar to how I did it in the select_features_to_scale helper function using something like this list(spark_df.
columns) which would return a list of all the columns in your spark dataframe that are object or string type.
The first thing we want to do is create an empty list called stages.
This will contain each step that the data pipeline needs to to complete all transformations within our pipeline.
I print out each step of the stages after the pipeline so you can see the sequential steps from my code to a list.
The second part is going to be a basic loop to go through each categorical feature from our list cat_features and then index and encode those features using one-hot encoding.
StringIndexer encodes your categorical feature to a feature index with the highest frequency label (count) as feature index 0 and so on.
I will preview the transformed data frame after the pipeline, Step 5, where you can see each feature index created from the categorical features.
For more information and a basic example of StringIndexer check the here.
Within the loop we also do some one-hot encoding (OHE) using the OneHotEncoderEstimator.
This function only takes a label index so if you have categorical data (objects or strings) you have to use StringIndexer so you can pass a label index to the OHE estimator.
One nice thing I found from looking at dozens of examples was that you can chain StringIndexer output right into the OHE estimator using string_indexer.
If you have a lot of features to transform you’ll want to do some thinking about the names, OutputCol, because you can’t just overwrite feature names so get creative.
We append all those pipeline steps within our loop into our pipeline list stages.
Next we use StringIndexer again on our label feature or dependent variable.
And then we’ll move on to scaling the numeric variables using the select_features_to_scale helper function from above.
Once that list is selected, we’ll vectorize those features using VectorAssembler and then standardize the features within that vector using StandardScaler.
Then we append those steps to our ongoing pipeline list stages.
The last step is just assembling all our features into a single vector.
We’ll find the numeric features from the list num_features that were not scaled by just using the difference between our unscaled_features (the name of the selected numeric feature TO scale) list and the original list of numeric features num_features.
Then we assemble or vectorize all the categorical OHE features and numeric features and add that step to our pipeline stages.
And finally, we add in the scaled_features to our assembled_inputs to get a final and single vector of features for our modeling.
Figure 7 — Spark Data PipelineWe can see all the steps within our pipeline by looking at our stages list that we’ve been sequentially adding.
Figure 8 — Data Pipeline list ‘stages’Step 5Run Data Through the Spark PipelineNow that the ”hard” part is over we can simply pipeline the stages and fit our data to the pipeline by using fit().
Then we actually transform the data by using transform.
Figure 9 — Pipeline, Fit, and Transform the DataWe can now preview our newly transformed PySpark dataframe with all the original and transformed features.
This can be better viewed in the notebook on GitHub but Figure 10 shows some of the indexes, assembled vectors, and our label index.
Figure 10 — Preview of Newly Transformed DataframeStep 6Final Data Prep before Deep Learning ModelThere are couple last and quick things we need to do before modeling.
First is to create a PySpark dataframe that only contains 2 vectors from the recently transformed dataframe.
We only need the: features (X) and label_index (y) features for modeling.
It’s easy enough to do with PySpark with the simple select statement.
Then, just cause, we preview the dataframe.
Figure 11 — Select Final Features and LabelFinally, we want to shuffle our dataframe and then split the data into train and test sets.
You always want to shuffle the data prior to modeling to avoid any bias from how the data may be sorted or otherwise organized and specifically shuffling prior to splitting the data.
Figure 12 — Order by Random (Shuffle) and Split DataStep 7Build a Deep Learning Model with KerasWe’ll now build a basic deep learning model using Keras.
Keras is described as: ”a high-level neural networks API, written in Python and capable of running on top of TensorFlow, CNTK, or Theano.
” in the Keras documentation.
I find Keras to be one of the easiest deep learning APIs for python.
Also, I found an extension of Keras that allowed me to do easy distributed deep learning on Spark that could integrate with my PySpark pipeline so it seemed like a great choice.
First we need to determine the number of classes as well as the number of inputs from our data so we can plug those values into our Keras deep learning model.
Figure 13 — Number of Classes and Inputs for ModelNext we create a basic deep learning model.
Using the model = Sequential() feature from Keras, it’s easy to add layers and build a deep learning model with all the desired settings (# of units, dropout %, regularization — l2, activation functions, etc.
) I selected the common Adam optimizer and the sigmoid activation with binary cross-entropy for our loss since out outcome label is binary .
Figure 14 — Keras DL ModelOnce the model is built we can view the architecture.
Notice that we went from 30 inputs/parameters to 74,242.
The beauty, and sometimes laziness :), of deep learning is the automatic feature engineering.
Figure 15 — Model Summary / ArchitectureStep 8Distributed Deep Learning with ElephasNow that we have a model built, using Keras as our deep learning framework, we want to run that model on Spark to leverage its distributed analytic engine.
We do that by using a python library and an extension to Keras called Elephas.
Elephas makes it pretty easy to run your Keras models on Apache spark with few lines of configuration.
I found Elephas to be easier and more stable to use than the several other libraries I read about and tried.
The first thing we do with Elephas is create an estimator similar to some of the PySpark pipeline items above.
We can set the optimizer settings right from Keras optimizer function and then pass that to our Elephas estimator.
I only explicitly use Adam optimizer with a set learning rate, but you can use any Keras optimizer with their respective parameters (clipnorm, beta_1, beta_2, etc.
Then within the Elephas estimator you specify a variety of items: features column, label column, # of epochs, batch size for training, validation split of your training data, loss function, metric, etc.
I just used the settings from an Elephas example and modified the code slightly.
I’m only using 1 worker for this since my data is small.
Also, when I tried running across the 6 workers (Step 2) the results were poor.
The lesson is: just because you can distribute doesn’t mean you should :)Figure 16 — Elephas Estimator for Distributed Deep LearningNotice that after we run the estimator the output, ElephasEstimator_31afcd77fffd, looks similar to one of our pipeline stages list items.
This can be passed directly into our PySpark pipeline to fit and transform our data, which we’ll do in the next step!Step 9Distributed Deep Learning Pipeline and ResultsNow that are deep learning model is to be run on Spark, using Elephas, we can pipeline line it exactly how we did above using Pipeline().
You could append this to our stages list and do all of this with one pass with a new dataset now that it’s all built out which would be super cool!Figure 17 — Easy DL Pipeline with PySparkI created another helper function below called dl_pipeline_fit_score_results that takes the deep learning pipeline dl_pipeline and then does all the fitting, transforming, and prediction on both the train and test data sets.
It also outputs the accuracy for both data sets and their confusion matrices.
Figure 18 — Deep Learning Pipeline Helper FunctionLet’s use our new deep learning pipeline and helper function on both data sets and test our results!.From below you can see we can about ~80% accuracy on both train and test data so the model seems to be generalizing well enough.
I admit I’m using a sledge hammer model where a much smaller ‘tool’ like a basic decision tree may do better.
However, we now have a working flow and prototype to bring in tabular data, pass that data through pipeline of transforms, and apply deep learning.
Figure 19 — Run DL Pipeline Helper Function, View ResultsConclusionI hope that this example has been helpful.
I know it was for me in learning more about PySpark pipelines and doing deep learning on spark using an easy deep learning framework like Keras.
Like I mentioned, I ran all this locally with little to no issue, but my main objective was to prototype something for an upcoming project that will contain a massive dataset.
My hope is that you (and myself) can use this as a template while making few changes to things like the spark session, data, feature selection, and maybe adding or removing some pipeline stages.
As always, thanks for reading and good luck on your next project!.