Consume Slack’s Events API with Cloudera Data Flow For Public Cloud

Ferenc Kis
Cloudera
Published in
10 min readFeb 8, 2023

Introduction

In this blogpost we will see how to consume Slack’s Events API with Cloudera Data Flow. This is the second part of the Consume Slack’s Events API with Cloudera … blogpost series.
In the previous post, Consume Slack’s Events API with Cloudera Flow Management, we set up a Slack account for testing purposes, and created a CFM flow to consume events from Slack.
Now we will see how to use Cloudera DataFlow for Public Cloud (CDF-PC) to create a production grade deployment for the same use-case.

What is Cloudera Data Flow

Cloudera DataFlow for Public Cloud (CDF-PC) is a cloud-native service that enables self-serve deployments of Apache NiFi data flows from a central catalog. DataFlow Deployments provides a cloud-native runtime to run your Apache NiFi flows through auto-scaling Kubernetes clusters, and centralised monitoring and alerting capabilities for the deployments.

Why Cloudera Data Flow

By using Cloudera Data Flow we gain the following benefits:

  • High availability: nodes in the CDF cluster are constantly monitored. In case of any issues, nodes are automatically replaced, thus the cluster will always be available.
  • Auto scaling: CDF makes it possible to auto scale NiFi clusters. It is enough to configure the range of node count. If the cluster gets overwhelmed, CDF will scale up the cluster to be able to serve the increased load. When node utilisation decreases and reaches a particular threshold, CDF will scale down the cluster and decommission the unnecessary nodes.
  • Security: CDF clusters are secured out of the box. Using a CDF cluster requires authentication and authorisation. Additionally traffic between cluster nodes is also secured.
  • Cost effectiveness: With the one flow per cluster approach there is no need to provision large monolithic clusters anymore. CDF provisions a just enough cluster for your flow. Additionally with auto scaling CDF can react to the changing resource needs.
  • No maintenance: You don’t need to provision and maintain clusters anymore. CDF will automatically allocate the necessary resources, and deploy the flow with

Update the CFM flow

We will need to make some changes to the CFM flow created in the previous blog post in order to make it CDF compatible and production ready.

We will discuss only the changes in the flow structure and processor configuration. Everything else remains the same.

Here is how the updated flow looks like:

  • An UpdateAttribute processor is added between EvaluateJsonPath and HandleHttpResponse processors. EvaluateJsonPath processor failure relationship (which means no JSON payload was available) is connected to UpdateAttribute processor, which then adds a flowfile attribute with name valid_record and value false.
    We don’t want to pass these flowfiles for downstream processing, so we are marking those to be able to filter them later.
  • A RouteOnAttribute processor is connected to the relationships of HandleHttpResponse processor. We use RouteOnAttribute processor to split the flow into two, valid and invalid events, based on the attribute added in the previous section.
  • PutCDPObjectStore processors are connected to the relationships of RouteOnAttribute processor for persisting events to the appropriate locations. For different use-cases these can be replaced by PutHDFS, PublishKafkaRecord, PutHBase, etc. depending on your specific use case.
  • A parameter context is created to make the flow customisable. In CDF when deploying a flow, the values of these parameters can be overridden. This enables customising your flow as per your needs. Create a parameter context with the following parameters, default values can be anything.

Processor configuration

HandleHttpRequest

Parameters were introduced to replace hard coded values to make the flow customisable. The default values of the parameters can be overridden during flow deployment time.

  • Listening Port: Replaced the hard coded port with #{inbound_flow} parameter reference to make it configurable.
  • SSL Context Service: Create a new StandardRestrictedSSLContextService, and name it “Inbound SSL Context Service”. No other configuration is required. The SSL context service will be created and fields will be populated during cluster deployment.
  • Allowed paths: Replaced the hard coded port with #{allowed_paths} parameter reference to make it configurable.
  • Allow POST: Slack Events API uses HTTP POST requests when sending data, this needs to be set to true. For security reasons it is recommended to set all of the other allowed HTTP methods to false.
  • Client authentication: Replaced the hard coded port with
    #{client_authentication} parameter reference to make it configurable.
    Slack Events API integration currently does not support mTLS authentication, so currently the default value is No Authentication to allow Slack pushing requests over HTTPS without authentication.

EvaluateJsonPath

There were no changes in this processor’s configuration.

UpdateAttribute

Add a dynamic parameter with name “valid_event” and value “false”. UpdateAttribute processor will add this dynamic parameter to the flowfile as a flowfile attribute. We will use this attribute later to direct flowfiles to the appropriate sink processors.

HandleHttpResponse

There were no changes in this processor’s configuration.

RouteOnAttribute

RouteOnAttribute processor uses the flowfile attributes to route the flowfile to the appropriate destination. Flowfiles tagged with valid_event=false attribute and flowfiles having the Slack challenge token as attribute are routed to the invalid_event relationship. All other flowfiles are routed to the default unmatched relationship.

PutCDPObjectStore

The flow is terminated by two PutCDPObjectStore processors. One for the “valid” flowfiles, and one for the flowfiles marked as invalid events. The Directory properties are filled in with /tmp/events and /tmp/invalid_events values respectively. Also CDP Username and CDP Password will be injected from the parameter context. A machine user is required to be registered in CDP Management Console, so it can be used here as CDP User to access the default cloud storage,
If you need more information about CDF please consult the official Cloudera documentation or contact Cloudera Support.

