How to add new field to struct column?

TL;DR You have to map the rows in a Dataset somehow.

map Operator (the most flexible)

Use map operation which gives you the most flexibility since you're in the total control of the final structure of the rows.

map[U](func: (T) ⇒ U)(implicit arg0: Encoder[U]): Dataset[U] (Scala-specific) Returns a new Dataset that contains the result of applying func to each element.

Your case would then look as follows:

// Create a sample dataset to work with
scala> val df = Seq("timestamp").
  withColumn("geoip", struct(lit("Warsaw") as "city", lit("Europe") as "continent"))
df: org.apache.spark.sql.DataFrame = [ts: string, geoip: struct<city: string, continent: string>]

|       ts|          geoip|

scala> df.printSchema
 |-- ts: string (nullable = true)
 |-- geoip: struct (nullable = false)
 |    |-- city: string (nullable = false)
 |    |-- continent: string (nullable = false)

val newDF = df.
  as[(String, (String, String))].  // <-- convert to typed Dataset as it makes map easier
  map { case (ts, (city, continent)) =>
    (ts, (city, continent, "New field with some value")) }. // <-- add new column
  toDF("timestamp", "geoip") // <-- name the top-level fields

scala> newDF.printSchema
 |-- timestamp: string (nullable = true)
 |-- geoip: struct (nullable = true)
 |    |-- _1: string (nullable = true)
 |    |-- _2: string (nullable = true)
 |    |-- _3: string (nullable = true)

That's not pretty as you lost the names of the columns.

Let's define the schema with the proper names. That's where you can use StructType with StructFields (you could also use a set of case classes, but I leave it to you as a home exercise).

import org.apache.spark.sql.types._
val geoIP = StructType(
  $"city".string ::
  $"continent".string ::
  $"new_field".string ::
val mySchema = StructType(
  $"timestamp".string ::
  $"geoip".struct(geoIP) ::

scala> mySchema.printTreeString
 |-- timestamp: string (nullable = true)
 |-- geoip: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- continent: string (nullable = true)
 |    |-- new_field: string (nullable = true)

Apply the new schema for proper names.

val properNamesDF = spark.createDataFrame(newDF.rdd, mySchema)
scala> = false)
|timestamp|geoip                                    |
|timestamp|[Warsaw,Europe,New field with some value]|

How to add field to "struct of a struct"

If you feel fairly adventurous, you may want to play with StructType as a collection type and re-shape it using Scala's Collection API and copy constructor.

It does not really matter how deep you want to go and what level of "struct of a struct" you want to modify. Just consider a StructType as a collection of StructFields that may in turn be StructTypes.

val oldSchema = newDF.schema
val names = Seq("city", "continent", "new_field")
val geoipFields = oldSchema("geoip").
  map { case (field, name) => field.copy(name = name) }
val myNewSchema = StructType(
  $"timestamp".string :: 
  $"geoip".struct(StructType(geoipFields)) :: Nil)
val properNamesDF = spark.createDataFrame(newDF.rdd, myNewSchema)
scala> properNamesDF.printSchema
 |-- timestamp: string (nullable = true)
 |-- geoip: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- continent: string (nullable = true)
 |    |-- new_field: string (nullable = true)

withColumn Operator with struct Function

You could use withColumn operator with struct function.

withColumn(colName: String, col: Column): DataFrame Returns a new Dataset by adding a column or replacing the existing column that has the same name.

struct(cols: Column*): Column Creates a new struct column.

The code could look as follows:

val anotherNewDF = df.
  withColumn("geoip", // <-- use the same column name so you hide the existing one
      $"", // <-- reference existing column to copy the values
      lit("new value") as "new_field")) // <-- new field with fixed value

scala> anotherNewDF.printSchema
 |-- ts: string (nullable = true)
 |-- geoip: struct (nullable = false)
 |    |-- city: string (nullable = false)
 |    |-- continent: string (nullable = false)
 |    |-- new_field: string (nullable = false)

As per a comment from @shj, you can use a wildcard to avoid re-listing the columns, which makes it pretty flexible, e.g.

val anotherNewDF = df
      $"geoip.*", // <-- the wildcard here
      lit("new value") as "new_field"))

You could also simply do:

df = df.withColumn("goip", struct($"geoip.*", lit("This is fine.").alias("error")))

That adds an "error" field to the "geoip" struct.