philschmid blog

MLOps: End-to-End Hugging Face Transformers with the Hub & SageMaker Pipelines

#AWS #BERT #HuggingFace #Sagemaker
, November 10, 2021 · 9 min read

Photo by T K on Unsplash

Welcome to this getting started guide, we will use the new Hugging Face Inference DLCs and Amazon SageMaker Python SDK to create an End-to-End MLOps Pipeline for Hugging Face Transformers from training to production using Amazon SageMaker. This blog posts demonstrates how to use SageMaker Pipelines to train a Hugging Face Transformer model and deploy it. The SageMaker integration with Hugging Face makes it easy to train and deploy advanced NLP models. A Lambda step in SageMaker Pipelines enables you to easily do lightweight model deployments and other serverless operations.

In this example we are going to fine-tune and deploy a DistilBERT model on the imdb dataset.

Development Environment and Permissions

Installation & Imports

We’ll start by updating the SageMaker SDK, and importing some necessary packages.

1 !pip install "sagemaker>=2.48.0" --upgrade

Import all relevant packages for SageMaker Pipelines.

1 import boto3
2 import os
3 import numpy as np
4 import pandas as pd
5 import sagemaker
6 import sys
7 import time
8
9 from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString
10
11 from sagemaker.lambda_helper import Lambda
12
13 from sagemaker.sklearn.processing import SKLearnProcessor
14
15 from sagemaker.processing import ProcessingInput, ProcessingOutput
16 from sagemaker.workflow.steps import CacheConfig, ProcessingStep
17
18 from sagemaker.huggingface import HuggingFace, HuggingFaceModel
19 import sagemaker.huggingface
20
21 from sagemaker.inputs import TrainingInput
22 from sagemaker.workflow.steps import TrainingStep
23
24 from sagemaker.processing import ScriptProcessor
25 from sagemaker.workflow.properties import PropertyFile
26 from sagemaker.workflow.step_collections import CreateModelStep, RegisterModel
27
28 from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo,ConditionGreaterThanOrEqualTo
29 from sagemaker.workflow.condition_step import ConditionStep
30 from sagemaker.workflow.functions import JsonGet
31
32 from sagemaker.workflow.pipeline import Pipeline, PipelineExperimentConfig
33 from sagemaker.workflow.execution_variables import ExecutionVariables

Permissions

If you are going to use Sagemaker in a local environment. You need access to an IAM Role with the required permissions for Sagemaker. You can find here more about it.

1 import sagemaker
2
3 sess = sagemaker.Session()
4 region = sess.boto_region_name
5
6 # sagemaker session bucket -> used for uploading data, models and logs
7 # sagemaker will automatically create this bucket if it not exists
8 sagemaker_session_bucket=None
9 if sagemaker_session_bucket is None and sess is not None:
10 # set to default bucket if a bucket name is not given
11 sagemaker_session_bucket = sess.default_bucket()
12
13 role = sagemaker.get_execution_role()
14 sagemaker_session = sagemaker.Session(default_bucket=sagemaker_session_bucket)
15
16 print(f"sagemaker role arn: {role}")
17 print(f"sagemaker bucket: {sagemaker_session.default_bucket()}")
18 print(f"sagemaker session region: {sagemaker_session.boto_region_name}")

Pipeline Overview

pipeline

Defining the Pipeline

0. Pipeline parameters

Before defining the pipeline, it is important to parameterize it. SageMaker Pipeline can directly be parameterized, including instance types and counts.

Read more about Parameters in the documentation

1 # S3 prefix where every assets will be stored
2 s3_prefix = "hugging-face-pipeline-demo"
3
4 # s3 bucket used for storing assets and artifacts
5 bucket = sagemaker_session.default_bucket()
6
7 # aws region used
8 region = sagemaker_session.boto_region_name
9
10 # base name prefix for sagemaker jobs (training, processing, inference)
11 base_job_prefix = s3_prefix
12
13 # Cache configuration for workflow
14 cache_config = CacheConfig(enable_caching=False, expire_after="30d")
15
16
17 # package versions
18 transformers_version = "4.11.0"
19 pytorch_version = "1.9.0"
20 py_version = "py38"
21
22 model_id_="distilbert-base-uncased"
23 dataset_name_="imdb"
24
25 model_id = ParameterString(name="ModelId", default_value="distilbert-base-uncased")
26 dataset_name = ParameterString(name="DatasetName", default_value="imdb")

