Stream Platform Events directly from Salesforce into Data Cloud using the Ingestion API

A fully “on platform” approach to stream Salesforce platform events directly into the Data Cloud Ingestion API.

Justus van den Berg
10 min readNov 10, 2023

Let’s say we have multiple platform event types we want to stream in real-time to Data Cloud. This could be from multiple orgs and potentially stream to multiple Data Cloud instances.

This requires a flexible, reusable solution: A single implementation that can be used for any type of platform event to map to any Data Cloud ingestion streaming API. So I decided to build a mini event bridge…

And there is more… Once you can map platform event objects, you implement exactly the same logic on any type of Salesforce object. This is great because it allows for Flows, LWCs and Apex to stream events directly to Data Cloud with minimal code required.

Requirements

Let’s start by outlining what it is we want to achieve exactly:

  • Map any type of Platform Event or sObject to any Data Lake Object
  • A configurable setup that can be reused from multiple orgs
  • A solution that requires as little code as possible yet is very flexible
  • The process will look like this:

What do we need?

  • A secure connection from any spoke org to one or more Data Cloud instances that leverages Named Credentials for security reasons.
    I have written a custom auth provider package to achieve this. Please see the instructions on how to set this up.
  • A custom metadata configuration that holds a mapping between sObject fields and Ingestion API Object fields.
    In the configuration we also need to keep track of the target Ingestion API connector and target Object name.
    Lastly we need to define the Named Credential.
  • The Ingestion API allows multiple records to be ingested at the same time, so as per Apex Best Practices the solution has to be bulkified.
  • The code will look something like this:
trigger PlatformEventTrigger on PlatformEvent (after insert) {

// Stream the event records based on a user defined configuration
// This logic can be called from any class / method
// The Async method is required for triggers only
// This is all the code an end user should have to worry about
utl.Dc.streamRecordsToDataCloudAsync('[CONFIG_NAME]', trigger.New);

}

Limits to consider

  • Ingestion API Streaming payloads cannot exceed 200 kb, so we cannot send massive amounts of records. But usually it should be only one.
  • Calling the Ingestion API will eat into your Data Cloud Limits.
  • Outbound calls do not consume any API limits in Salesforce
  • Subscribing to a platform event using an apex trigger does not count against your subscriber limits
  • Creating a platform event does still count towards your (hourly) limit
  • Because we use an Apex trigger, the API call to Data Cloud needs to fire Asynchronously. This uses a future method. When a future method fires is based on resource availability. Usually they execute instantly, but theoretically it can take a few minutes making this a near real-time solution. The Streaming Ingestion API processes records every two minutes, so I don’t see this an actual issue, but keep it mind.

Note on the platform event trigger callout architecture

Relaying an event through a web service call could be considered a bit of an anti-pattern. It has the advantage that you don’t need a (relatively complicated) subscriber client like Cometd or gRPC and it does not eat into your subscriber limits but it has some downsides as well.
If a callout fails there is no way to leverage the ReplayId and get the sent events again. Depending on how robust your solution needs to be, you might need to implement a custom error handling and retry mechanism. I personally think the pros outweigh the cons in many scenarios, but be well aware of the implications of this type of implementation.

Important

Security is no easy subject: Before implementing this (or any) solution, always validate what you’re doing with a certified security expert and your certified implementation partner.

TL;DR :: A Data Cloud Utility

I created a Data Cloud Utility that contains the mapping object and a sync and async method to stream records to Data Cloud from Apex / from an Apex Trigger.
The Utility also comes with a set of tools for bulk ingestion and YAML creation. Throughout the detailed setup I’ll be referencing the methods from the utility.

The GitHub repository can be found here:
https://github.com/jfwberg/lightweight-data-cloud-util

Step by Step Example Setup

Ok enough talk lets setup an example. In this example I have a platform event that represents a Smart Device Event. This can be turning on a light, scheduling the washing machine to run at a time there is a cheap electricity or enabling the heating when the temperature drops below a set threshold.

