Building a Data Cloud Ingestion API Utility on the Salesforce Platform

Unleash the power to ingest any type of data from any Salesforce Org directly into any Data Cloud Instance with a user friendly UI.

Justus van den Berg
16 min readJan 16, 2024

A few weeks ago I wrote a Custom Auth Provider that allows us to directly call the native Data Cloud APIs securely, using Named Credentials.

I gave an example of how this could be leveraged by streaming platform events in (near) real-time to Data Cloud directly from any Salesforce Org to any Data Cloud Instance, without the need for any middleware.

I have extended the Custom Auth Provider with a library of Data Cloud Apex Utilities to simplify calling these APIs using Apex.
The main focus is on the Ingestion API for both for Bulk and Streaming Ingestion, but the Query and Data Graph API are part of the Library as well.

Setting up the Ingestion API can be quite tedious: You have to create a YAML file with the object and field details. I have created a utility to easily convert Salesforce object(s) to a YAML file that can be used directly when you setup a new Data Cloud Ingestion API.

In this article I will go into the details of the library, the utilities and their example UI implementations to show the value that flexible and scalable multi org connection can bring.

I start with the explanation of “the why” first, go on to the Apex methods and finish with the UI Application built with these methods. It’s quite a long article, so feel free to skip parts that are not of any interest to you.

This is an exploratory / art of the possible approach, always speak to your certified implementation partner before implementing anything you’ve read about on the internet.

Update: I have written a detailed step by step guide on the setup here: Salesforce Data Cloud Utility and Ingestion Api UI Setup Instructions

Why connect to the native Data Cloud API?

The advantage of calling the native APIs instead of using the Connect API, is that we can use the Ingestion API directly from Salesforce.

Another advantage is that we can connect any Salesforce Org to any Data Cloud Instance. This allows the use of Data Cloud Data on multiple remote Orgs in LWCs, Apex or Flows or to write data from any Org to Data Cloud.

Most importantly this gives us the ability to push data when we want to, in the format we want to. It allows for platform event streaming and to send data from triggers. This enables a full near real-time two way communication between triggers in an Org and Data Actions in Data Cloud.

Note: The Spring ‘24 release has brought the flow action “Send Data to Data Cloud”. If you don’t require Apex, this can be used instead of the custom solution I referred to earlier. It still uses the Ingestion API and can only connect to the home org.

TL;DR

The Apex Utility GitHub repository can be found here.

Why not use the Salesforce CRM Connector?

As always; if you can go with a standard approach, always go with the standard approach. Connecting Salesforce data from multiple orgs into Data Cloud is best achieved through the native Salesforce CRM connector.

It has some limitation you need to keep in mind though:

  • Security: The CRM connector requires view all permissions on the objects it ingests. In heavily regulated industries this might be a problem and a lower level of access might be required.
  • There is no option to subscribe to platform events natively, so platform events will always have to be streamed into Data Cloud.
  • There are no options for parent child relationship fields, sub queries or have any query filters. It’s all or none.
  • Not all sObjects seem to supported; most standard and custom objects are, but I don’t seem to have all change history, feed and/or setup objects available.
    I have seen discrepancies between orgs, so this might be due to my config where not all objects have read all enabled on the security settings, but it is worth checking out.
  • No tooling API objects or Metadata API objects are supported (Why we would want this information in Data Cloud is a topic for a different day)
  • It is an all or none approach, that runs only once every 10 minutes. It used to be every hour, so great improvements there in the last release.
    If you only need refreshed data every 10 minutes, the CRM connector is the perfect solution for you.
  • Not really a limitation from the connector but there is no option for an Admin or End User to simply upload a CSV file with data for ingestion or Deletion.
    The Apex Utility I am writing about today is going to change that!

Clean data

For me the one that stands out is the the fact that relationship data like “Owner.Profile.UserLicense.Name” is not possible, only the fields on the object.
Parent child relationships are achieved by creating additional data streams for the parent/child data and harmonise that data based on the Ids. So in this case we would need a data stream for User, Profile and UserLicense. That is not ideal to have that done in Data Cloud when a simple query could take care of that.

