背景
Java 类设计
VectorizedRowBatch
ORC 中的数据,在内存中的类表示为 org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch
,如下:
public class VectorizedRowBatch implements Writable { public int numCols; // number of columns public ColumnVector[] cols; // a vector for each column public int size; // number of rows that qualify (i.e. haven't been filtered out) ... }
ColumnVector
每列的内容存储在 org.apache.hadoop.hive.ql.exec.vector.ColumnVector
中,如下:
public abstract class ColumnVector { public final Type type; // If the whole column vector has no nulls, this is true, otherwise false. public boolean noNulls; /* * If hasNulls is true, then this array contains true if the value * is null, otherwise false. The array is always allocated, so a batch can be re-used * later and nulls added. */ public boolean[] isNull; /* * True if same value repeats for whole column vector. * If so, vector[0] holds the repeating value. */ public boolean isRepeating; ... }
可见对于没有 null 的情景,和值全相同的情景,做了特殊优化。
其子类型包括:
ORC 类型 | 列类型 | ||
---|---|---|---|
array | ListColumnVector | ||
binary | BytesColumnVector | ||
bigint | LongColumnVector | ||
boolean | LongColumnVector | ||
char | BytesColumnVector | ||
date | LongColumnVector | ||
decimal | DecimalColumnVector | ||
double | DoubleColumnVector | ||
float | DoubleColumnVector | ||
int | LongColumnVector | ||
map | MapColumnVector | ||
smallint | LongColumnVector | ||
string | BytesColumnVector | ||
struct | StructColumnVector | ||
timestamp | TimestampColumnVector | ||
tinyint | LongColumnVector | ||
uniontype | UnionColumnVector | ||
varchar | BytesColumnVector |
LongColumnVector
LongColumnVector
处理所有的整数类型,包括 boolean
, bigint
, date
, int
, smallint
和 tinyint
public class LongColumnVector extends ColumnVector { public long[] vector; ... }
TimestampColumnVector
TimestampColumnVector
处理时间戳类型
public class TimestampColumnVector extends ColumnVector { /* * The values from Timestamp.getTime(). */ public long[] time; /* * The values from Timestamp.getNanos(). */ public int[] nanos; ... }
DoubleColumnVector
DoubleColumnVector
处理所有的浮点数,包括 double 和 flat
public class DoubleColumnVector extends ColumnVector { public double[] vector; public short scale; public short precision; ... }
DecimalColumnVector
DecimalColumnVector
处理十进制小数类型。但是其实现不是为性能设计的,以后可能会变化
public class DecimalColumnVector extends ColumnVector { /** * A vector of HiveDecimalWritable objects. * * For high performance and easy access to this low-level structure, * the fields are public by design (as they are in other ColumnVector * types). */ public HiveDecimalWritable[] vector; ... }
BytesColumnVector
BytesColumnVector
处理所有的二进制类型,包括 binary, char, string 和 varchar。
public class BytesColumnVector extends ColumnVector { public byte[][] vector; public int[] start; // start offset of each field /* * The length of each field. If the value repeats for every entry, then it is stored * in vector[0] and isRepeating from the superclass is set to true. */ public int[] length;
StructColumnVector
StructColumnVector
是结构体类型,其实现是一个 ColumnVector 的数组,巧妙不过自然。
public class StructColumnVector extends ColumnVector { public ColumnVector[] fields; ... }
UnionColumnVector
UnionColumnVector
是联合类型,其中的元素共享存储空间。实现如下:
public class UnionColumnVector extends ColumnVector { public int[] tags; public ColumnVector[] fields; ... }
其中的 tags 来标识元素子类型,fields[tag]标识对应的值。
MultiValuedColumnVector
MultiValuedColumnVector
为多值列,需要记录每个值的起始点和长度。
public abstract class MultiValuedColumnVector extends ColumnVector { public long[] offsets; public long[] lengths; // the number of children slots used public int childCount; ... }
ListColumnVector
ListColumnVector
处理数组列
public class ListColumnVector extends MultiValuedColumnVector { public ColumnVector child; ... }
其中 list[i]的值为 offsets[i]..offsets[i]+lengths[i]-1 inclusive.
MapColumnVector
MapColumnVector
处理 map 列
public class MapColumnVector extends MultiValuedColumnVector { public ColumnVector keys; public ColumnVector values; ... }
其它类型不再罗列了,具体可以参看官方文档 Apache ORC: Using Core Java
Java 写 ORC 文件
快速上手示例
步骤
- 定义 schema
- 使用 OrcFile 类创建 Writer
pom 文件
<dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> <version>1.5.1</version> </dependency>
示例如下
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; public class WriteExample { public static void main(String[] args) throws IllegalArgumentException, IOException { Configuration conf = new Configuration(); // 定义schema TypeDescription schema = TypeDescription.fromString("struct<x:int,y:int>"); // 创建writer Writer writer = OrcFile.createWriter(new Path("my-file.orc"), OrcFile.writerOptions(conf).setSchema(schema)); // 写文件 VectorizedRowBatch batch = schema.createRowBatch(); LongColumnVector x = (LongColumnVector) batch.cols[0]; LongColumnVector y = (LongColumnVector) batch.cols[1]; // 模拟10000行数据 for (int r = 0; r < 10000; ++r) { int row = batch.size++; x.vector[row] = r; y.vector[row] = r * 3; // 默认每个batch为1024行,如果满了,则新起一个batch. if (batch.size == batch.getMaxSize()) { writer.addRowBatch(batch); batch.reset(); } } if (batch.size != 0) { writer.addRowBatch(batch); batch.reset(); } writer.close(); } }
高级示例
写入 Map 类型
import java.io.IOException; import java.nio.charset.StandardCharsets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; public class AdvancedWriteExample { public static void main(String[] args) throws IllegalArgumentException, IOException { Configuration conf = new Configuration(); // 定义schema TypeDescription schema = TypeDescription.fromString("struct<first:int,second:int,third:map<string,int>>"); // 创建writer Writer writer = OrcFile.createWriter(new Path("advanced-example.orc"), OrcFile.writerOptions(conf).setSchema(schema)); // 写文件 VectorizedRowBatch batch = schema.createRowBatch(); // 定义前两列 LongColumnVector first = (LongColumnVector) batch.cols[0]; LongColumnVector second = (LongColumnVector) batch.cols[1]; // 定义map列,对key和value要做cast MapColumnVector map = (MapColumnVector) batch.cols[2]; BytesColumnVector mapKey = (BytesColumnVector) map.keys; LongColumnVector mapValue = (LongColumnVector) map.values; // 每个map包含5个元素 final int MAP_SIZE = 5; final int BATCH_SIZE = batch.getMaxSize(); // 确保map的空间充足 mapKey.ensureSize(BATCH_SIZE * MAP_SIZE, false); mapValue.ensureSize(BATCH_SIZE * MAP_SIZE, false); // 增加1500行到文件中 for (int r = 0; r < 1500; ++r) { int row = batch.size++; // 处理前两列 first.vector[row] = r; second.vector[row] = r * 3; // 处理map列偏移 map.offsets[row] = map.childCount; map.lengths[row] = MAP_SIZE; map.childCount += MAP_SIZE; // 处理map列的值 for (int mapElem = (int) map.offsets[row]; mapElem < map.offsets[row] + MAP_SIZE; ++mapElem) { String key = "row " + r + "." + (mapElem - map.offsets[row]); mapKey.setVal(mapElem, key.getBytes(StandardCharsets.UTF_8)); mapValue.vector[mapElem] = mapElem; } // 默认每个batch为1024行,如果满了,则新起一个batch. if (row == BATCH_SIZE - 1) { writer.addRowBatch(batch); batch.reset(); } } if (batch.size != 0) { writer.addRowBatch(batch); batch.reset(); } writer.close(); } }
Java 读 ORC 文件
步骤
- 使用 OrcFile 创建 Reader
- 读取文件
示例
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.RecordReader; public class ReaderExample { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); // 使用OrcFile创建Reader Reader reader = OrcFile.createReader(new Path("my-file.orc"), OrcFile.readerOptions(conf)); // 读取文件 RecordReader rows = reader.rows(); VectorizedRowBatch batch = reader.getSchema().createRowBatch(); while (rows.nextBatch(batch)) { System.out.println(batch.size); for (int r = 0; r < batch.size; ++r) { // ... process row r from batch // System.out.println(r); } } rows.close(); } }
结果
1024 1024 1024 1024 1024 1024 1024 1024 1024 784
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于