Note: This whole example is based around platform events, but it will work with any type of Salesforce Objects and with Apex what easily can be made to work with flows and LWCs.

A full configured demo is available in the Git Repo.

01 :: Spoke Org :: Install packages

Update: I have written a detailed step by step guide on the setup here: Salesforce Data Cloud Utility and Ingestion Api UI Setup Instructions Please install all packages listed in the first step of these instructions.

02 :: Set up Connected App and Named Credential

In order to connect we need to setup a named credential on the spoke org and a connected app on the data cloud org. Please follow the instructions.
If you want multiple spoke orgs or multiple Data Cloud Instances, simply repeat the steps on all targets.

03 :: Spoke Org :: Create a platform event

In setup create a platform event, in this example the event is called Smart_Device_Event__e to represent an event that is sent from a smart home device.
We create a few fields to bring out example to life. Action__c is it switched on or off? What Device_Type__c, i.e. a lightbulb, A Status_Code__c to test a number field and a Timestamp__c to test a date/time field value.
To make an event unique we can use the EventUuId field. Set the push behaviour to Publish Immediately.

04 :: Data Cloud :: Create Ingestion API Connector

  • Open Data Cloud, go to data cloud setup through the settings cog
  • On the left menu bar click the “Ingestion API” button
  • Click the “New” button in the top right corner and give the ingestion connector a name. In call it “Smart Device Event Connector” because the ingestion source will be smart device events from out platform event.
  • Now create a YAML file that represents the object we’ll map the event data to. For formatting details please see the documentation. Save the file somewhere on your drive. Note that the Utility has an SObject 2 YAML generator. That can be leveraged as well.
openapi: 3.0.3
components:
schemas:
Smart_Device_Event:
type: object
properties:
ReplayId:
type: string
EventUuid:
type: string
Action:
type: string
Device_Type:
type: string
Status_Code:
type: number
Status_Reason:
type: string
Timestamp:
type: string
format: date-time
  • In setup, on the details page of your newly created ingestion API connector press the button “Upload files” and upload your YAML file. You should see a preview with your fields. Press “Save
    Take note of these field names as there are the ones we are going to set up in our mapping configuration later.

05 :: Data Cloud :: Create a new Data Stream

  • The initial setup is now complete. Go back to the Data Cloud app and navigate to the Data Streams Tab.
  • Click “New”, selected the “Ingestion API” tile and click “Next”.
  • Select the Ingestion API you just created and select the object from the YAML file. Now click “Next”
  • Select what type of event category you want. In this case it is engagement data so I select Engagement. For the primary I select the unique identifier of the event, the EventUuid field and for the timestamp our custom Timestamp field. When ready click “Next
  • Optionally set your data space name or keep it at “default” and press “Deploy
  • That is Data Cloud now all set up and ready to receive streaming data.

06 :: Spoke Org :: Create a configuration and mapping

We now go back to the spoke org, to setup, custom metadata to create the configuration and mapping custom metadata records.

  • Click on “Manage Records” next to the “Data Cloud Ingestion API Configuration” custom metadata type and click “New
  • For the label and choose something that matches the platform event, in this case “Smart Device Event Configuration”. Not that this API name is what we will need in our Apex code later as the identifier.
  • In Named Credential Name specify the API name for the Data Cloud Named Credential. (Make sure you have tested it works). If you want to stream to multiple data cloud instances, you have to duplicate all these records. I don’t think this is a common use case, so I have not built anything for that specifically.
  • Set the ingestion API connector name to the connector name you have just setup. In this case “Smart_Device_Event_Connector
  • And lastly set the Target Object name to the name as specified in the Ingestion API / YAML file: In this case “Smart_Device_Event
  • Press “Save
  • Go back to custom metadata types in Setup and click on manage records next to the “Data Cloud Ingestion API Field Mapping” metadata type.
  • Click “New”
  • For each of the fields create a mapping record. Make sure the mapping name is a unique key (unfortunately this is required and manual)
  • Select your source (platform event field) and target (Ingestion API Connector field) and populate the lookup to the configuration record accordingly.
  • When all done your mapping table should look like this:

