How Druid Kafka Indexing Service works - detailed explanation

Kafka Indexing Service (KIS) is an extension for Druid that enables both real-time and historical data ingestion from Kafka topics with exactly-once guarantee.

It is meant to replace old Druid Real-Time nodes as a more reliable and flexible solution. In opposite to Real-Time node mechanism or new Tranquility feature, KIS is also able to read non-recent events from Kafka and is not subject to the window period considerations imposed on other ingestion mechanisms.

Kafka Ingestion Service uses Overlord and Middlemanager for its ingestion and indexing task.

Below we explain what is a detailed data flow, from ingestion to querying when using Kafka Ingestion Service.

  1. KIS tasks reads event stream from Kafka and creates a set of partitions stored in memory. Number of created partitions equals number of Kafka partitions * segmentGranularity interval.

  2. These partitions are save to disk when maxRowsInMemory is reached. Bear in mind that all these data (both in-memory and saved on disks) are available for query requests.

  3. When taskDuration is reached, the KIS task stops reading from Kafka and merges the spilled partitions together into a final set of partitions.

  4. This final set of partitions is moved into deep storage (S3, HDFS, NFS) and the segment descriptors are written into the metadata table. The task waits for handoff.

  5. To this moment those segments are not available to query via Historical nodes, because they're not there yet. So the querying requests for this data are handled by Middlemanagers.

  6. One of the Coordinator tasks is to periodically check the metadata database for new, unserved segments, and when it detects these segments, it instructs the historical nodes to load the segments according to a cost balancing algorithm.

  7. The Historical nodes pull the segments from deep storage into their local segment cache, which are then loaded into Historical server memory. The name local segment cache can be misleading, as both Historical and Broker nodes have its own, separate cache. So the local segment cache means all segments available for querying that are stored on Historical nodes.

  8. Once the Historical nodes have loaded all the segments, the indexing task is notified that it can stop serving queries for the segments it generated since the Historicals will now take over that responsibility. It's now done and the process terminates.