Announcing Apache Airflow operators for Google generative AI

2 months ago 27
News Banner

Looking for an Interim or Fractional CTO to support your business?

Read more

The data analytics landscape is evolving rapidly, and generative AI is a driving force behind this transformation. By generating meaningful content from data, generative models are changing how we approach data-driven decision-making. Google Cloud's Vertex AI, a unified AI development platform, offers powerful generative models such as Gemini that lead this innovation.

Now, it's never been easier to integrate Vertex AI's generative models into data pipelines orchestrated by Apache Airflow and Cloud Composer, Google Cloud’s fully managed workflow orchestration service. In the latest release of the apache-airflow-providers-google package (version 10.21.0), we’ve added three brand new Airflow operators to interact with Vertex AI's generative models. 

Let’s explore some use cases and then show how to use the new operators.

Generative AI-powered data pipelines

This integration unlocks new possibilities for data analytics pipelines, including, but not limited to:

  • Automated insights: Generate summaries, reports, and other valuable insights from raw data, saving time and resources for data analysts.

  • Data enrichment: Enhance datasets with synthetic data via generative models, expanding the scope of your analysis and improving downstream applications.

  • Advanced anomaly detection: Leverage generative models to identify unusual patterns and outliers in your data, strengthening your anomaly detection systems.

  • Text embeddings: Take a huge corpus of unstructured text and turn it into a structured form, making it possible to objectively compare, dissect, and derive insights from all that text.

  • Content generation: Provide DAG metadata such as descriptions, tags, and doc values. Or customize emails, alerts, and more based on contextual pipeline awareness.

  • Translation: Translate text, files, and other content into more than 35 different languages supported by Gemini.

Using the new Airflow operators

To generate a prediction via a language model you can use TextGenerationModelPredictOperator

Example:

code_block <ListValue: [StructValue([('code', 'predict_task = TextGenerationModelPredictOperator(\r\n task_id="predict_task",\r\n project_id="your-project",\r\n location="your-location",\r\n prompt="Explain the difference between a text generation model and a generative model.",\r\n pretrained_model="text-bison",\r\n )'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e3abb1b7160>)])]>

To generate text embeddings you can use TextEmbeddingModelGetEmbeddingsOperator.

Example:

code_block <ListValue: [StructValue([('code', 'generate_embeddings_task = TextEmbeddingModelGetEmbeddingsOperator(\r\n task_id="generate_embeddings_task",\r\n project_id="your-project",\r\n location="your-location",\r\n prompt="What are the benefits of generating text embeddings?",\r\n pretrained_model="textembedding-gecko",\r\n )'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e3abb1b79a0>)])]>

To generate content with a generative model you can use GenerativeModelGenerateContentOperator

Example:

code_block <ListValue: [StructValue([('code', 'generate_content_task = GenerativeModelGenerateContentOperator(\r\n task_id="generate_content_task",\r\n project_id="your-project",\r\n location="your-location",\r\n contents=[\r\n "Explain how to integrate Generative AI into an Airflow DAG in 25 words or less."\r\n ],\r\n pretrained_model="gemini-1.5-pro",\r\n )'), ('language', ''), ('caption', <wagtail.rich_text.RichText object at 0x3e3abb1b7b80>)])]>

Each operator returns the model’s response in XCom under the `model_response` key.

Potential real-world applications

Expanding upon the use cases mentioned above, you can bring Vertex AI Generative Models, Apache Airflow, and Google Cloud together for potential solutions such as:

Targeted marketing: Use Airflow to schedule and orchestrate an email campaign optimization process. Extract customer data from Google Sheets and store in Google Cloud Storage on a weekly or monthly basis. Use a Google Generative Model Airflow Operator to analyze the customer data to create multiple personalized subject lines and content options for each customer segment.

Data cleansing: Build an Airflow DAG to process batches of raw customer data from a staging area in Google Cloud Storage. Leverage Google Generative Model Airflow Operators to validate and standardize addresses, correcting errors and filling in missing information when possible. Flag any addresses that require manual review and load the cleansed data into BigQuery.

Anomaly detection for cost optimization: Set up an Airflow DAG to collect cloud resource usage data from monitoring APIs on a daily or hourly basis. Deploy a Google Generative Model trained on historical usage patterns and reference the model in your Google Generative Model Airflow Operators to analyze the data and identify unusual spikes in CPU usage, network traffic, or storage consumption. If significant anomalies are detected, send alerts to the infrastructure team for investigation and corrective action.

Representing visual content in new ways: Create an Airflow DAG that triggers when image/video files are uploaded to Google Cloud Storage. Use the multimodal capabilities of Google Generative Model Airflow Operators to generate tabular data that represents these files (for example, file metadata, time-series object detection, audio transcript data, frame analysis), Load the new tabular data into BigQuery and gain further insights.

Coalescing reports: Use Google Generative Model Airflow Operators to read hundreds or thousands of related PDF files and coalesce them summarized reports. Reduce the need for manual document write-up, reviews, and internal approvals. Export results to Google Cloud Storage. Evaluate results using the Rapid evaluation API service.

Automating feedback for customer service: On a daily basis, export CCAI customer service transcripts to Google Cloud Storage. Execute Google Generative Model Airflow Operators to analyze these transcripts and provide feedback on where the customer service can be improved. Export results to BigQuery or as a daily email to the customer service team.

Improving Airflow DAG and task alerts: When a DAG fails, prompt Google Generative Model Airflow Operators with the error messages and related DAG information and use the response to add contextual understanding to a Cloud Logging log-based alerting strategy.

By leveraging this powerful trio of technologies, businesses can develop innovative solutions across various domains and use-cases. 

Learn more

If you’re new to Airflow and Cloud Composer, check out our Quickstart: Run an Apache Airflow DAG in Cloud Composer 2. For even more information, check out the latest updates on the Google Cloud Airflow provider package, Google Cloud Vertex AI operators in Airflow, Cloud Composer, and generative AI on Vertex AI

We look forward to seeing how you integrate generative AI into your Airflow DAGs!

Read Entire Article