Spark Scala Split dataframe into equal number of rows
Another solution is to use limit and except. The following program will return an array with Dataframes that have an equal number of rows. Except the first one that may contain less rows.
var numberOfNew = 4
var input = List(1,2,3,4,5,6,7,8,9).toDF
var newFrames = 0 to numberOfNew map (_ => Seq.empty[Int].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt
while (size > 0) {
newFrames(numberOfNew) = input.limit(limit)
input = input.except(newFrames(numberOfNew))
size = size - limit
numberOfNew = numberOfNew - 1
}
newFrames.foreach(_.show)
+-----+
|value|
+-----+
| 7|
+-----+
+-----+
|value|
+-----+
| 4|
| 8|
+-----+
+-----+
|value|
+-----+
| 5|
| 9|
+-----+
...
According to my understanding from your input and required output, you can create row numbers
by grouping
the dataframe
with one groupId
.
Then you can just filter
dataframe
comparing the row number
and storing
them somewhere else according to your needs.
Following is the temporary solution to your needs. You can change according to your needs
val k = 4
val windowSpec = Window.partitionBy("grouped").orderBy("original_dt")
val newDF = dataFrame.withColumn("grouped", lit("grouping"))
var latestDF = newDF.withColumn("row", row_number() over windowSpec)
val totalCount = latestDF.count()
var lowLimit = 0
var highLimit = lowLimit + k
while(lowLimit < totalCount){
latestDF.where(s"row <= ${highLimit} and row > ${lowLimit}").show(false)
lowLimit = lowLimit + k
highLimit = highLimit + k
}
I hope this will give you a good start.
This is an improved answer on that of Steffen Schmitz that is in fact incorrect. I have improved it for posterity and generalized it. I do wonder about the performance at scale, however.
var numberOfNew = 4
var input = Seq((1,2),(3,4),(5,6),(7,8),(9,10),(11,12)).toDF
var newFrames = 0 to numberOfNew-1 map (_ => Seq.empty[(Int, Int)].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt
val limit_fract = (size / numberOfNew.toFloat)
val residual = ((limit_fract.toDouble - limit.toDouble) * size).toInt
var limit_to_use = limit
while (numberOfNew > 0) {
if (numberOfNew == 1 && residual != 0) limit_to_use = residual
newFrames(numberOfNew-1) = input.limit(limit_to_use)
input = input.except(newFrames(numberOfNew-1))
size = size - limit
numberOfNew = numberOfNew - 1
}
newFrames.foreach(_.show)
val singleDF = newFrames.reduce(_ union _)
singleDF.show(false)
returns individual dataframes:
+---+---+
| _1| _2|
+---+---+
| 7| 8|
| 3| 4|
| 11| 12|
+---+---+
+---+---+
| _1| _2|
+---+---+
| 5| 6|
+---+---+
+---+---+
| _1| _2|
+---+---+
| 9| 10|
+---+---+
+---+---+
| _1| _2|
+---+---+
| 1| 2|
+---+---+