Apache ORC 格式文件使用 Java 读写

本贴最后更新于 1312 天前,其中的信息可能已经水流花落

背景

接前文 Apache ORC 格式简介和使用工具读写

Java 类设计

VectorizedRowBatch

ORC 中的数据,在内存中的类表示为 org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch,如下:

public class VectorizedRowBatch implements Writable {
  public int numCols;           // number of columns
  public ColumnVector[] cols;   // a vector for each column
  public int size;              // number of rows that qualify (i.e. haven't been filtered out)
  ...
}

ColumnVector

每列的内容存储在 org.apache.hadoop.hive.ql.exec.vector.ColumnVector 中,如下:

public abstract class ColumnVector {

  public final Type type;
  
  // If the whole column vector has no nulls, this is true, otherwise false.
  public boolean noNulls;

  /*
   * If hasNulls is true, then this array contains true if the value
   * is null, otherwise false. The array is always allocated, so a batch can be re-used
   * later and nulls added.
   */
  public boolean[] isNull;

  /*
   * True if same value repeats for whole column vector.
   * If so, vector[0] holds the repeating value.
   */
  public boolean isRepeating;
  ...
}

可见对于没有 null 的情景,和值全相同的情景,做了特殊优化。

其子类型包括:

ORC 类型 列类型
array ListColumnVector
binary BytesColumnVector
bigint LongColumnVector
boolean LongColumnVector
char BytesColumnVector
date LongColumnVector
decimal DecimalColumnVector
double DoubleColumnVector
float DoubleColumnVector
int LongColumnVector
map MapColumnVector
smallint LongColumnVector
string BytesColumnVector
struct StructColumnVector
timestamp TimestampColumnVector
tinyint LongColumnVector
uniontype UnionColumnVector
varchar BytesColumnVector

LongColumnVector

LongColumnVector 处理所有的整数类型,包括 boolean, bigint, date, int, smallinttinyint

public class LongColumnVector extends ColumnVector {
  public long[] vector;
  ...
}

TimestampColumnVector

TimestampColumnVector 处理时间戳类型

public class TimestampColumnVector extends ColumnVector {

  /*
   * The values from Timestamp.getTime().
   */
  public long[] time;

  /*
   * The values from Timestamp.getNanos().
   */
  public int[] nanos;
  ...      
}

DoubleColumnVector

DoubleColumnVector 处理所有的浮点数,包括 double 和 flat

public class DoubleColumnVector extends ColumnVector {
  public double[] vector;
  public  short  scale;
  public  short  precision;
  ...
}

DecimalColumnVector

DecimalColumnVector 处理十进制小数类型。但是其实现不是为性能设计的,以后可能会变化

public class DecimalColumnVector extends ColumnVector {

  /**
   * A vector of HiveDecimalWritable objects.
   *
   * For high performance and easy access to this low-level structure,
   * the fields are public by design (as they are in other ColumnVector
   * types).
   */
  public HiveDecimalWritable[] vector;
  ...
}

BytesColumnVector

BytesColumnVector 处理所有的二进制类型,包括 binary, char, string 和 varchar。

public class BytesColumnVector extends ColumnVector {
  public byte[][] vector;
  public int[] start;          // start offset of each field

  /*
   * The length of each field. If the value repeats for every entry, then it is stored
   * in vector[0] and isRepeating from the superclass is set to true.
   */
  public int[] length;

StructColumnVector

StructColumnVector 是结构体类型,其实现是一个 ColumnVector 的数组,巧妙不过自然。

public class StructColumnVector extends ColumnVector {

  public ColumnVector[] fields;
  ...
}

UnionColumnVector

UnionColumnVector 是联合类型,其中的元素共享存储空间。实现如下:

public class UnionColumnVector extends ColumnVector {

  public int[] tags;
  public ColumnVector[] fields;
  ...
}

其中的 tags 来标识元素子类型,fields[tag]标识对应的值。

MultiValuedColumnVector

MultiValuedColumnVector 为多值列,需要记录每个值的起始点和长度。

public abstract class MultiValuedColumnVector extends ColumnVector {

  public long[] offsets;
  public long[] lengths;
  // the number of children slots used
  public int childCount;
  ...
}

ListColumnVector

ListColumnVector 处理数组列

public class ListColumnVector extends MultiValuedColumnVector {

  public ColumnVector child;
  ...
}

其中 list[i]的值为 offsets[i]..offsets[i]+lengths[i]-1 inclusive.

MapColumnVector

MapColumnVector 处理 map 列

public class MapColumnVector extends MultiValuedColumnVector {

  public ColumnVector keys;
  public ColumnVector values;
  ...
}

其它类型不再罗列了,具体可以参看官方文档 Apache ORC: Using Core Java

Java 写 ORC 文件

快速上手示例

步骤

