Multiclass text classification crossvalidation with pyspark pipelines

October 7, 2020 | 9 minute read
Text Size 100%:

While exploring natural language processing (NLP) and various ways to classify text data, I wanted a way to test multiple classification algorithms and chains of data processing, and perform hyperparameter tuning on them, all at the same time. I ended up using Apache Spark with the CrossValidator and pipeline models. This article will detail the approach.

Import statements.

Nothing will be detailed here. These are just provided to make running this yourself simple.

 

from timeit import default_timer as timer
import datetime
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.classification import NaiveBayes, LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IDF
from pyspark.ml.feature import NGram
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession

 

General configuration settings

The following are general settings that should be adjusted to match your setup. While each setting is explained with comments, more detail on some options:

- parallelExec is how many threads the CrossValidator will use. One pipeline model will be executed per thread. Watch your CPU usage when setting this parameter, as you can quickly use up a lot of CPU.

- trackSubModels corresponds to the CrossValidator collectSubModels parameter. By default, CrossValidator only returns the best model it finds. Settings this to true will output the parameter combos used for all the models being tested. Note that enabling this uses more memory and outputs more logging.

- modelSavePath is where the best found model will be saved to for later use. Note that a limitation with Spark at the time of writing this article is that you cannot refit a model from the CrossValidator. That means if you want to train another set of data with just the parameters from the best model, you will need to manually configure those. The parameters are printed to stdout so you can save them.

- dataToLoad in my example is data that has already been cleaned/normalized, and is in parquet format. Adjust to whatever suites your needs.

###################################################################
########## general setting to configure before executing ##########
# how many parallel threads should the crossvalidator use
parallelExec=4

# should the crossvalidator collect submodel data
trackSubModels=False

# how many folds should crossvalidator use
numberFolds=3

# Path to save the best model to
modelSavePath = "/u01/models/bestmodel"

# data file to load
dataToLoad = "/u01/data/cleaneddata.parquet"

conf = SparkConf().setMaster("local[*]").setAppName("multigridsearch")
###################################################################

sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
spark = SparkSession(sc)


# load custom list of stop words. If you want to use stopwords, you will need to create your own list. One word per line. Or modify this code to use something like the supplied nltk stopwords.
with open('english-stopwords.txt', 'r') as file:
    stop_words = file.read().split('\n')

 

Pipeline elements and hyperparameters

Here is a subset of all the parameters in the full code to explain what is happening.

Define each possible pipeline stage you would like to use. Below each definition is a list that corresponds to parameters that can be tuned for each. This is not an all inclusive list of params, but a subset I chose to use. Refer to the pyspark API docs for each item to see all possible parameters.

The list that is defined for each item will be used later in a ParamGridBuilder, and executed with the CrossValidator to perform the hyperparameter tuning.

# any word less than this lenth will be removed from the feature list. For example, is stop words doesn't catch "a, at, of" and min length is 3, those are gone.
regexTokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W", minTokenLength=3)
minimumWordLength = [2, 3, 4]

countVectors = CountVectorizer(inputCol="ngrams", outputCol="features", vocabSize=100000, minDF=5)
vocabSize = [10000, 50000, 100000]
minDF = [3, 5]

Defining parameter grids

The next section is defining the various grids the CrossValidator will execute. Each grid setup requires a list of pipeline stages to execute, and a ParamGridBuilder to define the hyperparameters to tune against.

The stages are executed in the order you enter them in the list.

For the ParamGridBuilder, here is a breakdown of some of the lines of code:

  1. ParamGridBuilder().baseOn({pipeline.stages:cv_stages})
    1. This is telling the builder that the grid is going to be pipeline stages. The cv_stages is the list of pipeline stages that will be executed.
  2. .addGrid(regexTokenizer.minTokenLength, minimumWordLength)
    1. regexTokenizer corresponds to the RegexTokenizer we defined above.
    2. minTokenLength is the parameter in the API docs that defines how long a word must be to be included.
    3. minimumWordLength is the list of possible word lengths defined above. CrossValidator will individually test each value in your list, and measure it's overall accuracy.
  3. .addGrid(countVectors.vocabSize, vocabSize)
    1. countVectors corresponds to the CountVectorizer defined above.
    2. vocabSize is the parameter in the API docs that defines the maximum vocabulary size that will be saved.
    3. vocabularySize is the list of possible vocab sizes defined above.

