设为首页 - 加入收藏 ASP站长网(Aspzz.Cn)- 科技、建站、经验、云计算、5G、大数据,站长网!
热搜: 数据 创业者 手机
当前位置: 首页 > 服务器 > 搭建环境 > Unix > 正文

从未如此简单:10分钟带你逆袭Kafka!(14)

发布时间:2020-03-23 12:31 所属栏目:119 来源:站长网
导读:finalProducerRecordInteger,Stringrecord=newProducerRecordInteger,String(test2,0,1,helloworld); producer.send(record); //有回调函数的调用 producer.send(record,newCallback(){ @Override publicvoidonComp

            final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world"); 

            producer.send(record); 

            // 有回调函数的调用 

            producer.send(record, new Callback() { 

                @Override 

                public void onCompletion(RecordMetadata recordMetadata, Exception e) { 

                    System.out.println(recordMetadata.topic()); 

                    System.out.println(recordMetadata.partition()); 

                    System.out.println(recordMetadata.offset()); 

                } 

            }); 

          // 自己定义一个类 

            producer.send(record, new MyCallback(record)); 

        } catch (Exception e) { 

            result = false; 

        } 

        return result; 

    } 

定义生产者发送成功的回调函数:

import org.apache.kafka.clients.producer.Callback; 

import org.apache.kafka.clients.producer.RecordMetadata; 

 

/** 

 * @ClassName MyCallback 

 * @Description TODO 

 * @Author lingxiangxiang 

 * @Date 3:51 PM 

 * @Version 1.0 

 **/ 

public class MyCallback implements Callback { 

    private Object msg; 

 

    public MyCallback(Object msg) { 

        this.msg = msg; 

    } 

 

    @Override 

    public void onCompletion(RecordMetadata metadata, Exception e) { 

        System.out.println("topic = " + metadata.topic()); 

        System.out.println("partiton = " + metadata.partition()); 

        System.out.println("offset = " + metadata.offset()); 

        System.out.println(msg); 

    } 

生产者测试类:在生产者测试类中,自己遇到一个坑,就是最后自己没有加 sleep,就是怎么检查自己的代码都没有问题,但是最后就是没法发送成功消息,最后加了一个 sleep 就可以了。

因为主函数 main 已经执行完退出,但是消息并没有发送完成,需要进行等待一下。当然,你在生产环境中可能不会遇到这样问题,呵呵!

代码如下:

import static java.lang.Thread.sleep; 

 

/** 

 * @ClassName MyKafkaProducerTest 

 * @Description TODO 

 * @Author lingxiangxiang 

 * @Date 3:46 PM 

 * @Version 1.0 

 **/ 

public class MyKafkaProducerTest { 

    public static void main(String[] args) throws InterruptedException { 

        MyKafkaProducer producer = new MyKafkaProducer(); 

        boolean result = producer.sendMsg(); 

        System.out.println("send msg " + result); 

        sleep(1000); 

    } 

消费者类:

import kafka.utils.ShutdownableThread; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 

import org.apache.kafka.clients.consumer.ConsumerRecords; 

import org.apache.kafka.clients.consumer.KafkaConsumer; 

 

import java.util.Arrays; 

import java.util.Collections; 

import java.util.Properties; 

 

/** 

 * @ClassName MyKafkaConsumer 

 * @Description TODO 

 * @Author lingxiangxiang 

 * @Date 4:12 PM 

 * @Version 1.0 

 **/ 

public class MyKafkaConsumer extends ShutdownableThread { 

 

    private KafkaConsumer<Integer, String> consumer; 

 

    public MyKafkaConsumer() { 

        super("KafkaConsumerTest", false); 

        Properties properties = new Properties(); 

(编辑:ASP站长网)

网友评论
推荐文章
    热点阅读