19 July 2018

Kafka is defacto choice for message queues. But I feel that Kafka clients and its documentation are not friendly to first-time users. Here are two points I feel.

  1. The documentation exposes internal details in an unclear way.
  2. Kafka clients ignore errors in a counterintuitive way.
  3. The interactions between configuration options are not documented.

And here are the mails related to the above two points:

API Hehaviour

Synchronously send a message to a stopped bootstrap server. After 60000 ms, the following exception is thrown:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Synchronously send a message to a runing bootstrap server. But the target topic does exist. The following exception is thrown:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

For asynchronous message sending, the following exception is thrown when the bootstrap.servers value is unresolvable hostname such as localhostx.

org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers

For the following value for bootstrap.servers, no exceptions for asynchronous message sending:

  1. Unreachable hostname such www.google.com.
  2. Unreachable IP.
  3. Reachable hostname. But the Kafka server is stopped.

poll(long timeoutMs) will block forever if the bootstrap server is stopped.

poll(java.time.Duration timeout) will return after timeout. No exceptions even the bootstrap server is down or the topic does exist on the bootstrap server. But if a topci which has been polled is deleted, the following error will show up:

2019-03-07 21:03:04,241 ERROR [main] o.a.k.c.c.i.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=test] Offset commit failed on partition test-topic-0 at offset 0: This server does not host this topic-partition.

MISC

Check the committed offset for a consumer group even if there are no active members in the consumer group.

kafka-consumer-groups

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group mygroup
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group console-consumer-88295

kafka-console-producer and kafka-console-consumer

kafka-console-producer sends messages with a null key by default. And it does not support cusotimzed key seriliazer and value serializer. For details, see

kafka-console-producer has --broker-list option. kafka-console-consumer has --bootstrap-server. These two options has the same meaning. Very bad software engineering.

Usage of parse.key and key.separator for kafka-console-producer:

kafka-topics --zookeepr localhost:2181 \
  --create --topic test-topic \
  --partitions 1 --replication-factor 1

kafka-console-producer \
  --broker-list localhost:9092 \
  --topic test-topic \
  --property "parse.key=true" \
  --property "key.separator=:"
>key1:value1

kafka-console-consumer --property print.key=true \
  --bootstrap-server localhost:9092 \
  --topic test-topic \
  --from-beginning
key1	value1

Usage of --consumer.config option:

# Terminal 1
$ cat cfg.properties
group.id=test-group
$ kafka-console-consumer --bootstrap-server localhost:9092 \
  --consumer.config cfg.properties \
  --topic test-topic --from-beginning
# Terminal 2
$ kafka-consumer-groups --bootstrap-server localhost:9092 \
  --list
test-group

--consumer.config and --consumer-property options are used to normal consumer configuration options. They can be used to pass kafka-console-consumer own option such as --topic. Here is the text from Kafka: The Definitive Guide:

In addition to the basic command-line options, it is possible to pass any normal consumer configuration options to the console consumer as well.

The –property option is only used for passing configurations to the message formatter, and not the client itself.

The above argument applies to --producer.config, --producer-property and --property options for kafka-console-producer.

commit offset is the record for the next record to be consumed.

offsets.retention.minutes:

After a consumer group loses all its consumers (i.e. becomes empty) its offsets will be kept for this retention period before getting discarded. For standalone consumers (using manual assignment), offsets will be expired after the time of last commit plus this retention period.

group.id of kafka-console-consumer:

The default for console consumer’s enable.auto.commit property when no group.id is provided is now set to false. This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.

RecordMetadata.toString
classroom-input-0@11308

kafka-run-class

Check topic offsets:

kafka-run-class kafka.tools.GetOffsetShell \
  --topic topic-name \
  --broker-list localhost:9092

kafka-streams-application-reset

The following command does not reset committed offsets for user topics. It only deletes internal topics. If these internal topics are not deleted, even if the state store is cleaned up by KafkaStreams#cleanUp() or manual deletion. The state store will be restored by its backup changelog topics which are internal topics.

kafka-streams-application-reset --bootstrap-servers localhost:9092 \
  --application-id my-id