While exploring natural language processing (NLP) and various ways to classify text data, I wanted a way to test multiple classification algorithms and chains of data processing, and perform hyperparameter tuning on them, all at the same time. I ended up using Apache Spark with the CrossValidator and pipeline models. This article will detail the approach.
Nothing will be detailed here. These are just provided to make running this yourself simple.
from timeit import default_timer as timer import datetime from pyspark import SparkConf from pyspark import SparkContext from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression, OneVsRest from pyspark.ml.classification import NaiveBayes, LinearSVC from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml.feature import IDF from pyspark.ml.feature import NGram from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer from pyspark.ml.feature import StringIndexer from pyspark.ml.regression import RandomForestRegressor from pyspark.ml.tuning import ParamGridBuilder, CrossValidator from pyspark.sql import SparkSession
The following are general settings that should be adjusted to match your setup. While each setting is explained with comments, more detail on some options:
- parallelExec is how many threads the CrossValidator will use. One pipeline model will be executed per thread. Watch your CPU usage when setting this parameter, as you can quickly use up a lot of CPU.
- trackSubModels corresponds to the CrossValidator collectSubModels parameter. By default, CrossValidator only returns the best model it finds. Settings this to true will output the parameter combos used for all the models being tested. Note that enabling this uses more memory and outputs more logging.
- modelSavePath is where the best found model will be saved to for later use. Note that a limitation with Spark at the time of writing this article is that you cannot refit a model from the CrossValidator. That means if you want to train another set of data with just the parameters from the best model, you will need to manually configure those. The parameters are printed to stdout so you can save them.
- dataToLoad in my example is data that has already been cleaned/normalized, and is in parquet format. Adjust to whatever suites your needs.
################################################################### ########## general setting to configure before executing ########## # how many parallel threads should the crossvalidator use parallelExec=4 # should the crossvalidator collect submodel data trackSubModels=False # how many folds should crossvalidator use numberFolds=3 # Path to save the best model to modelSavePath = "/u01/models/bestmodel" # data file to load dataToLoad = "/u01/data/cleaneddata.parquet" conf = SparkConf().setMaster("local[*]").setAppName("multigridsearch") ################################################################### sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") spark = SparkSession(sc) # load custom list of stop words. If you want to use stopwords, you will need to create your own list. One word per line. Or modify this code to use something like the supplied nltk stopwords. with open('english-stopwords.txt', 'r') as file: stop_words = file.read().split('\n')
Here is a subset of all the parameters in the full code to explain what is happening.
Define each possible pipeline stage you would like to use. Below each definition is a list that corresponds to parameters that can be tuned for each. This is not an all inclusive list of params, but a subset I chose to use. Refer to the pyspark API docs for each item to see all possible parameters.
The list that is defined for each item will be used later in a ParamGridBuilder, and executed with the CrossValidator to perform the hyperparameter tuning.
# any word less than this lenth will be removed from the feature list. For example, is stop words doesn't catch "a, at, of" and min length is 3, those are gone. regexTokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W", minTokenLength=3) minimumWordLength = [2, 3, 4] countVectors = CountVectorizer(inputCol="ngrams", outputCol="features", vocabSize=100000, minDF=5) vocabSize = [10000, 50000, 100000] minDF = [3, 5]
The next section is defining the various grids the CrossValidator will execute. Each grid setup requires a list of pipeline stages to execute, and a ParamGridBuilder to define the hyperparameters to tune against.
The stages are executed in the order you enter them in the list.
For the ParamGridBuilder, here is a breakdown of some of the lines of code:
We repeat the process for rf_stages. Notice the difference between cv_stages and rf_stages is the last item in the list of pipeline stages. In the full code for this section, you will see lr means LogisticRegression and rf means RandomForestRegressor.
################################################################### ######### define our various grids we want to execute ############# cv_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, lr] cv_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:cv_stages}) \ .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \ .addGrid(ngramer.n, ngramSize)\ .addGrid(countVectors.vocabSize, vocabularySize)\ .addGrid(countVectors.minDF, minDF) \ .addGrid(lr.regParam, regParam) \ .addGrid(lr.elasticNetParam, elasticNetParam) \ .build() rf_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, rf] rf_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:rf_stages}) \ .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \ .addGrid(ngramer.n, ngramSize) \ .addGrid(countVectors.vocabSize, vocabularySize) \ .addGrid(countVectors.minDF, minDF) \ .addGrid(rf.numTrees, numberTrees) \ .addGrid(rf.maxDepth, maxDepth)\ .build()
After you have defined all the possible grids you want to test, you need to combine them into one list variable for the CrossValidator.
You can define as few or as many grids as you want here. Keep in mind the more grids you include, and the more parameters in those grids, the total number of combinations the CrossValidator is going to test against can get quite large.
gridloop = [cv_paramgrid] # gridloop = [cv_paramgrid, idf_paramgrid] # gridloop = [cv_paramgrid, rf_paramgrid, idf_paramgrid, nb_paramgrid, lsvc_paramgrid]
It is possible to combine all your grids as a single grid parameter, and execute them all at once. Meaning, no loop required. However, I intentionally broke the grid apart to deal with memory consumption, especially when you are collecting subModel data. By executing one grid at a time, memory is freed up between loop iterations.
By using this loop approach, we need to manually keep track of the best model between loop iterations by looking at its F1 score, which is stored in avgMetrics.
Each time a new model is found with the highest accuracy so far, we print out the parameters for all the stages that were used in that model, and the best parameters found.
If you enable collecting subModel data, we also print the parameters for every model being tested. Warning - this can use more memory and output quite a bit of data.
# using a loop instead of one big grid search to accommodate getting submodel data. With a huge grid, out of memory errors are more likely when collecting subModel data for grid in gridloop: print ("running grid ", grid, "\n") starttime = timer() crossval = CrossValidator(estimatorParamMaps=grid, estimator=pipeline, evaluator=evaluator, numFolds=numberFolds, parallelism=parallelExec, collectSubModels=trackSubModels) cvModel = crossval.fit(data) print("Time to crossval", timer() - starttime) # get the accuracy metrics for the models. This is a list. avgMetricsGrid = cvModel.avgMetrics print (avgMetricsGrid) # get the max accuracy metric in the list of accuracy metrics. modelAcc = max(avgMetricsGrid) print("max score for this grid ", modelAcc) if (modelAcc > bestAcc): print ("this model has greater accuracy. Old acc ", bestAcc, " new acc ", modelAcc) bestModel = cvModel.bestModel bestAcc = modelAcc # print out the params for all the stages of this model for stage in bestModel.stages: print (stage.extractParamMap()) # if you are collecting submodel data, this will dump all the param combinations being tested. You can use this with the avgMetricsGrid above to see the accruacy of all your param combos if (trackSubModels) : submods = cvModel.subModels for mods in submods: for mod in mods: for stage in mod.stages: print(stage.extractParamMap())
I suggest you start with small datasets when running this setup. Get an idea of how many possible parameter combinations there are for what you want to tune. If you used all of the grids and parameters defined in this example, you would be running the CrossValidator against 1368 possible models. This can take a considerable amount of time to execute depending on data size and your Spark server setup.
As data set size increases, more memory is required. Watch the number of partitions your data is being sliced into. Sometimes manually repartitioning your rdd into a larger number of partitions can make the execution require less memory.
This process was useful for me when playing with small sets of data. Once I found the best model, or possibly the top 2 models, I would manually adjust the parameters for a single model execution and train larger data sets.
In this example, my parquet data has fields called categoryId, categoryName, and description. You would need to adjust these values to match your own data, where:
################################################################### ########## general setting to configure before executing ########## # how many parallel threads should the crossvalidator use parallelExec=4 # should the corssvalidator collect submodel data trackSubModels=False # how many folds should crossvalidator use numberFolds=3 # Path to save the best model to modelSavePath = "/u01/models/bestmodel" # data file to load dataToLoad = "/u01/data/cleaneddata.parquet" conf = SparkConf().setMaster("local[*]").setAppName("multigridsearch") ################################################################### sc = SparkContext(conf=conf) sc.setLogLevel("ERROR") spark = SparkSession(sc) # load custom list of stop words. If you want to use stopwords, you will need to create your own list. One word per line. Or modify this code to use something like the supplied nltk stopwords. with open('english-stopwords.txt', 'r') as file: stop_words = file.read().split('\n') data = spark.read.parquet(dataToLoad) data.show(5) # how many partitions if the data being sliced into by default print ('data partitions #:', data.rdd.getNumPartitions()) # What is the size of our dataset print ('data size : ', data.count()) # List the top 20 categories by count data.groupBy('categoryid', 'categoryname').count().orderBy('count', ascending=False).show(20) ################################################################### ### What features, algorithms and hyperparameters will be used #### # tokenize our data into individual words. If you only wanted to use unigrams, you could use just this step and not need the ngram step as well. You would need to adjust column names. # any word less than this lenth will be removed from the feature list. For example, is stop words doesn't catch "a, at, of" and min length is 3, those are gone. regexTokenizer = RegexTokenizer(inputCol="description", outputCol="words", pattern="\\W", minTokenLength=3) minimumWordLength = [2, 3, 4] # convert our text labels to numerical values label_stringIdx = StringIndexer(inputCol="categoryid", outputCol="label", handleInvalid="keep") # remove stopwords using our custom stopword list stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stop_words) # split words into groups. ngramSize defines the group size. For example, 1 is unigrams, 2 is bigrams etc... ngramer = NGram(inputCol='filtered', outputCol='ngrams') ngramSize = [1, 2] countVectors = CountVectorizer(inputCol="ngrams", outputCol="features", vocabSize=100000, minDF=5) vocabularySize = [10000, 50000, 100000] minDF = [3, 5] lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0) regParam = [0.01, 0.1, 0.3, 0.5] elasticNetParam = [0, .5, 1] rf = RandomForestRegressor(subsamplingRate=0.15, featuresCol='features', labelCol='label') numberTrees = [10, 20, 30] maxDepth = [5, 10] nb = NaiveBayes(smoothing=1) nbSmoothing = [0.0, 0.2, 0.4, 0.6, 0.8, 1.0] idf = IDF(inputCol="features", outputCol="idf", minDocFreq=5) minDocFreq=[5, 10] # Although linearSVC is for binary classification, we can use it for multiclass by using the OneVsRest estimator lsvc = LinearSVC() lvcsMaxIter = [10, 50, 100] lvcsRegParam = [0.001, 0.01, 1.0,10.0] ovr = OneVsRest(classifier=lsvc) ################################################################### # set pipeline to en empty list of pipeline stages pipeline = Pipeline(stages=[]) ################################################################### ######### define our various grids we want to execute ############# cv_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, lr] cv_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:cv_stages}) \ .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \ .addGrid(ngramer.n, ngramSize)\ .addGrid(countVectors.vocabSize, vocabularySize)\ .addGrid(countVectors.minDF, minDF) \ .addGrid(lr.regParam, regParam) \ .addGrid(lr.elasticNetParam, elasticNetParam) \ .build() rf_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, rf] rf_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:rf_stages}) \ .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \ .addGrid(ngramer.n, ngramSize) \ .addGrid(countVectors.vocabSize, vocabularySize) \ .addGrid(countVectors.minDF, minDF) \ .addGrid(rf.numTrees, numberTrees) \ .addGrid(rf.maxDepth, maxDepth)\ .build() idf_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, idf, lr] idf_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:idf_stages}) \ .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \ .addGrid(ngramer.n, ngramSize) \ .addGrid(countVectors.vocabSize, vocabularySize) \ .addGrid(countVectors.minDF, minDF) \ .addGrid(idf.minDocFreq, minDocFreq)\ .build() nb_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, nb] nb_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:nb_stages}) \ .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \ .addGrid(ngramer.n, ngramSize) \ .addGrid(countVectors.vocabSize, vocabularySize) \ .addGrid(countVectors.minDF, minDF) \ .addGrid(nb.smoothing, nbSmoothing) \ .build() lsvc_stages = [regexTokenizer, label_stringIdx, stopwordsRemover, ngramer, countVectors, ovr] lsvc_paramgrid = ParamGridBuilder().baseOn({pipeline.stages:lsvc_stages}) \ .addGrid(regexTokenizer.minTokenLength, minimumWordLength) \ .addGrid(ngramer.n, ngramSize) \ .addGrid(countVectors.vocabSize, vocabularySize) \ .addGrid(countVectors.minDF, minDF) \ .addGrid(lsvc.maxIter, lvcsMaxIter) \ .addGrid(lsvc.regParam, lvcsRegParam) \ .build() # gridloop = [cv_paramgrid, idf_paramgrid] # gridloop = [cv_paramgrid] gridloop = [cv_paramgrid, rf_paramgrid, idf_paramgrid, nb_paramgrid, lsvc_paramgrid] ################################################################### # how many parameter combinations are about to be tested. Warning - this can quickly get out of hand paramCombo = 0 for grid in gridloop: paramCombo = paramCombo + len(grid) print ("Number of parameter combinations being tested ", paramCombo) print("starting crossvalidation at ", datetime.datetime.now(), "\n") evaluator = MulticlassClassificationEvaluator(predictionCol="prediction") # Since a loop is used, we manually track which model between loops has the highest accuracy bestAcc = 0 # using a loop instead of one big grid search to accomodate getting submodel data. With a huge grid, out of memory errors are more likely when collecting subModel data for grid in gridloop: print ("running grid ", grid, "\n") starttime = timer() crossval = CrossValidator(estimatorParamMaps=grid, estimator=pipeline, evaluator=evaluator, numFolds=numberFolds, parallelism=parallelExec, collectSubModels=trackSubModels) cvModel = crossval.fit(data) print("Time to crossval", timer() - starttime) # get the accuracy metrics for the models. This is a list. avgMetricsGrid = cvModel.avgMetrics print (avgMetricsGrid) # get the max accuracy metric in the list of accuracy metrics. modelAcc = max(avgMetricsGrid) print("max score for this grid ", modelAcc) if (modelAcc > bestAcc): print ("this model has greater accuracy. Old acc ", bestAcc, " new acc ", modelAcc) bestModel = cvModel.bestModel bestAcc = modelAcc # print out the params for all the stages of this model for stage in bestModel.stages: print (stage.extractParamMap()) # if you are collecting submodel data, this will dump all the param combinations being tested. You can use this with the avgMetricsGrid above to see the accruacy of all your param combos if (trackSubModels) : submods = cvModel.subModels for mods in submods: for mod in mods: for stage in mod.stages: print(stage.extractParamMap()) # save the best model for reuse. bestModel.save(modelSavePath) spark.stop()
Next Post