Asynchronous Inference with Hugging Face Transformers and Amazon SageMaker
Photo by Juan Domenech on Unsplash
Welcome to this getting started guide. We will use the Hugging Face Inference DLCs and Amazon SageMaker Python SDK to run an Asynchronous Inference job. Amazon SageMaker Asynchronous Inference is a new capability in SageMaker that queues incoming requests and processes them asynchronously. Compared to Batch Transform Asynchronous Inference provides immediate access to the results of the inference job rather than waiting for the job to complete.
How it works
Asynchronous inference endpoints have many similarities (and some key differences) compared to real-time endpoints. The process to create asynchronous endpoints is similar to real-time endpoints. You need to create: a model, an endpoint configuration, and an endpoint. However, there are specific configuration parameters specific to asynchronous inference endpoints, which we will explore below.
The Invocation of asynchronous endpoints differs from real-time endpoints. Rather than pass the request payload in line with the request, you upload the payload to Amazon S3 and pass an Amazon S3 URI as a part of the request. Upon receiving the request, SageMaker provides you with a token with the output location where the result will be placed once processed. Internally, SageMaker maintains a queue with these requests and processes them. During endpoint creation, you can optionally specify an Amazon SNS topic to receive success or error notifications. Once you receive the notification that your inference request has been successfully processed, you can access the result in the output Amazon S3 location.
Link to Notebook: sagemaker/16_async_inference_hf_hub
NOTE: You can run this demo in Sagemaker Studio, your local machine, or Sagemaker Notebook Instances
Development Environment and Permissions
Installation
1 %pip install sagemaker --upgrade
1 import sagemaker23 assert sagemaker.__version__ >= "2.75.0"
Permissions
If you are going to use Sagemaker in a local environment (not SageMaker Studio or Notebook Instances). You need access to an IAM Role with the required permissions for Sagemaker. You can find here more about it.
1 import sagemaker2 import boto33 sess = sagemaker.Session()4 # sagemaker session bucket -> used for uploading data, models and logs5 # sagemaker will automatically create this bucket if it not exists6 sagemaker_session_bucket=None7 if sagemaker_session_bucket is None and sess is not None:8 # set to default bucket if a bucket name is not given9 sagemaker_session_bucket = sess.default_bucket()1011 try:12 role = sagemaker.get_execution_role()13 except ValueError:14 iam = boto3.client('iam')15 role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']1617 sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)1819 print(f"sagemaker role arn: {role}")20 print(f"sagemaker bucket: {sess.default_bucket()}")21 print(f"sagemaker session region: {sess.boto_region_name}")
Create Inference HuggingFaceModel
for the Asynchronous Inference Endpoint
We use the twitter-roberta-base-sentiment model running our async inference job. This is a RoBERTa-base model trained on ~58M tweets and finetuned for sentiment analysis with the TweetEval benchmark.
1 from sagemaker.huggingface.model import HuggingFaceModel2 from sagemaker.async_inference.async_inference_config import AsyncInferenceConfig3 from sagemaker.s3 import s3_path_join45 # Hub Model configuration. <https://huggingface.co/models>6 hub = {7 'HF_MODEL_ID':'cardiffnlp/twitter-roberta-base-sentiment',8 'HF_TASK':'text-classification'9 }1011 # create Hugging Face Model Class12 huggingface_model = HuggingFaceModel(13 env=hub, # configuration for loading model from Hub14 role=role, # iam role with permissions to create an Endpoint15 transformers_version="4.12", # transformers version used16 pytorch_version="1.9", # pytorch version used17 py_version='py38', # python version used18 )1920 # create async endpoint configuration21 async_config = AsyncInferenceConfig(22 output_path=s3_path_join("s3://",sagemaker_session_bucket,"async_inference/output") , # Where our results will be stored23 # notification_config={24 # "SuccessTopic": "arn:aws:sns:us-east-2:123456789012:MyTopic",25 # "ErrorTopic": "arn:aws:sns:us-east-2:123456789012:MyTopic",26 # }, # Notification configuration27 )2829 # deploy the endpoint endpoint30 async_predictor = huggingface_model.deploy(31 initial_instance_count=1,32 instance_type="ml.g4dn.xlarge",33 async_inference_config=async_config34 )
We can find our Asynchronous Inference endpoint configuration in the Amazon SageMaker Console. Our endpoint now has type async
compared to a’ real-time’ endpoint.
Request Asynchronous Inference Endpoint using the AsyncPredictor
The .deploy()
returns an AsyncPredictor
object which can be used to request inference. This AsyncPredictor
makes it easy to send asynchronous requests to your endpoint and get the results back. It has two methods: predict()
and predict_async()
. The predict()
method is synchronous and will block until the inference is complete. The predict_async()
method is asynchronous and will return immediately with the a AsyncInferenceResponse
, which can be used to check for the result with polling. If the result object exists in that path, get and return the result.
predict()
request example
The predict()
will upload our data
to Amazon S3 and run inference against it. Since we are using predict
it will block until the inference is complete.
1 data = {2 "inputs": [3 "it 's a charming and often affecting journey .",4 "it 's slow -- very , very slow",5 "the mesmerizing performances of the leads keep the film grounded and keep the audience riveted .",6 "the emotions are raw and will strike a nerve with anyone who 's ever had family trauma ."7 ]8 }910 res = async_predictor.predict(data=data)11 print(res)12 # [{'label': 'LABEL_2', 'score': 0.8808117508888245}, {'label': 'LABEL_0', 'score': 0.6126593947410583}, {'label': 'LABEL_2', 'score': 0.9425230622291565}, {'label': 'LABEL_0', 'score': 0.5511414408683777}]
predict_async()
request example
The predict_async()
will upload our data
to Amazon S3 and run inference against it. Since we are using predict_async
it will return immediately with an AsyncInferenceResponse
object.
In this example, we will loop over a csv
file and send each line to the endpoint. After that we are going to poll the endpoint until the inference is complete.
The provided tweet_data.csv
contains ~1800 tweets about different airlines.
But first, let’s do a quick test to see if we can get a result from the endpoint using predict_async
Single predict_async()
request example
1 from sagemaker.async_inference.waiter_config import WaiterConfig23 resp = async_predictor.predict_async(data={"inputs": "i like you. I love you"})45 print(f"Response object: {resp}")6 print(f"Response output path: {resp.output_path}")7 print("Start Polling to get response:")89 config = WaiterConfig(10 max_attempts=5, # number of attempts11 delay=10 # time in seconds to wait between attempts12 )1314 resp.get_result(config)
High load predict_async()
request example using a csv
file
1 from csv import reader23 data_file="tweet_data.csv"45 output_list = []67 # open file in read mode8 with open(data_file, 'r') as csv_reader:9 for row in reader(csv_reader):10 # send each row as async reuqest request11 resp = async_predictor.predict_async(data={"inputs": row[0]})12 output_list.append(resp)1314 print("All requests sent")15 print(f"Output path list length: {len(output_list)}")16 print(f"Output path list sample: {output_list[26].output_path}")1718 # iterate over list of output paths and get results19 results = []20 for async_response in output_list:21 response = async_response.get_result(WaiterConfig())22 results.append(response)2324 print(f"Results length: {len(results)}")25 print(f"Results sample: {results[26]}")
Autoscale (to Zero) the Asynchronous Inference Endpoint
Amazon SageMaker supports automatic scaling (autoscaling) your asynchronous endpoint. Autoscaling dynamically adjusts the number of instances provisioned for a model in response to changes in your workload. Unlike other hosted models Amazon SageMaker supports, with Asynchronous Inference, you can also scale down your asynchronous endpoints instances to zero.
Prequistion: You need to have an running Asynchronous Inference Endpoint up and running. You can check Create Inference HuggingFaceModel
for the Asynchronous Inference Endpoint to see how to create one.
If you want to learn more check-out Autoscale an asynchronous endpoint in the SageMaker documentation.
We are going to scale our asynchronous endpoint to 0-5 instances, which means that Amazon SageMaker will scale the endpoint to 0 instances after 600
seconds or 10 minutes to save you cost and scale up to 5 instances in 300
seconds steps getting more than 5.0
invocations.
1 # application-autoscaling client2 asg_client = boto3.client("application-autoscaling")34 # This is the format in which application autoscaling references the endpoint5 resource_id = f"endpoint/{async_predictor.endpoint_name}/variant/AllTraffic"67 # Configure Autoscaling on asynchronous endpoint down to zero instances8 response = asg_client.register_scalable_target(9 ServiceNamespace="sagemaker",10 ResourceId=resource_id,11 ScalableDimension="sagemaker:variant:DesiredInstanceCount",12 MinCapacity=0,13 MaxCapacity=5,14 )1516 response = asg_client.put_scaling_policy(17 PolicyName=f'Request-ScalingPolicy-{async_predictor.endpoint_name}',18 ServiceNamespace="sagemaker",19 ResourceId=resource_id,20 ScalableDimension="sagemaker:variant:DesiredInstanceCount",21 PolicyType="TargetTrackingScaling",22 TargetTrackingScalingPolicyConfiguration={23 "TargetValue": 5.0,24 "CustomizedMetricSpecification": {25 "MetricName": "ApproximateBacklogSizePerInstance",26 "Namespace": "AWS/SageMaker",27 "Dimensions": [{"Name": "EndpointName", "Value": async_predictor.endpoint_name}],28 "Statistic": "Average",29 },30 "ScaleInCooldown": 600, # duration until scale in begins (down to zero)31 "ScaleOutCooldown": 300 # duration between scale out attempts32 },33 )
The Endpoint will now scale to zero after 600s. Let’s wait until the endpoint is scaled to zero and then test sending requests and measure how long it takes to start an instance to process the requests. We are using the predict_async()
method to send the request.
IMPORTANT: Since we defined the TargetValue
to 5.0
the Async Endpoint will only start to scale out from 0 to 1 if you are sending more than 5 requests within 300 seconds.
1 import time23 start = time.time()45 output_list=[]67 # send 10 requests8 for i in range(10):9 resp = async_predictor.predict_async(data={"inputs": "it 's a charming and often affecting journey ."})10 output_list.append(resp)1112 # iterate over list of output paths and get results13 results = []14 for async_response in output_list:15 response = async_response.get_result(WaiterConfig(max_attempts=600))16 results.append(response)1718 print(f"Time taken: {time.time() - start}s")
It took about 7-9 minutes to start an instance and to process the requests. This is perfect when you have non real-time critical applications, but want to save money.
Delete the async inference endpoint & Autoscaling policy
1 response = asg_client.deregister_scalable_target(2 ServiceNamespace='sagemaker',3 ResourceId=resource_id,4 ScalableDimension='sagemaker:variant:DesiredInstanceCount'5 )6 async_predictor.delete_endpoint()
Conclusion
We successfully deploy an Asynchronous Inference Endpoint to Amazon SageMaker using the SageMaker-Python SDK. The SageMaker SDK provides creating tooling for deploying and especially for running inference for the Asynchronous Inference Endpoint. It creates a nice AsnycPredictor
object which can be used to send requests to the endpoint, which handles all of the boilperplate behind the scenes for asynchronous inference and gives us simple APIs.
In addition to this we were able to add autosclaing to the Asynchronous Inference Endpoint with boto3
for scaling our endpoint in and out. Asynchronous Inference Endpoints can even scale down to zero, which is a great feature for non-real-time critical applications to save cost.
You should definitely try out Asynchronous Inference Endpoints for your own applications if neither batch transform
nor real-time
were the right option for you.
You can find the code here.
Thanks for reading! If you have any questions, feel free to contact me, through Github, or on the forum. You can also connect with me on Twitter or LinkedIn.