X

Best Practices from Oracle Development's A‑Team

Comparison of data prep and cleansing for NLP with pandas, dask and spark

Introduction

In the course of prepping some data for a machine learning activity using natural language processing (NLP), several methods were used to compare the performance and volume of data that could be efficiently processed.

This post will show the performance of cleaning a small set, and a larger set of data. All examples are in python, and compare the use of Pandas dataframes, Dask dataframes, and Apache Spark (pyspark).

 

Test environments

The small dataset was executed in a virtual machine with 4 CPU and 32gb RAM, running Oracle Linux 7 and using python 3.8.

The large dataset was executed in a virtual machine with 48 CPU and 320gb RAM, running Oracle Linux 7 and using python 3.8.

The small dataset consist of a json file with 62499 rows of data. Each row is an individual/complete json element. After removing rows that contain no data in the text field we care about, there are 43845 rows of data.

The large dataset consist of a json file with 14199726 rows of data. Each row is an individual/complete json element. After removing rows that contain no data in the text field we care about, there are 6876405 rows of data.

 

Test code and summary

Pandas dataframe

With the code below using pandas dataframes, everything is held and manipulated in memory. The code will also only use a single CPU as this is the default behavior of the python interpreter.

With smaller sets of data, this performs well. From the results tables below, you can see using this method is not the most scalable method for large sets of data. Besides the need for massive amounts of RAM, the performance is much slower than other methods used.

 


import pandas as pd
from timeit import default_timer as timer

toptimer = timer()

starttime = timer()
# these are stopwords - words we want excluded from our data
with open('english-stopwords.txt', 'r') as file:
    stop_words=file.read().split('\n')

# load json data
df=pd.read_json("/u01/loader/smalltest.json", lines=True)
print("Time to read json and stopwords", timer() - starttime)

# remove any rows where the description field is null. This is the field that has the text data we want to use with NLP
starttime = timer()
df = df[pd.notnull(df['description'])]
print("Time to remove nulls", timer() - starttime)

# The json data has some extra columns that are not important for this exercise, so remove them to shrink the amount of data being manipulated
starttime = timer()
df = df.drop(columns=['brand', 'title', 'productid', 'categoryname'])
print("Time to remove columns", timer() - starttime)

# convert all text to lowercase
starttime = timer()
df['description'] = df['description'].str.lower()
print("Time to lowercase", timer() - starttime)

# use regex to remove any special characters.
starttime = timer()
df['description'] = df['description'].str.replace("(\\d|\\W)+", " ").str.strip()
print("Time to remove special chars", timer() - starttime)

# remove any word that is in the stopword list
starttime = timer()
df['description'] = df['description'].apply(lambda x: ' '.join([word for word in x.split() if word not in stop_words]))
print (df.head(5))
print("Time to remove words", timer() - starttime)

# This line really does nothing, but is for comparison because of the way dask works. This shows how long it takes to access the dataframe after it has been manipulated
starttime = timer()
print (df.head(0))
print("Time print frame", timer() - starttime)

print("Time for entire execution", timer() - toptimer)

Dask dataframe

Dask dataframes have the ability to be chunked - meaning they do not have to be held in memory as one giant object. Dask also allows for multiple threads and/or processes to be execute at the same time.

This method performed well with both the small and large datasets.

A few items to point out, which are also documented in the code. For each test, we set the number of workers to the number of CPU's on the VM - so 4 for the small dataset, and 48 for the large dataset. The same logic is applied to the number of partitions we slice our dataframe into.

At the end of the code, we print head(0). This is important for dask as frame manipulations need to be reconciled before this will execute. The point is to show that there is extra time incurred with dask before you can access your altered dataframes.


import dask.dataframe as dd
import dask
from timeit import default_timer as timer

