Real-Time Streaming Salesforce Updates to PubSub

Alex Fragotsis
Inside League
Published in
11 min readDec 12, 2020

TL/DR

The working implementation is on Third Attempt

Intro

A couple of months ago when we started building our data lake, one of the requirements was to try and get real-time data in. Most of our external and internal sources supported that, I had a bit more knowledge on Salesforce so I started researching how we can stream data from Salesforce in real-time.

Luckily I stumbled upon Salesforce Change Data Capture. At a first glance, it looks good, great, I thought, this is exactly what we need. So I started building a POC.

And thus, started most of my problems. Since our entire stack is on Google, and we already have set up some streaming jobs using Dataflow (with python), I thought to try and leverage that. So I tried searching how I can build a streaming pipeline using Dataflow aaaaand…. unfortunately, is not possible…

You see Dataflow in Python has connectors to limited Streaming Sources (only Kafka and PubSub). Ok, I thought, I will build an Unbounded Source for Salesforce myself… wrong again… In Python building custom Unbounded sources is not supported.

First attempt: Salesforce CDC and “Hacking” Dataflow

Even though it seems that I couldn’t use Dataflow, I wanted to try it anyway. So I build a pipeline and put the code which listens for messages from Salesforce inside ParDo. Surprisingly it worked! I let it run for 5–10 days without running to any issues, the streaming was working, it was pushing messages to PubSub all good.

import argparse
import asyncio
import logging
import apache_beam as beam
from aiosfstream import SalesforceStreamingClient
from apache_beam.options.pipeline_options import PipelineOptions
async def stream_events():
# connect to Streaming API
password = ‘my_password’
security_token = ‘my_security_token’
client = SalesforceStreamingClient(
consumer_key=’app_consumer_key’,
consumer_secret=’app_consumer_secret’,
username=’alex_test@salesforce_domain.com.sandbox’,
password=f’{password}{security_token}’,
sandbox=True
)
await client.open()
# subscribe to topics
await client.subscribe(“/data/ContactChangeEvent”)
logging.info(“waiting for messages”)
# listen for incoming messages
async for message in client:
topic = message[“channel”]
data = message[“data”]
logging.info(f”{topic}: {data}”)
push_data_to_pubsub(data) #sample implementation
class ReadData(beam.DoFn):
def process(self, nothing):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(stream_events())
class SalesforceStreamingOptions(PipelineOptions):def run(argv=None): with beam.Pipeline(options=pipeline_options) as pipeline:
(pipeline
| ‘Start’ >> beam.Create([None])
| ‘Parse Data’ >> beam.ParDo(ReadData())
)
if __name__ == ‘__main__’:
run()

But here are a few of the downsides.

  • We can’t use yield to push any messages downstream because the streaming happens on a different thread (see ReadData method)
  • All the code has to be written inside one block of ParDo, which is not ideal
  • The code inside the ParDo is supposed to manipulate one element at a time, modify the message and then pass it on to the next step, so even though the code works fine, it’s not supposed to be used like that
  • That’s why if you see the logs are flooded with messages like this.

Dataflow thinks that we’re processing the same element for 25 hours straight…

Soo, after that I went back to brainstorming what would be the next thing to try. After taking a look at all the available GCP products (and eliminated a few like CloudFunctions and CloudRun), I thought that the only way is to set up a microservice that will run on our already-set-up GKE cluster which will do only one thing, listen for messages from salesforce and push them to PubSub. (the stream_events function from above)

It might sound like a good idea but here comes the second requirement. Salesforce CDC pushes only the changed fields from the table we are watching, we wanted to get the whole row, and as much as I searched, the only way to achieve this is to make a get call back to Salesforce API on every event message we got.

In order to get the whole row on every change event, we would have to make a get call back to Salesforce after every change event

In theory that could work too, but it wouldn’t be scalable and we would hit the API limits pretty soon. So back to brainstorming again…

