设为首页 - 加入收藏 ASP站长网(Aspzz.Cn)- 科技、建站、经验、云计算、5G、大数据,站长网!
热搜: 创业者 数据 手机
当前位置: 首页 > 服务器 > 搭建环境 > Unix > 正文

从未如此简单:10分钟带你逆袭Kafka!(16)

发布时间:2020-03-23 12:31 所属栏目:119 来源:站长网
导读:②消费者同步手动提交 前面的消费者都是以自动提交 Offset 的方式对 Broker 中的消息进行消费的,但自动提交 可能会出现消息重复消费的情况。 所以在生产环境下,很多时候需要对 Offset 进行手动提交, 以解决重复

从未如此简单:10分钟带你逆袭Kafka!

②消费者同步手动提交

前面的消费者都是以自动提交 Offset 的方式对 Broker 中的消息进行消费的,但自动提交 可能会出现消息重复消费的情况。

所以在生产环境下,很多时候需要对 Offset 进行手动提交, 以解决重复消费的问题。

手动提交又可以划分为同步提交、异步提交,同异步联合提交。这些提交方式仅仅是 doWork() 方法不相同,其构造器是相同的。

所以下面首先在前面消费者类的基础上进行构造器的修改,然后再分别实现三种不同的提交方式。

同步提交方式是,消费者向 Broker 提交 Offset 后等待 Broker 成功响应。若没有收到响应,则会重新提交,直到获取到响应。

而在这个等待过程中,消费者是阻塞的。其严重影响了消费者的吞吐量。

修改前面的 MyKafkaConsumer.java, 主要修改下面的配置:

import kafka.utils.ShutdownableThread; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 

import org.apache.kafka.clients.consumer.ConsumerRecords; 

import org.apache.kafka.clients.consumer.KafkaConsumer; 

 

import java.util.Arrays; 

import java.util.Collections; 

import java.util.Properties; 

 

/** 

 * @ClassName MyKafkaConsumer 

 * @Description TODO 

 * @Author lingxiangxiang 

 * @Date 4:12 PM 

 * @Version 1.0 

 **/ 

public class MyKafkaConsumer extends ShutdownableThread { 

 

    private KafkaConsumer<Integer, String> consumer; 

 

    public MyKafkaConsumer() { 

        super("KafkaConsumerTest", false); 

        Properties properties = new Properties(); 

        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); 

        properties.put("group.id", "mygroup"); 

      // 这里要修改成手动提交 

        properties.put("enable.auto.commit", "false"); 

        // properties.put("auto.commit.interval.ms", "1000"); 

        properties.put("session.timeout.ms", "30000"); 

        properties.put("heartbeat.interval.ms", "10000"); 

        properties.put("auto.offset.reset", "earliest"); 

        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); 

        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

        this.consumer = new KafkaConsumer<Integer, String>(properties); 

    } 

    @Override 

    public void doWork() { 

        consumer.subscribe(Arrays.asList("test2")); 

        ConsumerRecords<Integer, String>records = consumer.poll(1000); 

        for (ConsumerRecord record : records) { 

            System.out.println("topic = " + record.topic()); 

            System.out.println("partition = " + record.partition()); 

            System.out.println("key = " + record.key()); 

            System.out.println("value = " + record.value()); 

 

          //手动同步提交 

          consumer.commitSync(); 

        } 

 

    } 

③消费者异步手工提交

手动同步提交方式需要等待 Broker 的成功响应,效率太低,影响消费者的吞吐量。

异步提交方式是,消费者向 Broker 提交 Offset 后不用等待成功响应,所以其增加了消费者的吞吐量。

import kafka.utils.ShutdownableThread; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 

import org.apache.kafka.clients.consumer.ConsumerRecords; 

import org.apache.kafka.clients.consumer.KafkaConsumer; 

 

import java.util.Arrays; 

import java.util.Collections; 

import java.util.Properties; 

 

/** 

 * @ClassName MyKafkaConsumer 

 * @Description TODO 

 * @Author lingxiangxiang 

 * @Date 4:12 PM 

 * @Version 1.0 

 **/ 

public class MyKafkaConsumer extends ShutdownableThread { 

 

    private KafkaConsumer<Integer, String> consumer; 

 

    public MyKafkaConsumer() { 

        super("KafkaConsumerTest", false); 

        Properties properties = new Properties(); 

(编辑:ASP站长网)

网友评论
推荐文章
    热点阅读