Kafka is defacto choice for message queues. But I feel that Kafka clients and its documentation are not friendly to first-time users. Here are two points I feel.
- The documentation exposes internal details in an unclear way.
- Kafka clients ignore errors in a counterintuitive way.
- The interactions between configuration options are not documented.
And here are the mails related to the above two points:
- If timeout is short, the first poll does not return records
- The relation between min.insync.replicas and offsets.topic.replication.factor
- What is meaning of buffer in KafkaConsumer’s Javadoc?
- The asynchronous sending of a message returns no error if the Kafka server is not started
API Hehaviour
Synchronously send a message to a stopped bootstrap server. After 60000 ms
, the following
exception is thrown:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
Synchronously send a message to a runing bootstrap server. But the target topic does exist. The following exception is thrown:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
For asynchronous message sending, the following exception is thrown when the
bootstrap.servers value is unresolvable hostname such as localhostx
.
org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
For the following value for bootstrap.servers
, no exceptions for asynchronous
message sending:
- Unreachable hostname such
www.google.com
. - Unreachable IP.
- Reachable hostname. But the Kafka server is stopped.
poll(long timeoutMs)
will block forever if the bootstrap server is stopped.
poll(java.time.Duration timeout)
will return after timeout. No exceptions even
the bootstrap server is down or the topic does exist on the bootstrap server.
But if a topci which has been polled is deleted, the following error will show
up:
2019-03-07 21:03:04,241 ERROR [main] o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=test] Offset commit failed on partition test-topic-0 at offset 0: This server does not host this topic-partition.
MISC
Check the committed offset for a consumer group even if there are no active members in the consumer group.
kafka-consumer-groups
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group mygroup
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group console-consumer-88295
kafka-console-producer and kafka-console-consumer
kafka-console-producer
sends messages with a null key by default. And it does
not support cusotimzed key seriliazer and value serializer. For details, see
- kafka-console-producer ignores value serializer?
- Console Producer / Consumer’s serde config is not working
--line-reader
option can be used to achieve the same result. But a line reader needs to implementstrait MessageReader
which is not included inkafka-clients
.
kafka-console-producer
has --broker-list
option. kafka-console-consumer
has --bootstrap-server
. These two options has the same meaning. Very bad
software engineering.
Usage of parse.key
and key.separator
for kafka-console-producer
:
kafka-topics --zookeepr localhost:2181 \
--create --topic test-topic \
--partitions 1 --replication-factor 1
kafka-console-producer \
--broker-list localhost:9092 \
--topic test-topic \
--property "parse.key=true" \
--property "key.separator=:"
>key1:value1
kafka-console-consumer --property print.key=true \
--bootstrap-server localhost:9092 \
--topic test-topic \
--from-beginning
key1 value1
Usage of --consumer.config
option:
# Terminal 1
$ cat cfg.properties
group.id=test-group
$ kafka-console-consumer --bootstrap-server localhost:9092 \
--consumer.config cfg.properties \
--topic test-topic --from-beginning
# Terminal 2
$ kafka-consumer-groups --bootstrap-server localhost:9092 \
--list
test-group
--consumer.config
and --consumer-property
options are used to normal
consumer configuration options. They can be used to pass
kafka-console-consumer
own option such as --topic
. Here is the text from
Kafka: The Definitive Guide
:
In addition to the basic command-line options, it is possible to pass any normal consumer configuration options to the console consumer as well.
The –property option is only used for passing configurations to the message formatter, and not the client itself.
The above argument applies to --producer.config
, --producer-property
and
--property
options for kafka-console-producer
.
commit offset is the record for the next record to be consumed.
offsets.retention.minutes:
After a consumer group loses all its consumers (i.e. becomes empty) its offsets will be kept for this retention period before getting discarded. For standalone consumers (using manual assignment), offsets will be expired after the time of last commit plus this retention period.
group.id of kafka-console-consumer:
The default for console consumer’s enable.auto.commit property when no group.id is provided is now set to false. This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.
RecordMetadata.toString
classroom-input-0@11308
kafka-run-class
Check topic offsets:
kafka-run-class kafka.tools.GetOffsetShell \
--topic topic-name \
--broker-list localhost:9092
kafka-streams-application-reset
The following command does not reset committed offsets for user topics. It only
deletes internal topics. If these internal topics are not deleted, even if the
state store is cleaned up by KafkaStreams#cleanUp()
or manual deletion
. The
state store will be restored by its backup changelog topics which are internal
topics.
kafka-streams-application-reset --bootstrap-servers localhost:9092 \
--application-id my-id