Bringing in pre-harmonised and filtered data means less data cleaning actions required and less processing power for Data Cloud. The aim is for better results from cleaner input data.

Note: This is all use case dependent: There might be use cases where de-normalizing the data in advance might be a terrible idea. There might be use cases where you should always have all the data so for example your AI can make better decisions. I can see value for at least having the option to filter and combine data before ingestion over having multiple “all or none” type data streams.

The trade-off is processing power in Salesforce and the desired Architecture vs The Admins or Data Expert that usually know their data model inside and out and have the platform skills.
Another question is, if any data pre-processing belongs on the platform all together. All these questions and finding a great, well balanced solution, is what makes being an Technical Architect / Salesforce Admin really fun :-)

Important note on limits and implementation

Although outbound API calls in Salesforce don’t consume any specific limits besides the callout governor limits, calling the Data Cloud API does counts towards your Data Cloud Usage. Always speak with your certified implementation partner and Account Executive over the impact of security and costs any implementation has on your business.

Functionality

The utility is a layer built on top of the native Data Cloud API that in turn uses a Custom Auth Provider and Named Credentials.

All Apex Data Cloud Utility methods are called through the utl.Dc class. I like to keep common utility class names short and the method names descriptive. This is purely a personal choice.

Configuration metadata methods

To configure the connection the custom metadata needs to be set up accordingly. A guide on how to setup the metadata is described in the “Salesforce Data Cloud Utility and Ingestion Api UI Setup Instructions” article.

/**
* CONFIGURATION METADATA METHODS
*/
// Method to get all the metadata including child records based on on the API Name
utl__Data_Cloud_Ingestion_API_Configuration__mdt configRecord = utl.Dc.getMetadataRecord(String mdtConfigName);

// Method to get the metadata configuration labels and names in a ready to use LWC picklist format
List<Map<String,String>> mdtOptions = utl.Dc.getConfigMetadataRecordsPicklistOptions();

// Method to get the metadata configuration labels and names in a ready to use LWC picklist format
// With a fitler for Data Cloud Named credentials only. This is the name of a DATA CLOUD named credential
List<Map<String,String>> mdtOptions = utl.Dc.getConfigMetadataRecordsPicklistOptions(String namedCredentialName);

Utility Methods

There are a few utility methods to make common actions more easy. There is an option to get all your data cloud named credentials in a picklist format that can be used in an LWC.
There is an upgraded version now to get more detailed field metadata and the ability to sort a list of the FieldMetadata type by the placeInOrder field.

/**
* UTILITY METHODS
*/
// Method to get list of picklist options with only Data Cloud Named Credentials in a ready to Use LWC picklist format
// Works based on endpoint that end with "c360a.my.salesforce.com"
List<Map<String,String>> ncOptions = utl.Dc.getDataCloudNamedCredentialPicklistOptions()

// Method to get a list of field metadata that is sorted by the order in column from a Data Cloud Query result
// When removePostfix is set to true, the __c is removed from the custom fields
// See the class format below
utl.Dc.FieldMetadata[] = utl.Dc.getFieldMetadata(Map<String,Object> metadataMap, Boolean removePostfix)

// Method to create an ordered set of column names from a Data Cloud (query) Metadata response
// Uses the getFieldMetadata() method, see below for details
Set<String> columns = utl.Dc.getOrderedColumnNamesFromMetadata(Map<String,Object> metadataMap);


// Format of the utl.Dc.FieldMetadata Class
class FieldMetadata{
String name;
String type;
Integer typeCode;
Integer placeInOrder;
}

Query API Methods

The query methods allow you to query Data Cloud. The results are returned as a multidimensional array with the rows and fields if you select v2. Additional metadata is part of the API response, use the utility methods to handle these responses.
In order to get the column headers in the same order as the array, I created a utility method that gets the column names from the response metadata. With these two methods you have everything you need to handle and visualize query results.
Note that the utl.Rst class from the Lightweight — REST Util is used. This library can handle Data Cloud API error responses.

