聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

怎么解析apache avro數(shù)據(jù)?本篇文章給大家介紹一下序列化生成avro數(shù)據(jù)、反序列化解析avro數(shù)據(jù)、使用flinksql解析avro數(shù)據(jù)的方法,希望對(duì)大家有所幫助!

聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

隨著互聯(lián)網(wǎng)高速的發(fā)展,云計(jì)算、大數(shù)據(jù)、人工智能AI、物聯(lián)網(wǎng)等前沿技術(shù)已然成為當(dāng)今時(shí)代主流的高新技術(shù),諸如電商網(wǎng)站、人臉識(shí)別、無(wú)人駕駛、智能家居、智慧城市等等,不僅方面方便了人們的衣食住行,背后更是時(shí)時(shí)刻刻有大量的數(shù)據(jù)在經(jīng)過(guò)各種各樣的系統(tǒng)平臺(tái)的采集、清晰、分析,而保證數(shù)據(jù)的低時(shí)延、高吞吐、安全性就顯得尤為重要,apache Avro本身通過(guò)Schema的方式序列化后進(jìn)行二進(jìn)制傳輸,一方面保證了數(shù)據(jù)的高速傳輸,另一方面保證了數(shù)據(jù)安全性,avro當(dāng)前在各個(gè)行業(yè)的應(yīng)用越來(lái)越廣泛,如何對(duì)avro數(shù)據(jù)進(jìn)行處理解析應(yīng)用就格外重要,本文將演示如果序列化生成avro數(shù)據(jù),并使用FlinkSQL進(jìn)行解析。

本文是avro解析的demo,當(dāng)前FlinkSQL僅適用于簡(jiǎn)單的avro數(shù)據(jù)解析,復(fù)雜嵌套avro數(shù)據(jù)暫時(shí)不支持。

場(chǎng)景介紹

本文主要介紹以下三個(gè)重點(diǎn)內(nèi)容:

  • 如何序列化生成Avro數(shù)據(jù)

  • 如何反序列化解析Avro數(shù)據(jù)

  • 如何使用FlinkSQL解析Avro數(shù)據(jù)

前提條件

  • 了解avro是什么,可參考apache avro官網(wǎng)快速入門指南

  • 了解avro應(yīng)用場(chǎng)景

操作步驟

1、新建avro maven工程項(xiàng)目,配置pom依賴

聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

pom文件內(nèi)容如下:

<?xml  version="1.0" encoding="UTF-8"?><project> ????<modelversion>4.0.0</modelversion>  ????<groupid>com.huawei.bigdata</groupid> ????<artifactid>avrodemo</artifactid> ????<version>1.0-SNAPSHOT</version> ????<dependencies> ????????<dependency> ????????????<groupid>org.apache.avro</groupid> ????????????<artifactid>avro</artifactid> ????????????<version>1.8.1</version> ????????</dependency> ????????<dependency> ????????????<groupid>junit</groupid> ????????????<artifactid>junit</artifactid> ????????????<version>4.12</version> ????????</dependency> ????</dependencies>  ????<build> ????????<plugins> ????????????<plugin> ????????????????<groupid>org.apache.avro</groupid> ????????????????<artifactid>avro-maven-plugin</artifactid> ????????????????<version>1.8.1</version> ????????????????<executions> ????????????????????<execution> ????????????????????????<phase>generate-sources</phase> ????????????????????????<goals> ????????????????????????????<goal>schema</goal> ????????????????????????</goals> ????????????????????????<configuration> ????????????????????????????<sourcedirectory>${project.basedir}/src/main/avro/</sourcedirectory> ????????????????????????????<outputdirectory>${project.basedir}/src/main/java/</outputdirectory> ????????????????????????</configuration> ????????????????????</execution> ????????????????</executions> ????????????</plugin> ????????????<plugin> ????????????????<groupid>org.apache.maven.plugins</groupid> ????????????????<artifactid>maven-compiler-plugin</artifactid> ????????????????<configuration> ????????????????????<source>1.6</source> ????????????????????<target>1.6</target> ????????????????</configuration> ????????????</plugin> ????????</plugins> ????</build></project>

注意:以上pom文件配置了自動(dòng)生成類的路徑,即project.basedir/src/main/avro/{project.basedir}/src/main/avro/和{project.basedir}/src/main/java/,這樣配置之后,在執(zhí)行mvn命令的時(shí)候,這個(gè)插件就會(huì)自動(dòng)將此目錄下的avsc schema生成類文件,并放到后者這個(gè)目錄下。如果沒(méi)有生成avro目錄,手動(dòng)創(chuàng)建一下即可。

2、定義schema

