Apache Parquet 初体验

本贴最后更新于 2373 天前,其中的信息可能已经天翻地覆

背景

Parquet 是以 Twitter 和 Cloudera 为主,在 2013 年推出的列存储格式。

Parquet 使用了 Dremel paper 中提到的日志编码算法。

虽然已经出现了 5 年,但是目前仍然是主流的两大列存储格式之一,值得调研一下。

生成第一个 parquet 文件

使用 Apache Parquet 命令行工具,从 CSV 文件生成一个 parquet 文件。

准备工具

下载

wget https://github.com/apache/parquet-mr/archive/apache-parquet-1.10.0.tar.gz
tar xvf parquet-mr-apache-parquet-1.10.0.tar.gz

编译

cd parquet-cli
mvn clean install -DskipTests

运行

mvn dependency:copy-dependencies
java -cp 'target/*:target/dependency/*' org.apache.parquet.cli.Main

使用工具

准备 CSV 文件

sample.csv, 内容如下:

a,b,c
0,a,0.0
1,b,1.1
2,c,2.2
3,d,
4,,4.4
,f,5.5
,,
7,h,7.7
8,i,8.8
9,j,9.9

生成 parquet 文件

java -cp target/parquet-cli-1.10.0.jar:target/dependency/parquet-avro-1.10.0.jar:target/dependency/* org.apache.parquet.cli.Main convert-csv sample.csv -o sample.parquet

可见目录下多了一个 sample.parquet 文件。

这里没有指定 avsc 格式的 schema 文件,cli 会生成默认的 schema 文件。

生成 CSV schema(可选)

schema 文件长什么样子呢?我们可以通过 cli 来生成单独的 schema 文件。

java -cp 'target/*:target/dependency/*' org.apache.parquet.cli.Main csv-schema sample.csv --record-name sample -o sample.avsc

结果保存在 sample.avsc 中,内容如下:

$ cat sample.avsc
{
  "type" : "record",
  "name" : "sample",
  "fields" : [ {
    "name" : "a",
    "type" : [ "null", "long" ],
    "doc" : "Type inferred from '0'",
    "default" : null
  }, {
    "name" : "b",
    "type" : [ "null", "string" ],
    "doc" : "Type inferred from 'a'",
    "default" : null
  }, {
    "name" : "c",
    "type" : [ "null", "double" ],
    "doc" : "Type inferred from '0.0'",
    "default" : null
  } ]
}

工具集安装和使用

mac 下工具集安装

brew install parquet-tools

工具集包含两个文件

$ brew list parquet-tools
/usr/local/Cellar/parquet-tools/1.9.0/bin/parquet-tools
/usr/local/Cellar/parquet-tools/1.9.0/libexec/parquet-tools-1.9.0.jar

这两个工具的效果是相同的,下面以 parquet-tools 来举例。

parquet-tools 使用

其中包含 6 个方法

  • usage: parquet-tools cat [option...] <input>
  • usage: parquet-tools head [option...] <input>
  • usage: parquet-tools schema [option...] <input>
  • usage: parquet-tools meta [option...] <input>
  • usage: parquet-tools dump [option...] <input>
  • usage: parquet-tools merge [option...] <input> [<input> ...] <output>

cat

parquet-tools cat sample.parquet
a = 0
b = a
c = 0.0

a = 1
b = b
c = 1.1

a = 2
b = c
c = 2.2

a = 3
b = d

a = 4
b =
c = 4.4

b = f
c = 5.5

b =

a = 7
b = h
c = 7.7

a = 8
b = i
c = 8.8

a = 9
b = j
c = 9.9
parquet-tools head sample.parquet
a = 0
b = a
c = 0.0

a = 1
b = b
c = 1.1

a = 2
b = c
c = 2.2

a = 3
b = d

a = 4
b =
c = 4.4

schema

parquet-tools schema sample.parquet
message sample {
  optional int64 a;
  optional binary b (UTF8);
  optional double c;
}

meta

$ parquet-tools meta sample.parquet
file:        file:/Users/note/tmp/parquet-mr-apache-parquet-1.10.0/parquet-cli/sample.parquet
creator:     parquet-mr version 1.10.0 (build 031a6654009e3b82020012a18434c582bd74c73a)
extra:       parquet.avro.schema = {"type":"record","name":"sample","fields":[{"name":"a","type":["null","long"],"doc":"Type inferred from '0'","default":null},{"name":"b","type":["null","string"],"doc":"Type inferred from 'a'","default":null},{"name":"c","type":["null","double"],"doc":"Type inferred from '0.0'","default":null}]}
extra:       writer.model.name = avro

file schema: sample
--------------------------------------------------------------------------------
a:           OPTIONAL INT64 R:0 D:1
b:           OPTIONAL BINARY O:UTF8 R:0 D:1
c:           OPTIONAL DOUBLE R:0 D:1

row group 1: RC:10 TS:346 OFFSET:4
--------------------------------------------------------------------------------
a:            INT64 GZIP DO:0 FPO:4 SZ:109/133/1.22 VC:10 ENC:BIT_PACKED,PLAIN,RLE
b:            BINARY GZIP DO:0 FPO:113 SZ:76/80/1.05 VC:10 ENC:BIT_PACKED,PLAIN,RLE
c:            DOUBLE GZIP DO:0 FPO:189 SZ:120/133/1.11 VC:10 ENC:BIT_PACKED,PLAIN,RLE

dump

parquet-tools dump sample.parquet
row group 0
--------------------------------------------------------------------------------
a:  INT64 GZIP DO:0 FPO:4 SZ:109/133/1.22 VC:10 ENC:PLAIN,RLE,BIT_PACKED
b:  BINARY GZIP DO:0 FPO:113 SZ:76/80/1.05 VC:10 ENC:PLAIN,RLE,BIT_PACKED
c:  DOUBLE GZIP DO:0 FPO:189 SZ:120/133/1.11 VC:10 ENC:PLAIN,RLE,BIT_PACKED

    a TV=10 RL=0 DL=1
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: 0, max: 9, num_nulls: 2] [more]...

    b TV=10 RL=0 DL=1
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[no stats for this column] [more]...

    c TV=10 RL=0 DL=1
    ----------------------------------------------------------------------------
    page 0:  DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: 0.00000, max: 9 [more]... VC:10

INT64 a
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 10 ***
value 1:  R:0 D:1 V:0
value 2:  R:0 D:1 V:1
value 3:  R:0 D:1 V:2
value 4:  R:0 D:1 V:3
value 5:  R:0 D:1 V:4
value 6:  R:0 D:0 V:<null>
value 7:  R:0 D:0 V:<null>
value 8:  R:0 D:1 V:7
value 9:  R:0 D:1 V:8
value 10: R:0 D:1 V:9

BINARY b
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 10 ***
value 1:  R:0 D:1 V:a
value 2:  R:0 D:1 V:b
value 3:  R:0 D:1 V:c
value 4:  R:0 D:1 V:d
value 5:  R:0 D:1 V:
value 6:  R:0 D:1 V:f
value 7:  R:0 D:1 V:
value 8:  R:0 D:1 V:h
value 9:  R:0 D:1 V:i
value 10: R:0 D:1 V:j

DOUBLE c
--------------------------------------------------------------------------------
*** row group 1 of 1, values 1 to 10 ***
value 1:  R:0 D:1 V:0.0
value 2:  R:0 D:1 V:1.1
value 3:  R:0 D:1 V:2.2
value 4:  R:0 D:0 V:<null>
value 5:  R:0 D:1 V:4.4
value 6:  R:0 D:1 V:5.5
value 7:  R:0 D:0 V:<null>
value 8:  R:0 D:1 V:7.7
value 9:  R:0 D:1 V:8.8
value 10: R:0 D:1 V:9.9

merge

merge 两个 parquet 文件

$ parquet-tools merge sample.parquet sample.parquet merge.parquet

prove

$ parquet-tools cat sample.parquet | grep ^a | wc -l
       8

$ parquet-tools cat merge.parquet | grep ^a | wc -l
      16

使用 Java 读写 parquet 文件

一般不会直接使用 Java 写 parquet 文件,而是直接建一张 hive 表,表格式设置为 parquet,通过 hive 写入数据保存成 parquet 格式。

所以这段代码没有实用价值,只有用来学习 parquet 格式使用。

pom 文件

	<dependencies>
		<dependency>
			<groupId>org.apache.parquet</groupId>
			<artifactId>parquet-avro</artifactId>
			<version>1.10.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-core</artifactId>
			<version>0.23.1</version>
		</dependency>

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>2.7.1</version>
		</dependency>
	</dependencies>

写文件

WriteExample

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class WriteExample {
	public static void main(String[] args) throws IllegalArgumentException, IOException {

		List<Field> fields = new ArrayList<Field>();
		Object defaultValue = null;
		fields.add(new Field("x", Schema.create(Type.INT), "x", defaultValue));
		fields.add(new Field("y", Schema.create(Type.INT), "y", defaultValue));

		Schema schema = Schema.createRecord("name", "doc", "namespace", false, fields);

		try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(
				new Path("my-file.parquet")).withSchema(schema).withCompressionCodec(CompressionCodecName.SNAPPY)
				.build()) {

			// 模拟10000行数据
			for (int r = 0; r < 10000; ++r) {
				Record record = new Record(schema);
				record.put(0, r);
				record.put(1, r * 3);
				writer.write(record);
			}
		}
	}
}

读文件

import java.io.IOException;

import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;

public class ReadExample {
	public static void main(String[] args) throws IllegalArgumentException, IOException {

		ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path("my-file.parquet"))
				.build();
		GenericRecord record;

		while ((record = reader.read()) != null) {
			System.out.println(record);
		}
	}
}

部分结果

{"x": 0, "y": 0}
{"x": 1, "y": 3}
{"x": 2, "y": 6}
{"x": 3, "y": 9}
{"x": 4, "y": 12}
{"x": 5, "y": 15}

注意:orc 和 parquet 依赖的包可能有冲突。本例子中原来引了 orc 的包导致依赖冲突,后来把 orc 的包移除掉就好了。

参考

  • 存储
    22 引用 • 28 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...