目录

kafka的入门使用

kafka的入门使用

kafka入门

  • 启动zookeeper

    1
    
    bin\windows\zookeeper-server-start.bat config\zookeeper.properties
    
  • 启动kafka

    1
    
    bin\windows\kafka-server-start.bat config\server.properties
    
  • 创建主题

    1
    
    kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    

springboot整合kafka

  • 引入依赖,spring-kafka

    1
    2
    3
    4
    
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
  • 配置kafka,配置server、consumer

    1
    2
    3
    4
    5
    6
    7
    
    spring:
        kafka:
            bootstrap-servers: localhost:9092
            consumer:
              group-id: test-consumer-group
              enable-auto-commit: true
              auto-commit-interval: 3000
    
  • 访问kafka

    • 生产者使用 kafkaTemplate.send(topic,content);发送消息
    • 消费者使用@KafkaListener(topics = {“test”})注解监听某些主题,消息封装成ConsumerRecord类型handleMessage(ConsumerRecord consumerRecord)
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    
    @SpringBootTest(classes = NowcoderCommunityApplication.class)
    public class KafkaTest {
    
        @Autowired
        private KafkaProducer kafkaProducer;
    
    
        @Test
        public void testKafka(){
            kafkaProducer.sendMessage("test","这是第一条kafkaProducer");
            kafkaProducer.sendMessage("test","hah");
            kafkaProducer.sendMessage("test","你好");
            try {
                Thread.sleep(1000*5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
    }
    
    //生产者
    @Component
    class KafkaProducer{
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        public void sendMessage(String topic,String content){
            kafkaTemplate.send(topic,content);
        }
    
    }
    
    //消费者
    @Component
    class KafkaConsumer{
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        //监听某些主题
        @KafkaListener(topics = {"test"})
        public void handleMessage(ConsumerRecord consumerRecord){
            System.out.println(consumerRecord.value());
        }
    
    }