S3 is NOT a file system
Apache Spark comes with the built-in functionality to pull data from S3 as it would with HDFS using the SparContext’s
textFiles allows for
glob syntax, which allows you to pull hierarchal data as in
Though this seems great at first, there is an underlying issue with treating S3 as a HDFS; that is that S3 is not a file system.
Though it is common to organize your S3 keys with slashes (
/), and AWS S3 Console will present you said keys is a nice interface if you do, this is actually
misleading. S3 isn’t a file system, it is a key-value store. The keys
2015/05/02 do not live in the “same place”. They just happen to have
a similar prefix:
This causes issues when using Apache Spark’s
textFiles since it assumes that anything being put through it behaves like an HDFS.
The Kinja Analytics team runs an Apache Spark cluster on AWS EMR continuously. As soon as a EMR step finishes, a job adds the next step. The jobs on the cluster
pull data from S3 (placed there using our event stream), runs multiple computations on that data set and persist the data into a MySQL table. The data in S3 is stored
in chronological order as
data.txt file is about 10MB large.
Originally we were pulling the data using SparkContext’s
textFiles method as such
sc.textFiles(s3n://bucket/events/*/*/*/*/*/*). This worked fine at first but
as the dataset grew we noticed that there would always be a large period of inactivity between jobs.
When the periods were as long as 3 hours, we figured something was wrong. At first we thought that simply adding more machines would solve this (since that’s how you are supposed to speed up Spark/Hadoop), but when that failed, we dug deeper.
Using Ganglia graphs, we noticed that during that time only one of the boxes was actually doing any work (which explains why adding more boxes did nothing). This box was the driver for that given application. We went ahead and looked at the logs for the driver and noticed something peculiar (NOTE: The logs that EMR places in S3 are behind, so you would need to wait for your application to finish before seeing the complete logs. If you want live logs you need to log into the machine).
... 15/05/05 21:43:26 INFO s3n.S3NativeFileSystem: listStatus s3n://kinesis-click-stream-us-east-1-bucket-4myg4if5x7au/events/2015/03/21/20 with recursive false 15/05/05 21:43:26 INFO s3n.S3NativeFileSystem: listStatus s3n://kinesis-click-stream-us-east-1-bucket-4myg4if5x7au/events/2015/03/21/21 with recursive false 15/05/05 21:43:26 INFO s3n.S3NativeFileSystem: listStatus s3n://kinesis-click-stream-us-east-1-bucket-4myg4if5x7au/events/2015/03/21/22 with recursive false 15/05/05 21:43:26 INFO s3n.S3NativeFileSystem: listStatus s3n://kinesis-click-stream-us-east-1-bucket-4myg4if5x7au/events/2015/03/21/23 with recursive false 15/05/05 21:43:26 INFO s3n.S3NativeFileSystem: listStatus s3n://kinesis-click-stream-us-east-1-bucket-4myg4if5x7au/events/2015/03/21/00/01 with recursive false 15/05/05 21:43:26 INFO s3n.S3NativeFileSystem: listStatus s3n://kinesis-click-stream-us-east-1-bucket-4myg4if5x7au/events/2015/03/21/00/02 with recursive false 15/05/05 21:43:26 INFO s3n.S3NativeFileSystem: listStatus s3n://kinesis-click-stream-us-east-1-bucket-4myg4if5x7au/events/2015/03/21/00/04 with recursive false 15/05/05 21:43:26 INFO s3n.S3NativeFileSystem: listStatus s3n://kinesis-click-stream-us-east-1-bucket-4myg4if5x7au/events/2015/03/21/00/05 with recursive false ...
This command, listStatus, seems to be recursively calling each “folder” (each * in the glob syntax we passed into
textFiles). In a normal file system, that glob syntax is fast enough to not care
but S3 is over the network and each * call does a “get all with prefix” call, which isn’t negligible (remember, S3 is a key-value store.
2015/01/02 do not live in the same place).
To make matters worse, all of those calls are done sequentially in the driver before being passed to the workers. This means that as more and more data is inserted into S3, the longer
it will take each job to actually start working.
The solution is quite simple: do not use
textFiles. Instead use the AmazonS3Client to manually get every key (maybe with a prefix), then parallelize the data pulling using SparkContext’s
parrallelize method and said AmazonS3Client.
Above, we get all of the keys for a bucket and a prefix (events) and parallelize all of the keys (give them to the workers/partitions) and make each worker pull the data for that one key.
After this change, our “time of inactivity” went down to a couple of minutes!
And elapsed times went from 4 hours to 1 hour: