philschmid blog

Asynchronous Inference with Hugging Face Transformers and Amazon SageMaker

#HuggingFace #AWS #BERT #SageMaker
, February 15, 2022 · 9 min read

Photo by Juan Domenech on Unsplash

Welcome to this getting started guide. We will use the Hugging Face Inference DLCs and Amazon SageMaker Python SDK to run an Asynchronous Inference job. Amazon SageMaker Asynchronous Inference is a new capability in SageMaker that queues incoming requests and processes them asynchronously. Compared to Batch Transform Asynchronous Inference provides immediate access to the results of the inference job rather than waiting for the job to complete.

How it works

Asynchronous inference endpoints have many similarities (and some key differences) compared to real-time endpoints. The process to create asynchronous endpoints is similar to real-time endpoints. You need to create: a model, an endpoint configuration, and an endpoint. However, there are specific configuration parameters specific to asynchronous inference endpoints, which we will explore below.

The Invocation of asynchronous endpoints differs from real-time endpoints. Rather than pass the request payload in line with the request, you upload the payload to Amazon S3 and pass an Amazon S3 URI as a part of the request. Upon receiving the request, SageMaker provides you with a token with the output location where the result will be placed once processed. Internally, SageMaker maintains a queue with these requests and processes them. During endpoint creation, you can optionally specify an Amazon SNS topic to receive success or error notifications. Once you receive the notification that your inference request has been successfully processed, you can access the result in the output Amazon S3 location.

architecture

Link to Notebook: sagemaker/16_async_inference_hf_hub

NOTE: You can run this demo in Sagemaker Studio, your local machine, or Sagemaker Notebook Instances

Development Environment and Permissions

Installation

1 %pip install sagemaker --upgrade
1 import sagemaker
2
3 assert sagemaker.__version__ >= "2.75.0"

Permissions

If you are going to use Sagemaker in a local environment (not SageMaker Studio or Notebook Instances). You need access to an IAM Role with the required permissions for Sagemaker. You can find here more about it.

1 import sagemaker
2 import boto3
3 sess = sagemaker.Session()
4 # sagemaker session bucket -> used for uploading data, models and logs
5 # sagemaker will automatically create this bucket if it not exists
6 sagemaker_session_bucket=None
7 if sagemaker_session_bucket is None and sess is not None:
8 # set to default bucket if a bucket name is not given
9 sagemaker_session_bucket = sess.default_bucket()
10
11 try:
12 role = sagemaker.get_execution_role()
13 except ValueError:
14 iam = boto3.client('iam')
15 role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']
16
17 sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)
18
19 print(f"sagemaker role arn: {role}")
20 print(f"sagemaker bucket: {sess.default_bucket()}")
21 print(f"sagemaker session region: {sess.boto_region_name}")

Create Inference HuggingFaceModel for the Asynchronous Inference Endpoint

We use the twitter-roberta-base-sentiment model running our async inference job. This is a RoBERTa-base model trained on ~58M tweets and finetuned for sentiment analysis with the TweetEval benchmark.

1 from sagemaker.huggingface.model import HuggingFaceModel
2 from sagemaker.async_inference.async_inference_config import AsyncInferenceConfig
3 from sagemaker.s3 import s3_path_join
4
5 # Hub Model configuration. <https://huggingface.co/models>
6 hub = {
7 'HF_MODEL_ID':'cardiffnlp/twitter-roberta-base-sentiment',
8 'HF_TASK':'text-classification'
9 }
10
11 # create Hugging Face Model Class
12 huggingface_model = HuggingFaceModel(
13 env=hub, # configuration for loading model from Hub
14 role=role, # iam role with permissions to create an Endpoint
15 transformers_version="4.12", # transformers version used
16 pytorch_version="1.9", # pytorch version used
17 py_version='py38', # python version used
18 )
19
20 # create async endpoint configuration
21 async_config = AsyncInferenceConfig(
22 output_path=s3_path_join("s3://",sagemaker_session_bucket,"async_inference/output") , # Where our results will be stored
23 # notification_config={
24 # "SuccessTopic": "arn:aws:sns:us-east-2:123456789012:MyTopic",
25 # "ErrorTopic": "arn:aws:sns:us-east-2:123456789012:MyTopic",
26 # }, # Notification configuration
27 )
28
29 # deploy the endpoint endpoint
30 async_predictor = huggingface_model.deploy(
31 initial_instance_count=1,
32 instance_type="ml.g4dn.xlarge",
33 async_inference_config=async_config
34 )

We can find our Asynchronous Inference endpoint configuration in the Amazon SageMaker Console. Our endpoint now has type async compared to a’ real-time’ endpoint.

deployed-endpoint

Request Asynchronous Inference Endpoint using the AsyncPredictor

The .deploy() returns an AsyncPredictor object which can be used to request inference. This AsyncPredictor makes it easy to send asynchronous requests to your endpoint and get the results back. It has two methods: predict() and predict_async(). The predict() method is synchronous and will block until the inference is complete. The predict_async() method is asynchronous and will return immediately with the a AsyncInferenceResponse, which can be used to check for the result with polling. If the result object exists in that path, get and return the result.

predict() request example

The predict() will upload our data to Amazon S3 and run inference against it. Since we are using predict it will block until the inference is complete.

1 data = {
2 "inputs": [
3 "it 's a charming and often affecting journey .",
4 "it 's slow -- very , very slow",
5 "the mesmerizing performances of the leads keep the film grounded and keep the audience riveted .",
6 "the emotions are raw and will strike a nerve with anyone who 's ever had family trauma ."
7 ]
8 }
9
10 res = async_predictor.predict(data=data)
11 print(res)
12 # [{'label': 'LABEL_2', 'score': 0.8808117508888245}, {'label': 'LABEL_0', 'score': 0.6126593947410583}, {'label': 'LABEL_2', 'score': 0.9425230622291565}, {'label': 'LABEL_0', 'score': 0.5511414408683777}]

predict_async() request example

The predict_async() will upload our data to Amazon S3 and run inference against it. Since we are using predict_async it will return immediately with an AsyncInferenceResponse object. In this example, we will loop over a csv file and send each line to the endpoint. After that we are going to poll the endpoint until the inference is complete. The provided tweet_data.csv contains ~1800 tweets about different airlines.

But first, let’s do a quick test to see if we can get a result from the endpoint using predict_async

Single predict_async() request example

1 from sagemaker.async_inference.waiter_config import WaiterConfig
2
3 resp = async_predictor.predict_async(data={"inputs": "i like you. I love you"})
4
5 print(f"Response object: {resp}")
6 print(f"Response output path: {resp.output_path}")
7 print("Start Polling to get response:")
8
9 config = WaiterConfig(
10 max_attempts=5, # number of attempts
11 delay=10 # time in seconds to wait between attempts
12 )
13
14 resp.get_result(config)

High load predict_async() request example using a csv file

1 from csv import reader
2
3 data_file="tweet_data.csv"
4
5 output_list = []
6
7 # open file in read mode
8 with open(data_file, 'r') as csv_reader:
9 for row in reader(csv_reader):
10 # send each row as async reuqest request
11 resp = async_predictor.predict_async(data={"inputs": row[0]})
12 output_list.append(resp)
13
14 print("All requests sent")
15 print(f"Output path list length: {len(output_list)}")
16 print(f"Output path list sample: {output_list[26].output_path}")
17
18 # iterate over list of output paths and get results
19 results = []
20 for async_response in output_list:
21 response = async_response.get_result(WaiterConfig())
22 results.append(response)
23
24 print(f"Results length: {len(results)}")
25 print(f"Results sample: {results[26]}")

Autoscale (to Zero) the Asynchronous Inference Endpoint

Amazon SageMaker supports automatic scaling (autoscaling) your asynchronous endpoint. Autoscaling dynamically adjusts the number of instances provisioned for a model in response to changes in your workload. Unlike other hosted models Amazon SageMaker supports, with Asynchronous Inference, you can also scale down your asynchronous endpoints instances to zero.

Prequistion: You need to have an running Asynchronous Inference Endpoint up and running. You can check Create Inference HuggingFaceModel for the Asynchronous Inference Endpoint to see how to create one.