  1. 定义 schema
  2. 使用 OrcFile 类创建 Writer

pom 文件

		<dependency>
			<groupId>org.apache.orc</groupId>
			<artifactId>orc-core</artifactId>
			<version>1.5.1</version>
		</dependency>

示例如下

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

public class WriteExample {
	public static void main(String[] args) throws IllegalArgumentException, IOException {
		Configuration conf = new Configuration();

		// 定义schema
		TypeDescription schema = TypeDescription.fromString("struct<x:int,y:int>");

		// 创建writer
		Writer writer = OrcFile.createWriter(new Path("my-file.orc"), OrcFile.writerOptions(conf).setSchema(schema));

		// 写文件
		VectorizedRowBatch batch = schema.createRowBatch();
		LongColumnVector x = (LongColumnVector) batch.cols[0];
		LongColumnVector y = (LongColumnVector) batch.cols[1];

		// 模拟10000行数据
		for (int r = 0; r < 10000; ++r) {
			int row = batch.size++;
			x.vector[row] = r;
			y.vector[row] = r * 3;

			// 默认每个batch为1024行,如果满了,则新起一个batch.
			if (batch.size == batch.getMaxSize()) {
				writer.addRowBatch(batch);
				batch.reset();
			}
		}
		if (batch.size != 0) {
			writer.addRowBatch(batch);
			batch.reset();
		}
		writer.close();
	}
}

高级示例

写入 Map 类型

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

public class AdvancedWriteExample {
	public static void main(String[] args) throws IllegalArgumentException, IOException {
		Configuration conf = new Configuration();

		// 定义schema
		TypeDescription schema = TypeDescription.fromString("struct<first:int,second:int,third:map<string,int>>");

		// 创建writer
		Writer writer = OrcFile.createWriter(new Path("advanced-example.orc"),
				OrcFile.writerOptions(conf).setSchema(schema));

		// 写文件
		VectorizedRowBatch batch = schema.createRowBatch();

		// 定义前两列
		LongColumnVector first = (LongColumnVector) batch.cols[0];
		LongColumnVector second = (LongColumnVector) batch.cols[1];

		// 定义map列,对key和value要做cast
		MapColumnVector map = (MapColumnVector) batch.cols[2];
		BytesColumnVector mapKey = (BytesColumnVector) map.keys;
		LongColumnVector mapValue = (LongColumnVector) map.values;

		// 每个map包含5个元素
		final int MAP_SIZE = 5;
		final int BATCH_SIZE = batch.getMaxSize();

		// 确保map的空间充足
		mapKey.ensureSize(BATCH_SIZE * MAP_SIZE, false);
		mapValue.ensureSize(BATCH_SIZE * MAP_SIZE, false);

		// 增加1500行到文件中
		for (int r = 0; r < 1500; ++r) {
			int row = batch.size++;

			// 处理前两列
			first.vector[row] = r;
			second.vector[row] = r * 3;

			// 处理map列偏移
			map.offsets[row] = map.childCount;
			map.lengths[row] = MAP_SIZE;
			map.childCount += MAP_SIZE;

			// 处理map列的值
			for (int mapElem = (int) map.offsets[row]; mapElem < map.offsets[row] + MAP_SIZE; ++mapElem) {
				String key = "row " + r + "." + (mapElem - map.offsets[row]);
				mapKey.setVal(mapElem, key.getBytes(StandardCharsets.UTF_8));
				mapValue.vector[mapElem] = mapElem;
			}

			// 默认每个batch为1024行,如果满了,则新起一个batch.
			if (row == BATCH_SIZE - 1) {
				writer.addRowBatch(batch);
				batch.reset();
			}
		}
		if (batch.size != 0) {
			writer.addRowBatch(batch);
			batch.reset();
		}
		writer.close();
	}
}

Java 读 ORC 文件

步骤

  1. 使用 OrcFile 创建 Reader
  2. 读取文件

示例

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;

public class ReaderExample {
	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();

		// 使用OrcFile创建Reader
		Reader reader = OrcFile.createReader(new Path("my-file.orc"), OrcFile.readerOptions(conf));

		// 读取文件
		RecordReader rows = reader.rows();
		VectorizedRowBatch batch = reader.getSchema().createRowBatch();

		while (rows.nextBatch(batch)) {
			System.out.println(batch.size);
			for (int r = 0; r < batch.size; ++r) {
				// ... process row r from batch
				// System.out.println(r);
			}
		}
		rows.close();
	}
}

结果

1024
1024
1024
1024
1024
1024
1024
1024
1024
784

参考

  • 存储
    21 引用 • 28 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...