Kinesis client library record processor failure


According to AWS docs:

The worker invokes record processor methods using Java ExecutorService tasks. If a task fails, the worker retains control of the shard that the record processor was processing. The worker starts a new record processor task to process that shard. For more information, see Read Throttling.

According to another page on AWS docs:

The Kinesis Client Library (KCL) relies on your processRecords code to
handle any exceptions that arise from processing the data records. Any
exception thrown from processRecords is absorbed by the KCL. To avoid
infinite retries on a recurring failure, the KCL does not resend the
batch of records processed at the time of the exception. The KCL then
calls processRecords for the next batch of data records without
restarting the record processor. This effectively results in consumer
applications observing skipped records. To prevent skipped records,
handle all exceptions within processRecords appropriately.

Aren’t these 2 contradictory statements? One says that record processor restarts and another says that the shard is skipped.
What does KCL exactly do when a record processor fails? How does a KCL worker comes to know if a record processor failed?


Based on my experience writing, debugging, and supporting KCL-based applications, the second statement is more clear/accurate/useful for describing how you should consider error handling.

First, a bit of background:

  • KCL record processing is designed to run from multiple hosts. Say you have 3 hosts and 12 shards to process – each host runs a single worker, and will own processing for 4 shards.
  • If, during processing for one of those shards, an exception is thrown, KCL will absorb the exception and treat it as if all records were processed – effectively “skipping” any records that weren’t processed.
    • Remember, this is your code that threw the exception, so you can handle it before it escapes to KCL
  • When KCL worker itself fails/is stopped, those shards are transferred to another worker. For example, if you scale down to two hosts, the 4 shards that were being worked by that third worker are transferred to the other two.

The first statement is trying (not very clearly) to say that when a KCL task fails, that instance of the worker will keep control of the shards it’s processing (and not transfer them to another worker).

Leave a Reply