How to perform "Lookup" operation on Spark dataframes given multiple conditions

As B is small I think the best way to do this would be a broadcast variable and user defined function.

// However you get the data...
case class BType( A2: Int, B2: Int, C2 : Int, D2 : String)
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200"))

val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER")


// Broadcast B so all nodes have a copy of it.
val Bbradcast = sc.broadcast(B)

// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. 
val findD = udf {( a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 }

// Use the UDF in a select
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show

Just for reference a solution without UDFs:

val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1"))
val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2"))

// Match A, B and C
val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1")
// Match A and B mismatch C
val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2")

val toDrop = b1.columns ++ b2.columns

toDrop.foldLeft(a
  .join(b1, expr1, "leftouter")
  .join(b2, expr2, "leftouter")
  // If there is match on A, B, C then D_1 should be not NULL
  // otherwise we fall-back to D_2 
  .withColumn("D", coalesce($"D_1", $"D_2")) 
)((df, c) => df.drop(c))

This assumes there is at most one match in each category (all three columns, or the first two) or duplicate rows in the output are desired.

UDF vs JOIN:

There are multiple factors to consider and there is no simple answer here:

Cons:

  • broadcast joins require passing data twice to the worker nodes. As for now broadcasted tables are not cached (SPARK-3863) and it is unlikely to change in the nearest future (Resolution: Later).
  • join operation is applied twice even if there is a full match.

Pros:

  • join and coalesce are transparent to the optimizer while UDFs are not.
  • operating directly with SQL expressions can benefit from all the Tungsten optimizations including code generation while UDF cannot.