/**
* QUERY API METHODS
*/
// Method to execute a Data Cloud SQL query, uses V1 by default
// Note: This works based on the name of the Data Cloud Named Credential, not a configuration record
utl.Rst callout = utl.Dc.executeQuery(String namedCredentialName, String query);

// Allows to specify a different query API valid values are "v1" or "v2"
// Note: This works based on the name of the Data Cloud Named Credential, not a configuration record
utl.Rst callout = utl.Rst executeQuery(String namedCredentialName, String query, String apiVersion)

Data Graph Methods

Working with data graphs is now a lot easier by using the below methods. Note that these methods use the named credential name and not the mapping that is specified in the metadata.

/**
* DATA GRAPH METHODS
*/
// Method to get a list of all Data Graph Metadata
// Note: This works based on the name of the Data Cloud Named Credential, not a configuration record
List<Map<String,Object>> dgMetadataList = utl.Dc.getAllDataGraphMetadata(String namedCredentialName)

// Method to get a a single instance of the a Data Graph Metadata
// Note: This works based on the name of the Data Cloud Named Credential, not a configuration record
Map<String,Object> dgMetadata = utl.Dc.getDetailedDataGraphMetadata(String namedCredentialName, String dataGraphName)

// Method that queries the Data Graph and returns the JSON Blob from the Data Graph API response based on the data graph name and a record Id
// The record Id needs to be bassed of the primary key field
String dgJsonBlob = utl.Dc.getDataGraphJsonBlob(String namedCredentialName, String dataGraphName, String dataGraphRecordId)

// // Method that queries the Data Graph and returns the RAW JSON Response based on the data graph name and a record Id
String utl.Dc.getDataGraph(String namedCredentialName, String dataGraphName, String dataGraphRecordId)

Streaming Ingestion Methods

The streaming methods are described in detail in the “Stream Platform Events directly into Data Cloud” Article.

/**
* STREAMING INGESTION API METHODS
*/
// Method to asynchronously call the ingest API, in this case due to the "future" nature of callout
// we are required to serialize the sObjects using JSON.serialize(sObject[] records)
// Use this on (platform event) triggers
utl.Dc.streamRecordsToDataCloudAsync(String mdtConfigName, String serializedRecords);

// Method to synchronously call the ingest API with the records and the config from the metadata record
utl.Dc.streamRecordsToDataCloud(String mdtConfigName, sObject[] records);

// Method to synchronously call the ingest API with the records and the config from the metadata record
// This method takes a list of object maps as the record parameter
utl.Dc.streamRecordsToDataCloud(String mdtConfigName, List<Map<String,Object>> records);

// Method to call the ingestion API with a custom generated payload and the option to test the payload
// against the payload validation endpoint
utl.Dc.streamDataToDataCloud(String mdtConfigName, String payload, Boolean isTest)

// Method to generate a mapping between source and target object that can be used with the
// streaming API
Map<String,String> fieldMapping = utl.Dc.createFieldMapping(Data_Cloud_Ingestion_API_Field_Mapping__mdt[] fieldMappingRecords);

// Method to generate an ingestion API payload based on a mapping taking a list of SObjects as input
String payload = utl.Dc.createIngestStreamPayload(sObject[] records, Map<String,String> fieldMapping, Boolean prettyPrint);

// Method to generate an ingestion API payload based on a mapping taking a list of object maps as input
String payload = utl.Dc.createIngestStreamPayload(List<Map<String,Object>> records, Map<String,String> fieldMapping, Boolean prettyPrint);

Bulk Ingestion Methods

The bulk ingestion methods are methods for each API endpoint that exist for the Bulk Ingestion API. It will make life easier as you do not have use postman or handle the results yourself.
In short: It will give you the option to load CSV files and view and update job statuses. This is the basis for our UI.

/**
* BULK INGESTION API METHODS
*/
// Method to orchestrate the bulk API ingestion from start to finish in a single transaction
// !! This is very limited in what it can handle is size and exists for test purposes only !!
utl.Dc.ingestBulkCsvDataInDataCloud(String mdtConfigName, String operation, String[] csvFiles);