At this point, I started asking people from other teams if they have any ideas, and this is where the solution came from. Some people from the backend/DevOps team proposed that instead of listening for messages and hitting back the Salesforce API, we could push them out of Salesforce. They proposed writing an Apex Trigger which should push data to a cloud function which was going to push the data to PubSub

Turns out, we had already set up a repo with Apex code I could leverage. I could write my Apex Trigger there and on every change to an underlying table we would get an event with the changed rows which we could push to a cloud function and from there push them to PubSub. We already had set up CI/CD on the repo for deploying the code and querying Salesforce Database from within the APEX Script doesn’t count towards the API Limits!! So the only thing remaining was to write the code. The downside? Write Apex Code!

Querying Salesforce Database from Apex doesn’t count towards the API Limits

Second attempt: APEX Triggers and CloudFunction

So here I am again, searching for how to write APEX Triggers. Luckily I know Java so all the code looked familiar but one of the negatives of APEX is that we can’t use standard Java Libraries. So in my POC in order to call the cloud function from within Salesforce, I used Apex Callouts.

The POC was successful but thinking about production we don’t want anyone with the URL to be able to call our cloud function, so we want to find a way to restrict access by authenticating to GCP (using a service account) and then call the function.

But wait…. If we can authenticate to Google with a service account, why not push directly to PubSub?

Third attempt: APEX Triggers and PubSub

So here we are, third and last attempt. Directly pushing data from Salesforce to PubSub. After researching online and finding many useful articles and especially one written by Google’s Engineers I was able to make it work.

It’s a bit complicated because we need to set up some things in Salesforce before we can try our code, but I’ll try to explain step-by-step

Salesforce Setup

Generate JKS key for the Service Account

First things first, assuming we have set up PubSub and we have a service account with publishing access to a topic. We need to generate a key and import it to Salesforce. Unfortunately, Salesforce only accepts jks keys so we’ll have to make one.

  1. In the Google Cloud Console, navigate to Identity -> Service Accounts
  2. Find the service account you’re going to use to push data to PubSub, click the 3 little dots under the “Actions” column and hit Create Key
  3. In the Popup menu select P12 and click Create (The default password is notasecret)
  4. Once you hit Create the P12 file will be downloaded locally

Now we need to transform the P12 key to jks. In order to do that we’ll use the cloud shell.

  1. In the Google Cloud Console click Activate Cloud Shell on the toolbar
  2. Click More select Upload File and select the P12 file you’ve just downloaded
  3. Rename the key to a friendlier name so you can use it easier
mv ${DEVSHELL_PROJECT_ID}-NNNNNN.p12 sf-pubsub-sa-key.p12

4. Now run the following command to generate a jks version of the key

keytool \
-importkeystore \
-srckeystore $HOME/sf-pubsub-sa-key.p12 \
-destkeystore $HOME/salesforce.jks \
-srcstoretype pkcs12 \
-srcstorepass notasecret \
-deststorepass notasecret \
-deststoretype jks \
-destalias google_cloud \
-srcalias privatekey

5. Now click More and click Download File
6. In the popup window enter salesforce.jks and click Download
7. This will download the salesforce.jksfile to your local computer

Import Service Account Key to Salesforce

Now that we have the jks key we need to import it to Salesforce.

  1. Login to Salesforce and in Setup use the Quick Find and search for Identity Provider
  2. Click Enable Identity Provider and then click Save to accept the certificate.
  3. Again using Quick Find, search for Certificate and Key Management
  4. Click Import from Keystore
  5. Click Choose File and select the jks file you downloaded earlier
  6. In the Keystore Password enter notasecret
  7. Click Save
  8. Make sure that the newly added certificate is labelled as google_cloud because that’s what we’re going to use later in the Apex script

Setting up a Custom Metadata Type Object

