1.1 Billion Taxi Rides: Spark 2.4.0 versus Presto 0.214

$ hdfs dfsadmin -report | grep 'Configured Capacity' Configured Capacity: 1480673034240 (1.35 TB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) The following will create folders on HDFS for each dataset format as well as copy the taxi trip files into those folders..$ hdfs dfs -mkdir /trips_orc/ $ hdfs dfs -copyFromLocal /mnt2/orc/* /trips_orc/ $ hdfs dfs -mkdir /trips_parquet/ $ hdfs dfs -copyFromLocal /mnt2/parquet/* /trips_parquet/ After the data was loaded onto HDFS I can see it has been distributed somewhat evenly across the cluster..$ hdfs dfsadmin -report | grep 'DFS Used%' | tail -n +2 | sort DFS Used%: 38.97% DFS Used%: 39.61% DFS Used%: 39.75% DFS Used%: 39.84% DFS Used%: 40.02% DFS Used%: 40.17% DFS Used%: 40.45% DFS Used%: 40.78% DFS Used%: 40.92% DFS Used%: 41.08% DFS Used%: 41.18% DFS Used%: 41.62% DFS Used%: 41.82% DFS Used%: 42.21% DFS Used%: 42.38% DFS Used%: 42.51% DFS Used%: 42.67% DFS Used%: 42.79% DFS Used%: 44.35% DFS Used%: 44.98% You might wonder why I havent used erasure coding as it only needs half the space of 3x replication..Erasure coding is a feature only found in Hadoop 3 and AWS EMR doesnt support Hadoop 3 yet..Below is a spot check to make sure one of the files has been replicated 3 times without issue..$ hadoop fsck /trips_parquet/000000_0 -files -blocks -racks Status: HEALTHY Total size: 2079571240 B Total dirs: 0 Total files: 1 Total symlinks: 0 Total blocks (validated): 16 (avg. block size 129973202 B) Minimally replicated blocks: 16 (100.0 %) Over-replicated blocks: 0 (0.0 %) Under-replicated blocks: 0 (0.0 %) Mis-replicated blocks: 0 (0.0 %) Default replication factor: 3 Average block replication: 3.0 Corrupt blocks: 0 Missing replicas: 0 (0.0 %) Number of data-nodes: 20 Number of racks: 1 Setting Up Hive Tables Ill launch Hive and create schemas for both datasets..This will allow both Spark and Presto to see the datasets as tables they can run SQL against..$ hive CREATE EXTERNAL TABLE trips_parquet ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS parquet LOCATION '/trips_parquet/'; CREATE EXTERNAL TABLE trips_orc ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS orc LOCATION '/trips_orc/'; Spark SQL on EMR Benchmark Results The following were the fastest times I saw after running each query multiple times..$ spark-sql These four queries were run on the ORC-formatted dataset..The following completed in 2.362 seconds..SELECT cab_type, count(*) FROM trips_orc GROUP BY cab_type; The following completed in 3.559 seconds..SELECT passenger_count, avg(total_amount) FROM trips_orc GROUP BY passenger_count; The following completed in 4.019 seconds.. More details

Leave a Reply