Kafka record
an events contains:
value
:required
key
: used to determine partitiontimestamp
: order messageheaders
: store HTTP headers and metadata
message store process
two-steps:
Partition Determination
: message –hash(key)–> partition (preserve order at partition level)
Broker Assignment
: find a broker holds that partition.
mapping of partitions to brokers is managed by kafka cluster metadata
– kafka controller
(a role within broker cluster). Producer use metadata to send message to broker that hold target partition.
flowchart LR
A[message]
B[partition Id]
C[kafka controller]
D[broker]
A -->|"hash(key)"| B
B <-->|get broker config| C
B -->|save to| D
partition
Each partition in Kafka functions essentially as an append-only
log file. Messages are sequentially added to the end of this log, which is why Kafka is commonly described as a distributed commit log.
Immutability
: can not bealtered
ordeleted
. avoids consistency issues, speeds up recovery processesEfficiency
: append at end –> minimizes disk seek timesScalability
: partition can be horizontal scaling.
offset in partition –> identify message position.
leader-follower Replication
leader
: handle all read and write, (leader assigned by cluster controller) –> distribute evenly across
Follower Replication
: receive message from leader replica
Synchronization and Consistency
: one of the follower replicas that has been fully synced can be quickly promoted to be the new leader, minimizing downtime and data loss.
Controller's Role in Replication
: controller managed the health
of all brokers and leadership
& dynamics
(based on zookeeper
)
Scaling - QPS per broker
Max disk write speed ≈ 1 GB/s Message size = 10 KB Max messages/sec ≈ 1 GB / 10 KB ≈ 102,400 messages/sec
A single broker on high-performance hardware can handle roughly 100k messages per second if each message is 10KB.
scaling:
-
add more brokers
-
partition strategy
Hot partitions
-
Random partitioning with no key
: even disribution, but lose the ability to guarantee order of messages -
Random salting
: We can add a random number or timestamp to the ad ID when generating the partition key. -
Use a compound key
: ID + Geo -
Back pressure
: slow down the producer
Fault Tolerance and Durability
Setting acks=all
ensures that the message is acknowledged only when all replicas have received it, guaranteeing maximum durability.
acks=1
by default
what happens when a consumer goes down?
-
Offset Management
: commit offsets after process a message -
Rebalancing
: When part of a consumer group, if one consumer goes down, Kafka will redistribute the partitions among the remaining consumers so that all partitions are still being processed.
Handling Retries and Errors
Producer Retries
network issues
, broker unavailability
, or transient failures
import com.zhicheng.avro.User;
import org.apache.kafka.clients.producer.*;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
public class AvroKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 使用 Confluent 的 Avro 序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// Schema Registry 地址(必须)
props.put("schema.registry.url", "http://localhost:8081");
// 幂等性与重试配置
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
KafkaProducer<Object, Object> producer = new KafkaProducer<>(props);
// 构造 Avro 对象
User user = User.newBuilder()
.setId("u123")
.setName("Zhicheng")
.setAge(30)
.build();
ProducerRecord<Object, Object> record = new ProducerRecord<>("user-topic", user.getId(), user);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent to topic %s partition %d offset %d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.flush();
producer.close();
}
}
Consumer Retries
One common pattern is to set up a custom topic that we can move failed messages to and then have a separate consumer that processes these messages.
If a given message is retried too many times, we can move it to a dead letter queue (DLQ). DLQs are just a place to store failed messages so that we can investigate them later.
Performance Optimizations
batch messages in the producer
before sending them to Kafka
const producer = kafka.producer({
batch: {
maxSize: 16384, // Maximum batch size in bytes
maxTime: 100, // Maximum time to wait before sending a batch
}
});
compressing the messages
: This can be done by setting the compression option in the producer configuration. Kafka supports several compression algorithms, including GZIP, Snappy, and LZ4
const producer = kafka.producer({
compression: CompressionTypes.GZIP,
});
Retention Policies
Kafka topics have a retention policy that determines how long messages are retained in the log. This is configured via the retention.ms and retention.bytes settings. The default retention policy is to keep messages for 7 days.