Spark: Joining with array

You could use explode on you Array column before the join. Explode creates a new line for each element in the array :

right = right.withColumn("exploded_col",explode(right("col1")))
right.show()

+------+----+--------------+
|  col1|col2|exploded_col_1|
+------+----+--------------+
|[1, 2]| Yes|             1|
|[1, 2]| Yes|             2|
|   [3]|  No|             3|
+------+----+--------------+

Then you can easily join with your first dataset.


One option is to create an UDF for building your join condition:

import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

val left = spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF("col1")
val right = spark.sparkContext.parallelize(Seq((Array(1, 2), "Yes"),(Array(3),"No"))).toDF("col1", "col2")

val checkValue = udf { 
  (array: WrappedArray[Int], value: Int) => array.contains(value) 
}
val result = left.join(right, checkValue(right("col1"), left("col1")), "inner")

result.show

+----+------+----+
|col1|  col1|col2|
+----+------+----+
|   1|[1, 2]| Yes|
|   2|[1, 2]| Yes|
|   3|   [3]|  No|
+----+------+----+

The most succinct way to do this is to use the array_contains spark sql expression as shown below, that said I've compared the performance of this with the performance of doing an explode and join as shown in a previous answer and the explode seems more performant.

import org.apache.spark.sql.functions.expr
import spark.implicits._

val left = Seq(1, 2, 3).toDF("col1")

val right = Seq((Array(1, 2), "Yes"),(Array(3),"No")).toDF("col1", "col2").withColumnRenamed("col1", "col1_array")

val joined = left.join(right, expr("array_contains(col1_array, col1)")).show

+----+----------+----+
|col1|col1_array|col2|
+----+----------+----+
|   1|    [1, 2]| Yes|
|   2|    [1, 2]| Yes|
|   3|       [3]|  No|
+----+----------+----+

Note you can't use the org.apache.spark.sql.functions.array_contains function directly as it requires the second argument to be a literal as opposed to a column expression.