// Method to create a new bulk ingestion job, return the Id on success
// Valid operation are 'upsert' or 'delete'
// Note: this uses the metadata configuration as it needs the ingestion API details
String jobId = utl.Dc.createIngestionBulkJob(String mdtConfigName, String correlationId, String operation);

// Method to get a list of all Bulk ingestion jobs for a certain Ingestion API Connector
// Note: This works based on the name of the Data Cloud Named Credential, not a configuration record
List<Map<String,Object>> jobTable = utl.Dc.getBulkIngestionJobs(String namedCredentialName);

// Method to get all info for a specific job
Map<String,Object> jobInfo = utl.Dc.getBulkIngestionJobDetails(String namedCredentialName, String jobId);

// Method to add a CSV to the ingestion job
// Note: Uses Named credential
utl.Dc.addCsvToIngestionBulkJob(String namedCredentialName, String correlationId, String jobId, String csvData);

// Method to update the ingestion job state to 'UploadComplete' or 'Aborted'
// Note: Uses Named credential
utl.Dc.updateIngestionBulkJobState(String namedCredentialName, String correlationId, String jobId, String state);

// Method to delete a bulk job
// Note: Uses Named credential
utl.Dc.deleteIngestionBulkJob(String namedCredentialName, String correlationId, String jobId);

Data Cloud Utility UI

Detailed installation instruction can be found here “Salesforce Data Cloud Utility and Ingestion Api UI Setup Instructions”. When that is all setup you can find an app called “Data Cloud Utility” in the App Launcher. The app consists of 4 tabs:

  • Data Cloud Ingestion: Components related to bulk and streaming Ingestion are here
  • Data Cloud Query: A component to query your Data Cloud Instances and a component to query the Data Graph. This is similar to the data explorer but this one works with multiple Data Cloud Instances instead of just the home org.
  • Data Cloud Utility: Utilities to create YAML files from sObjects and create queries from sObjects that align with Ingestion Headers are here.
  • Data Cloud JSON Utility: A JSON utility to test response payloads and their behaviour when transformed to different output formats.

I have to mentioned the limits again. When you use the UI, the synchronous Apex governor limits are in place. This is a limitation we simply cannot work around when using the Salesforce UI. This means that CSV files cannot be larger than 6MB and previewing a query with 50k rows might crash your browser. Please keep in mind these limits.

Data Cloud Bulk Ingestion Utility

There is no native front-end in Salesforce to monitor, create or delete bulk ingestion jobs. For admins there is no user interface where you can simply load a CSV file into Data Cloud for eiter ingestion or deletion.
You will always require some sort of middleware or need to use postman to use any of the API functionality.
Using this utility, less technical Admins or Data Experts can easily load or delete data from Data Cloud.

The Bulk Ingestion Utility solves these issues. In this utility you can create both upsert and delete bulk jobs, add CSV data directly or upload CSV files and abort or complete the jobs. You can also monitor past and current job statuses.
Based on the field mapping in the metadata configuration, it can generate a CSV header template file that can be used as the basis to create your own CSV file.

All the actions are based on the Data Cloud Configuration Metadata, so you can monitor multiple Ingestion Adapters for multiple Data Cloud instances from a single place.

Bulk Ingestion Utility Overview
The “Add CSV Data” option generates the expected header and sample data based on the target fields that are configured in the metadata
Upload a prepared CSV file from your local machine
The “Show mapping” button will give the information for the currently selected Configuration

Data Cloud Streaming Ingestion Utility

The Streaming Ingestion Utility is a tool to help you test an Ingestion Data Stream. It generates a sample payload based on the mapping you have configured in the metadata.

This auto generated payload can then be send to Data Cloud for (manual) testing and debugging purposes.

Auto generate a sample payload based on the metadata configuration and send it to the test endpoint or stream it live to Data Cloud

The Test button sends the payload to a synchronous endpoint to validate the payload.

Data Cloud Query Utility

The Query Utility gives you the option to run SQL queries against all your Data Cloud instances. This can be any Data Cloud instance that is configured through the metadata.

