Giving Your Algorithm a SparkJörg SchneiderBlockedUnblockFollowFollowingMay 16by Jörg Schneider and Jens OrtmannCluster computing is quickly gaining traction across all industries.
More and more companies have access to distributed computing power in the cloud.
Some are even setting up their own clusters.
While this offers huge new opportunities for complex analyses, it begs the question of how to proceed with existing algorithms: How can existing algorithms be brought up to speed within a cluster?Getting an algorithm ready for distributed computing can come with massive runtime improvements (from 10–100x faster).
It might also be mandated to decommission local server solutions and to consolidate the execution of data processing and algorithms in the cluster.
The catch is that an algorithm developed to run on a single computer does not run efficiently in the cluster.
To do so, it must be ported to Spark or a similar framework.
In this article, we focus on the case where the algorithm is implemented in Python, using common libraries like pandas, numpy, sklearn.
The techniques might apply similarly to algorithms written in R.
We assume further that computations in the cluster run in Spark.
A complete re-implementation of the algorithm is usually out of the question.
It requires additional work and new skills, is error prone, and is difficult to validate because a 1:1 replication of the original implementation is not possible.
Instead, the algorithm is often deployed as it is and runs on a single node in the cluster.
Typically, this execution happens on the edge node, which is the most easily accessed node in the cluster.
However, this creates an unnecessary strain on the edge node and leaves available resources in the cluster unused (in fact, this goes against the very paradigm of distributed computing).
As a remedy, we discuss here two simple but effective methods to port an existing algorithm to Spark:1.
Programming a custom Spark function in Python (a PySpark UDF): Spark UDFs are suited for tasks with limited complexity and few dependencies but high repetition.
Partitioning the dataset for cluster-enabled multiprocessing: This method is suited when the dataset can be split and the subsets be scheduled independently for computation.
The two methods allow the algorithm to make use of the available resources in the cluster, in the process creating massive runtime improvements.
In practice, these methods can even be combined.
The basics of cluster computing with Hadoop, Hive and SparkIn this first section, we present a high-level introduction to Hadoop, Hive and Spark as they pertain to this article.
Readers familiar with these topics are welcome to skip ahead.
Apache Hadoop is the de-facto standard framework for cluster computing and large-scale analytical processing.
Its core-components are (since Hadoop 2.
0):· Yet Another Resource Negotiator (YARN): YARN is a cluster-management system to handle resource allocation, job scheduling and job tracking.
· Hadoop Distributed File System (HDFS): HDFS stores files across the nodes in the cluster.
The physical location is hidden from the user and data is stored redundantly.
There is, however, an easy folder/path like syntax to access the files on HDFS.
Physically, a Hadoop cluster is made up of servers, called nodes.
These can be separated into the following types:· Edge node (also called workflow nodes): The edge node acts as entry point, to which users can connect and from which to submit jobs.
Edge nodes do not carry out YARN jobs.
· Name node (also called head node): The name node runs the YARN resource manager service and the HDFS service.
For high availability, a standard practice since Hadoop 2.
0 is to run a second name node as a failover.
· Data node (also called worker node): Data nodes provide storage for HDFS and computing resources (CPU cores & memory) to YARN jobs.
Computing on Hadoop clusters is done via processing engines, which leverage the core components YARN and HDFS.
Hive and Spark are the most relevant frameworks today:· Apache Hive stores schema, table and column semantics.
Hive enables the data in HDFS to be queried in an SQL dialect.
As a processing engine, it is thus more comparable to a database engine.
· Apache Spark is an execution engine that can run on Hadoop.
Internally Spark jobs are executed in Java and Scala (running on a JVM), but there are APIs for Python and R (PySpark and SparkR).
Spark provides its own implementation of data frames with similar functionality as Pandas in Python, but the frames are distributed instead of locally.
With Spark data frames, the computation is executed where the data resides (query shipping), while in pandas the data goes where the (local) execution resides (data shipping).
Since the data is shipped and serialized during the conversion, converting a Spark dataframe to a Pandas dataframe can take a significant amount of time.
Programming a custom spark functionThe least invasive way to leverage cluster computing for algorithms is by writing your own PySpark user-defined function (UDF).
Coming from Python, think of it as vectorization or a lambda apply that uses Spark: A small part of the algorithm is wrapped into a self-contained function, which is registered in Spark and evaluated on your data.
The UDF itself runs with as high a parallelization as your cluster resources and data allow.
Using this approach in real-world projects, we have achieved improvements in end-to-end execution time from 10–100x.
To use this approach, one can rely on the following steps:1.
Choose UDF candidate(s): Which part of your algorithm is computationally heavy, but can be isolated from the rest?.This should be a portion that is executed many times (over varying data) or that can be structured as such.
Define parameters and return datatype: Spark data frames support different datatypes from Python.
Default numerical primitives, booleans and strings work across both platforms.
Complex datatypes (e.
objects) can be used by pickling them to a base64-encoded string.
The string can be passed as primitive and the original data can be restored inside the UDF (likewise if a complex type is returned from the UDF).
Define data scope of the UDF: The Spark Python UDF will be a scalar function that is evaluated row-wise on a Spark data frame.
Each row should contain a unique ID.
In most scenarios, a purely scalar UDF is very limited.
Thus, passing complex data encoded as string is a neat way to pass numpy arrays, vectors or own objects.
Spark in version >=2.
3 comes with Pandas UDFs (also called Vectorized UDFs), which rely on Apache Arrow.
These bring faster data exchange and offer both scalar and grouped evaluation.
In our experience, few Hadoop deployments are already using Spark 2.
3 and also have the Arrow package on them.
Manage dependencies: The UDF will run distributed on data nodes outside your control.
If your UDF needs Python libraries, these must be either available on each data node or submitted alongside the Spark job as a zip file.
The UDF should not interact with the local filesystem (only exception: temp files) or the network.
Try to extract these parts out of it.
To illustrate when and how a PySpark UDF can be applied, we built an example around the New York city taxi cab dataset (all trips in 12/2018 where customers gave a tip).
We cleaned the data; derived some features like weekday and hour of the trip, trip distance, and trip duration; and transformed the tip amount into three categories: Low, Normal and High.
Finally, we grouped trips by their pickup and drop-off “taxi zones” (New York has around 260 such zones) and proceeded only with those (pickup, drop-off) routes, in which at least 500 trips occurred.
This gave us roughly 4.
5 million trips to work with, in about 2000 groups.
The resulting data looks as follows:Now, imagine we want to compute something individually for each group — a process that is computationally expensive.
For example, every taxi driver likes a good tip, so let’s compute where and when the driver can expect one.
For this we could train and test an expensive classifier (like SVM) that aims to predict the tip category based on the other attributes.
(SparkML currently does not support non-linear SVM as shown here — so a complete migration to Spark is not trivial.
)Let’s assume we have defined this computation as a Python function built for local execution.
It covers all steps from data scaling up until validation and, when done, returns the measured accuracy as well as the built classifier object:Usually, we would now use this function over all data, like this:This computation takes around 60 minutes on a computer (standard 2018 laptop), since SVMs complexity is quadratic to the input data.
Fortunately, this is an ideal candidate to leverage a PySpark UDF!We start by defining the PySpark UDF, which encapsulates everything train_and_test() is currently doing.
For that, we need to serialize complex Python objects and encode them as strings so they can be passed to PySpark.
For this purpose, we define the following two helper functions (using the base64 and pickle packages):The UDF function that PySpark will use is declared as follows:Note that we simply reuse train_and_test(): All we have to take care of is the conversion of our feature matrix into a numpy array (which we do inside of the UDF to avoid the need to also encode it before loading it to PySpark) and the encoding of the results (accuracy score and trained classifier object).
The PySpark UDF can then be registered by pointing to the previously defined Python function and declaring its return type:Starting again with the Pandas data frame “model_data_df”, we can train and test the classifiers using the PySpark UDF like this:Running the modified code on our computer (locally on Spark, not even in the cluster!), decreases the runtime for the code block by 80% (12 minutes instead of 60) — and this includes the time to convert the data from a Pandas to a Spark data frame and back.
Since there are about 2,000 models to build, the potential to further increase performance by more parallelization on a cluster is much larger.
On the other hand, the logic inside the UDF can then be tweaked to get better model results — i.
doing a more expensive parameter search.
There are some limitations to using PySpark UDFs to move an algorithm to Spark.
If a lot of data needs to be exchanged between the Python and Spark processes, this creates an overhead and the combined runtime for data transfers can outweigh the runtime improvements from parallelization.
This applies generally to problems that are more I/O bound than CPU bound.
The approach works particularly well when there is a set of self-contained steps to execute, and when the computational cost is high compared to the need for data exchange.
For example, Spark UDFs can be a great choice for:- Time series: Such as for forecasting costs of parts or individual accounts.
Each row needs to contain the time series data for on part or account.
– Applying a trained ML model: Such as for determining churn probabilities.
Each row contains the complete feature set of a customer.
– Fitting a function: Such as for determining the reliability of a part or machine (predictive quality).
Each row contains information about the failure history.
– Market basket analysis: Such as for analyzing items shopped per each transaction.
Cluster-enabled distributed computing on partitioned datasetsIn this section, we show how YARN and PySpark can be suitable as a framework to package an existing Python algorithm for distributed computing in a Spark Cluster — without having to do a full rewrite to Spark.
Multiprocessing is a standard approach to speed up Python code.
The idea is to fork a sequential process into parallel subprocesses.
A main process manages the division and consolidation of the subprocesses.
Locally, this approach is limited by the number of processes a CPU (or GPU) can handle.
Distributed computing takes this approach into a cluster, with access to multiple servers and their CPUs.
This promises massive gains in runtime, though there is added complexity in using multiple servers: Resources in the cluster need to be managed, data must be passed along, processes need to be tracked and failed processes must be restarted.
With the following steps, you can package an existing Python algorithm and submit it for parallel execution to a cluster:1.
Identify a part of your algorithm suited for parallel processingWhenever your algorithm has subtasks that do not need full access to the input data, that part is a good candidate for parallel processing.
Imagine you have sales and stock data for eight markets and you train a predictive model on each market.
The model training of one market is independent of the others.
Instead of training the models one after the other, you can train them simultaneously.
By parallelizing the whole model training, the process takes only about as long as the slowest process.
In many cases, preprocessing steps are well-suited for parallel processing.
Wrap it in a processNext, you need to specify the task at hand as a process with a defined signature and parameters that describe the data the task needs as well as a main function that manages division into subtasks.
You can pass filter information to the process so the process can retrieve the subset of data needed.
Input and output of the process should rely on HDFS (or on Hive, for that matter) so access across the cluster is ensured.
Execute it distributed and in parallelTo leverage the subprocesses defined above, you need an entry point as part of your Python algorithm, followed by initial data checks.
For example, the algorithm retrieves row counts per market to characterize the data partitions.
Then a (local) coordinator process spawns the subprocesses.
The latter is done using the spark-submit operation in cluster mode.
Spark–submit makes it possible to send a code package for execution into the cluster.
This is the key aspect that makes it a distributed computation.
In this case, resources are managed by YARN, enabling you to scale the number of processes as high as your Hadoop cluster and data allows.
The coordinator process waits for the completion of the subprocesses and consolidates the results.
Now, we can introduce the steps outlined above in a brief, pseudo-code manner.
After all, your configuration/implementation might vary and the actual code might get too verbose.
To prepare the existing algorithm for distributed computing, we begin by defining the unit of work that should be carried out by each subprocess (1):As part of our data flow, we need only one other step — the combination of intermediate results created by the now-parallel executions of run_algo()(2):These two adjustments cover the core part of your algo.
The following additional functions are solely to orchestrate them for the distributed execution.
To get optimal speedup, we should not allocate units of work arbitrarily, but rather have a helper function look at the existing groups within our source data (by i.
row counting) and return a list of data filters.
Each data filter will later map onto a process and should target roughly the same amount of data so the workload is even (3):To start and handle our subprocesses, we need to define two additional helper functions (we recommend implementing them with Python’s inbuilt “subprocess” module) (4), (5):Finally, we will wrap the process together in the following way:· Define a shell-script to wrap around the “spark-submit” command: We need this to submit our Python script in either client (coordinator) or cluster (subprocess) modes, and to handle the zip-ing and attachment of dependencies.
It is also helpful if you need to set some other PySpark settings (memory, cores, etc.
) or environment variables before its start.
· Define an entry point within our main Python script to differentiate between coordinator and subprocess logic.
In its simplest form, the shell script start_algo.
sh looks like this:The entry point within algo.
py finally ties it all together (E):Cluster-enabled multiprocessing allows you to speed up your algorithm by a factor as high as the number of partitions you can create.
Since this approach can wrap around a large part of the Python algorithm, a large amount of existing code can be reused.
You only need to ensure that all the required libraries are available (at every data node) in the cluster, and that all the data is read and written from/to HDFS.
Since all the heavy lifting is done in data nodes and only the coordination takes place on the edge node, using distributed instead of local multiprocessing in the cluster avoids potentially overloading the edge node.
By packaging the code and using spark-submit to send it to the cluster for computing, YARN manages the parallel execution of your tasks.
The added complexity of distributed computing is already taken care of by the cluster, with minimal effort on the user side.
The approach works particularly well when your data has a natural grouping and the algorithm applies to each group individually, such as when you have different markets for a product or offering, or have multiple product lines, assembly lines or lines of business.
It also works when you apply different algorithms or algorithms with different parameters to the same data.
Benefits in real applicationsIn a recent BCG GAMMA project, we applied both techniques.
We modified critical and time-consuming pieces of an existing algorithm to reap the benefits of the cluster it was deployed on.
Through the large reuse of existing algorithm code, the algorithm remained essentially the same, making results from before and after the modification comparable and making the continued maintenance of the algorithm easy.
The modifications drove down the feature engineering step from six hours to less than one, and reduced the algorithm execution time from about 30 hours to less than three.
Overall, it reduced the runtime from 1.
5 days to something that can be easily computed in a morning, thereby creating a significant operational improvement to internal audit processes.
Notably, the runtime improvement was still bounded by internal restrictions of how many processes a user can start.
If deemed necessary, the administrator of the cluster can grant more processes, which would then drive down execution times even further.
In sum: An effective middle-ground solutionIn this article, we have presented two techniques that can be used to leverage cluster computing while keeping the majority of an existing codebase unchanged.
Typically, a full re-implementation in a framework such as PySpark is cleaner and, hence, preferable.
The challenge is that this process takes more time and effort to achieve, and increases the risk that different or faulty algorithmic behavior will be introduced by the migration — behavior that would then need to be controlled by extensive validation.
When considering their prerequisites and limits, the techniques we have presented can serve as a powerful middle ground to unlock the computing resources of a cluster.