合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
Consumer分为自动提交和手动提交。 自动提交:自动提交的优点是方便,但是可能会重复处理消息。 手动提交:又分为同步提交commitSync和异步提交commitAsync。在实际开发中常用手动提交。 1. *`KafkaConsumer.scala`*(自动提交) ```scala import java.util import java.util.Properties import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer} /** * @Date 2021/1/19 9:34 * * 消息的消费者 */ object KafkaConsumer { def main(args: Array[String]): Unit = { /** ************* 1. 创建配置 **************/ val props = new Properties() // 配置Kafka集群的ip和端口号 // 如果有多个节点, // 则props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092") props.put("bootstrap.servers", "hadoop101:9092") // 设置消费者组的id, 如果有多个相同id的消费者程序, 那么他们将在一个组当中 // 这个可以乱填 props.put("group.id", "testGroup1") // 开启自动提交[默认就是true开启] props.put("enable.auto.commit", "true") // 每隔5000ms提交一次,默认值就是5000ms props.put("auto.commit.interval.ms", "1000") // key和value的反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") /** ****************** 2. 创建Consumer客户端 ***************/ val consumer = new KafkaConsumer[String, String](props) // 添加要消费的topic到列表中 val topics = new util.ArrayList[String]() topics.add("topic1") // 订阅模式消费,还有一种consumer.assign()指定分区模式消费 consumer.subscribe(topics) // 为了能够一直从Kafka中消费数据, 使用 while true 死循环 while (true) { // 拉取数据,设置超时时间, 单位是毫秒, 返回一条数据 val records: ConsumerRecords[String, String] = consumer.poll(1000) // 对这些数据进行遍历输出 val iter: util.Iterator[ConsumerRecord[String, String]] = records.iterator() while (iter.hasNext) { val next: ConsumerRecord[String, String] = iter.next() println(s"partition = ${next.partition()}, offset=${next.offset()}\nkey = ${next.key()}, value = ${next.value()}") } } } } ``` 2. *`KafkaConsumerManualOffset.scala`*(手动提交) ```scala import java.util import java.util.Properties import org.apache.kafka.clients.consumer.{ConsumerRecord, ConsumerRecords, KafkaConsumer} /** * @Date 2021/1/19 10:16 * * 消息的消费者 */ object KafkaConsumerManualOffset { def main(args: Array[String]): Unit = { /** ************* 1. 创建配置 **************/ val props = new Properties() // 配置Kafka集群的ip和端口号 props.put("bootstrap.servers", "hadoop101:9092") // 设置消费者组的id, 如果有多个相同id的消费者程序, 那么他们将在一个组当中 props.put("group.id", "testGroup1") // 关闭自动提交[默认就是开启] props.put("enable.auto.commit", "false") // key和value的反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") /** ****************** 2. 创建Consumer客户端 ***************/ val consumer = new KafkaConsumer[String, String](props) // consumer消费 val topics = new util.ArrayList[String]() topics.add("topic1") // 订阅topic consumer.subscribe(topics) // 创建一个集合, 用来存放消息的个数 val buffer = new util.ArrayList[ConsumerRecord[String, String]]() // 为了能够一直从Kafka中消费数据, 使用 while true 死循环 while (true) { // 设置超时时间, 单位是毫秒, 返回一些条数据 val records: ConsumerRecords[String, String] = consumer.poll(1000) // 对这些数据进行遍历输出 val iter: util.Iterator[ConsumerRecord[String, String]] = records.iterator() while (iter.hasNext) { val next: ConsumerRecord[String, String] = iter.next() println(s"partition = ${next.partition()}, offset=${next.offset()}\nkey = ${next.key()}, value = ${next.value()}") buffer.add(next) } if (buffer.size() > 5) { // 手动提交offset有两种, 一种是同步阻塞方式, 一种是异步非阻塞方式 consumer.commitAsync() // consumer.commitSync() buffer.clear() } } } } ``` 运行上面两个Consumer程序中的一个,然后再运行【Producer API】提供的程序生产消息,上面的两个消费者程序的输出如下: ```scala partition = 0, offset=7 key = 1001, value = zhangsan ```