- The central component of Kafka architecture is the broker.
- Brokers are the servers that make up a Kafka cluster (one or more brokers).
- Producers and consumers communicate with brokers in order to publish and consume messages.
- Kafka depends on an underlying technology called Zookeeper.
- Zookeeper is a generalized cluster management tool.
- It manages the cluster and provides a consistent, distributed place to store cluster configuration.
- Zookeeper coordinates communication throughout the cluster, adds and removes brokers, and monitors the status of nodes in the cluster.
- It is often installed alongside Kafka, but can be maintained on a completely separate set of servers.
- Kafka uses simple TCP protocol to handle messaging communication.
- In a Kafka cluster, one broker is dynamically designated as the Controller.
- The controller coordinates the process of assigning partitions and data replicas to nodes in the cluster.
- Every cluster has exactly ONE controller.
- If the controller goes down, another node will automatically become the controller.
Publisher/Subscriber Messaging in Kafka
- Topics are at the core of everything you can do in Kafka.
- A topic is a data feed to which data records are published and from which they can be consumed.
- Publishers send data to a topic, and subscribers read data from the topic.
- This is known as publisher/subscriber messaging, or simply pub/sub for short.
The Topic Log
- Kafka topics each maintain a log.
- The log is an ordered, immutable list of data records.
- The log for a Kafka topic is usually divided into multiple partitions. This is what allows Kafka to process data efficiently and in a scalable fashion.
- Each record in a partition has a sequential, unique ID called an offset
- A Producer is the publisher in the pub/sub model.
- The producer is simply an application that communicates with the cluster, and can be in a separate process or on a different server.
- Producers write data records to the Kafka topic.
- For each new record, the Producer determines which partition to write to, often in a simple round-robin fashion.
- You can customize this to use a more sophisticated algorithm for determining which topic to write a new record to.
- A Consumer is the subscriber in the pub/sub model.
- Like producers, consumers are external applications that can be in a separate process or on a different server from Kafka itself.
- Consumers read data from Kafka topics.
- Each consumer controls the offset it is currently reading for each partition, and consumers normally read records in order based on the offset.
- You can have any number of consumers for a topic, and they can all process the same records.
- Records are not deleted when they are consumed. They are only deleted based upon a configurable retention period.
By default, all consumers will process all records, but what if you want to scale your record processing so that multiple instances can process the data without two instances processing the same record? You can place consumers into Consumer Groups.
- Each record will be consumed by exactly one consumer per consumer group.
- With consumer groups, Kafka dynamically assigns each partition to exactly one consumer in the groups.
- If you have more consumers than partitions, some of the consumers will be idle and will not process records.
- Topic: A named data feed that data can be written to and read from.
- Log: The data structure used to store a topic’s data. The log is a partitioned, immutable sequence of data records.
- Partition: A section of a topic’s log.
- Offset: The sequential and unique ID of a data record within a partition.
- Producer: Something that writes data to a topic.
- Consumer: Something that reads data from a topic.
- Consumer group: A group of multiple consumers. Normally, multiple consumers can all consume the same record from a topic, but only one consumer in a consumer group will consume reach record.
Partitions and Replication
- Kafka is designed with fault tolerance in mind. As a result, it includes built-in support for replication.
- Replication means storing multiple copies of any given piece of data.
- In Kafka, every topic is given a configurable replication factor.
- The replication factor is the number of replicas that will be kept on different brokers for each partition in the topic.
- In order to ensure that messages in a partition are kept in a consistent order across all replicas, Kafka chooses a leader for each partition.
- The leader handles all reads and writes for the partition.
- The leader is dynamically selected and if the leader goes down, the cluster attempts to choose a new leader through a process called leader election.
- Leader is the source of truth.
- Kafka maintains a list of In-Sync Replicas (ISR) for each partition.
- ISRs are replicas that are up-to-date with the leader.
- If a leader dies, the new leader is elected from among the ISRs.
- By default, if there are no remaining ISRs when a leader dies, Kafka waits until one becomes available. This means that producers will be on hold until a new leader can be elected.
- You can turn on unclean leader election, allowing the cluster to elect a non-in-sync replica in this scenario.
The Life of a Message
- Producer publishes a message to a partition within a topic.
- The message is added to the partition on the leader.
- The message is copied to the replicas of that partition on other brokers.
- Consumers read the message and process it.
- When the retention period for the message is reached, the message is deleted. (default retention period is 7 days).
The Kafka Java APIs
- Producer API: Allows you to build producers that publish messages to Kafka.
- Consumer API: Allows you to build consumers that read Kafka messages.
- Streams API: Allows you to read from input topics, transform data, and output it to output topics.
- Connect API: Allows you to build custom connectors, which pull from or push to specific external systems.
- AdminClient API: Allows you to, manage and inspect higher-level objects like topics and brokers.
What are Streams?
- So far, we have discussed using Kafka for messaging (reliably passing data between two applications).
- Kafka Streams allows us to build applications that process Kafka data in real-time with ease.
- A Kafka Streams application is an application where both the input and the output are stored in Kafka topics.
- Kafka Streams is a client library (API) that makes it easy to build these application.
Kafka Streams Transformations
Kafka Streams provides a robust set of tools for processing and transforming data. The Kafka cluster itself serves as the backend for data management and storage.
There are two types of data transformations in Kafka Streams:
- Stateless transformations do not require any additional storage to manage the state.
- Stateful transformations require a state store to manage the state.
- Branch: Splits a stream into multiple streams based on a predicate.
- Filter: Removes messages from the stream based on a condition.
- Flat Map: Takes input records and turns them into a different set of records.
- Foreach: Performs an arbitrary stateless operation on each record. This is a terminal operation and stops further Processing.
- GroupBy/GroupByKey: Groups records by their key. This is required to perform stateful transformations.
- Map: Allows you to read a record and produce a new, modified record.
- Merge: Merges two streams into one stream.
- Peek: Similar to Foreach, but does not stop processing.
Kafka Streams Aggregations
- Stateless transformations, such as groupByKey and groupBy can be used to group records that share the same key.
- Aggregations are stateful transformations that always operate on these groups of records sharing the same key.
- Aggregate: Generates a new record from a calculation involving the grouped records.
- count: Counts the numbers for each grouped key.
- Reduce: Combines the grouped records into a single record.
Kafka Streams Joins
Joins are used to combine streams into one new stream.
When joining streams, the data must be co-partitioned:
- Same number of partitions for input topics.
- Same partitioning strategies for producers.
You can avoid the need for co-partitioning by using a GlobalkTable.
With GlobalKTables, all instances of your streams application will populate the local table with data
Kafka Streams Join types
Inner join: The new stream will contain only records that have a match in both joined streams.
Left Join: The new stream will contain all records from the first stream, but only matching records from the joined stream.
Outer Join: The new stream will contain all records from both streams.
Kafka Streams Windowing
Windows are similar to groups in that they deal with a set of records with the same key. However, windows further subdivide groups into “time buckets.”
- Tumbling Time Windows: Windows are based on time periods that never overlap or have gaps between them.
- Hopping Time Windows: Time-based, but can have overlaps or gaps between windows.
- Sliding Time Windows: These windows are dynamically based on the timestamps of records rather than a fixed point in time. They are only used in joins.
- Session Windows: Creates windows based on periods of activity. A group of records around the same timestamp will form a session window, whereas a period of “idle time” with no records in the group will not have a window.
- In real-world scenarios, it is always possible to receive out-of-order data.
- When records fall into a time window received after the end of that window’s grace period, they become known as late-arriving records.
- You can specify a retention period for a window. Kafka Streams will retain old window buckets during this period so that late-arriving records can still be processed.
- Any records that arrive after the retention period has expired will not be processed.
Streams vs. Tables
Kafka Streams models data in two primary ways: streams and tables.
- Streams: Each record is a self-contained piece of data in an unbounded set of data. New records do not replace an existing piece of data with a new value.
- Tables: Records represent a current state that can be overwritten/ updated.
example use cases
- Credit card transactions in real time.
- Areal-time log of attendees checking in to a conference.
- A log of customer purchases which represent the removal of items from a store’s inventory.
- A user’s current available credit card balance.
- A list of conference attendee names with a value indicating whether or not they have checked in.
- A set of data containing the quantity of each item in a store’s inventory.
- Kafka objects such as brokers, topics, producers, and consumers can all be customized using configuration.
- Kafka uses the property file format for configuration. All configuration options are provided using key-value pairs, for example:
- With Kafka, you can configure the following:
- Clients (Producers, Consumers, Streams Applications, etc.)
- You can configure your Kafka broker using
server.properties, the command line, or even programmatically using the AdminClient API.
- Some broker configs can be changed dynamically (without a broker restart). Check the documentation to see which configs can be dynamically updated.
- read-only: These configs require a broker restart in order to be updated.
- per-broker: These can be dynamically updated for each individual broker.
- cluster-wide: These configs can be updated per-broker, but the cluster-wide default can also be dynamically updated.
- Topics can be configured using the command line tools (i.e. kafka-topics or kafka-configs), as well as programmatically.
- All topic configurations have a broker-wide default. The default values will be used unless an override is specified for a particular topic.
- Use the —-config argument with kafka-topics to override default configuration values when creating topics from the command line.
- When using
kafka-configsyou have to user the
When using Kafka to address a given use case, it is important to design your topics for maximum performance.
The two main considerations when designing topics are partitions and replication factor.
Some questions to consider when designing your topics:
- How many brokers do you have? The number of brokers limits the number of replicas.
- What is your need for fault tolerance? A higher replication factor means greater fault tolerance.
- How many consumers do you want to place in a consumer group for parallel processing? You will need at least as many partitions as the number of consumers you expect to have on a single group.
- How much memory is available on each broker? Kafka requires memory to process messages. The configuration setting
replica.fetch.max.bytes(default ~1 MB) determines the rough amount of memory you will need for each partition on a broker.
Setting Kafka locally in docker-compose
version: '2' services: zookeeper-1: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 zookeeper-2: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 32181:2181 kafka-1: image: confluentinc/cp-kafka:latest depends_on: - zookeeper-1 - zookeeper-2 ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 kafka-2: image: confluentinc/cp-kafka:latest depends_on: - zookeeper-1 - zookeeper-2 ports: - 39092:39092 environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1