Building a Custom ETL Pipeline for Security Data

Alexander Bolshakov
Exness Tech Blog
Published in
7 min read1 day ago

Summary

In the article, the SOC engineering team at Exness discusses creating a custom ETL pipeline for processing security events. Driven by the need for greater flexibility and performance, we aimed to surpass the standard methods of the Splunk ecosystem to handle complex data processing tasks more effectively.

Reflecting on data ingestion needs

The Splunk ecosystem offers tools and approaches for collecting events, normalizing data, and generating alerts, which work well for basic requirements. However, using standard Splunk methods to connect new data sources can slow down data queries and increase the complexity and number of knowledge objects used to process the data.

Lacking extensive Splunk experience, we at Exness were concerned about implementing advanced event processing. Despite available vendor recommendations and community solutions, we decided to develop our own ETL pipeline to better meet our needs.

Our main reasons for adopting a different approach to data ingestion were:

  • Minimizing Splunk knowledge objects to avoid unnecessary complexity.
  • Using a universal data processing stack not limited to the Splunk ecosystem.

Here are the key functional requirements for our data pipeline as our SOC evolved. We left out the mandatory steps of filtering and parsing events because their implementation didn’t influence our strategy choice.

Ingest-time data operations

Data processing is divided into two types based on when transformations occur relative to event indexing:

  • Ingest-time (before indexing)
  • Search-time (after indexing)

For more details, see the Splunk documentation on Index time versus search time.

Early on, we compared Splunk Forwarders with open-source event processing solutions. We preferred universal approaches and were willing to build our own stack. This led us to choose an ingest-time processing strategy, moving away from the Splunk ecosystem for event processing.

Performative contextual enrichment

Most corporate network components lack sufficient context for threat analytics within log events. For instance, an outbound network connection event won’t include GeoIP or ASN details, and a user accessing GitLab won’t show departmental affiliation from the HR system.

However, knowing that an IP address belongs to the hosting provider’s virtual machine address range and knowing an employee’s department affiliation clarifies the context for SOC analysts, indicating potential security incidents. Enriching the event context saves analysts time and speeds up the mean time to respond (MTTR).

Threat Intelligence (TI) lists with Indicators of Compromise (IoC) can also add threat context. While IoC-based detection isn’t accurate, it complements behavioral anomaly detection for identifying Tactics, Techniques, and Procedures (TTPs).

These anomalies underpin several scheduled Splunk rules, some using large data volumes. Threat Hunting also involves repeated manual analysis of large data sets. Optimizing queries by filtering events at search time to reduce the number of records for enrichment isn’t feasible since the enriched values themselves can be used in queries alongside the original event fields.

Thus, the event context must be added at ingest time to avoid resource-intensive enrichments during each search query.

Data purification

Events from sources can contain sensitive data, such as passwords or tokens. For example, a user’s command line might include explicit account credentials that end up in log values. Similarly, identity solution software like Okta can log passwords if users mistakenly enter them into the login field.

To reduce the risk of secrets appearing in Splunk, it’s essential to have a solution for finding and masking them at ingest-time. Initially, this can be done with regular expressions, which should be periodically updated as new secrets inevitably appear in the logs.

Log aggregation

In any SIEM licensing model, costs rise with the volume of ingested data. To manage this, it’s crucial to eliminate events that aren’t needed for analysis or regulatory reasons at ingest-time. Identical events from high-volume sources should be aggregated into a single event without losing important information. This is particularly effective for events from network devices and can significantly reduce the volume of indexed data. This approach is similar to the common practice of aggregating performance metric events in the SRE world.

Implementation of the strategy

In our ETL pipeline, we process events to their final form at ingest-time and save them in Splunk. This means we don’t need to transform events at search-time and don’t rely on built-in applications when connecting sources, except in rare cases. With our programming and data engineering skills, we are ultimately free to process events without any vendor restrictions.

We perform all event transformations in the intermediate ETL layer using popular open-source tools:

Custom programs help us collect data from sources that Vector doesn’t support. The ETL components are deployed in a Kubernetes cluster, providing benefits like scalability, isolation, resource management, fault tolerance, high availability, and simplified maintenance and updates.

Ingestion flow overview

Each source has its own set of microservices for the ETL process. The components and their functions are mostly standard across pipelines, except for some sources connected through Splunk applications.

Event Forwarders with functions:

  • Collect raw log data from the originating source.
  • Serialize raw log data into a log JSON event.
  • Filter out useless log events.
  • Load log events to Kafka as a batched payload.

Stream Consumers with functions:

  • Normalise log events according to the internal event schema.
  • Aggregate similar logs.
  • Purify log fields from secrets.
  • Enrich logs with contextual data.
  • Load processed logs to Splunk HEC.

Data transfer between components is handled by Apache Kafka, which enables horizontal scaling, protects against event loss, and improves consistency by separating event processing into two sequential stages.

Ingestion Flow Diagram

