Spark HDFS data locality on Kubernetes

Author: Linus Meierhöfer (Trainee Machine Learning)

Data locality describes the process of moving computation close to data in order to avoid network traffic and improve overall throughput.

The Hadoop ecosystem as well as Apache Spark are emphasizing the importance of data locality for performant computation and are therefore implementing data locality where possible. The Hadoop Distributed File System (HDFS) provides a strong interface, enabling application to gather locality information about the data they requested. Projects like Apache Spark, which are designed to natively operate with HDFS, are then able to use this information to leverage their computational performance.
Unfortunately, when running containerized instances of HDFS and Spark on the cluster orchestrator Kubernetes, their default data locality implementations are broken.

This blog post describes the data locality implementations of Apache Spark and HDFS, why it is broken when running on Kubernetes and how we resolved it.

Spark HDFS data locality

When running Spark alongside HDFS, data locality generally consists of 3 layers/stages.
Every layer is responsible for another aspect of data locality.

The aim of the first layer is to ensure, that spark executor are created on HDFS-datanode-hosting hosts.
This is rather an obvious but essential prerequisite.

The second stage ensures, that spark tasks requiring specific data from HDFS are dispatched to executors, which are
located on the same hosts as the datanode holding this data.

Once all tasks have been scheduled correctly, the third stage has to secure that the executors are effectively using the data from their „most local“ datanode.

The problem when running on Kubernetes

At TIKI, our infrastructure relies on Kubernetes container orchestration.
When speaking about data locality on Kubernetes, we mean node-locality referring to Kubernetes worker nodes.

The importance of the first stage heavily relies on the structure of the referred cluster.
If you can assure, that the majority of spark executors gets allocated alongside datanodes, this stage may be ignored.
Otherwise, this issue may be solved by setting k8s node-labels on datanodes hosting k8s nodes. Spark is then able to schedule executors solely on nodes with the referred label, by configuring the k8s-node-selector field in spark submit.

The first big issue arises in the second layer described above. Spark scheduler is not able to correctly match datanodes and executors based on their addresses, since they return them in different formats. These addresses either may be virtual k8s pod addresses or k8s internal full qualified domain names.
Although their actual format depends on your k8s configuration, our fixes were developed for both formats and therefore run independently of your k8s-network settings.
Since we wish to achieve k8s node-locality, we need both instances to deliver locality information referring to their allocated k8s-node in the same format.
We decided to use the symbolic names of k8s nodes as they are distinctive and transparent.

The same issue applies in layer three. To ensure that HDFS clients (or spark executors) are capable of choosing the „most local“ datanode, HDFS also need similar locality information based on k8s-nodes.

Fixing layer two

When computing an RDD, spark driver requests a list of datanodes holding the required data from namenode.
Spark driver then retrieves for each of these datanodes the name of the k8s node, they are hosted on.
Using this information, driver then computes several mappings, assigning datanode locations (namely nodenames) to scheduled tasks.
When it comes to task dispatching, the driver iterates over every mapping and compares datanode locations with the locations
of its executors. For each of its executors spark driver had gathered their assigned k8s nodes and is therefore able to correctly match locality levels.

Since Apache Spark provides no plugin system for its scheduling system layer two can only be fixed directly in spark core source code.
The actual fix can be found in our forked Spark repository.

Fixing layer three

After a datanode pod got created, it registers itself at its assigned namenode. Together with other registration details,
the datanodes are transmitting their address. Namenode then uses this information to construct a network path for each datanode, uniquely describing its locality information. This network path follows the form: „/rack/kubernetes-node/pod-ip“.
The same procedure applies for client applications which request data from HDFS. Based on its address, the namenode
constructs a network path in the same format as for datanodes. Namenode is now able to match client applications and datanodes running on the same K8s nodes.

Thanks to HDFS interface, we were able to implement this fix as an easy-to-integrate plugin. The source as well as integration instructions can be found on its github repository:

Retrieving k8s nodename

The core idea behind these two fixes is to retrieve the k8s nodename and use this information to match Spark executors against HDFS datanodes.
Layer 2 as well as layer 3 either receives k8S domain names or pod addresses. In case of domain names, these can simply be split at the first point.
In case of pod addresses, we are using the fabric8 kubernetes client to request the nodename of a given address from k8s api.

Expanded HDFS topology

Throughout this post, we described k8s-node locality as equal to conventional host locality.
Obviously, this may not be the case for all cluster configurations. When running several k8s-nodes on the same physical machine,
we actually added a new layer of locality by expanding the default HDFS topology of format „rack/host“ to „rack/host/k8s-node“.


We were able to fix spark-hdfs data locality on Kubernetes. For implications, experiments and potential drawbacks see this blog post by my colleague Wolfgang Buchner.

Kontaktieren Sie mich gerne, falls Fragen zum Beitrag auftreten. Ich freue mich über Ihre Nachricht und den Austausch zu diesem Thema.