Complete metrics collections and analytics with Apache Karaf Decanter, Apache Kafka and Apache Druid
In this blog post, I will show how to extend the Karaf Decanter as log and metrics collection with storage and analytic powered by Apache Druid.
The idea is to collect machine metrics (using Decanter OSHI collector for instance), send to a Kafka broker and aggregate and analyze the metrics on Druid.
Apache Kafka
We can ingest data in Apache Druid using several channels (in streaming mode or batch mode). For this blog post, I will use streaming mode with Apache Kafka. For the purpose of the blog, I will simply start a zookeeper:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
and kafka 2.6.1 broker:
$ bin/kafka-server-start.sh config/server.properties
...
[2021-01-19 14:57:26,528] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
I'm create a decanter topic where we gonna send the metrics:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic decanter --partitions 2
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic decanter
Topic: decanter PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: decanter Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: decanter Partition: 1    Leader: 0       Replicas: 0     Isr: 0
Apache Karaf Decanter
Now we have Kafkadecanter topic ready, we can collect metrics using Decanter and sending to Kafka.
I'm starting a Apache Karaf 4.3.0 runtime and load Decanter:
$ bin/karaf
        __ __                  ____      
       / //_/____ __________ _/ __/      
      / ,<  / __ `/ ___/ __ `/ /_        
     / /| |/ /_/ / /  / /_/ / __/        
    /_/ |_|\__,_/_/   \__,_/_/         
  Apache Karaf (4.3.0)
Hit '' for a list of available commands
and '[cmd] --help' for help on a specific command.
Hit '' or type 'system:shutdown' or 'logout' to shutdown Karaf.
karaf@root()> feature:repo-add decanter 2.7.0-SNAPSHOT
Adding feature url mvn:org.apache.karaf.decanter/apache-karaf-decanter/2.7.0-SNAPSHOT/xml/features
karaf@root()>   
  
