Complete metrics collections and analytics with Apache Karaf Decanter, Apache Kafka and Apache Druid

In this blog post, I will show how to extend the Karaf Decanter as log and metrics collection with storage and analytic powered by Apache Druid. The idea is to collect machine metrics (using Decanter OSHI collector for instance), send to a Kafka broker and aggregate and analyze the metrics on Druid.

Apache Kafka

We can ingest data in Apache Druid using several channels (in streaming mode or batch mode). For this blog post, I will use streaming mode with Apache Kafka. For the purpose of the blog, I will simply start a zookeeper:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

and kafka 2.6.1 broker:

$ bin/kafka-server-start.sh config/server.properties
...
[2021-01-19 14:57:26,528] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

I'm create a decanter topic where we gonna send the metrics:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic decanter --partitions 2
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic decanter
Topic: decanter PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: decanter Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: decanter Partition: 1    Leader: 0       Replicas: 0     Isr: 0

Apache Karaf Decanter

Now we have Kafka decanter topic ready, we can collect metrics using Decanter and sending to Kafka. I'm starting a Apache Karaf 4.3.0 runtime and load Decanter:

$ bin/karaf
        __ __                  ____      
       / //_/____ __________ _/ __/      
      / ,<  / __ `/ ___/ __ `/ /_        
     / /| |/ /_/ / /  / /_/ / __/        
    /_/ |_|\__,_/_/   \__,_/_/         

  Apache Karaf (4.3.0)

Hit '' for a list of available commands
and '[cmd] --help' for help on a specific command.
Hit '' or type 'system:shutdown' or 'logout' to shutdown Karaf.

karaf@root()> feature:repo-add decanter 2.7.0-SNAPSHOT
Adding feature url mvn:org.apache.karaf.decanter/apache-karaf-decanter/2.7.0-SNAPSHOT/xml/features
karaf@root()>   

We first add the decanter-appender-kafka that will send the collected data (metrics, log, etc, depending of the collectors) to our Kafka broker:

karaf@root()> feature:install decanter-appender-kafka

For the purpose of this blog, I will use the Decanter OSHI collector to retrieve system metrics (CPU, memory, filesystems, ...):

karaf@root()> feature:install decanter-collector-oshi

That's it ! Decanter will periodically (every minute by default) poll system metrics (thanks to the OSHI collector) and send to kafka (thanks to the kafka appender). We can see data coming in the decanter topic:

$ bin/kafka-console-consumer.sh --topic decanter --bootstrap-server localhost:9092 --from-beginning
{"@timestamp":"2021-01-19T14:10:35,169Z","diskStore_1_partition_0_minor":8,"usbDevice_1_name":"Apple Internal Keyboard / Trackpad","networkIF_2_ndisPhysicalMediumType":0,"networkIF_1_prefixLengths":"[64]","powerSource_0_voltage":12.878,"diskStore_1_partition_0_major":1,....

Apache Druid

Now, let's focus on Apache Druid. Apache Druid is a real-time analytics database. Druid is most often used as a database for powering use cases where real-time ingest, fast query performance, and high uptime are important. It's a good platform for our use case: metrics storage and analytics. Apache Druid covers features from:
  • data warehouse
  • timeseries database
  • search system
In details, Apache Druid provides the following features:
  • Druid storage is column oriented. It means that Druid stores and compresses each column individually, and only needs to read the ones needed for a particular query, which supports fast scans, rankings, and groupBy.
  • Druid has a builtin search indexes. Druid especially uses inverted indexes. An inverted index is an index data structure storing a mapping from content, such as words or numbers, to its location in a document or set of documents. The purpose of an inverted index is to allow fast full-text searches, at a cost of increased processing when a document is added to the database. Druid creates inverted indexes for string values for faster search and filter operations.
  • Druid has a "flexible" ingestion layer, allowing to get data using batch (from filesystems (HDFS, S3, ...) for instance) or streaming (from kafka for instance).
  • Similar to other timeseries databases, Druid intelligently partitions data by time to enable fast time-oriented queries.
  • Druid supports SQL allowing to easily query Druid database.

Apache Druid processes

Apache Druid has three processes: master, query, data. Druid Master manages data availability and data ingestion. For that, it runs:
  • coordinator manages data availability on the cluster
  • overlord controls the assignement of data ingestion workloads.
Druid Query handles queries from external clients. For that, it runs:
  • broker handles queries from external clients
  • router (optional) can route requests to brokers, coordinators, and overlords.
Druid Data executes ingestion workloads and deals with storage. For that, it runs:
  • historical stores queryable data
  • middlemanager responsible for ingesting data
Master, query and data are the Druid core components. Additional, Druid supports existing infrastructure via external dependencies. If you don't want to reuse existing infrastructure, you don't need these external dependencies. Deep storage is a shared filesystem accessible by Druid server (local filesystem, HDFS, S3, ...). Druid uses deep storage to store any data that has been ingested. Druid uses deep storage only as a backup of your data and transfer data in the background between Druid parts. To respond to the queries, Druid Historical do not read from deep storage, but instead read prefetched segments from their local disks. It means that Druid never uses deep storage for queries. Metadata storage holds various shared system metadata such as segment usage information and task information. It's typically a database (PostgreSQL, MySQL, Derby, ...). Zookeeper can be used for internal service discovery, coordination, and leader election.

Apache Druid designs

Druid data is stored as "datasources" (like a table in a regular database). Each datasource is partitioned by time (and optionally by other attributes). Each time range is a chunk (for instance one chunk per day). In a chunk, data is partitioned into one or more segments. So basically, the storage structure is: datasources is a set of time chunk which is a set of segments. A segment is a single file created by the middlemanager as a mutable and uncomitted segment. When the middlemanager creates a segment, it does:
  • conversion to columnar format
  • indexing with bitmap indexes
  • compression
Periodically, uncommitted segments are committed and published, and then, the committed segments go on the deep storage, becoming immutable. The committed segments are managed by Druid historical (not the middlemanager anymore). In Druid terms, indexing is the mechanism by which new segments are created, handoff is the mechanism by which segments are published and served by Druid historical. The indexing process is the following:
  1. An indexing task starts running and building a new segment. The segment identifier is created before starting to build it.
  2. If the indexing task is a readltime task (like a Kafka task) then the segment is immediately queryable at this point. It's available, but unpublished.
  3. When the indexing task has finished reading data for the segment, it pushes it to deep storage and then publishes it by writing a record into the metadata store.
  4. If the indexing task is a realtime task, at this point it waits for a Druid Historical process to load the segment. If the indexing task is not realtime task, it exists immediately.
On the coordinator/historical side, the process is:
  1. The Druid Coordinator polls metadata store peridically (by default, every 1 minute) for newly published segments.
  2. When the Druid Coordinator finds a segment that is published and used, but unavailable, it chooses a Druid Historical process to load that segment and instructs that Druid Historical to do so.
  3. The Druid Historical loads the segment and begins serving it.
  4. At this point, if the indexing task was waiting for handoff, it will exit.
Each segment has an unique identifier. This identifier has four parts:
  1. datasource name
  2. time interval (for the time chunk containing the segment aka segmentGranularity specified at ingestion time
  3. version number (which is actually the timestamp when the segment has been created)
  4. partition number (UUID)
Each segment has three areas:
  1. Segment metadata is a small JSON stored in the metadata store. Publishing is the action of inserting a record for a segment into the metadata store. These metadata records have a flag used controlling whether the segment is intended to be queryable or not.
  2. Segment data files are pushed to deep storage once a segment is constructed.
  3. Segments are available for querying on some Druid data server, like a realtime task or a Druid historical process.
As said before, the metadata store is actually a database (PostgreSQL, MySQL, Derby, ...). In the metadata store, the sys.segments table contains the segments metadata. In this table, we can see the segment ID and the following flags:
  • is_published: true if segment metadata has been published to the metadata store and used is true.
  • is_available: true if the segment is currently available for querying, either on a realtime task or Druid Historical process.
  • is_realtime: true if the segment is only available on realtime tasks. For datasources that use realtime ingestion, this will generally start off true and then become false as the segment is published and handed off.
  • is_overshadowed: true if the segment is published (used is true) and is fully overshadowed by some other published segments. Generally this is a transient state, and segments in this state will soon have their used flag automatically set to false.
Once the data is stored in Druid (in several segments), we can directly query the data in Druid. The queries first enter the Druid Broker, where the Broker will identify which segments have data that may pertain to that query. The list of segments is always pruned by the time (and may also be pruned by other attributes). The broker will then identify which historicals and middlemanagers are serving those segments and send a rewritten subquery to each of those processors. The historical/middlemanager processes will take the queries, process them and return results. The broker receives the results and merges them together to get the final answer, which it returns to the original caller. Broker pruning is an important way that Druid limits the amount of data that must be scanned for each query, but it's not the only way. For filters at a more granular level than what Druid Broker can use for pruning, indexing structures inside each segment allow Druid to figure out which (if any) rows match the filter set before looking at any row of data. Once Druid knows which rows match a particular query, it only accesses the specific columns it needs for that query. Within those columns, Druid can skip from row to row, avoiding reading data that doesn't match the query filter. So Druid uses three different techniques to maximize query performance:
  • pruning which segments are accessed for each query
  • within each segment, using indexes to identify which rows must be accessed
  • within each segment, only reading the specific rows and columns that are relevant to a particular query.

Starting Apache Druid

For this blog purpose, I will start Druid on a single machine. Druid provides convenient scripts to start the Druid processes in a simple way. As my laptop is a decent one, I'm using the micro sizing of Druid processes:

$ bin/start-micro-quickstart

start-micro-quickstart will start Druid processes on the local machine:
  • default zookeeper
  • coordinator and overload
  • broker
  • router
  • historical
  • middlemanager
Once started, you can access the Druid WebConsole on http://localhost:8888:

Data Ingestion

Now, we can plug Druid on our Kafka server to retrieve the data sent by Decanter. We can do that directly in the Druid webconsole. To add the Kafka ingestion, we go on Data Ingestion:
Then, we define the Kafka bootstrap server and topic name. Druid loads messages from Kafka to define the data format:
Druid tries to detect the format and data as rows:
We define the time column. Fortunately, Decanter cleanly populates the timestamp and so Druid detects it automatically:
The parsing of the raw data is now done. Optionally, we can transform per row basis:
or filter:
Finally the schema is ready, defining the query granularity:
NB: in my use case (metrics), I'm using query granulariy of 1 second to be able to have good data time range. We now define the segment granularity (hourly time basis in my case):
The input tuning is important because, as we use Kafka, we want to use earliest offset (streaming mode):
We name the datasource decanter:
Basically, Druid web console creates the json spec to load data. We can review the json spec and then we "submit" the spec:
Once we have submitted the ingestion, quickly, we can see:
  1. a supervisor is created to "control" the Kafka ingestion tasks
  2. the kafka tasks
We can see the Druid datasource created by the Kafka ingestion tasks:
We can see the different segments:

Analytic with Druid and Grafana

Now we have the data loaded in Druid, we can start to use the analytic capability of Druid. It means that we can easily and efficiently query data using SQL syntax (powered by Apache Calcite). For instance, we can query Druid to get the max thread count during the last hour:
You can see on this query that we use the max and interval operators. It's also possible to use avg on full data:
Thanks to this layer, it's pretty easy to analyze the data. It's also possible to directly plug Grafana on Druid, allowing to create dashboards. First, we add the Druid plugin for Grafana (available on https://github.com/grafadruid/druid-grafana). A release is available here: https://github.com/grafadruid/druid-grafana/releases/download/v1.0.0/grafadruid-druid-datasource-1.0.0.zip. We can see the Druid plugin loaded at Grafana startup:

INFO[01-20|18:35:23] Registering plugin                       logger=plugins id=grafadruid-druid-datasource

Now, in Grafana, we can create a Druid datasource:
Once the Druid datasource is created in Grafana, in the explore view, we can directly execute query on Druid (as we do in the Druid webconsole):
Now, we can create a dashboard with some panels. We create a panel for thread count:
Another panel for CPU temperature:
At the end, our dashboard can look like:

Druid Queries Scheduler with Decanter Druid Collector

It's possible to execute query on Druid via HTTP, using http://druid_broker:8082/druid/v2/sql/. In my case, as I'm running all Druid parts on my laption, I can using http://localhost:8888/druid/v2/sql/ URL. The JSON to send (using POST) is pretty simple. For instance, we can create the following query.json file:

{ "query": "select * from decanter" }

And then execute the query using curl:

$ curl -X POST -H "Content-Type: application/json" http://localhost:8888/druid/v2/sql/ -d @query.json

Then we get the query result. It's possible to use the Decanter REST collector to periodically execute this kind of queries. For convenience, I created a dedicated Druid collector: https://github.com/apache/karaf-decanter/pull/226. This Druid Decanter collector is pretty basic: you define the Druid broker location and queries set in the etc/org.apache.karaf.decanter.collector.druid.cfg configuration file:

druid.broker.location=http://localhost:8888/druid/v2/sql/

query.all=select * from decanter
query.threadCount=select sum_operatingSystem_threadCount from decanter

unmarshaller.target=(dataFormat=json)

Then, the Karaf scheduler periodically executes the queries and send each query result in the dispatcher. For instance, using the log appender, we can see the data retrieved from Druid (using the log appender):

18:53:13.935 INFO [EventAdminAsyncThread #26] {"@timestamp":"2021-01-21T17:53:13,931Z","hostName":"LT-C02R90TRG8WM","component_name":"org.apache.karaf.decanter.collector.druid","query":"foo","felix_fileinstall_filename":"file:/Users/jbonofre/Downloads/apache-karaf-4.3.0/etc/org.apache.karaf.decanter.collector.druid.cfg","unmarshaller_target":"(dataFormat=json)","druid_broker_location":"http://localhost:8888/druid/v2/sql/","decanter_collector_name":"druid","scheduler_period":60,"service_pid":"org.apache.karaf.decanter.collector.druid","result":[{"sum_operatingSystem_threadCount":1533},{"sum_operatingSystem_threadCount":1595},{"sum_operatingSystem_threadCount":1587},{"sum_operatingSystem_threadCount":1592},{"sum_operatingSystem_threadCount":1594},{"sum_operatingSystem_threadCount":1600},{"sum_operatingSystem_threadCount":1572},{"sum_operatingSystem_threadCount":1604},{"sum_operatingSystem_threadCount":1585},{"sum_operatingSystem_threadCount":1576},{"sum_operatingSystem_threadCount":1585},{"sum_operatingSystem_threadCount":1576},{"sum_operatingSystem_threadCount":1562},{"sum_operatingSystem_threadCount":1559},{"sum_operatingSystem_threadCount":1568}],"scheduler_concurrent":false,"component_id":11,"karafName":"root","hostAddress":"192.168.134.100","query_foo":"select sum_operatingSystem_threadCount from decanter","scheduler_name":"decanter-collector-druid","timestamp":1611251593931,"event_topics":"decanter/collect/druid"}

It means that we can use any Decanter appender to store the Druid query execution results in any backend (elasticsearch, cassandra, ...), and the Druid queries are scheduled. A possible use case is to create analytic queries and send data into elasticsearch for instance with a Kibana on top of it.

What's next

In this blog post, we saw how to build a metrics collection system including analytic and dashboard. I'm working on some Druid proposals and improvements in Decanter. I will share details soon.

Comments

Post a Comment

Popular posts from this blog

What's new in Apache Karaf 4.2.11 ?

What's new in Apache Karaf Decanter 2.7.0 ?