PySpark Drop Rows
A straightforward way to achieve this in PySpark (Python API), assuming you are using Python 3:
noHeaderRDD = rawRDD.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys()
I did some profiling with various solutions and have the following
Cluster Configuration
Clusters
- Cluster 1 : 4 Cores 16 GB
- Cluster 2 : 4 Cores 16 GB
- Cluster 3 : 4 Cores 16 GB
- Cluster 4 : 2 Cores 8 GB
Data
7 million rows, 4 columns
#Solution 1
# Time Taken : 40 ms
data=sc.TextFile('file1.txt')
firstRow=data.first()
data=data.filter(lambda row:row != firstRow)
#Solution 2
# Time Taken : 3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
return iter(list(iterator)[1:]) if index==0 else iterator
data=data.mapPartitionsWithIndex(dropFirstRow)
#Solution 3
# Time Taken : 0.3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
if(index==0):
for subIndex,item in enumerate(iterator):
if subIndex > 0:
yield item
else:
yield iterator
data=data.mapPartitionsWithIndex(dropFirstRow)
I think that Solution 3 is the most scalable
Specific to PySpark:
As per @maasg, you could do this:
header = rdd.first()
rdd.filter(lambda line: line != header)
but it's not technically correct, as it's possible you exclude lines containing data as well as the header. However, this seems to work for me:
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
rdd.mapPartitionsWithIndex(remove_header)
Similarly:
rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])
I'm new to Spark, so can't intelligently comment about which will be fastest.
AFAIK there's no 'easy' way to do this.
This should do the trick, though:
val header = data.first
val rows = data.filter(line => line != header)