背景
Parquet 是以 Twitter 和 Cloudera 为主,在 2013 年推出的列存储格式。
Parquet 使用了 Dremel paper 中提到的日志编码算法。
虽然已经出现了 5 年,但是目前仍然是主流的两大列存储格式之一,值得调研一下。
生成第一个 parquet 文件
使用 Apache Parquet 命令行工具,从 CSV 文件生成一个 parquet 文件。
准备工具
下载
wget https://github.com/apache/parquet-mr/archive/apache-parquet-1.10.0.tar.gz tar xvf parquet-mr-apache-parquet-1.10.0.tar.gz
编译
cd parquet-cli mvn clean install -DskipTests
运行
mvn dependency:copy-dependencies java -cp 'target/*:target/dependency/*' org.apache.parquet.cli.Main
使用工具
准备 CSV 文件
sample.csv
, 内容如下:
a,b,c 0,a,0.0 1,b,1.1 2,c,2.2 3,d, 4,,4.4 ,f,5.5 ,, 7,h,7.7 8,i,8.8 9,j,9.9
生成 parquet 文件
java -cp target/parquet-cli-1.10.0.jar:target/dependency/parquet-avro-1.10.0.jar:target/dependency/* org.apache.parquet.cli.Main convert-csv sample.csv -o sample.parquet
可见目录下多了一个 sample.parquet
文件。
这里没有指定 avsc 格式的 schema 文件,cli 会生成默认的 schema 文件。
生成 CSV schema(可选)
schema 文件长什么样子呢?我们可以通过 cli 来生成单独的 schema 文件。
java -cp 'target/*:target/dependency/*' org.apache.parquet.cli.Main csv-schema sample.csv --record-name sample -o sample.avsc
结果保存在 sample.avsc
中,内容如下:
$ cat sample.avsc { "type" : "record", "name" : "sample", "fields" : [ { "name" : "a", "type" : [ "null", "long" ], "doc" : "Type inferred from '0'", "default" : null }, { "name" : "b", "type" : [ "null", "string" ], "doc" : "Type inferred from 'a'", "default" : null }, { "name" : "c", "type" : [ "null", "double" ], "doc" : "Type inferred from '0.0'", "default" : null } ] }
工具集安装和使用
mac 下工具集安装
brew install parquet-tools
工具集包含两个文件
$ brew list parquet-tools /usr/local/Cellar/parquet-tools/1.9.0/bin/parquet-tools /usr/local/Cellar/parquet-tools/1.9.0/libexec/parquet-tools-1.9.0.jar
这两个工具的效果是相同的,下面以 parquet-tools
来举例。
parquet-tools 使用
其中包含 6 个方法
usage: parquet-tools cat [option...] <input>
usage: parquet-tools head [option...] <input>
usage: parquet-tools schema [option...] <input>
usage: parquet-tools meta [option...] <input>
usage: parquet-tools dump [option...] <input>
usage: parquet-tools merge [option...] <input> [<input> ...] <output>
cat
parquet-tools cat sample.parquet a = 0 b = a c = 0.0 a = 1 b = b c = 1.1 a = 2 b = c c = 2.2 a = 3 b = d a = 4 b = c = 4.4 b = f c = 5.5 b = a = 7 b = h c = 7.7 a = 8 b = i c = 8.8 a = 9 b = j c = 9.9
head
parquet-tools head sample.parquet a = 0 b = a c = 0.0 a = 1 b = b c = 1.1 a = 2 b = c c = 2.2 a = 3 b = d a = 4 b = c = 4.4
schema
parquet-tools schema sample.parquet message sample { optional int64 a; optional binary b (UTF8); optional double c; }
meta
$ parquet-tools meta sample.parquet file: file:/Users/note/tmp/parquet-mr-apache-parquet-1.10.0/parquet-cli/sample.parquet creator: parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a) extra: parquet.avro.schema = {"type":"record","name":"sample","fields":[{"name":"a","type":["null","long"],"doc":"Type inferred from '0'","default":null},{"name":"b","type":["null","string"],"doc":"Type inferred from 'a'","default":null},{"name":"c","type":["null","double"],"doc":"Type inferred from '0.0'","default":null}]} extra: writer.model.name = avro file schema: sample -------------------------------------------------------------------------------- a: OPTIONAL INT64 R:0 D:1 b: OPTIONAL BINARY O:UTF8 R:0 D:1 c: OPTIONAL DOUBLE R:0 D:1 row group 1: RC:10 TS:346 OFFSET:4 -------------------------------------------------------------------------------- a: INT64 GZIP DO:0 FPO:4 SZ:109/133/1.22 VC:10 ENC:BIT_PACKED,PLAIN,RLE b: BINARY GZIP DO:0 FPO:113 SZ:76/80/1.05 VC:10 ENC:BIT_PACKED,PLAIN,RLE c: DOUBLE GZIP DO:0 FPO:189 SZ:120/133/1.11 VC:10 ENC:BIT_PACKED,PLAIN,RLE
dump
parquet-tools dump sample.parquet row group 0 -------------------------------------------------------------------------------- a: INT64 GZIP DO:0 FPO:4 SZ:109/133/1.22 VC:10 ENC:PLAIN,RLE,BIT_PACKED b: BINARY GZIP DO:0 FPO:113 SZ:76/80/1.05 VC:10 ENC:PLAIN,RLE,BIT_PACKED c: DOUBLE GZIP DO:0 FPO:189 SZ:120/133/1.11 VC:10 ENC:PLAIN,RLE,BIT_PACKED a TV=10 RL=0 DL=1 ---------------------------------------------------------------------------- page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: 0, max: 9, num_nulls: 2] [more]... b TV=10 RL=0 DL=1 ---------------------------------------------------------------------------- page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[no stats for this column] [more]... c TV=10 RL=0 DL=1 ---------------------------------------------------------------------------- page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: 0.00000, max: 9 [more]... VC:10 INT64 a -------------------------------------------------------------------------------- *** row group 1 of 1, values 1 to 10 *** value 1: R:0 D:1 V:0 value 2: R:0 D:1 V:1 value 3: R:0 D:1 V:2 value 4: R:0 D:1 V:3 value 5: R:0 D:1 V:4 value 6: R:0 D:0 V:<null> value 7: R:0 D:0 V:<null> value 8: R:0 D:1 V:7 value 9: R:0 D:1 V:8 value 10: R:0 D:1 V:9 BINARY b -------------------------------------------------------------------------------- *** row group 1 of 1, values 1 to 10 *** value 1: R:0 D:1 V:a value 2: R:0 D:1 V:b value 3: R:0 D:1 V:c value 4: R:0 D:1 V:d value 5: R:0 D:1 V: value 6: R:0 D:1 V:f value 7: R:0 D:1 V: value 8: R:0 D:1 V:h value 9: R:0 D:1 V:i value 10: R:0 D:1 V:j DOUBLE c -------------------------------------------------------------------------------- *** row group 1 of 1, values 1 to 10 *** value 1: R:0 D:1 V:0.0 value 2: R:0 D:1 V:1.1 value 3: R:0 D:1 V:2.2 value 4: R:0 D:0 V:<null> value 5: R:0 D:1 V:4.4 value 6: R:0 D:1 V:5.5 value 7: R:0 D:0 V:<null> value 8: R:0 D:1 V:7.7 value 9: R:0 D:1 V:8.8 value 10: R:0 D:1 V:9.9
merge
merge 两个 parquet 文件
$ parquet-tools merge sample.parquet sample.parquet merge.parquet
prove
$ parquet-tools cat sample.parquet | grep ^a | wc -l 8 $ parquet-tools cat merge.parquet | grep ^a | wc -l 16
使用 Java 读写 parquet 文件
一般不会直接使用 Java 写 parquet 文件,而是直接建一张 hive 表,表格式设置为 parquet,通过 hive 写入数据保存成 parquet 格式。
所以这段代码没有实用价值,只有用来学习 parquet 格式使用。
pom 文件
<dependencies> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>0.23.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.1</version> </dependency> </dependencies>
写文件
WriteExample
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Record; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; public class WriteExample { public static void main(String[] args) throws IllegalArgumentException, IOException { List<Field> fields = new ArrayList<Field>(); Object defaultValue = null; fields.add(new Field("x", Schema.create(Type.INT), "x", defaultValue)); fields.add(new Field("y", Schema.create(Type.INT), "y", defaultValue)); Schema schema = Schema.createRecord("name", "doc", "namespace", false, fields); try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder( new Path("my-file.parquet")).withSchema(schema).withCompressionCodec(CompressionCodecName.SNAPPY) .build()) { // 模拟10000行数据 for (int r = 0; r < 10000; ++r) { Record record = new Record(schema); record.put(0, r); record.put(1, r * 3); writer.write(record); } } } }
读文件
import java.io.IOException; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.hadoop.ParquetReader; public class ReadExample { public static void main(String[] args) throws IllegalArgumentException, IOException { ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path("my-file.parquet")) .build(); GenericRecord record; while ((record = reader.read()) != null) { System.out.println(record); } } }
部分结果
{"x": 0, "y": 0} {"x": 1, "y": 3} {"x": 2, "y": 6} {"x": 3, "y": 9} {"x": 4, "y": 12} {"x": 5, "y": 15}
注意:orc 和 parquet 依赖的包可能有冲突。本例子中原来引了 orc 的包导致依赖冲突,后来把 orc 的包移除掉就好了。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于