07 :: Spoke Org :: Create a trigger

The configuration is now complete so it’s time to create a trigger on our platform event.

trigger SmartDeviceEventTrigger on Smart_Device_Event__e (after insert) {

// Stream the data to data cloud
utl.Dc.streamRecordsToDataCloudAsync(
JSON.serialize(trigger.new),
'Smart_Device_Event_Configuration'
);
}
  • In the trigger all we call the utl.Dc.streamRecordsToDataCloudAsync() method. This is a future method. (Hence the Async)
  • The first argument is a JSON serialized version of the platform event. This is because future methods cannot take objects as arguments. So we do a round trip serialization. If you use the non-async method, you pass in the sObject list.
  • The second argument is the name of the metadata configuration record and should probably be in a custom label. But for a demo this will do.

08 :: Spoke Org :: create PlatformEventSubscriberConfig metadata

External credentials require that the user who calls them through a named credential has access to the Named Principal through a profile or permission set.
Unfortunately platform events execute as an automated user that you cannot assign any access to, making it impossible to make callouts through a future method.

The only way to solve this issue is to specify a running (System) user that executes the platform event. This is done using a PlatformEventSubscriberConfig metadata type and cannot be done through setup, it has to be done through the Metadata API.
The demo folder in the repository contains an example. Make sure to update the username before deploying.

Create an XML file that contains the running user info and the name of the platform event trigger you have created in step 07.
Name the file SmartDeviceEventTrigger.platformEventSubscriberConfig-meta.xml and deploy it using the metadata API or through VSCode.

<?xml version="1.0" encoding="UTF-8"?>
<PlatformEventSubscriberConfig xmlns="http://soap.sforce.com/2006/04/metadata">
<platformEventConsumer>SmartDeviceEventTrigger</platformEventConsumer>
<batchSize>100</batchSize>
<masterLabel>SmartDeviceEventTriggerConfig</masterLabel>
<user>test-382rc1gvovzk@example.com</user>
<isProtected>false</isProtected>
</PlatformEventSubscriberConfig>

It’s important that on your platform event you go to subscriptions, then manage and click suspend and resume for the new user to take effect. If you deploy everything at once, like the demo, it should work straight away. But it might save you some headache figuring out why it does not work. Make sure the running user has access to the External Credential.

09 :: Spoke Org :: Testing

Now we are all set up all we have to do is fire some platform events and watch them stream into Data Cloud. Here is an example to publish the event. The EventUuid and ReplayId are populated automatically.

EventBus.publish(
new Smart_Device_Event__e(
Action__c = 'Switch State',
Device_Type__c = 'Lightbulb',
Status_Code__c = 1,
Status_Reason__c = 'Switched On (1)',
Timestamp__c = Datetime.now()
)
);

In Data Cloud go to the Data Explorer Tab, Select the Smart Device Event Connector Data Lake Object in the chosen Data Space and behold the events streaming in…

10 :: Spoke Org :: Example

The full example configuration is available in the GitHub Repo. It contains an LWC to test the firing of platform events. You can play around with it to get a better idea of what it can do.

Conclusion

There you have it; a clear example of how we can stream data into Data Cloud with a single line of code (and some package magic) using custom metadata configurations.

It does take a little set up time but it’s flexible, and no boilerplate code has to be written and the connection is taken care of through named credentials.

Final note

At the time of writing I work for Salesforce. The views / solutions presented here are strictly MY OWN and NOT per definition the views or solutions Salesforce as a company would recommend. Again; always consult with your certified implementation partner before implementing anything you’ve read on the internet.

--

--