Running code after returning a response from an AWS Lambda function

8 months ago 110
News Banner

Looking for an Interim or Fractional CTO to support your business?

Read more

This post is written by Uri Segev, Principal Serverless Specialist SA.

When you invoke an AWS Lambda function synchronously, you expect the function to return a response. For example, this is the case when a client invokes a Lambda function through Amazon API Gateway or from AWS Step Functions. As the client is waiting for the response, you should return the response as soon as possible.

However, there may be instances where you must perform additional work that does not affect the response and you can do it asynchronously, after you send the response. For example, you may store data in a database or send information to a logging system.

Once you send the response from the function, the Lambda service freezes the runtime environment, and the function cannot run additional code. Even if you create a thread for running a task in the background, the Lambda service freezes the runtime environment once the handler returns, causing the thread to freeze until the next invocation. While you can delay returning the response to the client until all work is complete, this approach can negatively impact the user experience.

This blog explores ways to run a task that may start before the function returns but continues running after the function returns the response to the client.

Invoking an asynchronous Lambda function

The first option is to break the code into two functions. The first function runs the synchronous code; the second function runs the asynchronous code. Before the synchronous function returns, it invokes the second function asynchronously, either directly, using the Invoke API, or indirectly, for example, by sending a message to Amazon SQS to trigger the second function.

This Python code demonstrates how to implement this:

import json import time import os import boto3 from aws_lambda_powertools import Logger logger = Logger() client = boto3.client('lambda') def calc_response(event): logger.info(f"[Function] Calculating response") time.sleep(1) # Simulate sync work return { "message": "hello from async" } def submit_async_task(response): # Invoke async function to continue logger.info(f"[Function] Invoking async task in async function") client.invoke_async(FunctionName=os.getenv('ASYNC_FUNCTION'), InvokeArgs=json.dumps(response)) def handler(event, context): logger.info(f"[Function] Received event: {json.dumps(event)}") response = calc_response(event) # Done calculating response, submit async task submit_async_task(response) # Return response to client logger.info(f"[Function] Returning response to client") return { "statusCode": 200, "body": json.dumps(response) }

The following is the Lambda function that performs the asynchronous work:

import json import time from aws_lambda_powertools import Logger logger = Logger() def handler(event, context): logger.info(f"[Async task] Starting async task: {json.dumps(event)}") time.sleep(3) # Simulate async work logger.info(f"[Async task] Done")

Use Lambda response streaming

Response streaming enables developers to start streaming the response as soon as they have the first byte of the response, without waiting for the entire response. You usually use response streaming when you must minimize the Time to First Byte (TTFB) or when you must send a response that is larger than 6 MB (the Lambda response payload size limit).

Using this method, the function can send the response using the response streaming mechanism and can continue running code even after sending the last byte of the response. This way, the client receives the response, and the Lambda function can continue running.

This Node.js code demonstrates how to implement this:

