Machine learning at Scale using Pyspark & deployment using AzureML/Flask

Machine Learning in Pyspark


Yogesh Agrawal

6 days ago | 12 min read

Hello all, from last few months I was working on scalability & productionizing machine learning algorithms. I searched a lot in internet and got very less support. Companies still struggling to get higher success rate in this segment. Based on survey only 4% of ML model go for deployment and production environment, this is because less support of community towards this end. Let’s begin with our today’s topic and contribute a bit to community.

Few things to know before we proceed further:-

  1. Why Pyspark and databricks notebook ?
  2. What is flask and what are alternative ?

I have used pyspark and Databricks notebook as it is well defined to display spark dataframe and graphs as well. Databricks provides cluster setup as well so we can use any machine configuration in cluster for higher computational power. In our case I have used(3 machine 42GB Ram). You can check [here]( for more information and use.

Flask is used here for deployment. Flask is a web server for python which help to create server which exposes end point for serving request. It is not mandatory that we have to go with flask, there are many alternatives present in market. Later we will discuss more about this and how to use etc.. for more information check (this)[]

I chooses very basic dataset, Titanic dataset is used here but any one can play with other dataset as well. Some of the basic preprocessing operation performed here like feature extraction, imputation, drop etc.. I will show each step in a functional way.

Step 1 :- Import all the important library


Loading important package of spark


from pyspark.sql import SparkSession

from import Pipeline,PipelineModel

from pyspark.sql.functions import *

from import Transformer,Estimator

from import StringIndexer,VectorAssembler

from import LogisticRegression

from import CrossValidator, ParamGridBuilder

Here we are using pipeline ,which basically works on stages like sequential operation. Later in code you will see how I am using pipeline for StringIndexer,VectorAssembler and algorithm

other imported library please check in spark official website.

Step -2 : Create a Spark Session


Spark session creater


st = SparkSession \

.builder \

.appName('Titanic') \


You can set many custom memory option as well for this session, to keep simple i am using default config.

Step -3 Load Dataset in Spark DataFrame


Load data function for loading data..

@param -

path - path of file

header_value - header value, incase true first row will be header

@return - dataframe of loaded intended data.


def load_data(path,header_value):

df =,inferSchema=True,header=header_value)

return df

df = load_data('/FileStore/tables/titanic_train.csv',True)

df_test = load_data('/FileStore/tables/titanic_test.csv',True)

Load Data file, in this case train and test data file is separate. For convenience , I have create a function so it will call function load_data each time when want to load data.

Step -4 Create a Custom Transformer for preprocessing data


Custom Transformer class for tranformation implementation .

@param -

Transformer - Transformer class refrence

df - dataframe in which operation need to be carried ( passed through tranform function)

A - A class for variable sharing.

@return -

df - a dataframe which contains prediction value as well with featured value.


class preprocess_transform(Transformer):

def _transform(self,df):

print("******************************** in Transform method ...************************************")


Generate feature column in dataframe based on specific logic

@param -

df - dataframe for operation.

@return -

df - dataframe with generated feature.


def feature_generation(self,df):

df = df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))

df = df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],

['Miss','Miss','Miss','Mr','Mr', 'Mrs', 'Mrs', 'Other', 'Other','Other','Mr','Mr','Mr'])

df = df.withColumn("Family_Size",col('SibSp')+col('Parch'))

df = df.withColumn('Alone',lit(0))

df = df.withColumn("Alone",when(df["Family_Size"] ==0, 1).otherwise(df["Alone"]))

return df


Impute Age based on Age mean of specific gender. ex for male mean is 46 update all null male row with 46, similarly for others

@param -

df - dataframe for operation

@return -

df - with imputed value


def Age_impute(self,df):

Age_mean = df.groupBy("Initial").avg('Age')

Age_mean = Age_mean.withColumnRenamed('avg(Age)','mean_age')

Initials_list ="Initial").rdd.flatMap(lambda x: x).collect()

Mean_list ="mean_age").rdd.flatMap(lambda x: x).collect()

for i,j in zip(Initials_list,Mean_list):

df = df.withColumn("Age",when((df["Initial"] == i) & (df["Age"].isNull()), j).otherwise(df["Age"]))

return df


Impute Embark based on mode of embark column

@param -

df - dataframe for operation

@return -

df - with imputed value


def Embark_impute(self,df):

mode_value = df.groupBy('Embarked').count().sort(col('count').desc()).collect()[0][0]

df = df.fillna({'Embarked':mode_value})

return df


Impute Fare based on the class which he/she had sat ex: class 3rd has mean fare 9 and null fare belong to 3rd class so fill 9

@param -

df - dataframe for operation

@return -

df - with imputed value


def Fare_impute(self,df):

Select_pclass = df.filter(col('Fare').isNull()).select('Pclass')

if Select_pclass.count() > 0:

Pclass = Select_pclass.rdd.flatMap(lambda x: x).collect()

for i in Pclass:

mean_pclass_fare = df.groupBy('Pclass').mean().select('Pclass','avg(Fare)').filter(col('Pclass')== i).collect()[0][1]