1. Processing Step

A SKLearn Processing step is used to invoke a SageMaker Processing job with a custom python script - preprocessing.py.

Processing Parameter

1 processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.c5.2xlarge")
2 processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
3 processing_script = ParameterString(name="ProcessingScript", default_value="./scripts/preprocessing.py")

Processor

1 processing_output_destination = f"s3://{bucket}/{s3_prefix}/data"
2
3
4 sklearn_processor = SKLearnProcessor(
5 framework_version="0.23-1",
6 instance_type=processing_instance_type,
7 instance_count=processing_instance_count,
8 base_job_name=base_job_prefix + "/preprocessing",
9 sagemaker_session=sagemaker_session,
10 role=role,
11 )
12
13 step_process = ProcessingStep(
14 name="ProcessDataForTraining",
15 cache_config=cache_config,
16 processor=sklearn_processor,
17 job_arguments=["--transformers_version",transformers_version,
18 "--pytorch_version",pytorch_version,
19 "--model_id",model_id_,
20 "--dataset_name",dataset_name_],
21 outputs=[
22 ProcessingOutput(
23 output_name="train",
24 destination=f"{processing_output_destination}/train",
25 source="/opt/ml/processing/train",
26 ),
27 ProcessingOutput(
28 output_name="test",
29 destination=f"{processing_output_destination}/test",
30 source="/opt/ml/processing/test",
31 ),
32 ProcessingOutput(
33 output_name="validation",
34 destination=f"{processing_output_destination}/test",
35 source="/opt/ml/processing/validation",
36 ),
37 ],
38 code=processing_script,
39 )

2. Model Training Step

We use SageMaker’s Hugging Face Estimator class to create a model training step for the Hugging Face DistilBERT model. Transformer-based models such as the original BERT can be very large and slow to train. DistilBERT, however, is a small, fast, cheap and light Transformer model trained by distilling BERT base. It reduces the size of a BERT model by 40%, while retaining 97% of its language understanding capabilities and being 60% faster.

The Hugging Face estimator also takes hyperparameters as a dictionary. The training instance type and size are pipeline parameters that can be easily varied in future pipeline runs without changing any code.

Training Parameter

1 # training step parameters
2 training_entry_point = ParameterString(name="TrainingEntryPoint", default_value="train.py")
3 training_source_dir = ParameterString(name="TrainingSourceDir", default_value="./scripts")
4 training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.p3.2xlarge")
5 training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)
6
7 # hyperparameters, which are passed into the training job
8 epochs=ParameterString(name="Epochs", default_value="1")
9 eval_batch_size=ParameterString(name="EvalBatchSize", default_value="32")
10 train_batch_size=ParameterString(name="TrainBatchSize", default_value="16")
11 learning_rate=ParameterString(name="LearningRate", default_value="3e-5")
12 fp16=ParameterString(name="Fp16", default_value="True")

Hugging Face Estimator

1 huggingface_estimator = HuggingFace(
2 entry_point=training_entry_point,
3 source_dir=training_source_dir,
4 base_job_name=base_job_prefix + "/training",
5 instance_type=training_instance_type,
6 instance_count=training_instance_count,
7 role=role,
8 transformers_version=transformers_version,
9 pytorch_version=pytorch_version,
10 py_version=py_version,
11 hyperparameters={
12 'epochs':epochs,
13 'eval_batch_size': eval_batch_size,
14 'train_batch_size': train_batch_size,
15 'learning_rate': learning_rate,
16 'model_id': model_id,
17 'fp16': fp16
18 },
19 sagemaker_session=sagemaker_session,
20 )
21
22 step_train = TrainingStep(
23 name="TrainHuggingFaceModel",
24 estimator=huggingface_estimator,
25 inputs={
26 "train": TrainingInput(
27 s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
28 "train"
29 ].S3Output.S3Uri
30 ),
31 "test": TrainingInput(
32 s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
33 "test"
34 ].S3Output.S3Uri
35 ),
36 },
37 cache_config=cache_config,
38 )

