How do I read a large CSV file with Scala Stream class?
UPDATE 2020/08/30: Please use the Scala library, kantan.csv, for the most accurate and correct implementation of RFC 4180 which defines the .csv
MIME-type.
While I enjoyed the learning process I experienced creating the solution below, please refrain from using it as I have found a number of issues with it especially at scale. To avoid the obvious technical debt arising from my solution below, choosing a well-maintained RFC driven Scala native solution should be how you take care of your current and future clients.
If you are looking to process the large file line-by-line while avoiding requiring the entire file's contents be loaded into memory all at once, then you can use the Iterator
returned by scala.io.Source
.
I have a small function, tryProcessSource
, (containing two sub-functions) which I use for exactly these types of use-cases. The function takes up to four parameters, of which only the first is required. The other parameters have sane default values provided.
Here's the function profile (full function implementation is at the bottom):
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues),
): Try[List[List[String]]] = {
???
}
The first parameter, file: File
, is required. And it is just any valid instance of java.io.File
which points to a line-oriented text file, like a CSV.
The second parameter, parseLine: (Int, String) => Option[List[String]]
, is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int
, unparsedLine: String
. And then return an Option[List[String]]
. The function may return a Some
wrapped List[String]
consisting of the valid column values. Or it may return a None
which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, line) => Some(List(line))
is provided. This default results in the entire line being returned as a single String
value.
The third parameter, filterLine: (Int, List[String]) => Option[Boolean]
, is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int
, parsedValues: List[String]
. And then return an Option[Boolean]
. The function may return a Some
wrapped Boolean
indicating whether this particular line should be included in the output. Or it may return a None
which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, values) => Some(true)
is provided. This default results in all lines being included.
The fourth and final parameter, retainValues: (Int, List[String]) => Option[List[String]]
, is optional. And if provided, it must be a function expecting to receive two input parameters; index: Int
, parsedValues: List[String]
. And then return an Option[List[String]]
. The function may return a Some
wrapped List[String]
consisting of some subset and/or alteration of the existing column values. Or it may return a None
which indicates the entire streaming process is aborting early. If this parameter is not provided, a default value of (index, values) => Some(values)
is provided. This default results in the values parsed by the second parameter, parseLine
.
Consider a file with the following contents (4 lines):
street,street2,city,state,zip
100 Main Str,,Irving,TX,75039
231 Park Ave,,Irving,TX,75039
1400 Beltline Rd,Apt 312,Dallas,Tx,75240
The following calling profile...
val tryLinesDefaults =
tryProcessSource(new File("path/to/file.csv"))
...results in this output for tryLinesDefaults
(the unaltered contents of the file):
Success(
List(
List("street,street2,city,state,zip"),
List("100 Main Str,,Irving,TX,75039"),
List("231 Park Ave,,Irving,TX,75039"),
List("1400 Beltline Rd,Apt 312,Dallas,Tx,75240")
)
)
The following calling profile...
val tryLinesParseOnly =
tryProcessSource(
new File("path/to/file.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
)
...results in this output for tryLinesParseOnly
(each line parsed into the individual column values):
Success(
List(
List("street","street2","city","state","zip"),
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
List("1400 Beltline Rd","Apt 312","Dallas","Tx","75240")
)
)
The following calling profile...
val tryLinesIrvingTxNoHeader =
tryProcessSource(
new File("C:/Users/Jim/Desktop/test.csv")
, parseLine =
(index, unparsedLine) => Some(unparsedLine.split(",").toList)
, filterLine =
(index, parsedValues) =>
Some(
(index != 0) && //skip header line
(parsedValues(2).toLowerCase == "Irving".toLowerCase) && //only Irving
(parsedValues(3).toLowerCase == "Tx".toLowerCase)
)
)
...results in this output for tryLinesIrvingTxNoHeader
(each line parsed into the individual column values, no header and only the two rows in Irving,Tx):
Success(
List(
List("100 Main Str","","Irving,TX","75039"),
List("231 Park Ave","","Irving","TX","75039"),
)
)
Here's the entire tryProcessSource
function implementation:
import scala.io.Source
import scala.util.Try
import java.io.File
def tryProcessSource(
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
retainValues: (Int, List[String]) => Option[List[String]] =
(index, parsedValues) => Some(parsedValues)
): Try[List[List[String]]] = {
def usingSource[S <: Source, R](source: S)(transfer: S => R): Try[R] =
try {Try(transfer(source))} finally {source.close()}
def recursive(
remaining: Iterator[(String, Int)],
accumulator: List[List[String]],
isEarlyAbort: Boolean =
false
): List[List[String]] = {
if (isEarlyAbort || !remaining.hasNext)
accumulator
else {
val (line, index) =
remaining.next
parseLine(index, line) match {
case Some(values) =>
filterLine(index, values) match {
case Some(keep) =>
if (keep)
retainValues(index, values) match {
case Some(valuesNew) =>
recursive(remaining, valuesNew :: accumulator) //capture values
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
else
recursive(remaining, accumulator) //discard row
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
case None =>
recursive(remaining, accumulator, isEarlyAbort = true) //early abort
}
}
}
Try(Source.fromFile(file)).flatMap(
bufferedSource =>
usingSource(bufferedSource) {
source =>
recursive(source.getLines().buffered.zipWithIndex, Nil).reverse
}
)
}
While this solution is relatively succinct, it took me considerable time and many refactoring passes before I was finally able to get to here. Please let me know if you see any ways it might be improved.
UPDATE: I have just asked the issue below as it's own StackOverflow question. And it now has an answer fixing the error mentioned below.
I had the idea to try and make this even more generic changing the retainValues
parameter to transformLine
with the new generics-ified function definition below. However, I keep getting the highlight error in IntelliJ "Expression of type Some[List[String]] doesn't conform to expected type Option[A]" and wasn't able to figure out how to change the default value so the error goes away.
def tryProcessSource2[A <: AnyRef](
file: File,
parseLine: (Int, String) => Option[List[String]] =
(index, unparsedLine) => Some(List(unparsedLine)),
filterLine: (Int, List[String]) => Option[Boolean] =
(index, parsedValues) => Some(true),
transformLine: (Int, List[String]) => Option[A] =
(index, parsedValues) => Some(parsedValues)
): Try[List[A]] = {
???
}
Any assistance on how to make this work would be greatly appreciated.
I hope you don't mean Scala's collection.immutable.Stream
with Stream. This is not what you want. Stream is lazy, but does memoization.
I don't know what you plan to do, but just reading the file line-by-line should work very well without using high amounts of memory.
getLines
should evaluate lazily and should not crash (as long as your file does not have more than 2³² lines, afaik). If it does, ask on #scala or file a bug ticket (or do both).
Just use Source.fromFile(...).getLines
as you already stated.
That returns an Iterator, which is already lazy (You'd use stream as a lazy collection where you wanted previously retrieved values to be memoized, so you can read them again)
If you're getting memory problems, then the problem will lie in what you're doing after getLines. Any operation like toList
, which forces a strict collection, will cause the problem.