df = df.withColumn("Fare",when((col('Fare').isNull()) & (col('Pclass') == i),mean_pclass_fare).otherwise(col('Fare')))

return df


combining all column imputation together..

@param -

df - a dataframe for operation.

@return -

df - dataframe with imputed value.


def all_impute_together(df):

df = Age_impute(self,df)

df = Embark_impute(self,df)

df = Fare_impute(self,df)

return df


converting string to numeric values.

@param -

df - dataframe contained all columns.

col_list - list of column need to be

@return -

df - transformed dataframe.


def stringToNumeric_conv(df,col_list):

indexer = [StringIndexer(inputCol=column,outputCol=column+"_index").fit(df) for column in col_list]

string_change_pipeline = Pipeline(stages=indexer)

df =

return df


Drop column from dataframe

@param -

df - dataframe

col_name - name of column which need to be dropped.

@return -

df - a dataframe except dropped column


def drop_column(df,col_list):

for i in col_list:

df = df.drop(col(i))

return df

col_list = ["Sex","Embarked","Initial"]

dataset = feature_generation(self,df)

df_impute = all_impute_together(dataset)

df_numeric = stringToNumeric_conv(df_impute,col_list)

df_final = drop_column(df_numeric,['Cabin','Name','Ticket','Family_Size','SibSp','Parch','Sex','Embarked','Initial'])

return df_final

In Transformer class various methods present, each for different operation.

Feature_generation() — generate title from name column.

Age_impute() — Impute age based on age group mean.

Embark_impute & Fare_impute — Impute Embark and fare

StringToNumeric()- String Data type to numeric

Drop_col — Drop Unwanted columns from dataframe

Step -5 Create pipeline and extract model

from import GBTClassifier

from import RandomForestClassifier

from import MulticlassClassificationEvaluator

# initialization for pipeline setup

my_model = preprocess_transform()

df = my_model.transform(df)

feature = VectorAssembler(inputCols=['Pclass','Age','Fare','Alone','Sex_index','Embarked_index','Initial_index'],outputCol="features")

rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=10)

''' pipeline stages initilization , fit and transform. '''

pipeline = Pipeline(stages=[feature,rf])

model =

paramGrid = ParamGridBuilder().addGrid(rf.numTrees,[100,300]).build()

evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator,numFolds=3)

# use 3+ folds in practice # Run cross-validation, and choose the best set of parameters.

cvModel =

prediction = cvModel.transform(df_test)

mlflow.spark.log_model(model, "spark-model16") mlflow.spark.save_model(model, "spark-model_test")

In this steps, first we call transformer class and transform our dataframe with above option we have created.

After transformation call FeatureAssembler which is used to bind all the input feature in a single vector. Create a Pipeline object passing two stages first is feature assembler and second is Estimator(Classifier or regressor).

If you want, you can create a cross validation using list of hyperparameter, paramGridBuilder use to assign various hyperparameter using list of values.

Set Evaluator for measuring criteria like MulticlassClassificationEvaluator

Invoke crossvalidator and pass pipeline model, parameterGrid, evaluator and num of folds.

Once this all done, fit the data in crossvalidator function remember it contain a pipeline which has model. So it will train the model with different parameter combination and finally use transform method to get predicted data.

MLFlow is a platform which manages the machine learning cycle. After prediction we can use two function of ml flow which is log and save. Log function will log the processing metrics in ML FLow portal and Save function will save the best ml model. ML flow has many other useful function so just check out their official documentation.

Step-6 Deployment in Azure using Azure ML

import mlflow.azureml

from azureml.core import Workspace

from azureml.core.webservice import AciWebservice, Webservice

Create a workspace in Azure ML

workspace_name = "MLServiceDMWS11"

subscription_id = "xxxxxxxx-23ad-4272-xxxx-0d504b07d497"

resource_group = "mlservice_ws"

location = "xxx"

azure_workspace = Workspace.create(name=workspace_name,






Build image of the model in the workspace basically it means we are just keeping model object in workspace

azure_image, azure_model = mlflow.azureml.build_image(model_uri="/dbfs/databricks/mlflow/my_test_ml_flow",




Using Azure web service api will expose model as rest end point. Passing model image and workspace and configuration setting.

webservice_deployment_config = AciWebservice.deploy_configuration()

webservice = Webservice.deploy_from_image(deployment_config=webservice_deployment_config,





print("Scoring URI is: %s", webservice.scoring_uri)

Once model is deployed let just check whether the API which we created is working or not. Pass the parameter and corresponding value in list and hit a post request. Once the request is successful it will acknowledge with response. Standard way of taking i/p and o/p is in json format.

import requests

import json

sample_input = {

"columns": [




"coln "


"data": [

[val1, val2, val3,...... valn]



response =

url=webservice.scoring_uri, data=json.dumps(sample_input),

headers={"Content-type": "application/json"})

response_json = json.loads(response.text)


This is just initial draft of this article, I will update this is in near future. You can check whole code in my git link below. Always appreciate if you have any feedback or suggestions.

Thanks for your support! I will see you next time soon:)


Created by

Yogesh Agrawal

Senior Data Scientist







Related Articles