天天看点

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十四)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析

参考《在Kafka中使用Avro编码消息:Consumer篇》、《在Kafka中使用Avro编码消息:Producter篇》

在了解如何avro发送到kafka,再从kafka解析avro数据之前,我们可以先看下如何使用操作字符串:

producer:

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十四)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十四)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析

View Code

consumer:

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十四)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析
Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十四)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析

Avro操作工程pom.xml:

需要依赖于avro的包,同时这里是需要使用kafka api。

在使用 Avro 之前,我们需要先定义模式(schemas)。模式通常使用 JSON 来编写,我们不需要再定义相关的类,这篇文章中,我们将使用如下的模式:

上面的模式中,我们定义了一种 record 类型的对象,名字为 <code>Iteblog</code>,这个对象包含了两个字符串和一个 int 类型的fields。定义好模式之后,我们可以使用 avro 提供的相应方法来解析这个模式:

这里的 <code>USER_SCHEMA</code> 变量存储的就是上面定义好的模式。

解析好模式定义的对象之后,我们需要将这个对象序列化成字节数组,或者将字节数组转换成对象。Avro 提供的 API 不太易于使用,所以本文使用 twitter 开源的 Bijection 库来方便地实现这些操作。我们先创建 <code>Injection</code> 对象来讲对象转换成字节数组:

现在我们可以根据之前定义好的模式来创建相关的 Record,并使用 <code>recordInjection</code> 来序列化这个 Record :

有了上面的介绍之后,我们现在就可以在 Kafka 中使用 Avro 来序列化我们需要发送的消息了:

因为我们使用到 Avro 和 Bijection 类库,所有我们需要在 <code>pom.xml</code> 文件里面引入以下依赖:

从 Kafka 中读取 Avro 格式的消息和读取其他类型的类型一样,都是创建相关的流,然后迭代:

关键在于如何将读出来的 Avro 类型字节数组转换成我们要的数据。这里还是使用到我们之前介绍的模式解释器:

上面的 <code>USER_SCHEMA</code> 就是上边介绍的消息模式,我们创建了一个 <code>recordInjection</code> 对象,这个对象就可以利用刚刚解析好的模式将读出来的字节数组反序列化成我们写入的数据:

然后我们就可以通过下面方法获取写入的数据:

测试:

打印结果:

该信息在调试,想查看avro对象内容时,十分实用。

avro schema:

测试程序:

经过测试,可以正常运行,输出信息为:

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十四)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析

 但是如果把代码中的byte转化为字符代码修改为:

就抛出错误了:

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十四)定义一个avro schema使用comsumer发送avro字符流,producer接受avro字符流并解析

基础才是编程人员应该深入研究的问题,比如:

1)List/Set/Map内部组成原理|区别

2)mysql索引存储结构&amp;如何调优/b-tree特点、计算复杂度及影响复杂度的因素。。。

3)JVM运行组成与原理及调优

4)Java类加载器运行原理

5)Java中GC过程原理|使用的回收算法原理

6)Redis中hash一致性实现及与hash其他区别

7)Java多线程、线程池开发、管理Lock与Synchroined区别

8)Spring IOC/AOP 原理;加载过程的。。。

【+加关注】。