Using Argo Events With Pub/Sub to Trigger Kubernetes Jobs in Google Cloud

Ruben Blazquez Cob
Google Cloud - Community
5 min readFeb 9, 2023

Event-based systems are becoming increasingly popular in modern app development. They allow you to quickly respond to events and make decisions in real-time, giving your organization the agility and responsiveness it needs to stay ahead in today’s fast-moving digital world.

Despite the growing importance of event-based systems, there is currently no native solution for triggering the creation of resources in Google Kubernetes Engine (GKE) from Pub/Sub messages. This can be a challenge for organizations looking to build event-driven applications.

This is where Argo comes in with Argo Events, allowing us to connect a container to Pub/Sub, listen for incoming messages, and trigger containerized jobs.

Argo Events has many other use cases and sources it can work with. You can read more about it here.

In this article, we’ll look at a specific use case — triggering Kubernetes Jobs from Google Cloud Pub/Sub messages and I will walk you through how to set it up.

This guide will assume that you have installed the gcloud SDK and kubectl.

Create cluster and Pub/Sub

Here we will create the prerequisites to set up Argo Events for our use case - A GKE cluster and a Pub/Sub Topic and Subscription.

This will be done with an autopilot cluster but can be configured with the standard type just as well.

  1. Create cluster — This will create an autopilot cluster
gcloud container clusters create-auto CLUSTER_NAME \
--region REGION \
--project=PROJECT_ID

2. Connect to your cluster

gcloud container clusters get-credentials CLUSTERNAME \
--region REGION \
--project=PROJECT_ID

3. Create Pub/Sub topic

gcloud pubsub topics create TOPICNAME

4. Create a Pub/Sub subscription

gcloud pubsub subscriptions create SUBSCRIPTIONNAME --topic=TOPICNAME

Installing Argo Events

For a more in-depth and up-to-date guide on installing Argo Events, follow the official instructions.

1.Create the namespace.

kubectl create namespace argo-events

2.Deploy Argo Events SA, ClusterRoles, and Controller for Sensor, EventBus, and EventSource.


kubectl apply -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install.yaml

3.Deploy the eventbus.

kubectl apply -n argo-events -f https://raw.githubusercontent.com/argoproj/argo-events/stable/examples/eventbus/native.yaml

Create the Kubernetes service account and roles

Make sure to have the proper workload identity configuration beforehand to be able to connect to Pub/Sub and that you create the resources in the right namespace.

If you are not using Workload Identity, simply get a JSON key from the GSA and uncomment the parameter credentialSecret in the EventSource. Of course, this option is only recommended for testing. Leave everything else as is.

  1. Create Google Service Account
gcloud iam service-accounts create GSA_NAME \
--project=GSA_PROJECT

2. Allow the Kubernetes service account to impersonate the IAM service account by adding an IAM policy binding between the two service accounts.

gcloud iam service-accounts add-iam-policy-binding GSA_NAME@GSA_PROJECT.iam.gserviceaccount.com \
--role roles/iam.workloadIdentityUser \
--member "serviceAccount:PROJECT_ID.svc.id.goog[NAMESPACE/KSA_NAME]"

3. Give your Google Service Account the needed Pub/Sub roles.

gcloud projects add-iam-policy-binding PROJECT_ID \
--member "serviceAccount:GSA_NAME@GSA_PROJECT.iam.gserviceaccount.com" \
--role "roles/pubsub.subscriber"

4. Deploy this yaml file to configure the Kubernetes Service Account

apiVersion: v1
kind: ServiceAccount
metadata:
annotations:
iam.gke.io/gcp-service-account: <SA-ID>@<PROJECT_ID>.iam.gserviceaccount.com
# Replace service account name for your Google Service Account configured with workload identity
name: argo-events-ksa
namespace: argo-events

---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
namespace: argo-events
name: role-argo-events-ksa
rules:
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["create"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: role-argo-events-ksa-binding
namespace: argo-events
subjects:
- kind: ServiceAccount
name: argo-events-ksa
namespace: argo-events
roleRef:
kind: Role
name: role-argo-events-ksa
apiGroup: rbac.authorization.k8s.io

Create the YAML file and deploy

kubectl apply -f ksa.yaml

Create the EventSource resource

This will create a workload to listen for incoming Pub/Sub messages

As of the writing of this article, setting “jsonBody” to false results in receiving base64 encoded strings.

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: gcp-pubsub
namespace: argo-events
spec:
template:
serviceAccountName: argo-events-ksa
pubSub:
example:
# (optional) jsonBody specifies that all event body payload coming from this
# source will be JSON
jsonBody: true
# (optional) GCP project ID for the subscription.
# Required if you run Argo Events outside of GKE/GCE.
# (otherwise, the default value is its project)
projectID: <your-project-id>
# (optional) GCP project ID for the topic.
# By default, it is same as ProjectID.
# topicProjectID: "project-id"
# (optional) Topic to which the subscription should belongs.
# Required if you want the eventsource to create a new subscription.
# If you specify this field along with an existing subscription,
# it will be verified whether it actually belongs to the specified topic.
topic: <your-topic>
# (optional) ID of subscription.
# Required if you use existing subscription.
# The default value will be auto generated hash based on this eventsource setting, so the subscription
# might be recreated every time you update the setting, which has a possibility of event loss.
subscriptionID: <your-subscriptionID>
# (optional) Refers to a k8s secret which contains the credential JSON to access pubsub.
# If it is missing, it implicitly uses Workload Identity.
#credentialSecret:
# name: argo-sa-json
# key: creds

Create the YAML file and deploy

kubectl apply -f eventsource.yaml

Create the Sensor resource

This will be triggered whenever a new message is received and create a GKE job for each message received

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: gcp-pubsub
namespace: argo-events
labels:
app: cowsay
spec:
template:
serviceAccountName: argo-events-ksa
dependencies:
- name: test-dep
eventSourceName: gcp-pubsub
eventName: example
triggers:
- template:
name: webhook-pod-trigger
k8s:
operation: create
source:
resource:
apiVersion: batch/v1
kind: Job
metadata:
generateName: cowsay-
spec:
ttlSecondsAfterFinished: 100
template:
metadata:
labels:
app: cowsay
spec:
containers:
- name: ok
image: docker/whalesay:latest
command: [cowsay]
#args: [""] This gets filled with the body of the pub sub message
restartPolicy: Never
backoffLimit: 4
parameters:
- src:
dependencyName: test-dep
dataKey: body
value: messageNotCaptured
dest: spec.template.spec.containers.0.args.0

Create the YAML file and deploy

kubectl apply -f sensor.yaml

Test out the solution

  1. Publish a message (Has to have JSON format)
gcloud pubsub topics publish TOPIC_ID --message='{"Hello":"World!"}'

2. Search the logs

kubectl logs -l app=cowsay -n argo-events --tail 30

3. Success!

Summary

After scratching my head for a little while, not finding an “ultimate guide” and getting lost in some documentation, I decided to write down the exact steps it takes to replicate this setup.

Hopefully, this serves you well and you give it a try!

Guides followed

--

--

Ruben Blazquez Cob
Google Cloud - Community

CTO at Devoteam G Cloud Denmark. GDG Cloud Copenhagen Organizer. Passionate about all things around DevOps, modernizing, and Hybrid/Multi-Cloud environments.