kafka的入门使用
目录
kafka的入门使用
kafka入门
-
启动zookeeper
1
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
-
启动kafka
1
bin\windows\kafka-server-start.bat config\server.properties
-
创建主题
1
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
springboot整合kafka
-
引入依赖,spring-kafka
1 2 3 4
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
-
配置kafka,配置server、consumer
1 2 3 4 5 6 7
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: test-consumer-group enable-auto-commit: true auto-commit-interval: 3000
-
访问kafka
- 生产者使用
kafkaTemplate.send(topic,content);
发送消息 - 消费者使用@KafkaListener(topics = {“test”})注解监听某些主题,消息封装成ConsumerRecord类型
handleMessage(ConsumerRecord consumerRecord)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
@SpringBootTest(classes = NowcoderCommunityApplication.class) public class KafkaTest { @Autowired private KafkaProducer kafkaProducer; @Test public void testKafka(){ kafkaProducer.sendMessage("test","这是第一条kafkaProducer"); kafkaProducer.sendMessage("test","hah"); kafkaProducer.sendMessage("test","你好"); try { Thread.sleep(1000*5); } catch (InterruptedException e) { e.printStackTrace(); } } } //生产者 @Component class KafkaProducer{ @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic,String content){ kafkaTemplate.send(topic,content); } } //消费者 @Component class KafkaConsumer{ @Autowired private KafkaTemplate kafkaTemplate; //监听某些主题 @KafkaListener(topics = {"test"}) public void handleMessage(ConsumerRecord consumerRecord){ System.out.println(consumerRecord.value()); } }
- 生产者使用