# because this is multi threaded, we need to main block
if __name__ == '__main__':

    toptimer = timer()

    starttime = timer()
    with open('english-stopwords.txt', 'r') as file:
        stop_words=file.read().split('\n')

    # for the small dataset, we use 4 workers - the number of CPU in our VM
    dask.config.set(scheduler='processes', num_workers=4)
    # for the large dataset, we use 48 workers - the number of CPU in our VM
    # dask.config.set(scheduler='processes', num_workers=48)

    df = dd.read_json("/u01/loader/smalltest.json", lines=True)
    print("Time to read json and stopwords", timer() - starttime)

	# set number of partitions to CPU's
    df = df.repartition(npartitions=4)
	# df = df.repartition(npartitions=48)

	# remove any rows where the description field is null. This is the field that has the text data we want to use with NLP
    starttime = timer()
    df = df.dropna(subset=['description'])
    print("Time to remove nulls", timer() - starttime)

	# The json data has some extra columns that are not important for this exercise, so remove them to shrink the amount of data being manipulated
    starttime = timer()
    df = df.drop(columns=['brand', 'title', 'productid', 'categoryname'])
    print("Time to remove columns", timer() - starttime)

	# convert all text to lowercase
    starttime = timer()
    df['description'] = df['description'].str.lower()
    print("Time to lowercase", timer() - starttime)

	# use regex to remove any special characters.
    starttime = timer()
    df = df.replace(to_replace="(\\d|\\W)+", value=' ', regex=True)
    print("Time to remove special chars", timer() - starttime)

	# remove any word that is in the stopword list
    starttime = timer()
    df = df.replace(to_replace=stop_words, value=' ')
    print("Time to remove words", timer() - starttime)

	# This line really does nothing, but is for comparison because of the way dask works. This shows how long it takes to access the dataframe after it has been manipulated
	# Dask needs to reconcile changes made to all the frame partitions. This takes extra time other methods do not incur.
    starttime = timer()
    print (df.head(0))
    print("Time print frame", timer() - starttime)

    print("Time for entire execution", timer() - toptimer)

 

Apache Spark(pyspark) dataframe

Apache Spark has the ability to distributed operations across multiple processes. This allows data to be manipulated by many threads at once, similar to dask.

A few settings to note - Spark uses Java, and therefore requires a JVM heap.

For the large test - the heap is set to 300gb max. The max size of data serialization is set to 20gb. The network timeout, meaning how long a given thread can live, is 7200 seconds. This is far more than is needed and was set to this level for testing purposes so threads did not get killed.

For the small test - everything is left as defaults.

There is some overhead in starting up the Spark process, before code is actually executed. This is timed in the code as well.

 


from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.functions as f
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from timeit import default_timer as timer


toptimer = timer()

starttime = timer()
# conf = SparkConf().setMaster("local[*]").setAppName("SparkVect").set('spark.driver.memory', '300G').set('spark.driver.maxResultSize', '20G').set('spark.network.timeout', '7200s').set('spark.local.dir', '/u01/tmp')
conf = SparkConf().setMaster("local[*]").setAppName("SparkVect")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
print("Time to startup spark", timer() - starttime)

# these are stopwords - words we want excluded from our data
starttime = timer()
with open('english-stopwords.txt', 'r') as file:
    stop_words=file.read().split('\n')

# load json data
json_file_path = '/u01/loader/smalltest.json'
df = spark.read.json(json_file_path)
print("Time to read json and stopwords", timer() - starttime)

# remove any rows where the description field is null. This is the field that has the text data we want to use with NLP
starttime = timer()
df = df.filter(df.description. isNotNull())
print("Time to remove nulls", timer() - starttime)

# get rid of columns we dont need
starttime = timer()
drop_list = ['brand', 'title', 'productid', 'categoryname']
df = df.select([column for column in df.columns if column not in drop_list])
print("Time to remove columns", timer() - starttime)

# make all lower case
starttime = timer()
df = df.withColumn("description",f.lower(f.col("description")))
print("Time to lowercase", timer() - starttime)

# remove special characters
starttime = timer()
df = df.withColumn("description",f.regexp_replace(f.col("description"), "(\\d|\\W)+", ' '))
print("Time to remove special chars", timer() - starttime)

# remove stop words
starttime = timer()
regexTokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W")
tokenized = regexTokenizer.transform(df)
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stop_words)
cleaned = stopwordsRemover.transform(tokenized)
finaldf = cleaned.withColumn("description",f.concat_ws(' ', f.col("filtered")))
print("Time to remove words", timer() - starttime)


