Making API calls exactly once when using Workflows

8 months ago 54
News Banner

Looking for an Interim or Fractional CTO to support your business?

Read more

Introduction

One challenge with any distributed system, including Workflows, is ensuring that requests sent from one service to another are processed exactly once, when needed; for example, when placing a customer order in a shipping queue, withdrawing funds from a bank account, or processing a payment.

In this blog post, we’ll provide an example of a website invoking Workflows, and Workflows in turn invoking a Cloud Function. We’ll show how to make sure both Workflows and the Cloud Function logic only runs once. We’ll also talk about how to invoke Workflows exactly once when using HTTP callbacks, Pub/Sub messages, or Cloud Tasks.

Invoke Workflows exactly once

Imagine you have an online store and you’re using Workflows to create new orders, save to Firestore, and process payments by calling a Cloud Function:

image1

A new customer order comes in, the website makes an API call to Workflows but receives an error. Two possible scenarios are:

 (1) The request is lost and the workflow is never invoked:

image2

(2) The workflow is invoked and executes successfully, however the response is lost:

image3

How can you make sure the workflow executes once?

To solve this, the website retries the same request. One easy solution is to check if a document already exists in Firestore:

code_block <ListValue: [StructValue([('code', 'main:\r\n params: []\r\n steps:\r\n - init:\r\n assign:\r\n - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}\r\n - order_id: "12345" # In practice we would pass in the order ID as a workflow parameter, e.g. ${params[0]}\r\n - firestore_collection: "orders"\r\n - URL: https://us-central1-<your_project_id>.cloudfunctions.net/processpayment\r\n - create_document:\r\n try:\r\n call: googleapis.firestore.v1.projects.databases.documents.createDocument\r\n args:\r\n collectionId: ${firestore_collection}\r\n parent: ${"projects/" + project_id + "/databases/(default)/documents"}\r\n query:\r\n documentId: ${order_id}\r\n except:\r\n as: e\r\n steps:\r\n - endEarly:\r\n return: ${e} # Exception is raised, e.g. ${e.code == 409} if doc already exists\r\n - processPayment:\r\n ...'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e9dddb75190>)])]>

The processPayment step will execute only if a document is successfully created. This is effectively a 1-bit state machine, idempotent, and a valid solution. The downside of this solution is that it’s not extensible. We might want to complete additional work in this handler before changing states, or expand the number of states within the system. Next, let’s continue with a more advanced solution for the same problem.

Invoke Cloud Functions from Workflows exactly once

Let’s see what happens when the workflow uses a Cloud Function to process the payment. You might have the following step to call Cloud Functions:

code_block <ListValue: [StructValue([('code', '- processPayment:\r\n call: http.post\r\n args:\r\n url: https://us-central1-<your_project_id>.cloudfunctions.net/processpayment\r\n auth:\r\n type: OIDC'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e9dddb75490>)])]>

By default, Workflows offers at-most-once delivery (no retries) with HTTP requests. That’s usually OK because 99.9+% of the time, the call is successful, and a response is received.

In the rare case of failure, a ConnectionError might be raised. As in the website-to-workflow situation discussed previously, the workflow can’t tell which scenario occurred. Similarly, you can add retries.

Let’s add a default retry policy to handle this:

code_block <ListValue: [StructValue([('code', "- processPayment:\r\n try:\r\n call: http.post\r\n args:\r\n url: https://us-central1-<your_project_id>.cloudfunctions.net/processpayment\r\n auth:\r\n type: OIDC\r\n retry: ${http.default_retry} # Retries up to 5 times, includes 'ConnectionError'"), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e9dddb75c40>)])]>

Let's say the second delivery scenario occurs where the request is received by the Cloud Function but the response is lost. By adding retries, Workflows will likely invoke the Cloud Function multiple times. When this happens, how do you ensure that the code in the Cloud Function only runs once? 

You’ll need to add extra logic to the Cloud Function to check and update the payment state in Firestore:

image4

Let’s also assume you want to track the workflow EXECUTION_ID in Firestore and use the following order_state enum to allow for additional flexibility in payment processing:

code_block <ListValue: [StructValue([('code', 'payment_not_processed // Initial state when an order is created\r\npayment_declined // Payment was not successful\r\npayment_successful // Payment processed successfully\r\n...'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e9dddb0d640>)])]>

You can expand on the previous workflow and call a Cloud Function to process the payment:

code_block <ListValue: [StructValue([('code', 'main:\r\n params: []\r\n steps:\r\n - init:\r\n assign:\r\n - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}\r\n - order_id: "12345" # In practice we would pass in the order ID as a workflow parameter, e.g. ${params[0]}\r\n - firestore_collection: "orders"\r\n - URL: https://us-central1-<your_project_id>.cloudfunctions.net/processpayment\r\n - create_document:\r\n try:\r\n call: googleapis.firestore.v1.projects.databases.documents.createDocument\r\n args:\r\n collectionId: ${firestore_collection}\r\n parent: ${"projects/" + project_id + "/databases/(default)/documents"}\r\n query:\r\n documentId: ${order_id}\r\n body:\r\n fields:\r\n order_state: # We set an initial state\r\n stringValue: "payment_not_processed"\r\n workflow_id: # And also track this workflow execution ID\r\n stringValue: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}\r\n except:\r\n as: e\r\n steps:\r\n - endEarly:\r\n return: ${e} # Exception is raised, e.g. ${e.code == 409} if doc already exists\r\n - processPayment:\r\n try:\r\n call: http.post\r\n args:\r\n url: ${URL} # Might get called multiple times!\r\n auth:\r\n type: OIDC\r\n body:\r\n order_id: ${order_id}\r\n result: r\r\n retry: ${http.default_retry}\r\n - returnStep:\r\n return: ${r}'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e9dddb0d5b0>)])]>

Here’s the Cloud Function (Node.js v20) that processes the payment:

code_block <ListValue: [StructValue([('code', 'const functions = require(\'@google-cloud/functions-framework\');\r\nconst firestore = require(\'@google-cloud/firestore\');\r\n\r\n\r\nfunctions.http(\'helloHttp\', (req, res) => {\r\n const fs = new firestore.Firestore();\r\n try{\r\n// Reads the current state from Firestore and updates it within the same transaction to make this handler idempotent. Using a transaction is important. Note: It could be run multiple times but will only be committed once.\r\n return fs.runTransaction(t => {\r\n const docRef = fs.doc("orders/" + req.body.order_id);\r\n return t.get(docRef).then(doc => {\r\n console.log(doc, \'=>\', doc);\r\n var state = doc.data().order_state\r\n // Only process the order if we haven\'t already\r\n if (state == "payment_not_processed") {\r\n // Do payment stuff, e.g. debit account from another Firestore document\r\n // ...\r\n //\r\n state = "payment_successful"\r\n t.update(docRef, {order_state: state})\r\n res.status(200).send(state);\r\n return\r\n }\r\n res.status(200).send("request ignored, state already: " + state);\r\n });\r\n }).then(result => {\r\n console.log(\'Transaction result: \', result);\r\n });\r\n } catch (e) {\r\n console.log(\'Transaction failure:\', e);\r\n } \r\n});'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e9dddf27640>)])]>

package.json

code_block <ListValue: [StructValue([('code', '{\r\n "dependencies": {\r\n "@google-cloud/functions-framework": "^3.3.0",\r\n "@google-cloud/firestore": "^7.6.0"\r\n }\r\n}'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e9dddf27520>)])]>

The key takeaway is that all payment processing work occurs within a transaction, making all actions idempotent. The code within the transaction might run multiple times due to Workflows retries, but it’s only committed once. 

What about HTTP callbacks, Pub/Sub, Cloud Tasks?

So far, we’ve talked about how to make website-to-workflow and Workflows to Cloud Functions requests, exactly once. There are other ways of invoking or resuming Workflows such as HTTP callbacks, Pub/Sub messages or Cloud Tasks. How do you make those requests exactly once? Let’s take a look.

Callbacks

The good news is that Workflows HTTP callbacks are fully idempotent by default. It’s safe to retry a callback if it fails. For example:

code_block <ListValue: [StructValue([('code', '- createCallbackStep:\r\n call: events.create_callback_endpoint\r\n args:\r\n http_callback_method: "POST"\r\n result: callback_details\r\n- sendOutURL:\r\n call: http.post\r\n args:\r\n url: "https://your-endpoint.com/foo"\r\n body:\r\n callback_to_use: ${callback_details.url}\r\n...\r\n- callbackWaitStep:\r\n call: events.await_callback\r\n args:\r\n callback: ${callback_details}'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e9dddf27ca0>)])]>

Let’s assume that the first callback returns an error to the external caller. Based on the error, the caller might not know if the workflow callback was received, and should retry the callback. On the second callback, the caller will receive one of the following HTTP status codes:

  • 429 indicates that the first callback was received successfully. The second callback is rejected by the workflow.

  • 200 indicates that the second callback was received successfully. The first callback was either never received, or was received and processed successfully. If the latter, the second callback is not processed because await_callback is called only once. The second callback is discarded at the end of the workflow.

  • 404 indicates that a callback is not available. Either the first callback was received and processed and the workflow has completed, or the workflow is not running (and has failed, for example). To confirm this, you’ll need to send an API request to query the workflow execution state.

For more details, see Invoke a workflow exactly once using callbacks.

Pub/Sub messages 

When using Pub/Sub to trigger a new workflow execution, Pub/Sub uses at-least-once delivery with Workflows, and will retry on any delivery failure. Pub/Sub messages are automatically deduplicated. You don’t need to worry about duplicate deliveries in that time window (24 hours).

Cloud Tasks

Cloud Tasks is commonly used to buffer workflow executions and provides at-least-once delivery but it doesn’t have message deduplication. Workflow handlers should be idempotent.

Conclusion

Exactly-once request processing is a hard problem. In this blog post, we’ve outlined some scenarios where you might need exactly-once request processing when you’re using Workflows. We also provided some ideas on how you can implement it. The exact solution will depend on the actual use case and the services involved.

Read Entire Article