Event Forwarders are categorized by the roles they perform and their deployment schemes.

  • Agent: A service installed on a host OS to collect events by reading local files.
  • Aggregator: A Kubernetes container that consumes the event stream and processes raw log data sent via TCP.
  • Exporter: A Kubernetes container that extracts events from remote APIs or storage.

Functions can be shifted between components based on needs. For example, events from network devices are aggregated by Event Forwarders before being sent to the Event Streamer to reduce the load on Kafka and the network.

We aim to simplify Event Forwarders, especially Agents, to minimize the load on the host. Complex transformations are handled by Stream Consumers, deployed on separate Kubernetes nodes.

We primarily use Vector.dev for ETL components. For sources that provide security events through unsupported interfaces, like REST APIs, we create custom Exporters in Golang and Python, with a preference for Golang due to its stability for long-running applications and better performance.

Case study: collecting Google Workspace activity logs

Using Google Workspace as an example, we’ll show how we implement log ingestion for this unconventional source.

According to the documentation, activity events can be collected using the Admin SDK: Reports API. To gather these events, we developed an Event Forwarder with the role of Exporter, since Vector.dev doesn’t support this source (Vector.dev: Sources reference).

You can find the source code for this application on GitHub: google-workspace-audit-collector.

Process flow:

  • Event Forwarder (Exporter) collects and JSON-serializes log events from the Google Workspace Reports API, then sends them to a dedicated Kafka topic.
  • Stream Consumer extracts events from the Kafka topic, normalizing and enriching them before sending them to Splunk HEC for indexing and analysis.

Here’s how a log event is transformed by the Stream Consumer.

Raw log event collected by the Event Forwarder from the Google Workspace Reports API:

{
"actor": {
"callerType": "USER",
"email": "XXXXX.XXXXX@exness.com",
"profileId": "XXXXXXXXXXXXXXXXXX976"
},
"etag": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXtGcjgWU/XXXXXXXXXXXXXXXXXXTt6fPzwu0",
"events": [
{
"name": "ALERT_CENTER_VIEW",
"type": "ALERT_CENTER"
}
],
"id": {
"applicationName": "admin",
"customerId": "XXXXXXXXX",
"time": "2024-07-05T13:01:34.494Z",
"uniqueQualifier": "115971224149089997"
},
"ipAddress": "XXX.XXX.XXX.XXX",
"kind": "admin#reports#activity"
}

Within the ingestion flow, audit events are parsed and mapped to the data model fields used for all sources connected to Splunk.

Here’s the same event after transformation:

{
"account_uid": "XXXXXXXXX",
"app_name": "admin",
"etag": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXtGcjgWU/XXXXXXXXXXXXXXXXXXTt6fPzwu0",
"event_name": [
"ALERT_CENTER_VIEW"
],
"event_type": [
"ALERT_CENTER"
],
"event_uid": "115971224149089997",
"event_utc_time": "2024-07-05T13:01:34.494Z",
"events": [
{
"name": "ALERT_CENTER_VIEW",
"type": "ALERT_CENTER"
}
],
"kind": "admin#reports#activity",
"ldap": {
"updated": "2024-XX-XXT02:09:22.114388",
"user_group": [
"XXXXX"
],
"user_is_active": true
},
"log_source": "gws-audit-logs-exporter",
"log_sourcetype": "gws_audit",
"orgchart": {
"user_company": "Exness",
"user_department": "XXXXX",
"user_displayname": "Alexander Bolshakov",
"user_division": "XXXXX",
"user_email": "XXXXX.XXXXX@exness.com",
"user_employment": "On Site",
"user_geo": "XXXXX",
"user_hire_time": "XXXX-XX-XXT00:00:00Z",
"user_is_active": true,
"user_manager_email": "XXXXX.XXXXX@exness.com",
"user_probation_end_time": "XXXX-XX-XXT00:00:00Z",
"user_status": "employed",
"user_team": "Security Operations Center",
"user_title": "XXXXX",
"user_uid": "XXXXX"
},
"src_ip": "XXX.XXX.XXX.XXX",
"user_email": "alexander.bolshakov@exness.com",
"user_name": "alexander.bolshakov",
"user_type": "USER",
"user_uid": "XXXXXXXXXXXXXXXXXX976"
}

The transformed event is then sent to Splunk HEC and indexed to be available for analytics via the Splunk interface.

Synthetic fields, such as orgchart and ldap, containing employee information from the corporate HR system and LDAP identity storage, were added to the event. This contextual enrichment is performed by the Stream Consumer. We will discuss the technical details in the next article.

Conclusion

The Exness SOC engineering team created a custom ETL pipeline for security data. Using open-source tools and custom code, we achieved better flexibility, reduced system complexity, and optimized performance. This approach allowed us to meet our specific needs, maintain control over our technology, and minimize reliance on vendor-specific solutions. It improved our ability to manage security data efficiently and provided a scalable framework for future growth.

--

--