# get rid of extra columns. These were generated from the stopword removal process.
starttime = timer()
drop_list = ['words', 'filtered']
finaldf = finaldf.select([column for column in finaldf.columns if column not in drop_list])
print("Time to remove final column", timer() - starttime)

# This line really does nothing, but is for comparison because of the way dask works. This shows how long it takes to access the dataframe after it has been manipulated
starttime = timer()
print(df.head(0))
print("Time to print frame", timer() - starttime)

spark.stop()

print("Time for entire execution", timer() - toptimer)

 

Test performance results

All timing numbers are in seconds.

Small data set

For the small dataset, dask was the fastest, followed by spark, and finally pandas being the slowest.

An item mentioned in the test setup was the timing for the Spark startup process. It is worth noting the startup took 10 seconds, while the overall execution was  about 12 seconds. The actual processing of the data was fast with Spark, but the startup overhead makes the whole execution take longer.

Dask


Time to read json and stopwords 1.9466754389868584
Time to remove nulls 0.006330966003588401
Time to remove columns 0.003736091995961033
Time to lowercase 0.004399112003738992
Time to remove special chars 0.0029064449918223545
Time to remove words 0.011461061003501527
Empty DataFrame
Columns: [description, categoryid]
Index: []
Time print frame 3.72764095000457
Time for entire execution 5.704298381999251

 

Spark


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Time to startup spark 10.783671757002594
Time to remove nulls 0.06610469499719329
Time to remove columns 0.03633765899576247
Time to lowercase 0.01610688200162258
Time to remove special chars 0.04128759000741411
Time to remove words 0.3764381930086529
Time to remove final column 0.011100043993792497
[]
Time to print frame 0.07504766699275933
Time for entire execution 11.88015255999926

 

Pandas


Time to remove nulls 0.018146192000131123
Time to remove columns 0.008800052004517056
Time to lowercase 0.06798714700562414
Time to remove special chars 9.064440374000696
Time to remove words 39.55249689699849
Empty DataFrame
Columns: [description, categoryid]
Index: []
Time print frame 0.0008227450016420335
Time for entire execution 49.8852506980038

 

Larger data set

For the larger dataset, Spark was by far the fastest, with Dask second and Pandas last.

The actual data processing time of dask is extremely fast. It is the process of accessing the dataframe after manipulation that takes a big chunk of the overall execution time.

Spark


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Time to startup spark 3.647840406978503
Time to read json and stopwords 85.27908827015199                               
Time to remove nulls 0.05651667108759284
Time to remove columns 0.0326636319514364
Time to lowercase 0.02284233714453876
Time to remove special chars 0.05413018795661628
Time to remove words 0.31939824391156435
Time to remove final column 0.011086379177868366
[]
Time to print frame 0.08654274907894433
Time for entire execution 89.59366988181137

 

Dask


Time to read json and stopwords 194.5575270880945
Time to remove nulls 0.011882720049470663
Time to remove columns 0.004540998954325914
Time to lowercase 0.0058126861695200205
Time to remove special chars 0.0037077791057527065
Time to remove words 0.010335467988625169
Empty DataFrame
Columns: [description, categoryid]
Index: []
Time print frame 259.03842150000855
Time for entire execution 453.6337277069688

 

Pandas


Time to read json and stopwords 145.46433576499112
Time to remove nulls 14.382951432140544
Time to remove columns 44.43937778403051
Time to lowercase 26.785269804066047
Time to remove special chars 549.3819869868457
Time to remove words 2960.42678084597
Empty DataFrame
Columns: [description, categoryid]
Index: []
Time print frame 0.001058325171470642
Time for entire execution 3740.8823686749674

 

Conclusion

For me personally, pandas was the easiest to work with for small datasets. Even though it was slower, the difference wasn't enough to offset what I perceive as ease of use.

For working with large datasets, Spark would be my first choice for the significant performance improvements.

However, one thing with Dask that shines is the memory footprint used. If I am working on a box without enough RAM to handle the volume of data in Spark, I would go with Dask. It is able to process data in chunks with a smaller memory footprint. And like Spark, Dask can be distributed across multiple servers.

 

 

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.Captcha