If you want to learn more check-out Autoscale an asynchronous endpoint in the SageMaker documentation.

We are going to scale our asynchronous endpoint to 0-5 instances, which means that Amazon SageMaker will scale the endpoint to 0 instances after 600 seconds or 10 minutes to save you cost and scale up to 5 instances in 300 seconds steps getting more than 5.0 invocations.

1 # application-autoscaling client
2 asg_client = boto3.client("application-autoscaling")
3
4 # This is the format in which application autoscaling references the endpoint
5 resource_id = f"endpoint/{async_predictor.endpoint_name}/variant/AllTraffic"
6
7 # Configure Autoscaling on asynchronous endpoint down to zero instances
8 response = asg_client.register_scalable_target(
9 ServiceNamespace="sagemaker",
10 ResourceId=resource_id,
11 ScalableDimension="sagemaker:variant:DesiredInstanceCount",
12 MinCapacity=0,
13 MaxCapacity=5,
14 )
15
16 response = asg_client.put_scaling_policy(
17 PolicyName=f'Request-ScalingPolicy-{async_predictor.endpoint_name}',
18 ServiceNamespace="sagemaker",
19 ResourceId=resource_id,
20 ScalableDimension="sagemaker:variant:DesiredInstanceCount",
21 PolicyType="TargetTrackingScaling",
22 TargetTrackingScalingPolicyConfiguration={
23 "TargetValue": 5.0,
24 "CustomizedMetricSpecification": {
25 "MetricName": "ApproximateBacklogSizePerInstance",
26 "Namespace": "AWS/SageMaker",
27 "Dimensions": [{"Name": "EndpointName", "Value": async_predictor.endpoint_name}],
28 "Statistic": "Average",
29 },
30 "ScaleInCooldown": 600, # duration until scale in begins (down to zero)
31 "ScaleOutCooldown": 300 # duration between scale out attempts
32 },
33 )

scaling

The Endpoint will now scale to zero after 600s. Let’s wait until the endpoint is scaled to zero and then test sending requests and measure how long it takes to start an instance to process the requests. We are using the predict_async() method to send the request.

IMPORTANT: Since we defined the TargetValue to 5.0 the Async Endpoint will only start to scale out from 0 to 1 if you are sending more than 5 requests within 300 seconds.

1 import time
2
3 start = time.time()
4
5 output_list=[]
6
7 # send 10 requests
8 for i in range(10):
9 resp = async_predictor.predict_async(data={"inputs": "it 's a charming and often affecting journey ."})
10 output_list.append(resp)
11
12 # iterate over list of output paths and get results
13 results = []
14 for async_response in output_list:
15 response = async_response.get_result(WaiterConfig(max_attempts=600))
16 results.append(response)
17
18 print(f"Time taken: {time.time() - start}s")

It took about 7-9 minutes to start an instance and to process the requests. This is perfect when you have non real-time critical applications, but want to save money.

scale-out

Delete the async inference endpoint & Autoscaling policy

1 response = asg_client.deregister_scalable_target(
2 ServiceNamespace='sagemaker',
3 ResourceId=resource_id,
4 ScalableDimension='sagemaker:variant:DesiredInstanceCount'
5 )
6 async_predictor.delete_endpoint()

Conclusion

We successfully deploy an Asynchronous Inference Endpoint to Amazon SageMaker using the SageMaker-Python SDK. The SageMaker SDK provides creating tooling for deploying and especially for running inference for the Asynchronous Inference Endpoint. It creates a nice AsnycPredictor object which can be used to send requests to the endpoint, which handles all of the boilperplate behind the scenes for asynchronous inference and gives us simple APIs.

In addition to this we were able to add autosclaing to the Asynchronous Inference Endpoint with boto3 for scaling our endpoint in and out. Asynchronous Inference Endpoints can even scale down to zero, which is a great feature for non-real-time critical applications to save cost.

You should definitely try out Asynchronous Inference Endpoints for your own applications if neither batch transform nor real-time were the right option for you.


You can find the code here.

Thanks for reading! If you have any questions, feel free to contact me, through Github, or on the forum. You can also connect with me on Twitter or LinkedIn.