Calculating the cosine similarity between all the rows of a dataframe in pyspark
About this issue, due to the fact that I'm working in a project with pyspark where I have to use cosine similarity, I have to say that the code of @MaFF is correct, indeed, I hesitated when I see his code, due to the fact he was using the dot product of the vectors' L2 Norm, and the theroy says: Mathematically, it is the ratio of the dot product of the vectors and the product of the magnitude of the two vectors.
And here is my code adapted with the same results, so I came to the conclusion that SKLearn caculates tfidf in a different way, so if you try to replay this excersice using sklearn, you will get a different result.
d = [{'id': '1', 'office': 'Delhi, Mumbai, Gandhinagar'}, {'id': '2', 'office': 'Delhi, Mandi'}, {'id': '3', 'office': 'Hyderbad, Jaipur'}]
df_fussion = spark.createDataFrame(d)
df_fussion = df_fussion.withColumn('office', F.split('office', ', '))
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="office", outputCol="tf")
tf = hashingTF.transform(df_fussion)
idf = IDF(inputCol="tf", outputCol="feature").fit(tf)
data = idf.transform(tf)
@udf
def sim_cos(v1,v2):
try:
p = 2
return float(v1.dot(v2))/float(v1.norm(p)*v2.norm(p))
except:
return 0
result = data.alias("i").join(data.alias("j"), F.col("i.ID") < F.col("j.ID"))\
.select(
F.col("i.ID").alias("i"),
F.col("j.ID").alias("j"),
sim_cos("i.feature", "j.feature").alias("sim_cosine"))\
.sort("i", "j")
result.show()
I also want to share with you some simply test that I did with simply vectors where the results are corrects:
Kind regards,
You can use the mllib
package to compute the L2
norm of the TF-IDF of every row. Then multiply the table with itself to get the cosine similarity as the dot product of two by two L2
norms:
1. RDD
rdd = sc.parallelize([[1, "Delhi, Mumbai, Gandhinagar"],[2, " Delhi, Mandi"], [3, "Hyderbad, Jaipur"]])
Compute
TF-IDF
:documents = rdd.map(lambda l: l[1].replace(" ", "").split(",")) from pyspark.mllib.feature import HashingTF, IDF hashingTF = HashingTF() tf = hashingTF.transform(documents)
You can specify the number of features in HashingTF
to make the feature matrix smaller (fewer columns).
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
Compute
L2
norm:from pyspark.mllib.feature import Normalizer labels = rdd.map(lambda l: l[0]) features = tfidf normalizer = Normalizer() data = labels.zip(normalizer.transform(features))
Compute cosine similarity by multiplying the matrix with itself:
from pyspark.mllib.linalg.distributed import IndexedRowMatrix mat = IndexedRowMatrix(data).toBlockMatrix() dot = mat.multiply(mat.transpose()) dot.toLocalMatrix().toArray() array([[ 0. , 0. , 0. , 0. ], [ 0. , 1. , 0.10794634, 0. ], [ 0. , 0.10794634, 1. , 0. ], [ 0. , 0. , 0. , 1. ]])
OR: Using a Cartesian product and the function
dot
on numpy arrays:data.cartesian(data)\ .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\ .sortByKey()\ .collect() [((1, 1), 1.0), ((1, 2), 0.10794633570596117), ((1, 3), 0.0), ((2, 1), 0.10794633570596117), ((2, 2), 1.0), ((2, 3), 0.0), ((3, 1), 0.0), ((3, 2), 0.0), ((3, 3), 1.0)]
2. DataFrame
Since you seem to be using dataframes, you can use the spark ml
package instead:
import pyspark.sql.functions as psf
df = rdd.toDF(["ID", "Office_Loc"])\
.withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ','))
Compute TF-IDF:
from pyspark.ml.feature import HashingTF, IDF hashingTF = HashingTF(inputCol="Office_Loc", outputCol="tf") tf = hashingTF.transform(df) idf = IDF(inputCol="tf", outputCol="feature").fit(tf) tfidf = idf.transform(tf)
Compute
L2
norm:from pyspark.ml.feature import Normalizer normalizer = Normalizer(inputCol="feature", outputCol="norm") data = normalizer.transform(tfidf)
Compute matrix product:
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix mat = IndexedRowMatrix( data.select("ID", "norm")\ .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix() dot = mat.multiply(mat.transpose()) dot.toLocalMatrix().toArray()
OR: using a join and a
UDF
for functiondot
:dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType()) data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\ .select( psf.col("i.ID").alias("i"), psf.col("j.ID").alias("j"), dot_udf("i.norm", "j.norm").alias("dot"))\ .sort("i", "j")\ .show() +---+---+-------------------+ | i| j| dot| +---+---+-------------------+ | 1| 2|0.10794633570596117| | 1| 3| 0.0| | 2| 3| 0.0| +---+---+-------------------+
This tutorial lists different methods to multiply large scale matrices: https://labs.yodas.com/large-scale-matrix-multiplication-with-pyspark-or-how-to-match-two-large-datasets-of-company-1be4b1b2871e