import { Logger } from '@aws-lambda-powertools/logger'; const logger = new Logger(); export const handler = awslambda.streamifyResponse(async (event, responseStream, _context) => { logger.info("[Function] Received event: ", event); // Do some stuff with event let response = await calc_response(event); // Return response to client logger.info("[Function] Returning response to client"); responseStream.setContentType('application/json'); responseStream.write(response); responseStream.end(); await async_task(response); }); const calc_response = async (event) => { logger.info("[Function] Calculating response"); await sleep(1); // Simulate sync work return { message: "hello from streaming" }; }; const async_task = async (response) => { logger.info("[Async task] Starting async task"); await sleep(3); // Simulate async work logger.info("[Async task] Done"); }; const sleep = async (sec) => { return new Promise((resolve) => { setTimeout(resolve, sec * 1000); }); };

Use Lambda extensions

Lambda extensions can augment Lambda functions to integrate with your preferred monitoring, observability, security, and governance tools. You can also use an extension to run your own code in the background so that it continues running after your function returns the response to the client.

There are two types of Lambda extensions: external extensions and internal extensions. External extensions run as separate processes in the same execution environment. The Lambda function can communicate with the extension using files in the /tmp folder or using a local network, for example, via HTTP requests. You must package external extensions as a Lambda layer.

Internal extensions run as separate threads within the same process that runs the handler. The handler can communicate with the extension using any in-process mechanism, such as internal queues. This example shows an internal extension, which is a dedicated thread within the handler process.

When the Lambda service invokes a function, it also notifies all the extensions of the invocation. The Lambda service only freezes the execution environment when the Lambda function returns a response and all the extensions signal to the runtime that they are finished. With this approach, the function has the extension run the task independently from the function itself and the extension notifies the Lambda runtime when it is done processing the task. This way, the execution environment stays active until the task is done.

The following Python code example isolates the extension code into its own file and the handler imports and uses it to run the background task:

import json import time import async_processor as ap from aws_lambda_powertools import Logger logger = Logger() def calc_response(event): logger.info(f"[Function] Calculating response") time.sleep(1) # Simulate sync work return { "message": "hello from extension" } # This function is performed after the handler code calls submit_async_task # and it can continue running after the function returns def async_task(response): logger.info(f"[Async task] Starting async task: {json.dumps(response)}") time.sleep(3) # Simulate async work logger.info(f"[Async task] Done") def handler(event, context): logger.info(f"[Function] Received event: {json.dumps(event)}") # Calculate response response = calc_response(event) # Done calculating response # call async processor to continue logger.info(f"[Function] Invoking async task in extension") ap.start_async_task(async_task, response) # Return response to client logger.info(f"[Function] Returning response to client") return { "statusCode": 200, "body": json.dumps(response) }

The following Python code demonstrates how to implement the extension that runs the background task:

import os import requests import threading import queue from aws_lambda_powertools import Logger logger = Logger() LAMBDA_EXTENSION_NAME = "AsyncProcessor" # An internal queue used by the handler to notify the extension that it can # start processing the async task. async_tasks_queue = queue.Queue() def start_async_processor(): # Register internal extension logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Registering with Lambda service...") response = requests.post( url=f"http://{os.environ['AWS_LAMBDA_RUNTIME_API']}/2020-01-01/extension/register", json={'events': ['INVOKE']}, headers={'Lambda-Extension-Name': LAMBDA_EXTENSION_NAME} ) ext_id = response.headers['Lambda-Extension-Identifier'] logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Registered with ID: {ext_id}") def process_tasks(): while True: # Call /next to get notified when there is a new invocation and let # Lambda know that we are done processing the previous task. logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Waiting for invocation...") response = requests.get( url=f"http://{os.environ['AWS_LAMBDA_RUNTIME_API']}/2020-01-01/extension/event/next", headers={'Lambda-Extension-Identifier': ext_id}, timeout=None ) # Get next task from internal queue logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Wok up, waiting for async task from handler") async_task, args = async_tasks_queue.get() if async_task is None: # No task to run this invocation logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Received null task. Ignoring.") else: # Invoke task logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Received async task from handler. Starting task.") async_task(args) logger.debug(f"[{LAMBDA_EXTENSION_NAME}] Finished processing task") # Start processing extension events in a separate thread threading.Thread(target=process_tasks, daemon=True, name='AsyncProcessor').start() # Used by the function to indicate that there is work that needs to be # performed by the async task processor def start_async_task(async_task=None, args=None): async_tasks_queue.put((async_task, args)) # Starts the async task processor start_async_processor()

Use a custom runtime

Lambda supports several runtimes out of the box: Python, Node.js, Java, Dotnet, and Ruby. Lambda also supports custom runtimes, which lets you develop Lambda functions in any other programming language that you need to.

When you invoke a Lambda function that uses a custom runtime, the Lambda service invokes a process called ‘bootstrap’ that contains your custom code. The custom code needs to interact with the Lambda Runtime API. It calls the /next endpoint to obtain information about the next invocation. This API call is blocking and it waits until a request arrives. When the function is done processing the request, it must call the /response endpoint to send the response back to the client and then it must call the /next endpoint again to wait for the next invocation. Lambda freezes the execution environment after you call /next, until a request arrives.

Using this approach, you can run the asynchronous task after calling /response, and sending the response back to the client, and before calling /next, indicating that the processing is done.

The following Python code example isolates the custom runtime code into its own file and the function imports and uses it to interact with the runtime API:

import time import json import runtime_interface as rt from aws_lambda_powertools import Logger logger = Logger() def calc_response(event): logger.info(f"[Function] Calculating response") time.sleep(1) # Simulate sync work return { "message": "hello from custom" } def async_task(response): logger.info(f"[Async task] Starting async task: {json.dumps(response)}") time.sleep(3) # Simulate async work logger.info(f"[Async task] Done") def main(): # You can add initialization code here # The following loop runs forever waiting for the next invocation # and sending the response back to the client while True: # Call /next to wait for next request (and indicate # that we are done processing the previous request) requestId, event = rt.get_next() # The code from here to send_response() is the code # that usually goes inside the Lambda handler() logger.info(f"[Function] Received event: {json.dumps(event)}") # Calculate response response = calc_response(event) # Done calculating response, send response to client logger.info(f"[Function] Returning response to client") rt.send_response(requestId, { "statusCode": 200, "body": json.dumps(response) }) logger.info(f"[Function] Invoking async task") async_task(response) main()

This Python code demonstrates how to interact with the runtime API:

import requests import os from aws_lambda_powertools import Logger logger = Logger() run_time_endpoint = os.environ['AWS_LAMBDA_RUNTIME_API'] def get_next(): logger.debug("[Custom runtime] Waiting for invocation...") request = requests.get( url=f"http://{run_time_endpoint}/2018-06-01/runtime/invocation/next", timeout=None ) event = request.json() requestId = request.headers["Lambda-Runtime-Aws-Request-Id"] return requestId, event def send_response(requestId, response): logger.debug("[Custom runtime] Sending response") requests.post( url=f"http://{run_time_endpoint}/2018-06-01/runtime/invocation/{requestId}/response", json = response, timeout=None )

Conclusion

This blog shows four ways of combining synchronous and asynchronous tasks in a Lambda function, allowing you to run tasks that continue running after the function returns a response to the client. The following table summarizes the pros and cons of each solution:

Function URLs, cannot be used with API Gateway, always public

Asynchronous invocation Response streaming Lambda extensions Custom runtime
Complexity Easier to implement Easiest to implement The most complex solution to implement as it requires interacting with the extensions API and a dedicated thread Medium as it interacts with the runtime API
Deployment Need two artifacts: the synchronous function and the asynchronous function A single deployment artifact that contains all code A single deployment artifact that contains all code A single deployment artifact, requires packaging all needed runtime files
Cost Most expensive as it incurs additional invocation cost as well as the overall duration of both functions is higher than having it in one Least expensive Least expensive Least expensive
Starting the async task Before returning from handler Anytime during the handler invocation Anytime during the handler invocation After returning the response to the client, unless you use a dedicated thread
Limitations Payload sent to the asynchronous function cannot exceed 256 KB Only supported with Node.js and custom runtimes. Requires Lambda Function URLs, cannot be used with API Gateway, always public
Additional benefits Better decoupling between synchronous and asynchronous code Ability to send response in stages. Supports payloads larger than 6 MB (at additional cost) The asynchronous task runs in its own thread, which can reduce overall duration and cost
Retries in case of failure in async code Managed by the Lambda service Responsibility of the developer Responsibility of the developer Responsibility of the developer

Choosing the right approach depends on your use case. If you write your function in Node.js and you invoke it using Lambda Function URLs, use response streaming. This is the easiest way to implement, and it is the most cost effective.

If there is a chance for a failure in the asynchronous task (for example, a database is not accessible), and you must ensure that the task completes, use the asynchronous Lambda invocation method. The Lambda service retries your asynchronous function until it succeeds. Eventually, if all retries fail, it invokes a Lambda destination so you can take action.

If you need a custom runtime because you need to use a programming language that Lambda does not natively support, use the custom runtime option. Otherwise, use the Lambda extensions option. It is more complex to implement, but it is cost effective. This allows you to package the code in a single artifact and start processing the asynchronous task before you send the response to the client.

For more serverless learning resources, visit Serverless Land.

Read Entire Article