Benchmarking Python Distributed AI Backends with WordbatchA comparison of the three major backend schedulers: Spark, Dask and RayAntti PuurulaBlockedUnblockFollowFollowingJun 23Towards Distributed Artificial IntelligenceOver the last few years Python has become the lingua franca of data science and artificial intelligence, with all of the prominent deep learning frameworks (Keras, Pytorch, MXNet) using Python as their main interface language.
Compared to competing languages, Python rivals or exceeds in almost every aspect of DS&AI: latest machine learning algorithms and their efficient implementations (Scikit-Learn, LightGBM, XGBoost), data manipulation and analysis (Pandas, cuDF), efficient numerical computation libraries (Numpy, PyPy, Numba), GPU computing (CuPY), and web API programming (Flask, Celery).
The Achilles heel of Python is its weakness in parallel multi-threaded and multi-process workloads, due to the Global Interpreter Lock (GIL) as a part of its core design.
This has produced workaround solutions within the Python camp, as well as alternative languages with more emphasis on parallelism, such as GoLang.
The need for parallelism has been accelerated by an ongoing arms-race period in hardware: consumer CPUs have gone from 4 cores to 32 cores (AMD 2990WX) in a matter of a few years, and affordable cloud computing nodes now provide 224 cores each (Amazon u-6tb1.
For AI the need for parallelism is not just with single workstations or computing nodes, but for orchestrating AI processing pipelines distributed across potentially thousands of computing nodes.
Similarly to the changes in CPU cores, network transfer speeds have gone from 1 Gb/s to commodity 10–100 Gb/s connections for both local and cloud use.
Until recently much of this type of Big Data technology was based on Java frameworks such as Hadoop, but the changes in both software and hardware have brought about new types of solutions, including three major Python distributed processing frameworks for AI: PySpark, Dask, and Ray.
Distributed Batch Processing FrameworksApache Spark and its Python interface PySpark is the oldest of the frameworks, with initial GitHub version going back to 4th Oct 2010.
Spark established itself as one of the main Big Data technologies, gaining wide adoption in the enterprise world.
It provides an extension of the Map-Reduce programming paradigm, which solves batch-processing tasks by mapping a larger task into set of mini-batches distributed to workers (Map), and combines the results after each mini-batch has completed (Reduce).
Spark processes Directed Acyclic Graphs (DAG) of Map-Reduce computing pipelines, keeping data distributed across workers throughout processing a DAG.
The task graph is functionally defined, and tasks are executed lazily after optimizing the DAG computation order.
Both Dask and Ray build on Spark’s central idea of concurrent functional evaluation of a DAG, with data kept distributed throughout the process.
Dask and its scheduler backend Distributed is a more recent framework, with original GitHub version on 29 Jan 2015.
Whereas Spark was written for Java and Scala, Dask is written for Python and provides a rich set of distributed classes.
Dask also provides a richer lower-level API, with support for actor classes that are crucial for distributed training of AI models.
Ray is the latest framework, with initial GitHub version dated 21 May 2017.
Like Dask, Ray has a Python-first API and support for actors.
It has several high-performance optimizations that make it more efficient.
Unlike Spark and Dask, tasks are executed eagerly within each node, so that each worker process starts as soon it receives the data it needs.
Data within a worker node is stored using Apache Arrow objects, that provide zero-copy sharing of objects between all processes that work on the node.
The worker nodes have their own local schedulers, further reducing the overhead to the global scheduler.
WordbatchThe three frameworks differ widely in the design and implementation of their scheduler engines: serialization, transfer, scheduling, need for configuration, memory requirements etc.
It is difficult, if not impossible, to say for a given complex task which engine would work the best.
For some tasks a specific framework wouldn’t work at all.
Spark lacks actors, complicating large-scale training of models.
Dask doesn’t serialize complex dependencies.
Ray result store can’t store some very basic Python objects, such as collections.
Therefore, both for performance and feasibility it’s useful to test each framework for a given task, and choose one that works.
Wordbatch library v.
4 does batch-processing of pipelines, using swappable scheduler backends.
Its orchestrator class Batcher keeps a reference to a backend handle, and handles mapping of tasks into mini-batches and reducing the results.
The scheduler backend is swappable, so that if one backend can’t handle a processing task, any other backend can be swapped simply by replacing the Batcher object’s backend and backend_handle attributes.
It supports both local (serial, threading, multiprocessing, Loky) and distributed backends (Spark, Dask, Ray).
The distributed frameworks are similarly called, with data kept distributed throughout the pipeline, when possible.
Wordbatch also comes with a set of pipelines and classes that provide a full set of tools for text-based machine learning, and can serve as templates for processing in other domains.
The Wordbatch classes can independently call Map-Reduce operations on Batcher if needed, and support both distributed storage throughout a pipeline, and stream processing with fit_partial() -methods.
Benchmark SetupWe can use Wordbatch as a neutral benchmark to test the three distributed frameworks, as well as the non-distributed backends as baselines.
To keep the comparison simple, we’ll use two basic pipelines under two hardware setups.
Both tasks use up to 1.
28M reviews from the TripAdvisor review dataset http://times.
Full version of the benchmark script is available at https://github.
For both tasks the test script initializes Batcher as follows:from wordbatch.
batcher import Batcherbatcher = Batcher(procs=16, minibatch_size=5000, backend=backend, backend_handle=backend)Here “procs” are the number of processes used, “minibatch_size” is the number of data rows to be processed in each mini-batch, “backend” is the name of the backend, and “backend_handle” gives an API handle for Batcher to communicate.
The first pipeline ApplyBatch runs a Scikit-learn HashingVectorizer on each mini-batch of reviews and returns the reduced sparse matrix of hashed features.
text import HashingVectorizerfrom wordbatch.
pipelines import ApplyBatchhv = HashingVectorizer(decode_error=’ignore’, n_features=2 ** 25, preprocessor=normalize_text, ngram_range=(1, 2), norm=’l2')output = ApplyBatch(hv.
transform(texts_chunk)The second pipeline WordBatch is a full text processing pipeline, performing consecutive steps of 1) text normalization, 2) spelling correction and stemming, 3) dictionary counting, 4) bag-of-word feature extraction and TF-IDF weighting.
Both spelling correction and dictionary counting steps perform their own Map-Reduce operations to compute word frequency tables, and spelling correction and feature extraction steps require sending a dictionary to each worker.
pipelines import WordBatchfrom wordbatch.
extractors import WordBagfrom wordbatch.
transformers import Tokenizer, Dictionarywb = WordBatch(normalize_text=normalize_text, dictionary=Dictionary(min_df=10, max_words=1000000), tokenizer=Tokenizer(spellcor_count=2, spellcor_dist=2, raw_min_df= 2, stemmer=stemmer), extractor=WordBag(hash_ngrams=0, norm= ‘l2’, tf= ‘binary’, idf= 50.
0), batcher=batcher)output = wb.
fit_transform(texts_chunk)The first hardware setup is using a single i9–9900K CPU with 8 cores and 64GB DDR4 RAM that can handle all of the covered tests.
The second setup connects an additional worker node with 18-core i9–7980XE CPU, using a direct 10 Gb/s ethernet connection.
OS used was Ubuntu 18.
2 LTS, and library versions were pyspark 2.
1, ray 0.
0 and distributed 1.
Distributing Scikit-Learn HashingVectorizer on a single nodeFor the simple task of parallelizing HashingVectorizer on a single node, all parallel frameworks get roughly linear speedup compared to running a single serial process.
Serial takes 256s for the largest case of 1.
28M documents, while multiprocessing takes 36s.
Interestingly, Ray is actually faster than multiprocessing, taking 33s, while Spark takes 50s.
Distributing the WordBatch feature extraction pipeline on a single nodeThe more complex task with WordBatch pipeline shows surprising results.
Spark, Ray and multiprocessing show again linear speed ups that stay constant with increasing data, but both Loky and Dask have trouble parallelizing the task.
Compared to serial taking 460s for 1.
28M documents, Ray does this again fastest in 91s.
Loky and Dask both have increasing time uses that converge roughly on same time use as serial, but with increasing data sizes would likely exceed serial time use.
The likely reason for this odd behaviour is the lack of sharing between processes and the need in this task to twice send dictionaries to each worker.
The dictionaries become larger with more data, and the overhead of not efficiently sharing auxiliary data exceeds the benefit of parallelization.
This is a surprising result, especially since sending shared data to workers is a basic operation in AI pipelines.
Distributing HashingVectorizer using an additional nodeMoving on to the second hardware setup with an additional 18 cores over 10 Gb/s, all three distributed benefit from the additional node.
However, Spark has trouble with the largest 1.
28M document task, due to larger memory requirements and operating close to the configured memory limits.
In practice Spark requires considerable configuration of its components, which has been a source of frustration for its users.
At best, the additional node gives 22% speedup for Spark.
Dask and Ray perform much better, with 32% speedup for Dask and 41% for Ray at 1.
The speedups compared to single node also keep increasing with data sizes, and seem nowhere near saturation at the maximum tested size.
Distributing WordBatch pipeline with an additional nodeTesting the WordBatch pipeline with the additional node, we see that Dask doesn’t gain much.
Its problem of efficiently dealing with auxiliary data seems to compound when using additional nodes, so that at the largest 1.
28M document condition we get only a speed up from 457s to 420s, and the speedup constantly decreases with larger tasks.
Both Spark and Ray can use the additional node better in this task, with the maximum speedups of 38% for Spark and 28% for Ray, at 0.
Due to the better use of the additional node, Spark with the additional node performs virtually on par with Ray, and could very well exceed it with larger data sizes and a mode complex processing pipeline.
Concluding ThoughtsThese basic benchmarks demonstrate some of the main properties of the distributed schedulers.
All of the schedulers are useful for distributing Python workloads, but some are not suitable for every task.
Actual applications would involve more complex pipelines on large clusters, but this would complicate straightforward comparisons due to: choices in configuring the schedulers, design decisions on how to implement shared data and remote classes such as actors, and how to use GPUs and other non-CPU processors.
As a preliminary conclusion Ray seems to be the most promising framework.
It works around 10% faster than Python standard multiprocessing on a single node, and uses the additional node well in all conditions.
Unlike Spark, cluster configuration is very minimal, and it has support for actors.
Unlike Dask, it serializes nested Python object dependencies well, and shares data between processes efficiently, scaling complex pipelines linearly.
Compared to a single serial process, Ray with an additional node provided 12.
9x speedup distributing HashingVectorizer, and 6.
7x speedup on the more complex task.
The available hardware also has a large impact on the performance of the schedulers.
If a 1 Gb/s connection was used here, there would be close to no advantage from the additional node.
100 Gb/s over the 10 Gb/s would increase the benefit from additional nodes, as well as change the results between the tested backends.
Dask in particular would benefit more from 100 Gb/s, compared to Ray.
If data was ingested from a distributed storage as Spark does with Hadoop, this would somewhat reduce the dependency from a high bandwidth network.
However, most actual pipelines will do data transfers requiring high bandwidth.
With more nodes these frameworks should both use 100 Gb/s, and the AI pipeline should be planned as to minimize network traffic and maximize use of the distributed cores.
Contribute to apache/spark development by creating an account on GitHub.
comdask/daskParallel computing with task scheduling.
Contribute to dask/dask development by creating an account on GitHub.
comray-project/rayA fast and simple framework for building and running distributed applications.
comanttttti/WordbatchPython library for distributed AI processing pipelines, using swappable scheduler backends.
comComparison to Spark – Dask 1.
2 documentationThey can both deploy on the same clusters.
Most clusters are designed to support many different distributed systems at…docs.
orgComparison to dask · Issue #642 · ray-project/rayray looks like an interesting project!.I see some similarities to dask (http://dask.
org), especially for ad-hoc…github.