This part is not necessary but it’s better to use a custom metadata object to store the PubSub URL, the GCP Authentication URL and the service account email instead of hardcoding them into code. This is also helpful because with the same code we can access different URLs and service account emails in a sandbox and in prod.

The following code creates the Custom Object
force-app/main/default/objects/GCP_Key__mdt/GCP_Key__mdt.object-meta.xml

<?xml version="1.0" encoding="UTF-8"?>
<CustomObject xmlns="http://soap.sforce.com/2006/04/metadata">
<label>GCP Key</label>
<pluralLabel>GCP Keys</pluralLabel>
<visibility>Public</visibility>
</CustomObject>

And with the following metadata file, we add an entry to the object we just created
force-app/main/default/customMetadata/GCP_Key.Stage_GCP_PubSub_Cases.md-meta.xml

<?xml version="1.0" encoding="UTF-8"?>
<CustomMetadata xmlns="http://soap.sforce.com/2006/04/metadata" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<label>Stage_GCP_PubSub_Cases</label>
<protected>false</protected>
<values>
<field>Client_Email__c</field>
<value xsi:type="xsd:string">stage-salesforce-alex@alex-stage-project.iam.gserviceaccount.com</value>
</values>
<values>
<field>Pubsub_Endpoint__c</field>
<value xsi:type="xsd:string">https://pubsub.googleapis.com/v1/projects/alex-stage-project/topics/stage-alex-salesforce-cases:publish</value>
</values>
<values>
<field>Token_Endpoint__c</field>
<value xsi:type="xsd:string">https://www.googleapis.com/oauth2/v4/token</value>
</values>
</CustomMetadata>

Notice in the code

Remote Site

In order for Salesforce to be able to make external calls to Google APIs we need to add the URLs as Remote Site. In our case, we need to add the Google Auth Url and the PubSub Url
force-app/main/default/remoteSiteSettings/GCP_Auth.remoteSite-meta.xml

<?xml version="1.0" encoding="UTF-8"?>
<RemoteSiteSetting xmlns="http://soap.sforce.com/2006/04/metadata">
<fullName>GCP_Auth</fullName>
<description>To authenticate to google services</description>
<disableProtocolSecurity>false</disableProtocolSecurity>
<isActive>true</isActive>
<url>https://accounts.google.com</url>
</RemoteSiteSetting>

force-app/main/default/remoteSiteSettings/GCP_PubSub.remoteSite-meta.xml

<?xml version="1.0" encoding="UTF-8"?>
<RemoteSiteSetting xmlns="http://soap.sforce.com/2006/04/metadata">
<fullName>GCP_PubSub</fullName>
<description>Used to publish messages to PubSub</description>
<disableProtocolSecurity>false</disableProtocolSecurity>
<isActive>true</isActive>
<url>https://pubsub.googleapis.com</url>
</RemoteSiteSetting>

Cache Partition

The final thing we need to set up is the cache. When we authenticate to Google we’ll get back a bearer token, which then we’ll add to our headers to push data to PubSub. This token expires after 1 hour. So for this hour, we can re-use the old token.
Setting up the cache partition is similar

force-app/main/default/cachePartitions/gcpPartition.cachePartition-meta.xml

<?xml version=”1.0" encoding=”UTF-8"?>
<PlatformCachePartition xmlns=”http://soap.sforce.com/2006/04/metadata">
<description>Partition for storing GCP Access token </description>
<isDefaultPartition>false</isDefaultPartition>
<masterLabel>gcpPartition</masterLabel>
<platformCachePartitionTypes>
​​ <allocatedCapacity>0</allocatedCapacity>
<cacheType>Session</cacheType>
</platformCachePartitionTypes>
<platformCachePartitionTypes>
​​ <allocatedCapacity>1</allocatedCapacity>
<cacheType>Organization</cacheType>
</platformCachePartitionTypes>​
</PlatformCachePartition>

In the above code, we allocate 1MB to the gcpPartition. Since we only plan to store one token it should be enough.

