Resolving dependency problems in Apache Spark
Apache Spark's classpath is built dynamically (to accommodate per-application user code) which makes it vulnerable to such issues. @user7337271's answer is correct, but there are some more concerns, depending on the cluster manager ("master") you're using.
First, a Spark application consists of these components (each one is a separate JVM, therefore potentially contains different classes in its classpath):
- Driver: that's your application creating a
SparkSession
(orSparkContext
) and connecting to a cluster manager to perform the actual work - Cluster Manager: serves as an "entry point" to the cluster, in charge of allocating executors for each application. There are several different types supported in Spark: standalone, YARN and Mesos, which we'll describe bellow.
- Executors: these are the processes on the cluster nodes, performing the actual work (running Spark tasks)
The relationsip between these is described in this diagram from Apache Spark's cluster mode overview:
Now - which classes should reside in each of these components?
This can be answered by the following diagram:
Let's parse that slowly:
Spark Code are Spark's libraries. They should exist in ALL three components as they include the glue that let's Spark perform the communication between them. By the way - Spark authors made a design decision to include code for ALL components in ALL components (e.g. to include code that should only run in Executor in driver too) to simplify this - so Spark's "fat jar" (in versions up to 1.6) or "archive" (in 2.0, details bellow) contain the necessary code for all components and should be available in all of them.
Driver-Only Code this is user code that does not include anything that should be used on Executors, i.e. code that isn't used in any transformations on the RDD / DataFrame / Dataset. This does not necessarily have to be separated from the distributed user code, but it can be.
Distributed Code this is user code that is compiled with driver code, but also has to be executed on the Executors - everything the actual transformations use must be included in this jar(s).
Now that we got that straight, how do we get the classes to load correctly in each component, and what rules should they follow?
Spark Code: as previous answers state, you must use the same Scala and Spark versions in all components.
1.1 In Standalone mode, there's a "pre-existing" Spark installation to which applications (drivers) can connect. That means that all drivers must use that same Spark version running on the master and executors.
1.2 In YARN / Mesos, each application can use a different Spark version, but all components of the same application must use the same one. That means that if you used version X to compile and package your driver application, you should provide the same version when starting the SparkSession (e.g. via
spark.yarn.archive
orspark.yarn.jars
parameters when using YARN). The jars / archive you provide should include all Spark dependencies (including transitive dependencies), and it will be shipped by the cluster manager to each executor when the application starts.Driver Code: that's entirely up to - driver code can be shipped as a bunch of jars or a "fat jar", as long as it includes all Spark dependencies + all user code
Distributed Code: in addition to being present on the Driver, this code must be shipped to executors (again, along with all of its transitive dependencies). This is done using the
spark.jars
parameter.
To summarize, here's a suggested approach to building and deploying a Spark Application (in this case - using YARN):
- Create a library with your distributed code, package it both as a "regular" jar (with a .pom file describing its dependencies) and as a "fat jar" (with all of its transitive dependencies included).
- Create a driver application, with compile-dependencies on your distributed code library and on Apache Spark (with a specific version)
- Package the driver application into a fat jar to be deployed to driver
- Pass the right version of your distributed code as the value of
spark.jars
parameter when starting theSparkSession
- Pass the location of an archive file (e.g. gzip) containing all the jars under
lib/
folder of the downloaded Spark binaries as the value ofspark.yarn.archive
In addition to the very extensive answer already given by user7337271, if the problem results from missing external dependencies you can build a jar with your dependencies with e.g. maven assembly plugin
In that case, make sure to mark all the core spark dependencies as "provided" in your build system and, as already noted, make sure they correlate with your runtime spark version.
When building and deploying Spark applications all dependencies require compatible versions.
Scala version. All packages have to use the same major (2.10, 2.11, 2.12) Scala version.
Consider following (incorrect)
build.sbt
:name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
We use
spark-streaming
for Scala 2.10 while remaining packages are for Scala 2.11. A valid file could bename := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.11" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
but it is better to specify version globally and use
%%
(which appends the scala version for you):name := "Simple Project" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.1", "org.apache.spark" %% "spark-streaming" % "2.0.1", "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.1" )
Similarly in Maven:
<project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
Spark version All packages have to use the same major Spark version (1.6, 2.0, 2.1, ...).
Consider following (incorrect) build.sbt:
name := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "1.6.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
We use
spark-core
1.6 while remaining components are in Spark 2.0. A valid file could bename := "Simple Project" version := "1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.0.1", "org.apache.spark" % "spark-streaming_2.10" % "2.0.1", "org.apache.bahir" % "spark-streaming-twitter_2.11" % "2.0.1" )
but it is better to use a variable (still incorrect):
name := "Simple Project" version := "1.0" val sparkVersion = "2.0.1" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % sparkVersion, "org.apache.spark" % "spark-streaming_2.10" % sparkVersion, "org.apache.bahir" % "spark-streaming-twitter_2.11" % sparkVersion )
Similarly in Maven:
<project>
<groupId>com.example</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<spark.version>2.0.1</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
Spark version used in Spark dependencies has to match Spark version of the Spark installation. For example if you use 1.6.1 on the cluster you have to use 1.6.1 to build jars. Minor versions mismatch are not always accepted.
Scala version used to build jar has to match Scala version used to build deployed Spark. By default (downloadable binaries and default builds):
- Spark 1.x -> Scala 2.10
- Spark 2.x -> Scala 2.11
Additional packages should be accessible on the worker nodes if included in the fat jar. There are number of options including:
--jars
argument forspark-submit
- to distribute localjar
files.--packages
argument forspark-submit
- to fetch dependencies from Maven repository.
When submitting in the cluster node you should include application
jar
in--jars
.