背景
Parquet 是以 Twitter 和 Cloudera 为主,在 2013 年推出的列存储格式。
Parquet 使用了 Dremel paper 中提到的日志编码算法。
虽然已经出现了 5 年,但是目前仍然是主流的两大列存储格式之一,值得调研一下。
生成第一个 parquet 文件
使用 Apache Parquet 命令行工具,从 CSV 文件生成一个 parquet 文件。
准备工具
下载
wget https://github.com/apache/parquet-mr/archive/apache-parquet-1.10.0.tar.gz
tar xvf parquet-mr-apache-parquet-1.10.0.tar.gz
编译
cd parquet-cli
mvn clean install -DskipTests
运行
mvn dependency:copy-dependencies
java -cp 'target/*:target/dependency/*' org.apache.parquet.cli.Main
使用工具
准备 CSV 文件
sample.csv
, 内容如下:
a,b,c
0,a,0.0
1,b,1.1
2,c,2.2
3,d,
4,,4.4
,f,5.5
,,
7,h,7.7
8,i,8.8
9,j,9.9
生成 parquet 文件
java -cp target/parquet-cli-1.10.0.jar:target/dependency/parquet-avro-1.10.0.jar:target/dependency/* org.apache.parquet.cli.Main convert-csv sample.csv -o sample.parquet
可见目录下多了一个 sample.parquet
文件。
这里没有指定 avsc 格式的 schema 文件,cli 会生成默认的 schema 文件。
生成 CSV schema(可选)
schema 文件长什么样子呢?我们可以通过 cli 来生成单独的 schema 文件。
java -cp 'target/*:target/dependency/*' org.apache.parquet.cli.Main csv-schema sample.csv --record-name sample -o sample.avsc
结果保存在 sample.avsc
中,内容如下:
$ cat sample.avsc
{
"type" : "record",
"name" : "sample",
"fields" : [ {
"name" : "a",
"type" : [ "null", "long" ],
"doc" : "Type inferred from '0'",
"default" : null
}, {
"name" : "b",
"type" : [ "null", "string" ],
"doc" : "Type inferred from 'a'",
"default" : null
}, {
"name" : "c",
"type" : [ "null", "double" ],
"doc" : "Type inferred from '0.0'",
"default" : null
} ]
}
工具集安装和使用
mac 下工具集安装
brew install parquet-tools
工具集包含两个文件
$ brew list parquet-tools
/usr/local/Cellar/parquet-tools/1.9.0/bin/parquet-tools
/usr/local/Cellar/parquet-tools/1.9.0/libexec/parquet-tools-1.9.0.jar
这两个工具的效果是相同的,下面以 parquet-tools
来举例。
parquet-tools 使用
其中包含 6 个方法
usage: parquet-tools cat [option...] <input>
usage: parquet-tools head [option...] <input>
usage: parquet-tools schema [option...] <input>
usage: parquet-tools meta [option...] <input>
usage: parquet-tools dump [option...] <input>
usage: parquet-tools merge [option...] <input> [<input> ...] <output>
cat
parquet-tools cat sample.parquet
a = 0
b = a
c = 0.0
a = 1
b = b
c = 1.1
a = 2
b = c
c = 2.2
a = 3
b = d
a = 4
b =
c = 4.4
b = f
c = 5.5
b =
a = 7
b = h
c = 7.7
a = 8
b = i
c = 8.8
a = 9
b = j
c = 9.9
head
parquet-tools head sample.parquet
a = 0
b = a
c = 0.0
a = 1
b = b
c = 1.1
a = 2
b = c
c = 2.2
a = 3
b = d
a = 4
b =
c = 4.4
schema
parquet-tools schema sample.parquet
message sample {
optional int64 a;
optional binary b (UTF8);
optional double c;
}
meta
$ parquet-tools meta sample.parquet
file: file:/Users/note/tmp/parquet-mr-apache-parquet-1.10.0/parquet-cli/sample.parquet
creator: parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a)
extra: parquet.avro.schema = {"type":"record","name":"sample","fields":[{"name":"a","type":["null","long"],"doc":"Type inferred from '0'","default":null},{"name":"b","type":["null","string"],"doc":"Type inferred from 'a'","default":null},{"name":"c","type":["null","double"],"doc":"Type inferred from '0.0'","default":null}]}
extra: writer.model.name = avro
file schema: sample
--------------------------------------------------------------------------------
a: OPTIONAL INT64 R:0 D:1
b: OPTIONAL BINARY O:UTF8 R:0 D:1
c: OPTIONAL DOUBLE R:0 D:1
row group 1: RC:10 TS:346 OFFSET:4
--------------------------------------------------------------------------------
a: INT64 GZIP DO:0 FPO:4 SZ:109/133/1.22 VC:10 ENC:BIT_PACKED,PLAIN,RLE
b: BINARY GZIP DO:0 FPO:113 SZ:76/80/1.05 VC:10 ENC:BIT_PACKED,PLAIN,RLE
c: DOUBLE GZIP DO:0 FPO:189 SZ:120/133/1.11 VC:10 ENC:BIT_PACKED,PLAIN,RLE
dump
parquet-tools dump sample.parquet
row group 0
--------------------------------------------------------------------------------
a: INT64 GZIP DO:0 FPO:4 SZ:109/133/1.22 VC:10 ENC:PLAIN,RLE,BIT_PACKED
b: BINARY GZIP DO:0 FPO:113 SZ:76/80/1.05 VC:10 ENC:PLAIN,RLE,BIT_PACKED
c: DOUBLE GZIP DO:0 FPO:189 SZ:120/133/1.11 VC:10 ENC:PLAIN,RLE,BIT_PACKED
a TV=10 RL=0 DL=1
----------------------------------------------------------------------------
page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: 0, max: 9, num_nulls: 2] [more]...
b TV=10 RL=0 DL=1
----------------------------------------------------------------------------
page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[no stats for this column] [more]...
c TV=10 RL=0 DL=1
----------------------------------------------------------------------------
page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: 0.00000, max: 9 [more]... VC:10
INT64 a
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 10 ***
value 1: R:0 D:1 V:0
value 2: R:0 D:1 V:1
value 3: R:0 D:1 V:2
value 4: R:0 D:1 V:3
value 5: R:0 D:1 V:4
value 6: R:0 D:0 V:<null>
value 7: R:0 D:0 V:<null>
value 8: R:0 D:1 V:7
value 9: R:0 D:1 V:8
value 10: R:0 D:1 V:9
BINARY b
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 10 ***
value 1: R:0 D:1 V:a
value 2: R:0 D:1 V:b
value 3: R:0 D:1 V:c
value 4: R:0 D:1 V:d
value 5: R:0 D:1 V:
value 6: R:0 D:1 V:f
value 7: R:0 D:1 V:
value 8: R:0 D:1 V:h
value 9: R:0 D:1 V:i
value 10: R:0 D:1 V:j
DOUBLE c
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 10 ***
value 1: R:0 D:1 V:0.0
value 2: R:0 D:1 V:1.1
value 3: R:0 D:1 V:2.2
value 4: R:0 D:0 V:<null>
value 5: R:0 D:1 V:4.4
value 6: R:0 D:1 V:5.5
value 7: R:0 D:0 V:<null>
value 8: R:0 D:1 V:7.7
value 9: R:0 D:1 V:8.8
value 10: R:0 D:1 V:9.9
merge
merge 两个 parquet 文件
$ parquet-tools merge sample.parquet sample.parquet merge.parquet
prove
$ parquet-tools cat sample.parquet | grep ^a | wc -l
8
$ parquet-tools cat merge.parquet | grep ^a | wc -l
16
使用 Java 读写 parquet 文件
一般不会直接使用 Java 写 parquet 文件,而是直接建一张 hive 表,表格式设置为 parquet,通过 hive 写入数据保存成 parquet 格式。
所以这段代码没有实用价值,只有用来学习 parquet 格式使用。
pom 文件
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>0.23.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
</dependency>
</dependencies>
写文件
WriteExample
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class WriteExample {
public static void main(String[] args) throws IllegalArgumentException, IOException {
List<Field> fields = new ArrayList<Field>();
Object defaultValue = null;
fields.add(new Field("x", Schema.create(Type.INT), "x", defaultValue));
fields.add(new Field("y", Schema.create(Type.INT), "y", defaultValue));
Schema schema = Schema.createRecord("name", "doc", "namespace", false, fields);
try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(
new Path("my-file.parquet")).withSchema(schema).withCompressionCodec(CompressionCodecName.SNAPPY)
.build()) {
// 模拟10000行数据
for (int r = 0; r < 10000; ++r) {
Record record = new Record(schema);
record.put(0, r);
record.put(1, r * 3);
writer.write(record);
}
}
}
}
读文件
import java.io.IOException;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
public class ReadExample {
public static void main(String[] args) throws IllegalArgumentException, IOException {
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path("my-file.parquet"))
.build();
GenericRecord record;
while ((record = reader.read()) != null) {
System.out.println(record);
}
}
}
部分结果
{"x": 0, "y": 0}
{"x": 1, "y": 3}
{"x": 2, "y": 6}
{"x": 3, "y": 9}
{"x": 4, "y": 12}
{"x": 5, "y": 15}
注意:orc 和 parquet 依赖的包可能有冲突。本例子中原来引了 orc 的包导致依赖冲突,后来把 orc 的包移除掉就好了。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于