The Data Cloud instance that is specified in the “Data Cloud Named Credential” section in the configuration is the instance that the query runs against. The other fields in the configuration are ignored. So you could create a configuration record without any additional details specifically to run queries.

In the utility you simply select the metadata configuration and a result output format: This can be a lighting data table or a CSV file or RAW JSON file that can be exported or copied.

These queries can be used for checking your data or exporting to a CSV file that you might want use in a deletion job. It’s not a replacement for Data Explorer, but it does work with multiple Data Cloud instances using the Apex Utility what is great for testing and debugging your implementation.

You can specify what API version v1 or v2 you want to use. You can see the difference using the RAW response output.

Run a query against any Data Cloud object. In this example our Smart Demo Data Lake Object
A Data Cloud Query result in a Lightning Data Table
The Data Cloud Query result formatted in a CSV format with headers that can be used with Bulk Ingestion
A Data Cloud Query Result as RAW API response

Data Graph Utility

The data Graph utility can be used to run Data Graph queries. The utility runs based of the Data Cloud Named Credentials.
You select a named credential in the list and all availible data graphs will be shown in the picklist. After selecting a Data Graph you select the primary key field and the record id as specified in the PK field and you press “Execute Data Graph” to run the Data Graph. You’ll get the response back in a modal where you can prettify, copy and save the results.
You can view the data graph metadata in a table view by pressing the “Metadata” button.

Data Graph Utility
The data blob value returned by the Data Graph Execution results as a key/value
The data blob value returned by the Data Graph Execution results
Metadata details (1/3)
Medata details (2/3)
Medata details (3/3)

Create Data Cloud Ingestion API YAML from sObject

To setup the objects and fields to use with the ingestion API you are required to upload a YAML file. If you are unfamiliar with the YAML syntax this can be quite tricky to get right.

The YAML from sObject Utility allows you to create a YAML file that is based on an existing sObject in the org with just a few clicks. Note the you can select multiple objects in the same YAML file.

You select the sObject you want from the picklist, you can switch between labels or API names to make it easier to find your sObject.
Once you picked your sObject, you select the fields you want to add your YAML file and simply click the “Create YAML” button.

This creates the YAML file that is ready to use in Data Cloud. You can download the file directly or copy the file body.

If you want multiple objects you can run the creation process multiple times and merge the YAML files into a single file. The first three lines have to be removed from subsequent objects you’re adding. You need to keep the spacing the way it is to work.

This is not a perfect solution but it will work really well to create an Ingestion API YAML for platform events for example or to simply give you a syntax that you can edit manually afterwards.

Data Cloud SObject to CSV Utility

This library is all about ingesting your Salesforce platform data into Data Cloud. This practically means that you are converting your sObject data into CSV formatted data and send that to Data Cloud.

The sObject to CSV Utility creates a CSV file from a SOQL Query.

If you have set up a mapping between the sObject and the Data Source Object in your metadata configuration, the headers are automatically mapped between the Salesforce field name and the Data Cloud field name.

This allows a user to run a query and create a CSV file that is instantly usable by the Bulk Ingestion API Utility. By configuring the “Salesforce sObject Name” field in the configuration a sample query will be automatically generated on the selection of the metadata configuration. It is the easiest way to convert your Salesforce data into Data Cloud Ingestible data by an end user.

This example is limited to 2000 (1000 for tooling queries) records because that is the limit forced by the REST API without requiring a “query more” solution. I have chosen the REST API due to the support of the Tooling API but also in the future it will support to query multi Salesforce Orgs.

Example of querying the Smart_Demo__c sObject and convert the results to a CSV format
Query result as a CSV output

Final Note

Using a custom library we have simplified the way we can call the Data Cloud Ingestion API endpoints directly from Salesforce. We are limited by the governor limits but if we don’t have millions of records at the same time, it can handle very decent amounts of data.

The example UI implementations should give a good idea of how the libraries can be used.

At the time of writing I am a Salesforce employee, the above article describes my personal views and techniques only. They are in no way, shape or form official advice. It’s purely informative.
Anything from the article is not per definition the view of Salesforce as an Organization.

--

--