Real-Time Streaming Salesforce Updates to PubSub
TL/DR
The working implementation is on Third Attempt
Intro
A couple of months ago when we started building our data lake, one of the requirements was to try and get real-time data in. Most of our external and internal sources supported that, I had a bit more knowledge on Salesforce so I started researching how we can stream data from Salesforce in real-time.
Luckily I stumbled upon Salesforce Change Data Capture. At a first glance, it looks good, great, I thought, this is exactly what we need. So I started building a POC.
And thus, started most of my problems. Since our entire stack is on Google, and we already have set up some streaming jobs using Dataflow (with python), I thought to try and leverage that. So I tried searching how I can build a streaming pipeline using Dataflow aaaaand…. unfortunately, is not possible…
You see Dataflow in Python has connectors to limited Streaming Sources (only Kafka and PubSub). Ok, I thought, I will build an Unbounded Source for Salesforce myself… wrong again… In Python building custom Unbounded sources is not supported.
First attempt: Salesforce CDC and “Hacking” Dataflow
Even though it seems that I couldn’t use Dataflow, I wanted to try it anyway. So I build a pipeline and put the code which listens for messages from Salesforce inside ParDo. Surprisingly it worked! I let it run for 5–10 days without running to any issues, the streaming was working, it was pushing messages to PubSub all good.
import argparse
import asyncio
import loggingimport apache_beam as beam
from aiosfstream import SalesforceStreamingClient
from apache_beam.options.pipeline_options import PipelineOptionsasync def stream_events():
# connect to Streaming API
password = ‘my_password’
security_token = ‘my_security_token’
client = SalesforceStreamingClient(
consumer_key=’app_consumer_key’,
consumer_secret=’app_consumer_secret’,
username=’alex_test@salesforce_domain.com.sandbox’,
password=f’{password}{security_token}’,
sandbox=True
)
await client.open() # subscribe to topics
await client.subscribe(“/data/ContactChangeEvent”) logging.info(“waiting for messages”)
# listen for incoming messages
async for message in client:
topic = message[“channel”]
data = message[“data”]
logging.info(f”{topic}: {data}”)
push_data_to_pubsub(data) #sample implementationclass ReadData(beam.DoFn):
def process(self, nothing):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(stream_events())class SalesforceStreamingOptions(PipelineOptions):def run(argv=None): with beam.Pipeline(options=pipeline_options) as pipeline:
(pipeline
| ‘Start’ >> beam.Create([None])
| ‘Parse Data’ >> beam.ParDo(ReadData())
)if __name__ == ‘__main__’:
run()
But here are a few of the downsides.
- We can’t use yield to push any messages downstream because the streaming happens on a different thread (see ReadData method)
- All the code has to be written inside one block of ParDo, which is not ideal
- The code inside the ParDo is supposed to manipulate one element at a time, modify the message and then pass it on to the next step, so even though the code works fine, it’s not supposed to be used like that
- That’s why if you see the logs are flooded with messages like this.
Dataflow thinks that we’re processing the same element for 25 hours straight…
Soo, after that I went back to brainstorming what would be the next thing to try. After taking a look at all the available GCP products (and eliminated a few like CloudFunctions and CloudRun), I thought that the only way is to set up a microservice that will run on our already-set-up GKE cluster which will do only one thing, listen for messages from salesforce and push them to PubSub. (the stream_events function from above)
It might sound like a good idea but here comes the second requirement. Salesforce CDC pushes only the changed fields from the table we are watching, we wanted to get the whole row, and as much as I searched, the only way to achieve this is to make a get call back to Salesforce API on every event message we got.
In order to get the whole row on every change event, we would have to make a get call back to Salesforce after every change event
In theory that could work too, but it wouldn’t be scalable and we would hit the API limits pretty soon. So back to brainstorming again…
At this point, I started asking people from other teams if they have any ideas, and this is where the solution came from. Some people from the backend/DevOps team proposed that instead of listening for messages and hitting back the Salesforce API, we could push them out of Salesforce. They proposed writing an Apex Trigger which should push data to a cloud function which was going to push the data to PubSub
Turns out, we had already set up a repo with Apex code I could leverage. I could write my Apex Trigger there and on every change to an underlying table we would get an event with the changed rows which we could push to a cloud function and from there push them to PubSub. We already had set up CI/CD on the repo for deploying the code and querying Salesforce Database from within the APEX Script doesn’t count towards the API Limits!! So the only thing remaining was to write the code. The downside? Write Apex Code!
Querying Salesforce Database from Apex doesn’t count towards the API Limits
Second attempt: APEX Triggers and CloudFunction
So here I am again, searching for how to write APEX Triggers. Luckily I know Java so all the code looked familiar but one of the negatives of APEX is that we can’t use standard Java Libraries. So in my POC in order to call the cloud function from within Salesforce, I used Apex Callouts.
The POC was successful but thinking about production we don’t want anyone with the URL to be able to call our cloud function, so we want to find a way to restrict access by authenticating to GCP (using a service account) and then call the function.
But wait…. If we can authenticate to Google with a service account, why not push directly to PubSub?
Third attempt: APEX Triggers and PubSub
So here we are, third and last attempt. Directly pushing data from Salesforce to PubSub. After researching online and finding many useful articles and especially one written by Google’s Engineers I was able to make it work.
It’s a bit complicated because we need to set up some things in Salesforce before we can try our code, but I’ll try to explain step-by-step
Salesforce Setup
Generate JKS key for the Service Account
First things first, assuming we have set up PubSub and we have a service account with publishing access to a topic. We need to generate a key and import it to Salesforce. Unfortunately, Salesforce only accepts jks keys so we’ll have to make one.
- In the Google Cloud Console, navigate to Identity -> Service Accounts
- Find the service account you’re going to use to push data to PubSub, click the 3 little dots under the “Actions” column and hit Create Key
- In the Popup menu select P12 and click Create (The default password is
notasecret
) - Once you hit Create the P12 file will be downloaded locally
Now we need to transform the P12 key to jks. In order to do that we’ll use the cloud shell.
- In the Google Cloud Console click Activate Cloud Shell on the toolbar
- Click More select Upload File and select the P12 file you’ve just downloaded
- Rename the key to a friendlier name so you can use it easier
mv ${DEVSHELL_PROJECT_ID}-NNNNNN.p12 sf-pubsub-sa-key.p12
4. Now run the following command to generate a jks version of the key
keytool \
-importkeystore \
-srckeystore $HOME/sf-pubsub-sa-key.p12 \
-destkeystore $HOME/salesforce.jks \
-srcstoretype pkcs12 \
-srcstorepass notasecret \
-deststorepass notasecret \
-deststoretype jks \
-destalias google_cloud \
-srcalias privatekey
5. Now click More and click Download File
6. In the popup window enter salesforce.jks
and click Download
7. This will download the salesforce.jks
file to your local computer
Import Service Account Key to Salesforce
Now that we have the jks key we need to import it to Salesforce.
- Login to Salesforce and in Setup use the Quick Find and search for Identity Provider
- Click Enable Identity Provider and then click Save to accept the certificate.
- Again using Quick Find, search for Certificate and Key Management
- Click Import from Keystore
- Click Choose File and select the jks file you downloaded earlier
- In the Keystore Password enter
notasecret
- Click Save
- Make sure that the newly added certificate is labelled as google_cloud because that’s what we’re going to use later in the Apex script
Setting up a Custom Metadata Type Object
This part is not necessary but it’s better to use a custom metadata object to store the PubSub URL, the GCP Authentication URL and the service account email instead of hardcoding them into code. This is also helpful because with the same code we can access different URLs and service account emails in a sandbox and in prod.
The following code creates the Custom Object force-app/main/default/objects/GCP_Key__mdt/GCP_Key__mdt.object-meta.xml
<?xml version="1.0" encoding="UTF-8"?>
<CustomObject xmlns="http://soap.sforce.com/2006/04/metadata">
<label>GCP Key</label>
<pluralLabel>GCP Keys</pluralLabel>
<visibility>Public</visibility>
</CustomObject>
And with the following metadata file, we add an entry to the object we just createdforce-app/main/default/customMetadata/GCP_Key.Stage_GCP_PubSub_Cases.md-meta.xml
<?xml version="1.0" encoding="UTF-8"?>
<CustomMetadata xmlns="http://soap.sforce.com/2006/04/metadata" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<label>Stage_GCP_PubSub_Cases</label>
<protected>false</protected>
<values>
<field>Client_Email__c</field>
<value xsi:type="xsd:string">stage-salesforce-alex@alex-stage-project.iam.gserviceaccount.com</value>
</values>
<values>
<field>Pubsub_Endpoint__c</field>
<value xsi:type="xsd:string">https://pubsub.googleapis.com/v1/projects/alex-stage-project/topics/stage-alex-salesforce-cases:publish</value>
</values>
<values>
<field>Token_Endpoint__c</field>
<value xsi:type="xsd:string">https://www.googleapis.com/oauth2/v4/token</value>
</values>
</CustomMetadata>
Notice in the code
- the service account email should be the same as the one we have on the service account key we generated and uploaded earlier
- The URL we for authenticating to Google is always
https://www.googleapis.com/oauth2/v4/token
- And the PubSub URL would be
https://pubsub.googleapis.com/v1/{pubsub_topic_url}:publish
For Example:https://pubsub.googleapis.com/v1/projects/alex-test-project/topics/salesforce-cases:publish
Remote Site
In order for Salesforce to be able to make external calls to Google APIs we need to add the URLs as Remote Site. In our case, we need to add the Google Auth Url and the PubSub Urlforce-app/main/default/remoteSiteSettings/GCP_Auth.remoteSite-meta.xml
<?xml version="1.0" encoding="UTF-8"?>
<RemoteSiteSetting xmlns="http://soap.sforce.com/2006/04/metadata">
<fullName>GCP_Auth</fullName>
<description>To authenticate to google services</description>
<disableProtocolSecurity>false</disableProtocolSecurity>
<isActive>true</isActive>
<url>https://accounts.google.com</url>
</RemoteSiteSetting>
force-app/main/default/remoteSiteSettings/GCP_PubSub.remoteSite-meta.xml
<?xml version="1.0" encoding="UTF-8"?>
<RemoteSiteSetting xmlns="http://soap.sforce.com/2006/04/metadata">
<fullName>GCP_PubSub</fullName>
<description>Used to publish messages to PubSub</description>
<disableProtocolSecurity>false</disableProtocolSecurity>
<isActive>true</isActive>
<url>https://pubsub.googleapis.com</url>
</RemoteSiteSetting>
Cache Partition
The final thing we need to set up is the cache. When we authenticate to Google we’ll get back a bearer token, which then we’ll add to our headers to push data to PubSub. This token expires after 1 hour. So for this hour, we can re-use the old token.
Setting up the cache partition is similar
force-app/main/default/cachePartitions/gcpPartition.cachePartition-meta.xml
<?xml version=”1.0" encoding=”UTF-8"?>
<PlatformCachePartition xmlns=”http://soap.sforce.com/2006/04/metadata">
<description>Partition for storing GCP Access token </description>
<isDefaultPartition>false</isDefaultPartition>
<masterLabel>gcpPartition</masterLabel>
<platformCachePartitionTypes>
<allocatedCapacity>0</allocatedCapacity>
<cacheType>Session</cacheType>
</platformCachePartitionTypes>
<platformCachePartitionTypes>
<allocatedCapacity>1</allocatedCapacity>
<cacheType>Organization</cacheType>
</platformCachePartitionTypes>
</PlatformCachePartition>
In the above code, we allocate 1MB to the gcpPartition. Since we only plan to store one token it should be enough.
Apex Code
Ok Finally we have finished with the Salesforce setup, now we can move to write the actual Apex Code. For this example, we’ll create 1 trigger and 2 helper functions.
CaseTrigger.trigger
Our trigger is pretty simple. We want to get updates after every insert, update, delete and undelete on the Case table, we serialize the data to a JSON object and we use System.enqueueJob to start the background service which is going to push the data to PubSub.
trigger CaseTrigger on Case (after insert,after update,after delete,after undelete) {
if(Trigger.isAfter){
List<Case> cases; if(Trigger.isDelete){
cases = trigger.old;
}else{
cases = trigger.new;
}
String serialisedCases = JSON.serialize(cases);
PubSub gcpService = new PubSub(serialisedCases, String.valueOf(Trigger.operationType), PubSub.GCP_CASES_METADATA_NAME);
ID jobID = System.enqueueJob(gcpService);
}
}
PubSub.cls
For the PubSub class, we use the Queueable interface to be able to run the code as a background service. This is because making Callouts is not allowed in the main thread, also since it’s running in the background it won’t affect Salesforce’s performance
public class PubSub implements Queueable, Database.AllowsCallouts {
public static final String GCP_CASES_METADATA_NAME = 'GCP_PubSub_Cases'; // Serialised data coming from triggers
private String serializedData; // Action to be performed in GCP, can be 'create' in this example
private String action; // Metadata name in order to get the appropriate entry from sf metadata
private String metadata_name;
// Constructor
public PubSub(String serializedData, String action, String metadata_name) {
this.serializedData = serializedData;
this.action = action;
this.metadata_name = metadata_name;
} @TestVisible
private void pushDataToPubsub(String token, String pubsubEndpoint) {
List<SObject> deserialisedData = (List<SObject>) JSON.deserialize(serializedData, List<SObject>.class); if(!deserialisedData.get(0) instanceof Case){
throw new GCPServiceAccountException('The object you are trying to push to PubSub is not supported');
} PubsubRequest requestMessage = new PubsubRequest(deserialisedData, this.action); HTTP http = new HTTP();
HTTPRequest req = new HTTPRequest();
req.setHeader('Content-Type','application/json');
req.setHeader('Authorization', 'Bearer ' + token);
req.setEndpoint(pubsubEndpoint);
req.setMethod('POST');
req.setBody(JSON.serialize(requestMessage)); HTTPResponse response=new HttpResponse();
System.debug('POST request to ' + pubsubEndpoint);
if (!Test.isRunningTest()) {
response = http.send(req);
}
if (response.getStatusCode() == 200) {
System.debug(pubsubEndpoint + ' called successfully');
} else {
System.debug(LoggingLevel.ERROR, 'Error occured ' + response.getStatus() + ' ' + response.getStatusCode());
}
}public void execute(QueueableContext context) {
GoogleUtils utils = new GoogleUtils(this.metadata_name);
if (utils.get_token() != null) {
this.pushDataToPubsub(utils.get_token(),
utils.get_audience());
}
}public class GCPServiceAccountException extends Exception {}}
GoogleUtils.cls
The following function will query the custom Metadata Object we made and get us the GCP Key object.
Then we can use the following function to authenticate to Google
public class GoogleUtils { private String authToken = '';
private String audience = ''; public GoogleUtils(String metadata_name){
GCP_Key__mdt serviceAccount = getGCPServiceAccount(metadata_name);
audience = serviceAccount.Pubsub_Endpoint__c;
authToken = getGCPAuthToken(serviceAccount);
}
public String get_token(){
return authToken;
}
public String get_audience(){
return audience;
}// For authenticating to Google and saving the token to cache
public String getGCPAuthToken(GCP_Key__mdt serviceAccount) {
String cachedAccessToken = (String)
Cache.Org.get(CACHE_ACCESS_TOKEN_KEY);
if (cachedAccessToken != null) {
System.debug('Returning cached access token');
return cachedAccessToken;
} else {
GoogleAuthResponse result = new GoogleAuthResponse(); Auth.JWT jwt = new Auth.JWT();
jwt.setAud(serviceAccount.Token_Endpoint__c);
jwt.setIss(serviceAccount.client_email__c); // Additional claims to set scope
Map<String, Object> claims = new Map<String, Object>();
claims.put('scope', 'https://www.googleapis.com/auth/pubsub');
jwt.setAdditionalClaims(claims); // Create the object that signs the JWT bearer token
Auth.JWS jws = new Auth.JWS(jwt, 'google_cloud'); // Get the resulting JWS in case debugging is required
String token = jws.getCompactSerialization(); //Set the token endpoint that the JWT bearer token is posted to
String tokenEndpoint = serviceAccount.Token_Endpoint__c; Auth.JWTBearerTokenExchange bearer = new
Auth.JWTBearerTokenExchange(tokenEndpoint, jws); if (!Test.isRunningTest()) {
System.HttpResponse response = bearer.getHTTPResponse();
result =(GoogleAuthResponse)
JSON.deserialize(response.getBody(), GoogleAuthResponse.class);
} else {
result.access_token = 'IN TEST';
}
cacheAccessToken(result.access_token);
return result.access_token;
}
}// For getting the service account metadata from the custom object we created earlier
public GCP_Key__mdt getGCPServiceAccount(String metadata_name) {
List<GCP_Key__mdt> gcp_sa_keys = new List<GCP_Key__mdt>();
gcp_sa_keys = [SELECT Client_Email__c,
Pubsub_Endpoint__c,
Token_Endpoint__c
FROM GCP_Key__mdt
WHERE MasterLabel = :metadata_name];
if (gcp_sa_keys.size() > 0) {
return gcp_sa_keys[0];
} else {
throw new GCPServiceAccountException('Cannot find GCP Service
Account Keys');
}
}// For the cache
private final static String CACHE_ACCESS_TOKEN_KEY = 'local.gcpPartition.googleCloudAccessToken';public static void cacheAccessToken(String accessToken) {
// Cache the access token in the default org cache with a TTL of 30 seconds less than its expiration
Cache.Org.put(CACHE_ACCESS_TOKEN_KEY, accessToken, 3600 - 30);
}
public static Boolean clearAccessToken() {
// Method to clear the cached token from our local cache
return Cache.Org.remove(CACHE_ACCESS_TOKEN_KEY);
}
}
And that’s it. Our trigger should work fine. Looking at the Logs in the Salesforce Developer Console we’re able to see that our post called successfully
and the message arrived in PubSub
You can find the full code here: https://github.com/aFrag/SalesforceToPubSub
Finally
- When I started this POC I had no idea what direction to go, I tried, failed and tried again
- You never know where the solution will come from. On my occasion, it was brainstorming with people from completely different teams who had knowledge that I didn’t.
- Just because it’s working doesn’t mean it’s good (my First attempt with Dataflow)
References
- https://cloud.google.com/solutions/calling-protected-cloud-functions
- https://github.com/GoogleCloudPlatform/salesforce-cloud-functions-crm-tutorial
- https://trailhead.salesforce.com/en/content/learn/modules/apex_integration_services/apex_integration_rest_callouts
- https://salesforce.stackexchange.com/questions/13301/connect-apex-and-google-api-using-jwt-to-retrieve-oauth-2-0-token/71442#71442
- https://medium.com/@kumar385/google-pubsub-rest-api-61d96d3deae9
- https://developer.salesforce.com/docs/atlas.en-us.apexcode.meta/apexcode/apex_platform_cache_org_examples.htm
- https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_trigger_intro.htm
- https://developer.salesforce.com/docs/atlas.en-us.change_data_capture.meta/change_data_capture/cdc_trigger_example.htm