Processing multiple files as independent RDD's in parallel
It is more an idea than a full solution and I haven't tested it yet.
You can start with extracting your data processing pipeline into a function.
def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}
If your files are small you can adjust n
parameter to use as small number of partitions as possible to fit data from a single file and avoid shuffling. It means you are limiting concurrency but we'll get back to this issue later.
val n: Int = ???
Next you have to obtain a list of input files. This step depends on a data source but most of the time it is more or less straightforward:
val files: Array[String] = ???
Next you can map above list using pipeline
function:
val rdds = files.map(f => pipeline(f, n))
Since we limit concurrency at the level of the single file we want to compensate by submitting multiple jobs. Lets add a simple helper which forces evaluation and wraps it with Future
import scala.concurrent._
import ExecutionContext.Implicits.global
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.rdd.foreach(_ => ()) // Force computation
df
}
Finally we can use above helper on the rdds
:
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)
Depending on your requirements you can add onComplete
callbacks or use reactive streams to collect the results.