背景
Java 类设计
VectorizedRowBatch
ORC 中的数据,在内存中的类表示为 org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch
,如下:
public class VectorizedRowBatch implements Writable {
public int numCols; // number of columns
public ColumnVector[] cols; // a vector for each column
public int size; // number of rows that qualify (i.e. haven't been filtered out)
...
}
ColumnVector
每列的内容存储在 org.apache.hadoop.hive.ql.exec.vector.ColumnVector
中,如下:
public abstract class ColumnVector {
public final Type type;
// If the whole column vector has no nulls, this is true, otherwise false.
public boolean noNulls;
/*
* If hasNulls is true, then this array contains true if the value
* is null, otherwise false. The array is always allocated, so a batch can be re-used
* later and nulls added.
*/
public boolean[] isNull;
/*
* True if same value repeats for whole column vector.
* If so, vector[0] holds the repeating value.
*/
public boolean isRepeating;
...
}
可见对于没有 null 的情景,和值全相同的情景,做了特殊优化。
其子类型包括:
ORC 类型 | 列类型 | ||
---|---|---|---|
array | ListColumnVector | ||
binary | BytesColumnVector | ||
bigint | LongColumnVector | ||
boolean | LongColumnVector | ||
char | BytesColumnVector | ||
date | LongColumnVector | ||
decimal | DecimalColumnVector | ||
double | DoubleColumnVector | ||
float | DoubleColumnVector | ||
int | LongColumnVector | ||
map | MapColumnVector | ||
smallint | LongColumnVector | ||
string | BytesColumnVector | ||
struct | StructColumnVector | ||
timestamp | TimestampColumnVector | ||
tinyint | LongColumnVector | ||
uniontype | UnionColumnVector | ||
varchar | BytesColumnVector |
LongColumnVector
LongColumnVector
处理所有的整数类型,包括 boolean
, bigint
, date
, int
, smallint
和 tinyint
public class LongColumnVector extends ColumnVector {
public long[] vector;
...
}
TimestampColumnVector
TimestampColumnVector
处理时间戳类型
public class TimestampColumnVector extends ColumnVector {
/*
* The values from Timestamp.getTime().
*/
public long[] time;
/*
* The values from Timestamp.getNanos().
*/
public int[] nanos;
...
}
DoubleColumnVector
DoubleColumnVector
处理所有的浮点数,包括 double 和 flat
public class DoubleColumnVector extends ColumnVector {
public double[] vector;
public short scale;
public short precision;
...
}
DecimalColumnVector
DecimalColumnVector
处理十进制小数类型。但是其实现不是为性能设计的,以后可能会变化
public class DecimalColumnVector extends ColumnVector {
/**
* A vector of HiveDecimalWritable objects.
*
* For high performance and easy access to this low-level structure,
* the fields are public by design (as they are in other ColumnVector
* types).
*/
public HiveDecimalWritable[] vector;
...
}
BytesColumnVector
BytesColumnVector
处理所有的二进制类型,包括 binary, char, string 和 varchar。
public class BytesColumnVector extends ColumnVector {
public byte[][] vector;
public int[] start; // start offset of each field
/*
* The length of each field. If the value repeats for every entry, then it is stored
* in vector[0] and isRepeating from the superclass is set to true.
*/
public int[] length;
StructColumnVector
StructColumnVector
是结构体类型,其实现是一个 ColumnVector 的数组,巧妙不过自然。
public class StructColumnVector extends ColumnVector {
public ColumnVector[] fields;
...
}
UnionColumnVector
UnionColumnVector
是联合类型,其中的元素共享存储空间。实现如下:
public class UnionColumnVector extends ColumnVector {
public int[] tags;
public ColumnVector[] fields;
...
}
其中的 tags 来标识元素子类型,fields[tag]标识对应的值。
MultiValuedColumnVector
MultiValuedColumnVector
为多值列,需要记录每个值的起始点和长度。
public abstract class MultiValuedColumnVector extends ColumnVector {
public long[] offsets;
public long[] lengths;
// the number of children slots used
public int childCount;
...
}
ListColumnVector
ListColumnVector
处理数组列
public class ListColumnVector extends MultiValuedColumnVector {
public ColumnVector child;
...
}
其中 list[i]的值为 offsets[i]..offsets[i]+lengths[i]-1 inclusive.
MapColumnVector
MapColumnVector
处理 map 列
public class MapColumnVector extends MultiValuedColumnVector {
public ColumnVector keys;
public ColumnVector values;
...
}
其它类型不再罗列了,具体可以参看官方文档 Apache ORC: Using Core Java
Java 写 ORC 文件
快速上手示例
步骤
- 定义 schema
- 使用 OrcFile 类创建 Writer
pom 文件
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>1.5.1</version>
</dependency>
示例如下
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
public class WriteExample {
public static void main(String[] args) throws IllegalArgumentException, IOException {
Configuration conf = new Configuration();
// 定义schema
TypeDescription schema = TypeDescription.fromString("struct<x:int,y:int>");
// 创建writer
Writer writer = OrcFile.createWriter(new Path("my-file.orc"), OrcFile.writerOptions(conf).setSchema(schema));
// 写文件
VectorizedRowBatch batch = schema.createRowBatch();
LongColumnVector x = (LongColumnVector) batch.cols[0];
LongColumnVector y = (LongColumnVector) batch.cols[1];
// 模拟10000行数据
for (int r = 0; r < 10000; ++r) {
int row = batch.size++;
x.vector[row] = r;
y.vector[row] = r * 3;
// 默认每个batch为1024行,如果满了,则新起一个batch.
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
writer.addRowBatch(batch);
batch.reset();
}
writer.close();
}
}
高级示例
写入 Map 类型
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
public class AdvancedWriteExample {
public static void main(String[] args) throws IllegalArgumentException, IOException {
Configuration conf = new Configuration();
// 定义schema
TypeDescription schema = TypeDescription.fromString("struct<first:int,second:int,third:map<string,int>>");
// 创建writer
Writer writer = OrcFile.createWriter(new Path("advanced-example.orc"),
OrcFile.writerOptions(conf).setSchema(schema));
// 写文件
VectorizedRowBatch batch = schema.createRowBatch();
// 定义前两列
LongColumnVector first = (LongColumnVector) batch.cols[0];
LongColumnVector second = (LongColumnVector) batch.cols[1];
// 定义map列,对key和value要做cast
MapColumnVector map = (MapColumnVector) batch.cols[2];
BytesColumnVector mapKey = (BytesColumnVector) map.keys;
LongColumnVector mapValue = (LongColumnVector) map.values;
// 每个map包含5个元素
final int MAP_SIZE = 5;
final int BATCH_SIZE = batch.getMaxSize();
// 确保map的空间充足
mapKey.ensureSize(BATCH_SIZE * MAP_SIZE, false);
mapValue.ensureSize(BATCH_SIZE * MAP_SIZE, false);
// 增加1500行到文件中
for (int r = 0; r < 1500; ++r) {
int row = batch.size++;
// 处理前两列
first.vector[row] = r;
second.vector[row] = r * 3;
// 处理map列偏移
map.offsets[row] = map.childCount;
map.lengths[row] = MAP_SIZE;
map.childCount += MAP_SIZE;
// 处理map列的值
for (int mapElem = (int) map.offsets[row]; mapElem < map.offsets[row] + MAP_SIZE; ++mapElem) {
String key = "row " + r + "." + (mapElem - map.offsets[row]);
mapKey.setVal(mapElem, key.getBytes(StandardCharsets.UTF_8));
mapValue.vector[mapElem] = mapElem;
}
// 默认每个batch为1024行,如果满了,则新起一个batch.
if (row == BATCH_SIZE - 1) {
writer.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
writer.addRowBatch(batch);
batch.reset();
}
writer.close();
}
}
Java 读 ORC 文件
步骤
- 使用 OrcFile 创建 Reader
- 读取文件
示例
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
public class ReaderExample {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
// 使用OrcFile创建Reader
Reader reader = OrcFile.createReader(new Path("my-file.orc"), OrcFile.readerOptions(conf));
// 读取文件
RecordReader rows = reader.rows();
VectorizedRowBatch batch = reader.getSchema().createRowBatch();
while (rows.nextBatch(batch)) {
System.out.println(batch.size);
for (int r = 0; r < batch.size; ++r) {
// ... process row r from batch
// System.out.println(r);
}
}
rows.close();
}
}
结果
1024
1024
1024
1024
1024
1024
1024
1024
1024
784
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于