Simplest method for text lemmatization in Scala and Spark

There is a function from the book Adavanced analitics in Spark, chapter about Lemmatization:

  val plainText =  sc.parallelize(List("Sentence to be precessed."))

  val stopWords = Set("stopWord")

  import edu.stanford.nlp.pipeline._
  import edu.stanford.nlp.ling.CoreAnnotations._
  import scala.collection.JavaConversions._

  def plainTextToLemmas(text: String, stopWords: Set[String]): Seq[String] = {
    val props = new Properties()
    props.put("annotators", "tokenize, ssplit, pos, lemma")
    val pipeline = new StanfordCoreNLP(props)
    val doc = new Annotation(text)
    pipeline.annotate(doc)
    val lemmas = new ArrayBuffer[String]()
    val sentences = doc.get(classOf[SentencesAnnotation])
    for (sentence <- sentences; token <- sentence.get(classOf[TokensAnnotation])) {
      val lemma = token.get(classOf[LemmaAnnotation])
      if (lemma.length > 2 && !stopWords.contains(lemma)) {
        lemmas += lemma.toLowerCase
      }
    }
    lemmas
  }

  val lemmatized = plainText.map(plainTextToLemmas(_, stopWords))
  lemmatized.foreach(println)

Now just use this for every line in mapper.

val lemmatized = plainText.map(plainTextToLemmas(_, stopWords))

EDIT:

I added to the code line

import scala.collection.JavaConversions._

this is needed because otherwise sentences are Java not Scala List. This should now compile without problems.

I used scala 2.10.4 and fallowing stanford.nlp dependencies:

<dependency>
  <groupId>edu.stanford.nlp</groupId>
  <artifactId>stanford-corenlp</artifactId>
  <version>3.5.2</version>
</dependency>
<dependency>
  <groupId>edu.stanford.nlp</groupId>
  <artifactId>stanford-corenlp</artifactId>
  <version>3.5.2</version>
  <classifier>models</classifier>
</dependency>

You can also look at stanford.nlp page there is a lot of examples (in Java) http://nlp.stanford.edu/software/corenlp.shtml.

EDIT:

MapPartition version:

Although i dont know if its gonna speed up job significantly.

  def plainTextToLemmas(text: String, stopWords: Set[String], pipeline: StanfordCoreNLP): Seq[String] = {
    val doc = new Annotation(text)
    pipeline.annotate(doc)
    val lemmas = new ArrayBuffer[String]()
    val sentences = doc.get(classOf[SentencesAnnotation])
    for (sentence <- sentences; token <- sentence.get(classOf[TokensAnnotation])) {
      val lemma = token.get(classOf[LemmaAnnotation])
      if (lemma.length > 2 && !stopWords.contains(lemma)) {
        lemmas += lemma.toLowerCase
      }
    }
    lemmas
  }

  val lemmatized = plainText.mapPartitions(p => {
    val props = new Properties()
    props.put("annotators", "tokenize, ssplit, pos, lemma")
    val pipeline = new StanfordCoreNLP(props)
    p.map(q => plainTextToLemmas(q, stopWords, pipeline))
  })
  lemmatized.foreach(println)

I think @user52045 has the right idea. The only modification I would make would be to use mapPartitions instead of map -- this allows you to only do the potentially expensive pipeline creation once per partition. This may not be a huge hit on a lemmatization pipeline, but it will be extremely important if you want to do something that requires a model, like the NER portion of the pipeline.

def plainTextToLemmas(text: String, stopWords: Set[String], pipeline:StanfordCoreNLP): Seq[String] = {
  val doc = new Annotation(text)
  pipeline.annotate(doc)
  val lemmas = new ArrayBuffer[String]()
  val sentences = doc.get(classOf[SentencesAnnotation])
  for (sentence <- sentences; token <- sentence.get(classOf[TokensAnnotation])) {
    val lemma = token.get(classOf[LemmaAnnotation])
    if (lemma.length > 2 && !stopWords.contains(lemma)) {
      lemmas += lemma.toLowerCase
    }
  }
  lemmas
}

val lemmatized = plainText.mapPartitions(strings => {
  val props = new Properties()
  props.put("annotators", "tokenize, ssplit, pos, lemma")
  val pipeline = new StanfordCoreNLP(props)
  strings.map(string => plainTextToLemmas(string, stopWords, pipeline))
})
lemmatized.foreach(println)