合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
(1)编写Producer代码 *`KafkaProducer.scala`* ```scala import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} /** * @Date 2021/1/19 8:59 * * 消息的生产者 */ object KafkaProducer { def main(args: Array[String]): Unit = { /** *************** 1. 创建配置 ******************/ val props = new Properties() // 设置Kafka集群 // 如果有多个节点, // 则props.put("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092") props.put("bootstrap.servers", "hadoop101:9092") // 设置ack // 0: 不需要leader partition确认接收成功, 将消息发送到leader partition即可 // 1:需要等待leader partition确认接收成功 // -1(all): 需要等待leader partition以及ISR列表中的follower都确认接收成功;速度最慢,但最安全。 props.put("acks", "all") // 设置key和value的序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") /** ************ 2. 创建生产者 ***************/ val producer = new KafkaProducer[String, String](props) /** ************ 3. 发送消息 ***************/ // new ProducerRecord[String, String](topic, key, value) producer.send(new ProducerRecord[String, String]("topic1", "1001", "zhangsan")) /** ************ 4. 关闭资源 *************/ producer.close() } } ``` (2)检查topic1是否已经存在 ```shell -- 检查topic1是否存在 [root@hadoop101 kafka]# bin/kafka-topics.sh --list --zookeeper hadoop101:2181 topic1 __consumer_offsets test topic1 -- 如果不存在则创建topic1 [root@hadoop101 kafka]# bin/kafka-topics.sh --create --topic topic1 --zookeeper hadoop101:2181 --replication-factor 1 --partitions 3 Created topic "topic1". -- 启动消费者控制台 [root@hadoop101 kafka]# bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --topic topic1 --from-beginning ``` (3)执行程序,你就可以在消费者的终端看到发送过来的消息了 ``` zhansan ```