Apex Code

Ok Finally we have finished with the Salesforce setup, now we can move to write the actual Apex Code. For this example, we’ll create 1 trigger and 2 helper functions.

CaseTrigger.trigger

Our trigger is pretty simple. We want to get updates after every insert, update, delete and undelete on the Case table, we serialize the data to a JSON object and we use System.enqueueJob to start the background service which is going to push the data to PubSub.

trigger CaseTrigger on Case (after insert,after update,after delete,after undelete) {
if(Trigger.isAfter){
List<Case> cases;
if(Trigger.isDelete){
cases = trigger.old;
}else{
cases = trigger.new;
}

String serialisedCases = JSON.serialize(cases);
PubSub gcpService = new PubSub(serialisedCases, String.valueOf(Trigger.operationType), PubSub.GCP_CASES_METADATA_NAME);
ID jobID = System.enqueueJob(gcpService);
}
}

PubSub.cls

For the PubSub class, we use the Queueable interface to be able to run the code as a background service. This is because making Callouts is not allowed in the main thread, also since it’s running in the background it won’t affect Salesforce’s performance

public class PubSub implements Queueable, Database.AllowsCallouts {
public static final String GCP_CASES_METADATA_NAME = 'GCP_PubSub_Cases';
// Serialised data coming from triggers
private String serializedData;
// Action to be performed in GCP, can be 'create' in this example
private String action;
// Metadata name in order to get the appropriate entry from sf metadata
private String metadata_name;

// Constructor
public PubSub(String serializedData, String action, String metadata_name) {
this.serializedData = serializedData;
this.action = action;
this.metadata_name = metadata_name;
}
@TestVisible
private void pushDataToPubsub(String token, String pubsubEndpoint) {
List<SObject> deserialisedData = (List<SObject>) JSON.deserialize(serializedData, List<SObject>.class);
if(!deserialisedData.get(0) instanceof Case){
throw new GCPServiceAccountException('The object you are trying to push to PubSub is not supported');
}
PubsubRequest requestMessage = new PubsubRequest(deserialisedData, this.action); HTTP http = new HTTP();
HTTPRequest req = new HTTPRequest();
req.setHeader('Content-Type','application/json');
req.setHeader('Authorization', 'Bearer ' + token);
req.setEndpoint(pubsubEndpoint);
req.setMethod('POST');
req.setBody(JSON.serialize(requestMessage));
HTTPResponse response=new HttpResponse();
System.debug('POST request to ' + pubsubEndpoint);
if (!Test.isRunningTest()) {
response = http.send(req);
}

if (response.getStatusCode() == 200) {
System.debug(pubsubEndpoint + ' called successfully');
} else {
System.debug(LoggingLevel.ERROR, 'Error occured ' + response.getStatus() + ' ' + response.getStatusCode());
}
}
public void execute(QueueableContext context) {
GoogleUtils utils = new GoogleUtils(this.metadata_name);

if (utils.get_token() != null) {
this.pushDataToPubsub(utils.get_token(),
utils.get_audience());
}
}
public class GCPServiceAccountException extends Exception {}}

GoogleUtils.cls
The following function will query the custom Metadata Object we made and get us the GCP Key object.
Then we can use the following function to authenticate to Google