使用JSON為Avro定義schema。schema由基本類型(null,boolean, int, long, float, double, bytes 和string)和復(fù)雜類型(record, enum, array, map, union, 和fixed)組成。例如,以下定義一個(gè)user的schema,在main目錄下創(chuàng)建一個(gè)avro目錄,然后在avro目錄下新建文件 user.avsc :

{"namespace":?"lancoo.ecbdc.pre", ?"type":?"record", ?"name":?"User", ?"fields":?[ ?????{"name":?"name",?"type":?"string"}, ?????{"name":?"favorite_number",??"type":?["int",?"null"]}, ?????{"name":?"favorite_color",?"type":?["string",?"null"]} ?] }

聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

3、編譯schema

點(diǎn)擊maven projects項(xiàng)目的compile進(jìn)行編譯,會(huì)自動(dòng)在創(chuàng)建namespace路徑和User類代碼

聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

4、序列化

創(chuàng)建TestUser類,用于序列化生成數(shù)據(jù)

User?user1?=?new?User(); user1.setName("Alyssa"); user1.setFavoriteNumber(256); //?Leave?favorite?col?or?null  //?Alternate?constructor User?user2?=?new?User("Ben",?7,?"red");  //?Construct?via?builder User?user3?=?User.newBuilder() ????????.setName("Charlie") ????????.setFavoriteColor("blue") ????????.setFavoriteNumber(null) ????????.build();  //?Serialize?user1,?user2?and?user3?to?disk DatumWriter<user>?userDatumWriter?=?new?SpecificDatumWriter<user>(User.class); DataFileWriter<user>?dataFileWriter?=?new?DataFileWriter<user>(userDatumWriter); dataFileWriter.create(user1.getSchema(),?new?File("user_generic.avro")); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); dataFileWriter.close();</user></user></user></user>

執(zhí)行序列化程序后,會(huì)在項(xiàng)目的同級(jí)目錄下生成avro數(shù)據(jù)

聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

user_generic.avro內(nèi)容如下:

Objavro.schema?{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}

至此avro數(shù)據(jù)已經(jīng)生成。

5、反序列化

通過(guò)反序列化代碼解析avro數(shù)據(jù)

//?Deserialize?Users?from?disk DatumReader<user>?userDatumReader?=?new?SpecificDatumReader<user>(User.class); DataFileReader<user>?dataFileReader?=?new?DataFileReader<user>(new?File("user_generic.avro"),?userDatumReader); User?user?=?null; while?(dataFileReader.hasNext())?{ ????//?Reuse?user?object?by?passing?it?to?next().?This?saves?us?from ????//?allocating?and?garbage?collecting?many?objects?for?files?with ????//?many?items. ????user?=?dataFileReader.next(user); ????System.out.println(user); }</user></user></user></user>

執(zhí)行反序列化代碼解析user_generic.avro

聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

avro數(shù)據(jù)解析成功。

6、將user_generic.avro上傳至hdfs路徑

hdfs?dfs?-mkdir?-p?/tmp/lztest/  hdfs?dfs?-put?user_generic.avro?/tmp/lztest/

聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

7、配置flinkserver

  • 準(zhǔn)備avro jar包

將flink-sql-avro-*.jar、flink-sql-avro-confluent-registry-*.jar放入flinkserver lib,將下面的命令在所有flinkserver節(jié)點(diǎn)執(zhí)行

cp?/opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar?/opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib  chmod?500?flink-sql-avro*.jar  chown?omm:wheel?flink-sql-avro*.jar

聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

  • 同時(shí)重啟FlinkServer實(shí)例,重啟完成后查看avro包是否被上傳

    hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib

聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

8、編寫(xiě)FlinkSQL

CREATE?TABLE?testHdfs( ??name?String, ??favorite_number?int, ??favorite_color?String )?WITH( ??'connector'?=?'filesystem', ??'path'?=?'hdfs:///tmp/lztest/user_generic.avro', ??'format'?=?'avro' );CREATE?TABLE?KafkaTable?( ??name?String, ??favorite_number?int, ??favorite_color?String )?WITH?( ??'connector'?=?'kafka', ??'topic'?=?'testavro', ??'properties.bootstrap.servers'?=?'96.10.2.1:21005', ??'properties.group.id'?=?'testGroup', ??'scan.startup.mode'?=?'latest-offset', ??'format'?=?'avro' ); insert?into ??KafkaTable select ??* from ??testHdfs;

聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

保存提交任務(wù)

9、查看對(duì)應(yīng)topic中是否有數(shù)據(jù)

聊聊怎么解析Apache Avro數(shù)據(jù)(示例講解)

FlinkSQL解析avro數(shù)據(jù)成功。

【推薦:Apache使用教程

? 版權(quán)聲明
THE END
喜歡就支持一下吧
點(diǎn)贊7 分享