Build an Alerting System for Apache Kafka using Cloudera Streams Messaging Manager (SMM) and Apache Nifi

Or Bar Ilan
5 min readAug 31, 2020

Alerting & Monitoring Apache Kafka using Cloudera Streams Messaging Manager (SMM), Apache Nifi, ElasticSearch and Grafana

This article is intended for who already familiar with Kafka and has a cluster managed by Cloudera Manager with Streams Messaging Manager (SMM), and Nifi services installed; it shows how to build an alerting system for Kafka and explains all the steps along the way including the storage and visualization systems (we chose to use ElasticSearch and Grafana).

We will briefly present every tool before using it.

Cloudera Streams Messaging Manager (SMM):

SMM is a project that originally belonged to Hortonworks. Today, after merging the company with Cloudera. SMM under the auspices of Cloudera.

According to Cloudera documentation: “Streams Messaging Manager (SMM) is an operations monitoring and management tool that provides end-to-end visibility in an enterprise Apache Kafka® environment.”

Moreover, SMM has the ability to create and modify wealthy alert policies and alerts. But unfortunately, his notifiers provider is pretty poor - it has two options: Email and non-generic HTTP, meaning the request’s structure can’t be changed (the SMM also does not provide a way to see this structure by its UI). Our ultimate goal is to see the alerts as a time-dependent function or to use their time to create nice panels (in Grafana for instance), to overcome this problem, we decided to use Nifi but more about that later.

First Step:

Now, we would like to generate dummy SMM alerts by the Alert Policies. First, we have to create new notifier by using the top-right button called ADD NEW in the NOTIFIERS section, the HTTP Provider will have a URL to our Nifi server with open-port and base-path, for example:

URL: http://nifi.server.address:1231/contentListener

Then, adding a new policy using the top-right button called ADD NEW in the ALERT POLICIES section. At first, it is better to create a policy that will send an alert every X seconds so that we can test the flow through it, for example:

ALERT POLICY: IF ANY TOPIC has BYTES IN PER SEC = 0 (Assuming there is an idle topic)

Finally, use the ENABLE button to activate the alert policy.

Excellent, we achieve a streaming data of alerts for Nifi.

Majin Buu

Apache Nifi:

Nifi was built to automate the flow of data between systems. In our case, we want to use it to transfer the data between SMM and our storage system (we chose ElasticSearch), while adding a properly timestamp field for each event (termed as Record in Nifi).

Second Step:

First, create a new processor group in Nifi with ListenHTTP processor which is configured as follows :

As you can see, the Listening Port should be equal to the port configured in our SMM notifier URL property.

Then, add new EvaluateJsonPath processor which connected to our ListenHTTP processor, should be configured as:

The reason we need to add the new attribute called “updateTimestamp” is because each flow file (each alert sent) contains the time it was actually created and needs to be extracted so that we can perform a transformation on it.

IMPORTANT ! DO NOT CONFUSE WITH THE JSON FIELDS: “updateTimestamp” and “createTimestamp”, the field extracted is “$.updateTimestamp”.

Transformation is something that is necessary, ElasticSearch required timestamp in a specific format by default, like:

“2020–08–30T20:00:00.000Z” (GMT)

Not an epoch timestamp.

In order to achieve this format: add new UpdateRecord processor which connected to our EvaluateJsonPath processor and configured as:

new Property “/@timestamp” value is : ${updateTimestamp:format(“yyyy-MM-dd’T’HH:mm:ss:SSS’Z’”,”GMT”)

The new property “/@timestamp” take care of adding a new field called “@timestamp” to the flow-file. This field is “updateTimestamp” with the correct format.

After that, add new PutElasticsearchRecord processor which connected to our UpdateRecord processor, should be configured as:

The Client Service property is really important, we should configure it carefully with correct HTTP Host property (this is the ElasticSearch route).

In the end, the processor group should look like:

Excellent, we achieve a streaming data of alerts for ElasticSearch.

ElasticSearch:

According to ElasticSearch Documentation: “Elasticsearch is a distributed, open-source search and analytics engine for all types of data, including textual, numerical, geospatial, structured, and unstructured… An Elasticsearch index is a collection of documents that are related to each other. Elasticsearch stores data as JSON documents.”

ElasticSearch is the “last stop” of our data before the visualization phase.

Third step:

In our ElasticSearch, we add a new index called “smm-alerts” corresponding to the index we wrote in PutElasticsearchRecord.

We can add a Kibana index to show the data or we can use Grafana.

Grafana:

Grafana is an open-source analytics & monitoring solution for every database. We will use it to create some nice dashboards and panels for our alerts data.

Fourth step:

First, we configure a brand new ElasticSearch data source as follows:

Then, we can create some new dashboards to show our alerts on graphs (using this guide: Using Elasticsearch in Grafana or Grafana with Elasticsearch tutorial)

Here is an example:

Example for a dashboard with SMM alerts data in Grafana

Congratulations, We implemented a solution for SMM alerts visualization!

Barney Stinson

--

--