How do Apache Kafka streams work

Introduction to Apache Kafka

motivation

Apache Kafka is a persistent, distributed message broker, streaming and data integration platform. It is often used as a central data hub and streaming component in a fast data platform.

Overview and terminology

Kafka collects news and persists them on disk. Messages are data that are published and received (publish-subscribe).

Kafka is called through several nodes Broker (English for brokers, intermediaries), distributed in order to scale and ensure reliability. A Kafka cluster is coordinated by Apache ZooKeeper.

Messages are in named containers called Topics summarized. New messages are appended to the end of a topic and are assigned an incremented number. This number is called Offset. Kafka is therefore also called log referred to in the sense of a continuous sequence of messages. A topic is replicated to several brokers.

A topic is in Partitions divided. The number of partitions is set when the topic is created, and it determines how the topic scales. The messages of a topic are distributed to the partitions. The offset applies per partition.

Producer publish messages in topics. Consumer subscribe to topics and read the incoming messages in the stream.

Persistence

Kafka saves every message it receives on the hard drive in segments. A segment is a file that contains a certain, configurable amount of messages before a new segment is opened.

Despite persistence, Kafka has a very high performance. This is achieved, among other things, by writing messages to the end of the segment file that is always open, and by scaling across partitions. In addition, writes go to the Linux page cache, reads come from the page cache via a network socket. When writing, unlike a database, no user-defined indexes are updated, only the offset is incremented and the mapping is updated from offset to message.

SSDs are generally recommended, but rotating hard disks are also very efficient thanks to this type of access.

Unlike many message queues, data in Kafka is not removed after it has been consumed.

Topics

Overview

A topic is a collection of messages with a unique name. Kafka does not specify what kind of data ends up in a topic, for Kafka it is just a container that has a name and certain configuration parameters. A topic is always written serially at the end (append-only).

A topic is explicitly created with a replication factor and a number of partitions. Many configuration parameters in Kafka are topic-specific or can be overridden at the topic level.

Partitions

Each topic is divided into sub-units, that is partitioned. Partitions are the basic mechanism by which both scaling and replication work. If a topic is written to or read from a topic, it always refers to a partition. The offset also applies per partition.

Each partition is sorted by its offset. When you write a message in a topic, you have the option of specifying a key. The hash of this key ensures that all messages with the same key end up in the same partition. The order of incoming messages is guaranteed within a partition.

Segments

Ultimately, a partition corresponds to several segment files on the hard disk. New messages are saved in the active segment. When the segment reaches the set size or time limit, it will be closed. Then a new segment is created, which is now the active segment.

Each segment file is named after the offset of the first message it contains and ends with. There is also a suitable index file that maps the offsets to the position of the messages in the segment file.

The following folder structure would exist on a broker that hosts partitions 0 and 2 of the “Test” topic with 100 messages per segment.

A high number of partitions means, on the one hand, that more parallelism is possible when writing and reading, but also that more open file handles are active and more threads are occupied. If there are few partitions, the parallelism may be too low.

If only a few messages are to be processed, a low number of partitions is sufficient. You usually start with a small value and then adjust gradually. Increasing partitions is straightforward, decreasing partitions is not possible as this would result in data loss.

Log receipt

For each topic, a restriction can be set in the form of a time-to-live (TTL) or vice versa. Retention) in Kafka's language, according to time (parameters,,), according to size (parameters), or both. The storage space is the limiting factor, i.e. topics can have a TTL of one year. If retention is set, Kafka removes the closed segments of the partitions in which all messages have exceeded the maximum age. With a size limit, closed segments are removed as soon as the limit in bytes per partition is exceeded in total. The active segment remains unaffected in both cases.

There are also Compacted Topicsthat only hold the latest message for each key. Old messages with the same key are removed at regular intervals. A message with no content (= zero) deletes all messages with the key.

Replication, leaders and replicas

As mentioned, Kafka is distributed across several brokers and replicates topics for reliability. Here, too, the partitions are the means to enable uncomplicated replication. Each node is either a leader or a replica for a partition.

The Replication factor of a topic states the number of brokers to which each partition of a topic is replicated. For example, a factor of 2 would mean that each partition would be stored on another replica next to its leader. A partition with the factor r therefore exists r times in the cluster.

Not the topic but every single partition has a leader partition and r-1 followers. The leader partition is simply the partition on the assigned leader. Which broker leader is for which partition is selected automatically by round robin when creating the topic, but it can also be done manually using partition maps. This is even necessary if the number of partitions in a topic is to be changed later.

There is a special role In-Sync Replica (ISR) too. They describe the replicas that are on the stand of the leader. If a leader falls, a new one is elected from the pool of ISRs.

This means that r-1 ISRs can fail without data loss taking place.

When creating a topic, the replication factor cannot be higher than there are brokers in the cluster. That doesn't sound worth mentioning at first, but it also means that if brokers are down, a topic may not be able to be created that could otherwise be created because there are currently too few brokers. If brokers are down and the replication factor of an existing topic can no longer be filled, the partitions on the broker are marked as under-replicated.

news

A message consists of a sequence of bytes. A message can also be assigned a key, also consisting of a byte sequence. In addition, metadata such as the offset, a checksum, message size, compression and timestamp are available for the message. You can also add your own headers.

The content of the message and the key, including their format, are therefore entirely the responsibility of the user. The Kafka Client API only offers de / serializers for strings and byte arrays. A dedicated de / serializer must ultimately always read or generate a byte array so that Kafka can process the data.

Messages or batches of messages can be compressed. Kafka is able to save the compressed data directly without unpacking it beforehand. The compression format is stored in the metadata of the messages. Compression represents one of the greatest performance gains when using Kafka.

Messages also have headers that, like under HTTP, consist of a key-value pair. Headers are always used when you want to decide on the further processing of the data without knowing the content of the message. For example, the data format could be stored in the header using a content type (JSON, Avro, Protobuf, etc.). Or the data may not be accessed at all if the data is encrypted or not accessible under data protection law, e.g. data generated in Europe should not be replicated in a data center in the USA.

Producer

Overview

A producer writes news in topics.

When the message is sent, the key can be set via which Kafka uses hash to determine the partition into which the producer writes the message. If no key is available, the partition is selected at random. At the beginning, the producer connects to any broker from the cluster, which then mediates the leader of the partition to which the producer transmits the message.

A Kafka producer works synchronously and writes his messages in batches. The batch size or a maximum waiting time can be configured in order to optimize latency or throughput.

Durability

An important configuration parameter is. It controls the reliability and the throughput. He stands up 0, is sent in fire-and-forget mode. There are no guarantees that a message was written, but throughput is at its maximum. This mode is relevant, for example, for unit tests that should be carried out as quickly as possible. Get up acks 1, confirmation from the leader is enough. Any other number n then requires n acknowledgments from Leader + n-1 replicas. Alles ultimately means that the configured minimum number of ISRs (broker configuration parameters) must have given their confirmation for the write.

Overall, the more confirmations, the more secure the write, but also the lower the performance. If the requirements of acks cannot be met, an exception is thrown.

compression

Compression () should always be activated. By default, Snappy is mostly used, depending on the environment, other algorithms such as LZ4 can offer better performance. Topics themselves can also be compressed. If you choose the value for the compression of the topic, i.e. in the broker configuration producer, not only is the same algorithm used as on the producer, but Kafka saves CPU time by not first having to decompress the compressed messages, but instead save them directly using zero-copy.

Consumer

Overview

A consumer reads messages from one or more topics.

A Kafka consumer is not notified by push, but polls for new messages and thus determines the speed at which new data is delivered to him.

The key of a message is also relevant when processing data. If the data is partitioned, e.g. according to user ID, a consumer knows that he is receiving all of a user's data and that it is not yet in other partitions that may be assigned to another consumer.

Offsets

The current position of a consumer in the log is managed via the message offset. The offset is used to control which messages have already been processed. So that messages are not processed twice in the event of a crash or restart of a consumer, the last processed offset must be saved in order to then jump to the correct place in the log.

If the consumer lags behind in reading the messages, i.e. if the current offset of the consumer is smaller than the highest offset in the partition, it is called Consumer lag. Messages are produced faster than they are consumed.

The offset can be automatically saved in the topic by Kafka. This is particularly useful because Kafka then takes care of jumping to the last offset when the consumer is restarted. In addition, this method allows transactional writing from one topic to another.

Alternatively, you can take over the entire offset management yourself. For example, it can be useful to save the offset in a database that can be queried by monitoring tools. In addition, you can reset the offset here directly if processing failed and should be carried out again (Replay). Last but not least, storing the offset with the processed data can help avoid repetitive processing of messages.

groups