We repeat the process for rf_stages. Notice the difference between cv_stages and rf_stages is the last item in the list of pipeline stages. In the full code for this section, you will see lr means LogisticRegression and rf means RandomForestRegressor. 

###################################################################
######### define our various grids we want to execute #############
cv_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, lr]
cv_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:cv_stages}) \
    .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \
    .addGrid(ngramer.n, ngramSize)\
    .addGrid(countVectors.vocabSize, vocabularySize)\
    .addGrid(countVectors.minDF, minDF) \
    .addGrid(lr.regParam, regParam) \
    .addGrid(lr.elasticNetParam, elasticNetParam) \
    .build()

rf_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, rf]
rf_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:rf_stages}) \
    .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \
    .addGrid(ngramer.n, ngramSize) \
    .addGrid(countVectors.vocabSize, vocabularySize) \
    .addGrid(countVectors.minDF, minDF) \
    .addGrid(rf.numTrees, numberTrees) \
    .addGrid(rf.maxDepth, maxDepth)\
    .build()

 

Combine grids

After you have defined all the possible grids you want to test, you need to combine them into one list variable for the CrossValidator.

You can define as few or as many grids as you want here. Keep in mind the more grids you include, and the more parameters in those grids, the total number of combinations the CrossValidator is going to test against can get quite large.

 

gridloop = [cv_paramgrid]
# gridloop = [cv_paramgrid, idf_paramgrid]
# gridloop = [cv_paramgrid, rf_paramgrid, idf_paramgrid, nb_paramgrid, lsvc_paramgrid]

 

Looping through and testing each grid

It is possible to combine all your grids as a single grid parameter, and execute them all at once. Meaning, no loop required. However, I intentionally broke the grid apart to deal with memory consumption, especially when you are collecting subModel data. By executing one grid at a time, memory is freed up between loop iterations.

By using this loop approach, we need to manually keep track of the best model between loop iterations by looking at its F1 score, which is stored in avgMetrics.

Each time a new model is found with the highest accuracy so far, we print out the parameters for all the stages that were used in that model, and the best parameters found.

If you enable collecting subModel data, we also print the parameters for every model being tested. Warning - this can use more memory and output quite a bit of data.

# using a loop instead of one big grid search to accommodate getting submodel data. With a huge grid, out of memory errors are more likely when collecting subModel data
for grid in gridloop:
    print ("running grid ", grid, "\n")
    starttime = timer()
    crossval = CrossValidator(estimatorParamMaps=grid,
                                     estimator=pipeline,
                                     evaluator=evaluator,
                                     numFolds=numberFolds,
                                     parallelism=parallelExec,
                                     collectSubModels=trackSubModels)

    cvModel = crossval.fit(data)
    print("Time to crossval", timer() - starttime)
    # get the accuracy metrics for the models. This is a list.
    avgMetricsGrid = cvModel.avgMetrics
    print (avgMetricsGrid)
    # get the max accuracy metric in the list of accuracy metrics.
    modelAcc = max(avgMetricsGrid)
    print("max score for this grid ", modelAcc)
    if (modelAcc > bestAcc):
        print ("this model has greater accuracy. Old acc ", bestAcc, " new acc ", modelAcc)
        bestModel = cvModel.bestModel
        bestAcc = modelAcc
        # print out the params for all the stages of this model
        for stage in bestModel.stages:
            print (stage.extractParamMap())

    # if you are collecting submodel data, this will dump all the param combinations being tested. You can use this with the avgMetricsGrid above to see the accruacy of all your param combos
    if (trackSubModels) :
        submods = cvModel.subModels
        for mods in submods:
            for mod in mods:
                for stage in mod.stages:
                    print(stage.extractParamMap())

 

General considerations

I suggest you start with small datasets when running this setup. Get an idea of how many possible parameter combinations there are for what you want to tune. If you used all of the grids and parameters defined in this example, you would be running the CrossValidator against 1368 possible models. This can take a considerable amount of time to execute depending on data size and your Spark server setup.