public class GoogleUtils { private String authToken = '';
private String audience = '';
public GoogleUtils(String metadata_name){
GCP_Key__mdt serviceAccount = getGCPServiceAccount(metadata_name);
audience = serviceAccount.Pubsub_Endpoint__c;
authToken = getGCPAuthToken(serviceAccount);
}

public String get_token(){
return authToken;
}

public String get_audience(){
return audience;
}
// For authenticating to Google and saving the token to cache
public String getGCPAuthToken(GCP_Key__mdt serviceAccount) {
String cachedAccessToken = (String)
Cache.Org.get(CACHE_ACCESS_TOKEN_KEY);
if (cachedAccessToken != null) {
System.debug('Returning cached access token');
return cachedAccessToken;
} else {
GoogleAuthResponse result = new GoogleAuthResponse();
Auth.JWT jwt = new Auth.JWT();
jwt.setAud(serviceAccount.Token_Endpoint__c);
jwt.setIss(serviceAccount.client_email__c);
// Additional claims to set scope
Map<String, Object> claims = new Map<String, Object>();
claims.put('scope', 'https://www.googleapis.com/auth/pubsub');
jwt.setAdditionalClaims(claims);
// Create the object that signs the JWT bearer token
Auth.JWS jws = new Auth.JWS(jwt, 'google_cloud');
// Get the resulting JWS in case debugging is required
String token = jws.getCompactSerialization();
//Set the token endpoint that the JWT bearer token is posted to
String tokenEndpoint = serviceAccount.Token_Endpoint__c;
Auth.JWTBearerTokenExchange bearer = new
Auth.JWTBearerTokenExchange(tokenEndpoint, jws);
if (!Test.isRunningTest()) {
System.HttpResponse response = bearer.getHTTPResponse();
result =(GoogleAuthResponse)
JSON.deserialize(response.getBody(), GoogleAuthResponse.class);
} else {
result.access_token = 'IN TEST';
}
cacheAccessToken(result.access_token);
return result.access_token;
}
}
// For getting the service account metadata from the custom object we created earlier
public GCP_Key__mdt getGCPServiceAccount(String metadata_name) {
List<GCP_Key__mdt> gcp_sa_keys = new List<GCP_Key__mdt>();
gcp_sa_keys = [SELECT Client_Email__c,
Pubsub_Endpoint__c,
Token_Endpoint__c
FROM GCP_Key__mdt
WHERE MasterLabel = :metadata_name];
if (gcp_sa_keys.size() > 0) {
return gcp_sa_keys[0];
} else {
throw new GCPServiceAccountException('Cannot find GCP Service
Account Keys');
}
}
// For the cache
private final static String CACHE_ACCESS_TOKEN_KEY = 'local.gcpPartition.googleCloudAccessToken';
public static void cacheAccessToken(String accessToken) {
// Cache the access token in the default org cache with a TTL of 30 seconds less than its expiration
Cache.Org.put(CACHE_ACCESS_TOKEN_KEY, accessToken, 3600 - 30);
}

public static Boolean clearAccessToken() {
// Method to clear the cached token from our local cache
return Cache.Org.remove(CACHE_ACCESS_TOKEN_KEY);
}
}

And that’s it. Our trigger should work fine. Looking at the Logs in the Salesforce Developer Console we’re able to see that our post called successfully

and the message arrived in PubSub

You can find the full code here: https://github.com/aFrag/SalesforceToPubSub

Finally

  • When I started this POC I had no idea what direction to go, I tried, failed and tried again
  • You never know where the solution will come from. On my occasion, it was brainstorming with people from completely different teams who had knowledge that I didn’t.
  • Just because it’s working doesn’t mean it’s good (my First attempt with Dataflow)

References

  1. https://cloud.google.com/solutions/calling-protected-cloud-functions
  2. https://github.com/GoogleCloudPlatform/salesforce-cloud-functions-crm-tutorial
  3. https://trailhead.salesforce.com/en/content/learn/modules/apex_integration_services/apex_integration_rest_callouts
  4. https://salesforce.stackexchange.com/questions/13301/connect-apex-and-google-api-using-jwt-to-retrieve-oauth-2-0-token/71442#71442
  5. https://medium.com/@kumar385/google-pubsub-rest-api-61d96d3deae9
  6. https://developer.salesforce.com/docs/atlas.en-us.apexcode.meta/apexcode/apex_platform_cache_org_examples.htm
  7. https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_trigger_intro.htm
  8. https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_trigger_example.htm

--

--