Consumers are organized in groups. Membership in a group is controlled via the configuration parameters. If a consumer is started that is part of a group, the already active consumers are rebalanced in this group. rebalance). The same thing happens when a consumer crashes. At Kafka, not all consumers in a group get all the data, but the data is distributed to the group members. This is achieved by assigning each partition to exactly one consumer in the group. A consumer can read from one or more partitions.

The number of partitions is therefore also the upper limit of parallelism and is therefore essential for high-performance processing of messages. If you want to read with a high degree of parallelism, there must be a corresponding number of partitions. If each partition is assigned to a consumer, another consumer in the same group would simply not receive any data. A very high number of partitions with few consumers can increase the memory pressure, especially on the JVM, on the one hand because a Kafka consumer starts a thread per partition and on the other hand because a consumer who does not process data in real time, batches of messages from several Loads partitions in parallel.

If you want to process the data several times in different scenarios, the processing is separated into separate groups. For example, one consumer group can process the stream immediately in real time, while another is started only once at night and works in batch mode.

Ecosystem

Kafka itself consists only of the core of persistent messages and APIs for producers and consumers. Around Kafka there are still a few tools that can be used in particular for monitoring, cross-data center replication, and the filling and processing of streams.

Tools provided

Mirror Maker is producer and consumer and mirrors topics from one cluster to another.

Kafka Connect is an ETL tool that loads data into and out of Kafka. A common use case of Connect is the connection of relational data sources, the content of which is completely or partially loaded into Kafka Topics. For this purpose, connectors for JDBC or the Change Data Capture (CDC) mechanism of many databases are provided.

Kafka Streams is a stream processing framework that only works with Kafka. It is integrated as a library in JVM applications and is particularly suitable for event-based microservices. A special feature is the provision of local state, e.g. a table that is built up from a compacted topic.

KSQL allows to query and transform data from topics via SQL. Since new messages are constantly running in the topics, the queries are always streamed. KSQL runs in its own service on the Kafka cluster and is particularly suitable for processing that can be easily formulated with SQL.

Separate tools

Kafka manager is an external web tool that can be used to manage a Kafka cluster.

Trifecta is an external web tool that can be used to view the data in topics.

Use cases

With the described properties, components, and the surrounding ecosystem, the following use cases can be implemented particularly well with Kafka.

Messaging (publish-subscribe) - A messaging platform decouples two systems from each other and allows messages to be delivered asynchronously, i.e. the recipient can call them up later and does not have to react immediately. This is one of the core functions of Kafka and is achieved through topics and the separation into producer and consumer.

Stream processing, Complex Event Processing (CEP) - Kafka is a streaming platform and offers low latency to process messages continuously without any offset. In this way, it is possible to react immediately to events that occur.

integration - Kafka can be used to collect and process the data from various source systems of the company centrally. Another term for it would be ETL (Extract-Transform-Load) from the business intelligence environment. Kafka enables ETL in the stream by Kafka Connect taking over the E&L part, and Kafka Streams and KSQL Transform part.

Tracking, recording metrics, log aggregation - This is about collecting a lot of data in different formats and forms. Kafka allows a high throughput of many messages, accepts any kind of data and is format-agnostic.

Event sourcing - Event sourcing refers to the mapping of the current application state from individual events. The events can be saved as messages in Kafka.When the application is restarted, the messages can be read in again (Replay). Another topic with snapshots can be derived from the topic with a single event.

Alternatives

In addition to many other message buses, there are two systems in particular that are similar to Kafka.

Kinesis is a cloud-based message queue and streaming platform as part of Amazon Web Services (AWS). Kinesis is ideal if you don't want to worry about the infrastructure and use other, complementary AWS services such as Lambda or Spark on EMR. Kafka basically scales better than Kinesis and is therefore cheaper. If you don't want to use Kinesis-specific features, you can also use Amazon Managed Streaming for Apache Kafka (MKS).

Apache pulsar is a little younger than Kafka and offers a few features that Kafka is (still) missing, including the division of reads and writes, geo-replication, and generally more features directly in the broker.

Kafka in the Fast Data platform

If you enjoyed this introduction to Apache Kafka, you might also be interested in my book Fast Data in Action. It contains this post in a more detailed version, and builds on it in the chapter Kafka in the Fast Data platform that specifically addresses the use of Kafka in Fast Data practice. There will also be chapters on performance, topic versioning, migrations, replay, and more.