Export the CFM Flow

Right click on the canvas in CFM. Go to Download flow definition, and choose Without external services. Give it a descriptive name, and save the file.

Flow deployment prerequisites

Before deploying the flow make sure that the following criteria are fulfilled.

  • You have access to Cloudera Data Platform
  • You have created an Environment and a Data Lake
  • Your environment has public subnets and public connectivity is enabled
  • Cloudera DataFlow is enabled
  • Public Endpoint is enabled for Cloudera DataFlow

If any of the previous criteria are not fulfilled please consult the official Cloudera documentation or contact Cloudera Support.

Import the flow

Before the flow can be deployed, we need to import the flow exported in the previous section into Cloudera DataFlow Catalog.

Open Cloudera DataFlow in a browser. Click on Catalog on the left menu, then on the Import Flow Definition button in the upper right corner.

Give an arbitrary name to the flow, and select the flow definition file you have exported previously. Once done, click on Import.

Find the imported flow in the catalog, and select it. Then click on the Deploy button.

Select the Target Environment on the appearing window, then click on Continue.

You will be redirected to the Flow Deployment Wizard.

Flow Deployment Wizard

Overview

Give a name to the deployment. It can be any arbitrary name. Then click on Next.

NiFi Configuration

On the NiFi Configuration tab enable Allow NiFi to receive data in the Inbound Connections section. CDF will automatically create an endpoint which we will use as the request URL when we will update the Slack configurations. Also CDF recognises the port from the flow configuration.
In my case it looks like this:

Copy and paste the Endpoint Hostname to a text editor, we will need it in the next step.

Click on the Next button.

Parameters

On the Parameters page we can see all the parameters with default values, in exactly the same way how we have defined them in the Parameter Context.
This is how a CDF flow is configurable. If you wish you can change the values here.

Now we have all properties to create our Slack Events API request URL. Copy the inbound_port and allowed_paths parameters and add them to the previously copied endpoint hostname to form the full URL.
Mine looks like this:
https://slackeventsdeployment.inbound.dfx.o7fe0mjh.xcu2-8y8x.dev.cldr.work:9876/events

Save this URL in a text editor, we will need it later. Click on the Next button.

Sizing & Scaling

The size and the number of the NiFi nodes can be configured on this page. Also we can turn auto scaling on. In my case I went with the Extra Small node type and allowed Auto Scaling up to 3 NiFi instances. In this way the CDF flow will be able to react to changes in the traffic: it will allocate more resources if more processing power is necessary, and it will release resources once those become unnecessary. In this way we can achieve high availability while optimising our costs. Depending on your use case you may need to use different NiFi node types and node numbers.

Click on the Next button.

Key Performance Indicators

We can select pre-defined metrics, and configure alerts with particular criteria. This comes really handy in a production environment as we can get real time feedback about how our flow is performing.
We will use the Data In metric and define an alert that if inbound data volume is under a threshold value for a given time, then CDF should trigger an alert.

Click on Add new KPI.

Fill in the form in a similar way. We intentionally used such small values top get almost instant feedback that the configured alert works.
In real life these should be much bigger values.

Once done click on the Add button, then click on Next.

Review

Recheck the settings we have given, and if everything looks okay, click on Deploy.
The deployment will take some time. The progress can be tracked on the appearing window.

Once the deployment is done, click on the Manage Deployment button.

Verify the deployment

Click on the Actions button on the top right corner, then on View in NiFi.

You should see the same NiFi flow which was exported in the previous step. All processors should be in running state.

Update Slack Configuration

Open https://api.slack.com/apps and go to Event Subscriptions on the left side. Click on the Change button next to the Request URL field, and insert the request URL we have saved before for later use.

Click on Save Changes once done.

Put it all together

Although updating the New Request URL in the previous section was successful, which meant that the CDF flow was able to respond to Slack‘s request, we still want to see that the flow receives and processes other events from Slack as well.
I have sent three direct messages in Slack.

And in the CDF flow I can see that three flowfiles are piled up in the queue before the PutCDPObjectStore processor which is meant to persist the valid flowfiles.
Note: The PutCDPObjectStore processor was intentionally stopped, so the flowfiles could queue up in the connection and their content could be easily checked.

When checking one of the flowfiles content, it also proves to be correct.

After starting the stopped PutCDPObjectStore processor, we can see that the flowfiles are persisted into the configured directory in the default CDP bucket.

Now it’s time to check how the configured KPI and alert behaves. To test this we should send some additional messages from Slack, then stop sending. If we go back to Dashboard on the left menu, then select our deployment we can see that the inbound data is measured as expected.

After having stopped sending messages from Slack and waiting for some time, we can see that the related alert kicks in.

We have successfully deployed our flow to Cloudera DataFlow, and managed to set up integration with Slack.

Congratulations! Job well done! :)

Conclusion

In this blog post we have seen how to modify the CFM flow from the previous blog post and deploy it to Cloudera DataFlow.
By applying these changes we created a highly available, scalable, secure and cost effective deployment.
Hope you enjoyed the post and found it useful.

--

--