3. Model evaluation Step

A ProcessingStep is used to evaluate the performance of the trained model. Based on the results of the evaluation, either the model is created, registered, and deployed, or the pipeline stops.

In the training job, the model was evaluated against the test dataset, and the result of the evaluation was stored in the model.tar.gz file saved by the training job. The results of that evaluation are copied into a PropertyFile in this ProcessingStep so that it can be used in the ConditionStep.

Evaluation Parameter

1 evaluation_script = ParameterString(name="EvaluationScript", default_value="./scripts/evaluate.py")

Evaluator

1 script_eval = SKLearnProcessor(
2 framework_version="0.23-1",
3 instance_type=processing_instance_type,
4 instance_count=processing_instance_count,
5 base_job_name=base_job_prefix + "/evaluation",
6 sagemaker_session=sagemaker_session,
7 role=role,
8 )
9
10 evaluation_report = PropertyFile(
11 name="HuggingFaceEvaluationReport",
12 output_name="evaluation",
13 path="evaluation.json",
14 )
15
16 step_eval = ProcessingStep(
17 name="HuggingfaceEvalLoss",
18 processor=script_eval,
19 inputs=[
20 ProcessingInput(
21 source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
22 destination="/opt/ml/processing/model",
23 )
24 ],
25 outputs=[
26 ProcessingOutput(
27 output_name="evaluation",
28 source="/opt/ml/processing/evaluation",
29 destination=f"s3://{bucket}/{s3_prefix}/evaluation_report",
30 ),
31 ],
32 code=evaluation_script,
33 property_files=[evaluation_report],
34 cache_config=cache_config,
35 )

4. Register the model

The trained model is registered in the Model Registry under a Model Package Group. Each time a new model is registered, it is given a new version number by default. The model is registered in the “Approved” state so that it can be deployed. Registration will only happen if the output of the 6. Condition for deployment is true, i.e, the metrics being checked are within the threshold defined.

1 model = HuggingFaceModel(
2 model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
3 role=role,
4 transformers_version=transformers_version,
5 pytorch_version=pytorch_version,
6 py_version=py_version,
7 sagemaker_session=sagemaker_session,
8 )
9 model_package_group_name = "HuggingFaceModelPackageGroup"
10 step_register = RegisterModel(
11 name="HuggingFaceRegisterModel",
12 model=model,
13 content_types=["application/json"],
14 response_types=["application/json"],
15 inference_instances=["ml.g4dn.xlarge", "ml.m5.xlarge"],
16 transform_instances=["ml.g4dn.xlarge", "ml.m5.xlarge"],
17 model_package_group_name=model_package_group_name,
18 approval_status="Approved",
19 )

5. Model Deployment

We create a custom step ModelDeployment derived from the provided LambdaStep. This Step will create a Lambda function and invocate to deploy our model as SageMaker Endpoint.

1 # custom Helper Step for ModelDeployment
2 from utils.deploy_step import ModelDeployment
3
4 # we will use the iam role from the notebook session for the created endpoint
5 # this role will be attached to our endpoint and need permissions, e.g. to download assets from s3
6 sagemaker_endpoint_role=sagemaker.get_execution_role()
7
8
9 step_deployment = ModelDeployment(
10 model_name=f"{model_id_}-{dataset_name_}",
11 registered_model=step_register.steps[0],
12 endpoint_instance_type="ml.g4dn.xlarge",
13 sagemaker_endpoint_role=sagemaker_endpoint_role,
14 autoscaling_policy=None,
15 )

6. Condition for deployment

For the condition to be True and the steps after evaluation to run, the evaluated accuracy of the Hugging Face model must be greater than our TresholdAccuracy parameter.

Condition Parameter

1 threshold_accuracy = ParameterFloat(name="ThresholdAccuracy", default_value=0.8)

Condition

1 cond_gte = ConditionGreaterThanOrEqualTo(
2 left=JsonGet(
3 step=step_eval,
4 property_file=evaluation_report,
5 json_path="eval_accuracy",
6 ),
7 right=threshold_accuracy,
8 )
9
10 step_cond = ConditionStep(
11 name="CheckHuggingfaceEvalAccuracy",
12 conditions=[cond_gte],
13 if_steps=[step_register, step_deployment],
14 else_steps=[],
15 )