As data set size increases, more memory is required. Watch the number of partitions your data is being sliced into. Sometimes manually repartitioning your rdd into a larger number of partitions can make the execution require less memory.

This process was useful for me when playing with small sets of data. Once I found the best model, or possibly the top 2 models, I would manually adjust the parameters for a single model execution and train larger data sets.

 

Putting it all together

In this example, my parquet data has fields called categoryId, categoryName, and description. You would need to adjust these values to match your own data, where:

  • categoryId is the label to use for a classification. StringIndexer will make this numeric.
  • categoryName is really just informational for ease of seeing what maps to what. It is not actually needed
  • description is your text data you want to classify.
  • You would adjust the RegexTokenizer to match your description equivalent, and StringIndexer to match your categoryId equivalent column name.
###################################################################
########## general setting to configure before executing ##########
# how many parallel threads should the crossvalidator use
parallelExec=4

# should the corssvalidator collect submodel data
trackSubModels=False

# how many folds should crossvalidator use
numberFolds=3

# Path to save the best model to
modelSavePath = "/u01/models/bestmodel"

# data file to load
dataToLoad = "/u01/data/cleaneddata.parquet"

conf = SparkConf().setMaster("local[*]").setAppName("multigridsearch")
###################################################################

sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
spark = SparkSession(sc)


# load custom list of stop words. If you want to use stopwords, you will need to create your own list. One word per line. Or modify this code to use something like the supplied nltk stopwords.
with open('english-stopwords.txt', 'r') as file:
    stop_words = file.read().split('\n')


data = spark.read.parquet(dataToLoad)
data.show(5)
# how many partitions if the data being sliced into by default
print ('data partitions #:', data.rdd.getNumPartitions())
# What is the size of our dataset
print ('data size : ', data.count())
# List the top 20 categories by count
data.groupBy('categoryid', 'categoryname').count().orderBy('count', ascending=False).show(20)

###################################################################
### What features, algorithms and hyperparameters will be used ####

# tokenize our data into individual words. If you only wanted to use unigrams, you could use just this step and not need the ngram step as well. You would need to adjust column names.
# any word less than this lenth will be removed from the feature list. For example, is stop words doesn't catch "a, at, of" and min length is 3, those are gone.
regexTokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W", minTokenLength=3)
minimumWordLength = [2, 3, 4]

# convert our text labels to numerical values
label_stringIdx = StringIndexer(inputCol="categoryid", outputCol="label", handleInvalid="keep")

# remove stopwords using our custom stopword list
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stop_words)

# split words into groups. ngramSize defines the group size. For example, 1 is unigrams, 2 is bigrams etc...
ngramer = NGram(inputCol='filtered', outputCol='ngrams')
ngramSize = [1, 2]

countVectors = CountVectorizer(inputCol="ngrams", outputCol="features", vocabSize=100000, minDF=5)
vocabularySize = [10000, 50000, 100000]
minDF = [3, 5]

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
regParam = [0.01, 0.1, 0.3, 0.5]
elasticNetParam = [0, .5, 1]

rf = RandomForestRegressor(subsamplingRate=0.15, featuresCol='features', labelCol='label')
numberTrees = [10, 20, 30]
maxDepth =  [5, 10]

nb = NaiveBayes(smoothing=1)
nbSmoothing =  [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]

idf = IDF(inputCol="features", outputCol="idf", minDocFreq=5)
minDocFreq=[5, 10]

# Although linearSVC is for binary classification, we can use it for multiclass by using the OneVsRest estimator
lsvc = LinearSVC()
lvcsMaxIter = [10, 50, 100]
lvcsRegParam = [0.001, 0.01, 1.0,10.0]
ovr = OneVsRest(classifier=lsvc)
###################################################################

# set pipeline to en empty list of pipeline stages
pipeline = Pipeline(stages=[])

###################################################################
######### define our various grids we want to execute #############
cv_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, lr]
cv_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:cv_stages}) \
    .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \
    .addGrid(ngramer.n, ngramSize)\
    .addGrid(countVectors.vocabSize, vocabularySize)\
    .addGrid(countVectors.minDF, minDF) \
    .addGrid(lr.regParam, regParam) \
    .addGrid(lr.elasticNetParam, elasticNetParam) \
    .build()

