如何获取指定位移的消息?

2017-11-03 12:13 阅读 258 views 次 评论 0 条

如何获取指定位移的消息?

    很多用户都有这样的需求,即直接消费指定位移的消息。虽然Kafka鼓励顺序批量消费,但我们使用API依然可以获取特定位移的消息。代码如下:

Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");

        props.put("group.id", "test");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("max.poll.records", 1); // 这里设置1表明我们只想消费一条消息

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("test1"));

        consumer.poll(0);  // 一定要在seek之前先调用一次poll让订阅

        consumer.seek(new TopicPartition("test1", 0), 15);  // 这里你可以指定位移来消费特定位置的消息

        ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE);

怎么样?简单吧~


版权声明:本文版权由木秀林网所有,转载请保留链接:如何获取指定位移的消息?
分类:Kafka解析 标签:

发表评论


表情