What is the best way to remove accents with Apache Spark dataframes in PySpark?

One possible improvement is to build a custom Transformer, which will handle Unicode normalization, and corresponding Python wrapper. It should reduce overall overhead of passing data between JVM and Python and doesn't require any modifications in Spark itself or access to private API.

On JVM side you'll need a transformer similar to this one:

package net.zero323.spark.ml.feature

import java.text.Normalizer
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.sql.types.{DataType, StringType}

class UnicodeNormalizer (override val uid: String)
  extends UnaryTransformer[String, String, UnicodeNormalizer] {

  def this() = this(Identifiable.randomUID("unicode_normalizer"))

  private val forms = Map(
    "NFC" -> Normalizer.Form.NFC, "NFD" -> Normalizer.Form.NFD,
    "NFKC" -> Normalizer.Form.NFKC, "NFKD" -> Normalizer.Form.NFKD
  )

  val form: Param[String] = new Param(this, "form", "unicode form (one of NFC, NFD, NFKC, NFKD)",
    ParamValidators.inArray(forms.keys.toArray))

  def setN(value: String): this.type = set(form, value)

  def getForm: String = $(form)

  setDefault(form -> "NFKD")

  override protected def createTransformFunc: String => String = {
    val normalizerForm = forms($(form))
    (s: String) => Normalizer.normalize(s, normalizerForm)
  }

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == StringType, s"Input type must be string type but got $inputType.")
  }

  override protected def outputDataType: DataType = StringType
}

Corresponding build definition (adjust Spark and Scala versions to match your Spark deployment):

name := "unicode-normalization"

version := "1.0"

crossScalaVersions := Seq("2.11.12", "2.12.8")

organization := "net.zero323"

val sparkVersion = "2.4.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion
)

On Python side you'll need a wrapper similar to this one.

from pyspark.ml.param.shared import *
# from pyspark.ml.util import keyword_only  # in Spark < 2.0
from pyspark import keyword_only 
from pyspark.ml.wrapper import JavaTransformer

class UnicodeNormalizer(JavaTransformer, HasInputCol, HasOutputCol):

    @keyword_only
    def __init__(self, form="NFKD", inputCol=None, outputCol=None):
        super(UnicodeNormalizer, self).__init__()
        self._java_obj = self._new_java_obj(
            "net.zero323.spark.ml.feature.UnicodeNormalizer", self.uid)
        self.form = Param(self, "form",
            "unicode form (one of NFC, NFD, NFKC, NFKD)")
        # kwargs = self.__init__._input_kwargs  # in Spark < 2.0
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, form="NFKD", inputCol=None, outputCol=None):
        # kwargs = self.setParams._input_kwargs  # in Spark < 2.0
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setForm(self, value):
        return self._set(form=value)

    def getForm(self):
        return self.getOrDefault(self.form)

Build Scala package:

sbt +package

include it when you start shell or submit. For example for Spark build with Scala 2.11:

bin/pyspark --jars path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar \
 --driver-class-path path-to/target/scala-2.11/unicode-normalization_2.11-1.0.jar

and you should be ready to go. All what is left is a little bit of regexp magic:

from pyspark.sql.functions import regexp_replace

normalizer = UnicodeNormalizer(form="NFKD",
    inputCol="text", outputCol="text_normalized")

df = sc.parallelize([
    (1, "Maracaibó"), (2, "New York"),
    (3, "   São Paulo   "), (4, "~Madrid")
]).toDF(["id", "text"])

(normalizer
    .transform(df)
    .select(regexp_replace("text_normalized", "\p{M}", ""))
    .show())

## +--------------------------------------+
## |regexp_replace(text_normalized,\p{M},)|
## +--------------------------------------+
## |                             Maracaibo|
## |                              New York|
## |                          Sao Paulo   |
## |                               ~Madrid|
## +--------------------------------------+

Please note that this follows the same conventions as built in text transformers and is not null safe. You can easily correct for that by check for null in createTransformFunc.


Another way for doing using python Unicode Database :

import unicodedata
import sys

from pyspark.sql.functions import translate, regexp_replace

def make_trans():
    matching_string = ""
    replace_string = ""

    for i in range(ord(" "), sys.maxunicode):
        name = unicodedata.name(chr(i), "")
        if "WITH" in name:
            try:
                base = unicodedata.lookup(name.split(" WITH")[0])
                matching_string += chr(i)
                replace_string += base
            except KeyError:
                pass

    return matching_string, replace_string

def clean_text(c):
    matching_string, replace_string = make_trans()
    return translate(
        regexp_replace(c, "\p{M}", ""), 
        matching_string, replace_string
    ).alias(c)

So now let's test it :

df = sc.parallelize([
(1, "Maracaibó"), (2, "New York"),
(3, "   São Paulo   "), (4, "~Madrid"),
(5, "São Paulo"), (6, "Maracaibó")
]).toDF(["id", "text"])

df.select(clean_text("text")).show()
## +---------------+
## |           text|
## +---------------+
## |      Maracaibo|
## |       New York|
## |   Sao Paulo   |
## |        ~Madrid|
## |      Sao Paulo|
## |      Maracaibo|
## +---------------+

acknowledge @zero323