rf_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, rf]
rf_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:rf_stages}) \
    .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \
    .addGrid(ngramer.n, ngramSize) \
    .addGrid(countVectors.vocabSize, vocabularySize) \
    .addGrid(countVectors.minDF, minDF) \
    .addGrid(rf.numTrees, numberTrees) \
    .addGrid(rf.maxDepth, maxDepth)\
    .build()

idf_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, idf, lr]
idf_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:idf_stages}) \
    .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \
    .addGrid(ngramer.n, ngramSize) \
    .addGrid(countVectors.vocabSize, vocabularySize) \
    .addGrid(countVectors.minDF, minDF) \
    .addGrid(idf.minDocFreq, minDocFreq)\
    .build()

nb_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, nb]
nb_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:nb_stages}) \
    .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \
    .addGrid(ngramer.n, ngramSize) \
    .addGrid(countVectors.vocabSize, vocabularySize) \
    .addGrid(countVectors.minDF, minDF) \
    .addGrid(nb.smoothing, nbSmoothing) \
    .build()

lsvc_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, ovr]
lsvc_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:lsvc_stages}) \
    .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \
    .addGrid(ngramer.n, ngramSize) \
    .addGrid(countVectors.vocabSize, vocabularySize) \
    .addGrid(countVectors.minDF, minDF) \
    .addGrid(lsvc.maxIter, lvcsMaxIter) \
    .addGrid(lsvc.regParam, lvcsRegParam) \
    .build()

# gridloop = [cv_paramgrid, idf_paramgrid]
# gridloop = [cv_paramgrid]
gridloop = [cv_paramgrid, rf_paramgrid, idf_paramgrid, nb_paramgrid, lsvc_paramgrid]

###################################################################


# how many parameter combinations are about to be tested. Warning - this can quickly get out of hand
paramCombo = 0
for grid in gridloop:
    paramCombo = paramCombo + len(grid)
print ("Number of parameter combinations being tested ", paramCombo)

print("starting crossvalidation at ", datetime.datetime.now(), "\n")

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

# Since a loop is used, we manually track which model between loops has the highest accuracy
bestAcc = 0

# using a loop instead of one big grid search to accomodate getting submodel data. With a huge grid, out of memory errors are more likely when collecting subModel data
for grid in gridloop:
    print ("running grid ", grid, "\n")
    starttime = timer()
    crossval = CrossValidator(estimatorParamMaps=grid,
                                     estimator=pipeline,
                                     evaluator=evaluator,
                                     numFolds=numberFolds,
                                     parallelism=parallelExec,
                                     collectSubModels=trackSubModels)

    cvModel = crossval.fit(data)
    print("Time to crossval", timer() - starttime)
    # get the accuracy metrics for the models. This is a list.
    avgMetricsGrid = cvModel.avgMetrics
    print (avgMetricsGrid)
    # get the max accuracy metric in the list of accuracy metrics.
    modelAcc = max(avgMetricsGrid)
    print("max score for this grid ", modelAcc)
    if (modelAcc > bestAcc):
        print ("this model has greater accuracy. Old acc ", bestAcc, " new acc ", modelAcc)
        bestModel = cvModel.bestModel
        bestAcc = modelAcc
        # print out the params for all the stages of this model
        for stage in bestModel.stages:
            print (stage.extractParamMap())

    # if you are collecting submodel data, this will dump all the param combinations being tested. You can use this with the avgMetricsGrid above to see the accruacy of all your param combos
    if (trackSubModels) :
        submods = cvModel.subModels
        for mods in submods:
            for mod in mods:
                for stage in mod.stages:
                    print(stage.extractParamMap())

# save the best model for reuse.
bestModel.save(modelSavePath)

spark.stop()

 

 

Michael Shanley


Previous Post

Push Cloud Guard Problems to Splunk HEC with OCI SDK

Uday Sambhara | 4 min read

Next Post


Observabitiliy on Oracle OCI - Using Custom Logs in OCI Logging to Monitor and Analyze Cloud-Native Applications

Stefan Koser | 5 min read