Kafka Rest Proxy¶
Installation¶
dependencies {
implementation(platform("org.http4k:http4k-connect-bom:5.22.1.0"))
implementation("org.http4k:http4k-connect-kafka-rest")
implementation("org.http4k:http4k-connect-kafka-rest-fake")
}
There are 2 distinct APIs in the Rest proxy:
v2¶
The main KafkaRest
connector provides the following v2 Actions:
- CreateConsumer
- DeleteConsumer
- GetOffsets
- GetPartitions
- SeekOffsets
- CommitOffsets
- ConsumeRecords
- ProduceMessages
- SubscribeToTopics
In addition, you can use a KafkaRestConsumer
which provides the following Actions:
- ConsumeRecords
- Delete
- GetOffsets
- SeekOffsets
- CommitOffsets
- SubscribeToTopics
Record formats¶
The following formats of Kafka records are supported currently. Partition keys are optional and null by default:
JSON¶
All keys and messages will be auto-marshalled to JSON using the standard Moshi instance (which supports most common JDK types):
Records.Json(listOf(Record("123", "value", PartitionId.of(123))))
AVRO¶
Support for GenericContainer
classes (auto-generated from schema). The Key and Value schemas will be extracted from
the Key and Value and sent with the message automatically.
Records.Avro(
listOf(
Record(
RandomEvent(UUID.nameUUIDFromBytes(it.toByteArray())),
RandomEvent(UUID(0, 0), PartitionId.of(123))
)
)
)
Binary¶
Record contents are specified using Base64 type for wire transport:
Records.Binary(listOf(Record(Base64Blob.encode("123"), Base64Blob.encode("456"), PartitionId.of(123))))
Notes on message production¶
Messages can be sent to the broker with or without PartitionIds. If you want to use a strategy for partitioning, the
Partitioner
interface can be implemented and used as below. RoundRobin
and Sticky
(key-hash % Partitions)
strategies
come out of the box.
val kafkaRest = KafkaRest.Http(
Credentials("user", "password"), Uri.of("http://restproxy"), JavaHttpClient()
)
kafkaRest.produceMessages(Topic.of("topic"), Records.Json(listOf(Record("123", ""))), ::RoundRobinRecordPartitioner)
To keep things simple with respect to partition allocation and rebalancing, the above code will fetch the available
partitions on each send to the REST proxy using the /topics/$topic/partitions
call. This is obviously not very
efficient,
but can be reimplemented as needed using any caching strategy which you might wish to implement.
v3 (Confluent API)¶
The main KafkaRest
connector provides the following v2 Actions:
- GetPartitions
- GetTopic
- GetTopics
- ProduceRecords
Record formats¶
The following formats of Kafka records are supported currently. Partition keys are optional and null by default:
JSON¶
All keys and messages will be auto-marshalled to JSON using the standard Moshi instance (which supports most common JDK types):
Record(Json(mapOf("key" to "value")))
Binary¶
Record contents are specified using Base64 type for wire transport:
Record(Binary(Base64Blob.encode("foo1")))
Notes on message production¶
Messages can be sent to the broker with or without PartitionIds. If you want to use a strategy for partitioning, the
Partitioner
interface can be implemented and used as below. RoundRobin
and Sticky
(key-hash % Partitions)
strategies
come out of the box.
val kafkaRest = KafkaRest.Http(
Credentials("user", "password"), Uri.of("http://restproxy"), JavaHttpClient()
)
kafkaRest.produceRecordsWithPartitions(
topic,
clusterId,
listOf(Record(Binary(Base64Blob.encode("foo1"))),),
::RoundRobinRecordPartitioner
)
To keep things simple with respect to partition allocation and rebalancing, the above code will fetch the available
partitions on each send to the REST proxy using the /kafka/v3/clusters/$id/topics/$topic/partitions
call. This is
obviously not very efficient,
but can be reimplemented as needed using any caching strategy which you might wish to implement.
Fake¶
The Fake provides the all of the above endpoints listed for v2 and v3, which is enough for basic consumer lifecycle and production and consumption of records. Note that consumers by default will start at the start of the topic stream, although they can be committed to.
"auto.commit.enable" is enabled by default but can be set to "false" for manual committing of offsets.
Default Fake port: 30091¶
To start:
FakeKafkaRest().start()