The foray into PySpark continues. This week I’ve been tuning a PySpark + Kafka streaming app and read through a few excellent pointers on reasonable settings, e.g. Cloudera’s entries. That second one was particularly interesting while I tinkered with optimizing executors, and lead me to playing with this setup with dynamic allocation enabled:
- Use 5 cores per executor e.g.
--executor-cores 5. Let’s call it
coresPerExecutorfor this writeup.
- Figure out the maximum number of executors you plan on using. We’ll call this
- Figure out how much memory is available per node in your system. This is
- Set the memory per executor as measured in GB according to the formula
int(round(coresPerExecutor * (memPerNode - 1) / numExecutors)).
So where do these numbers come from? The first big one is the number of cores per executor, which is essentially the number of cores per JVM if you like. Intuitively you might just set this to the number of cores available on a given node, but Cloudera found that anything over 5 cores and you tend to hit HDFS and limit performance. So it’s a good starting point, but obviously you’d want to try playing with this number for your configuration.
Next up is the number of executors. If you’re lucky enough to be the sole occupant on your cluster, let me say I’m jealous and are you hiring? In this case, you can basically set this to whatever you want. If your use case is more like mine where you expect other jobs to be running at the same time, you might want to dial this back a bit to be a good neighbor. Again, you’ll probably need to experiment a bit to get a nice answer.
Item #3 should be the easiest number to come up with – if all your nodes are homogeneous it’s a single number. If they’re heterogeneous you’ll probably want to find the node with the least memory and use that.
Finally we get to figuring out how much memory to use per executor. Intuitively it might make sense that the memory per executor should be related to the ratio of cores per executor to the total (maximum) number of executors in the Spark application, but why lop off 1GB in the formula? Cloudera found that setting aside around 1GB for overhead (JVM/Spark) was appropriate so that’s what we’re doing in this formula as well. We then round and convert the float to an integer to pass to Spark in configuration.
I should point out that this formula tends to be a little on the conservative side, e.g. Cloudera’s example comes up with 19GB per executor and this formula comes up with 18GB. You might want to tinker with replacing
ceil and see how that goes.
Even if the calculation is a little conservative, the results so far have been promising. Anecdotally, I started with using 300 executors in the streaming app. After coming up with this formula I was able to drop that down to 150 with no reduction in throughput; dropping to 110 executors came with around a 10% drop. You could of course make the reasonable case that I started with a very poorly-optimized app and this was all low-hanging fruit, and I’d probably agree. But I think that having a back of the envelope calculation handy that maybe starts me off with a slightly-optimized application is still worth a look. 🙂