We first add the decanter-appender-kafka that will send the collected data (metrics, log, etc, depending of the collectors) to our Kafka broker:
karaf@root()> feature:install decanter-appender-kafka
For the purpose of this blog, I will use the Decanter OSHI collector to retrieve system metrics (CPU, memory, filesystems, ...):
karaf@root()> feature:install decanter-collector-oshi
That's it ! Decanter will periodically (every minute by default) poll system metrics (thanks to the OSHI collector) and send to kafka (thanks to the kafka appender).
We can see data coming in the decanter topic:
$ bin/kafka-console-consumer.sh --topic decanter --bootstrap-server localhost:9092 --from-beginning
{"@timestamp":"2021-01-19T14:10:35,169Z","diskStore_1_partition_0_minor":8,"usbDevice_1_name":"Apple Internal Keyboard / Trackpad","networkIF_2_ndisPhysicalMediumType":0,"networkIF_1_prefixLengths":"[64]","powerSource_0_voltage":12.878,"diskStore_1_partition_0_major":1,....
Apache Druid
Now, let's focus on Apache Druid. Apache Druid is a real-time analytics database. Druid is most often used as a database for powering use cases where real-time ingest, fast query performance, and high uptime are important. It's a good platform for our use case: metrics storage and analytics. Apache Druid covers features from:- data warehouse
- timeseries database
- search system
- Druid storage is column oriented. It means that Druid stores and compresses each column individually, and only needs to read the ones needed for a particular query, which supports fast scans, rankings, and groupBy.
- Druid has a builtin search indexes. Druid especially uses inverted indexes. An inverted index is an index data structure storing a mapping from content, such as words or numbers, to its location in a document or set of documents. The purpose of an inverted index is to allow fast full-text searches, at a cost of increased processing when a document is added to the database. Druid creates inverted indexes for string values for faster search and filter operations.
- Druid has a "flexible" ingestion layer, allowing to get data using batch (from filesystems (HDFS, S3, ...) for instance) or streaming (from kafka for instance).
- Similar to other timeseries databases, Druid intelligently partitions data by time to enable fast time-oriented queries.
- Druid supports SQL allowing to easily query Druid database.
Apache Druid processes
Apache Druid has three processes: master, query, data. Druid Master manages data availability and data ingestion. For that, it runs:- coordinator manages data availability on the cluster
- overlord controls the assignement of data ingestion workloads.
- broker handles queries from external clients
- router (optional) can route requests to brokers, coordinators, and overlords.
- historical stores queryable data
- middlemanager responsible for ingesting data
Apache Druid designs
Druid data is stored as "datasources" (like a table in a regular database). Each datasource is partitioned by time (and optionally by other attributes). Each time range is a chunk (for instance one chunk per day). In a chunk, data is partitioned into one or more segments. So basically, the storage structure is: datasources is a set of time chunk which is a set of segments. A segment is a single file created by the middlemanager as a mutable and uncomitted segment. When the middlemanager creates a segment, it does:- conversion to columnar format
- indexing with bitmap indexes
- compression
- An indexing task starts running and building a new segment. The segment identifier is created before starting to build it.
- If the indexing task is a readltime task (like a Kafka task) then the segment is immediately queryable at this point. It's available, but unpublished.
- When the indexing task has finished reading data for the segment, it pushes it to deep storage and then publishes it by writing a record into the metadata store.
- If the indexing task is a realtime task, at this point it waits for a Druid Historical process to load the segment. If the indexing task is not realtime task, it exists immediately.
- The Druid Coordinator polls metadata store peridically (by default, every 1 minute) for newly published segments.
- When the Druid Coordinator finds a segment that is published and used, but unavailable, it chooses a Druid Historical process to load that segment and instructs that Druid Historical to do so.
- The Druid Historical loads the segment and begins serving it.
- At this point, if the indexing task was waiting for handoff, it will exit.
- datasource name
- time interval (for the time chunk containing the segment aka segmentGranularityspecified at ingestion time
- version number (which is actually the timestamp when the segment has been created)
- partition number (UUID)
- Segment metadata is a small JSON stored in the metadata store. Publishing is the action of inserting a record for a segment into the metadata store. These metadata records have a flag usedcontrolling whether the segment is intended to be queryable or not.
- Segment data files are pushed to deep storage once a segment is constructed.
- Segments are available for querying on some Druid data server, like a realtime task or a Druid historical process.
sys.segments table contains the segments metadata. In this table, we can see the segment ID and the following flags:
- is_published:- trueif segment metadata has been published to the metadata store and- usedis- true.
- is_available:- trueif the segment is currently available for querying, either on a realtime task or Druid Historical process.
- is_realtime:- trueif the segment is only available on realtime tasks. For datasources that use realtime ingestion, this will generally start off- trueand then become- falseas the segment is published and handed off.
- is_overshadowed:- trueif the segment is published (- usedis true) and is fully overshadowed by some other published segments. Generally this is a transient state, and segments in this state will soon have their- usedflag automatically set to- false.
- pruning which segments are accessed for each query
- within each segment, using indexes to identify which rows must be accessed
- within each segment, only reading the specific rows and columns that are relevant to a particular query.
Starting Apache Druid
For this blog purpose, I will start Druid on a single machine. Druid provides convenient scripts to start the Druid processes in a simple way. As my laptop is a decent one, I'm using themicro sizing of Druid processes:
$ bin/start-micro-quickstart
start-micro-quickstart will start Druid processes on the local machine:
- default zookeeper
- coordinator and overload
- broker
- router
- historical
- middlemanager
Data Ingestion
Now, we can plug Druid on our Kafka server to retrieve the data sent by Decanter. We can do that directly in the Druid webconsole. To add the Kafka ingestion, we go on Data Ingestion: Then, we define the Kafka bootstrap server and topic name. Druid loads messages from Kafka to define the data format: Druid tries to detect the format and data as rows: We define the time column. Fortunately, Decanter cleanly populates the timestamp and so Druid detects it automatically: The parsing of the raw data is now done. Optionally, we can transform per row basis: or filter: Finally the schema is ready, defining the query granularity: NB: in my use case (metrics), I'm using query granulariy of 1 second to be able to have good data time range. We now define the segment granularity (hourly time basis in my case): The input tuning is important because, as we use Kafka, we want to use earliest offset (streaming mode): We name the datasourcedecanter:
Basically, Druid web console creates the json spec to load data. We can review the json spec and then we "submit" the spec:
Once we have submitted the ingestion, quickly, we can see:
- a supervisor is created to "control" the Kafka ingestion tasks
- the kafka tasks
Analytic with Druid and Grafana
Now we have the data loaded in Druid, we can start to use the analytic capability of Druid. It means that we can easily and efficiently query data using SQL syntax (powered by Apache Calcite). For instance, we can query Druid to get the max thread count during the last hour: You can see on this query that we use themax and interval operators.
It's also possible to use avg on full data:
Thanks to this layer, it's pretty easy to analyze the data.
It's also possible to directly plug Grafana on Druid, allowing to create dashboards.
First, we add the Druid plugin for Grafana (available on https://github.com/grafadruid/druid-grafana).
A release is available here: https://github.com/grafadruid/druid-grafana/releases/download/v1.0.0/grafadruid-druid-datasource-1.0.0.zip.
We can see the Druid plugin loaded at Grafana startup:
INFO[01-20|18:35:23] Registering plugin                       logger=plugins id=grafadruid-druid-datasource
Now, in Grafana, we can create a Druid datasource:
Once the Druid datasource is created in Grafana, in the explore view, we can directly execute query on Druid (as we do in the Druid webconsole):
Now, we can create a dashboard with some panels.
We create a panel for thread count:
Another panel for CPU temperature:
At the end, our dashboard can look like:
Druid Queries Scheduler with Decanter Druid Collector
It's possible to execute query on Druid via HTTP, using http://druid_broker:8082/druid/v2/sql/. In my case, as I'm running all Druid parts on my laption, I can using http://localhost:8888/druid/v2/sql/ URL. The JSON to send (using POST) is pretty simple. For instance, we can create the followingquery.json file:
{ "query": "select * from decanter" }
And then execute the query using curl:
$ curl -X POST -H "Content-Type: application/json" http://localhost:8888/druid/v2/sql/ -d @query.json
Then we get the query result.
It's possible to use the Decanter REST collector to periodically execute this kind of queries.
For convenience, I created a dedicated Druid collector: https://github.com/apache/karaf-decanter/pull/226.
This Druid Decanter collector is pretty basic: you define the Druid broker location and queries set in the etc/org.apache.karaf.decanter.collector.druid.cfg configuration file:
druid.broker.location=http://localhost:8888/druid/v2/sql/
query.all=select * from decanter
query.threadCount=select sum_operatingSystem_threadCount from decanter
unmarshaller.target=(dataFormat=json)
Then, the Karaf scheduler periodically executes the queries and send each query result in the dispatcher.
For instance, using the log appender, we can see the data retrieved from Druid (using the log appender):
18:53:13.935 INFO [EventAdminAsyncThread #26] {"@timestamp":"2021-01-21T17:53:13,931Z","hostName":"LT-C02R90TRG8WM","component_name":"org.apache.karaf.decanter.collector.druid","query":"foo","felix_fileinstall_filename":"file:/Users/jbonofre/Downloads/apache-karaf-4.3.0/etc/org.apache.karaf.decanter.collector.druid.cfg","unmarshaller_target":"(dataFormat=json)","druid_broker_location":"http://localhost:8888/druid/v2/sql/","decanter_collector_name":"druid","scheduler_period":60,"service_pid":"org.apache.karaf.decanter.collector.druid","result":[{"sum_operatingSystem_threadCount":1533},{"sum_operatingSystem_threadCount":1595},{"sum_operatingSystem_threadCount":1587},{"sum_operatingSystem_threadCount":1592},{"sum_operatingSystem_threadCount":1594},{"sum_operatingSystem_threadCount":1600},{"sum_operatingSystem_threadCount":1572},{"sum_operatingSystem_threadCount":1604},{"sum_operatingSystem_threadCount":1585},{"sum_operatingSystem_threadCount":1576},{"sum_operatingSystem_threadCount":1585},{"sum_operatingSystem_threadCount":1576},{"sum_operatingSystem_threadCount":1562},{"sum_operatingSystem_threadCount":1559},{"sum_operatingSystem_threadCount":1568}],"scheduler_concurrent":false,"component_id":11,"karafName":"root","hostAddress":"192.168.134.100","query_foo":"select sum_operatingSystem_threadCount from decanter","scheduler_name":"decanter-collector-druid","timestamp":1611251593931,"event_topics":"decanter/collect/druid"}
It means that we can use any Decanter appender to store the Druid query execution results in any backend (elasticsearch, cassandra, ...), and the Druid queries are scheduled.
A possible use case is to create analytic queries and send data into elasticsearch for instance with a Kibana on top of it.






















 
 
Comments
Post a Comment