Pipeline definition and execution

SageMaker Pipelines constructs the pipeline graph from the implicit definition created by the way pipeline steps inputs and outputs are specified. There’s no need to specify that a step is a “parallel” or “serial” step. Steps such as model registration after the condition step are not listed in the pipeline definition because they do not run unless the condition is true. If so, they are run in order based on their specified inputs and outputs.

Each Parameter we defined holds a default value, which can be overwritten before starting the pipeline. Parameter Documentation

Overwriting Parameters

1 # define parameter which should be overwritten
2 pipeline_parameters=dict(
3 ModelId="distilbert-base-uncased",
4 ThresholdAccuracy=0.7,
5 Epochs="3",
6 TrainBatchSize="32",
7 EvalBatchSize="64",
8 )

Create Pipeline

1 pipeline = Pipeline(
2 name=f"HuggingFaceDemoPipeline",
3 parameters=[
4 model_id,
5 dataset_name,
6 processing_instance_type,
7 processing_instance_count,
8 processing_script,
9 training_entry_point,
10 training_source_dir,
11 training_instance_type,
12 training_instance_count,
13 evaluation_script,
14 threshold_accuracy,
15 epochs,
16 eval_batch_size,
17 train_batch_size,
18 learning_rate,
19 fp16
20 ],
21 steps=[step_process, step_train, step_eval, step_cond],
22 sagemaker_session=sagemaker_session,
23 )

We can examine the pipeline definition in JSON format. You also can inspect the pipeline graph in SageMaker Studio by going to the page for your pipeline.

1 import json
2
3 json.loads(pipeline.definition())

pipeline

upsert creates or updates the pipeline.

1 pipeline.upsert(role_arn=role)

Run the pipeline

1 execution = pipeline.start(parameters=pipeline_parameters)
1 execution.wait()

Getting predictions from the endpoint

After the previous cell completes, you can check whether the endpoint has finished deploying.

We can use the endpoint_name to create up a HuggingFacePredictor object that will be used to get predictions.

1 from sagemaker.huggingface import HuggingFacePredictor
2
3 endpoint_name = f"{model_id}-{dataset_name}"
4
5 # check if endpoint is up and running
6 print(f"https://console.aws.amazon.com/sagemaker/home?region={region}#/endpoints/{endpoint_name}")
7
8 hf_predictor = HuggingFacePredictor(endpoint_name,sagemaker_session=sagemaker_session)

Test data

Here are a couple of sample reviews we would like to classify as positive (pos) or negative (neg). Demonstrating the power of advanced Transformer-based models such as this Hugging Face model, the model should do quite well even though the reviews are mixed.

1 sentiment_input1 = {"inputs":"Although the movie had some plot weaknesses, it was engaging. Special effects were mind boggling. Can't wait to see what this creative team does next."}
2
3 hf_predictor.predict(sentiment_input1)
4
5 # [{'label': 'pos', 'score': 0.9690886735916138}]
6
7 sentiment_input2 = {"inputs":"There was some good acting, but the story was ridiculous. The other sequels in this franchise were better. It's time to take a break from this IP, but if they switch it up for the next one, I'll check it out."}
8
9 hf_predictor.predict(sentiment_input2)
10
11 # [{'label': 'neg', 'score': 0.9938264489173889}]

Cleanup Resources

The following cell will delete the resources created by the Lambda function and the Lambda itself. Deleting other resources such as the S3 bucket and the IAM role for the Lambda function are the responsibility of the notebook user.

1 sm_client = boto3.client("sagemaker")
2
3 # Delete the Lambda function
4 step_deployment.func.delete()
5
6 # Delete the endpoint
7 hf_predictor.delete_endpoint()

Conclusion

With the help of the Amazon SageMaker Pipelines we were able to create a 100% managed End-to-End Machine Learning Pipeline with out the need think about any administration tasks. Through the simplicity of SageMaker you don’t need huge Ops-teams anymore to manage and scale your machine learning pipelines. You can do it yourself.


You can find the code here and feel free open a thread the forum.

Thanks for reading. If you have any questions, feel free to contact me, through Github, or on the forum. You can also connect with me on Twitter or LinkedIn.