设为首页 - 加入收藏 ASP站长网(Aspzz.Cn)- 科技、建站、经验、云计算、5G、大数据,站长网!
热搜: 创业者 手机 数据
当前位置: 首页 > 服务器 > 搭建环境 > Unix > 正文

从未如此简单:10分钟带你逆袭Kafka!(18)

发布时间:2020-03-23 12:31 所属栏目:119 来源:站长网
导读:System.out.println(提交次数,offsets=+offsets); System.out.println(exception=+e); } }); } } } Spring Boot 使用 Kafka 现在大家的开发过程中,很多都用的是 Spring Boot 的项目,直接启动了,如果还是用原生的

              System.out.println("提交次数, offsets = " + offsets); 

              System.out.println("exception = " + e); 

            } 

          }); 

        } 

    } 

Spring Boot 使用 Kafka

现在大家的开发过程中,很多都用的是 Spring Boot 的项目,直接启动了,如果还是用原生的 API,就是有点 Low 了啊,那 Kafka 是如何和 Spring Boot 进行联合的呢?

maven 配置:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> 

   <dependency> 

     <groupId>org.apache.kafka</groupId> 

     <artifactId>kafka-clients</artifactId> 

     <version>2.1.1</version> 

   </dependency> 

添加配置文件,在 application.properties 中加入如下配置信息:

Kafka 连接地址:

spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 

生产者:

spring.kafka.producer.acks = 0 

spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer 

spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer 

spring.kafka.producer.retries = 3 

spring.kafka.producer.batch-size = 4096 

spring.kafka.producer.buffer-memory = 33554432 

spring.kafka.producer.compression-type = gzip 

消费者:

spring.kafka.consumer.group-id = mygroup 

spring.kafka.consumer.auto-commit-interval = 5000 

spring.kafka.consumer.heartbeat-interval = 3000 

spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer 

spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer 

spring.kafka.consumer.auto-offset-reset = earliest 

spring.kafka.consumer.enable-auto-commit = true 

# listenner, 标识消费者监听的个数 

spring.kafka.listener.concurrency = 8 

# topic的名字 

kafka.topic1 = topic1 

生产者:

import lombok.extern.slf4j.Slf4j; 

import org.springframework.beans.factory.annotation.Value; 

import org.springframework.kafka.core.KafkaTemplate; 

 

@Service 

@Slf4j 

public class MyKafkaProducerServiceImpl implements MyKafkaProducerService { 

        @Resource 

    private KafkaTemplate<String, String> kafkaTemplate; 

        // 读取配置文件 

    @Value("${kafka.topic1}") 

    private String topic; 

 

    @Override 

    public void sendKafka() { 

      kafkaTemplate.send(topic, "hell world"); 

    } 

消费者:

@Component 

@Slf4j 

public class MyKafkaConsumer { 

  @KafkaListener(topics = "${kafka.topic1}") 

    public void listen(ConsumerRecord<?, ?> record) { 

        Optional<?> kafkaMessage = Optional.ofNullable(record.value()); 

        if (kafkaMessage.isPresent()) { 

            log.info("----------------- record =" + record); 

            log.info("------------------ message =" + kafkaMessage.get()); 

(编辑:ASP站长网)

网友评论
推荐文章
    热点阅读