Spark: Join dataframe column with an array

You can cast the type of key1 and key2 and then use the contains function, as follow.

val df1 = sc.parallelize(Seq((1L,"one.df1"), 
                             (2L,"two.df1"),      
                             (3L,"three.df1"))).toDF("key1","Value")  

DF1:
+----+---------+
|key1|Value    |
+----+---------+
|1   |one.df1  |
|2   |two.df1  |
|3   |three.df1|
+----+---------+

val df2 = sc.parallelize(Seq((Array(1L,1L),"one.df2"),
                             (Array(2L,2L),"two.df2"),
                             (Array(3L,3L),"three.df2"))).toDF("key2","Value")
DF2:
+------+---------+
|key2  |Value    |
+------+---------+
|[1, 1]|one.df2  |
|[2, 2]|two.df2  |
|[3, 3]|three.df2|
+------+---------+

val joinedRDD = df1.join(df2, col("key2").cast("string").contains(col("key1").cast("string")))

JOIN:
+----+---------+------+---------+
|key1|Value    |key2  |Value    |
+----+---------+------+---------+
|1   |one.df1  |[1, 1]|one.df2  |
|2   |two.df1  |[2, 2]|two.df2  |
|3   |three.df1|[3, 3]|three.df2|
+----+---------+------+---------+

The best way to do this (and the one that doesn't require any casting or exploding of dataframes) is to use the array_contains spark sql expression as shown below.

import org.apache.spark.sql.functions.expr
import spark.implicits._

val df1 = Seq((1L,"one.df1"), (2L,"two.df1"),(3L,"three.df1")).toDF("key1","Value")

val df2 = Seq((Array(1L,1L),"one.df2"), (Array(2L,2L),"two.df2"), (Array(3L,3L),"three.df2")).toDF("key2","Value")

val joinedRDD = df1.join(df2, expr("array_contains(key2, key1)")).show

+----+---------+------+---------+
|key1|    Value|  key2|    Value|
+----+---------+------+---------+
|   1|  one.df1|[1, 1]|  one.df2|
|   2|  two.df1|[2, 2]|  two.df2|
|   3|three.df1|[3, 3]|three.df2|
+----+---------+------+---------+

Please note that you cannot use the org.apache.spark.sql.functions.array_contains function directly as it requires the second argument to be a literal as opposed to a column expression.