使用 Java API 将 txt 文件转换为 orc 文件
orc文件是hive中重要文件格式,在大数据中具有广泛的应用场景。orc文件是二进制文件,不能直接进行读取或者写入,这里介绍如何通过Java API将普通规范式文件转换为orc文件,并且将orc文件读到控制台。关于orc文件格式,这里不做详细介绍,有需要请参考文章,或者orc官网。
一、添加依赖
1,第一步,添加相关依赖(出处来源于官网),测试该程序时应具备Hadoop的相关环境依赖。
<dependencies>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
二、确定转换格式
2,确定我们转换的数据格式,我测试时的数据格式如下:
该数据时用Java程序随机生成的,第一列为数据索引,第二列为随机生成的单词,第三列为随机 生成的五位数,第五列为随机生成的一个汉字。我本人在进行测试的时候,生成了520M数据。
三、开发
package com.atguigu.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
public class WriteBigTextToOrc {
public static void main(String[] args) throws IOException {
String filePath = "data/normal/cyhData500.txt";
writeOrc(filePath);
}
public static void writeOrc(String fileName) throws IOException {
Configuration conf = new Configuration();
//确定每一列的数据类型
TypeDescription schema = TypeDescription.createStruct()
.addField("filed1",TypeDescription.createInt())
.addField("filed2",TypeDescription.createString())
.addField("filed3",TypeDescription.createInt())
.addField("filed4",TypeDescription.createString());
//输出orc文件到本地路径
String path = "data/orc/zlib/cyhOrc500.orc";
//设置写入流时的参数,
Writer writer = OrcFile.createWriter(new Path(path),OrcFile.writerOptions(conf)
.setSchema(schema)
.stripeSize(67108864)
.bufferSize(64*1024)
.blockSize(128*1024*1024)
.rowIndexStride(10000)
.blockPadding(true)
//默认压缩算法为zlib,zlib相对于snappy压缩算法,压缩比更低,压缩效果更好,但是花费了更多的压缩时间
.compress(CompressionKind.ZLIB));
File file = new File(fileName);
BufferedReader reader = null;
reader = new BufferedReader(new FileReader(file));
VectorizedRowBatch batch = schema.createRowBatch();
//获取每一列的引用
LongColumnVector a = (LongColumnVector) batch.cols[0];
BytesColumnVector b = (BytesColumnVector) batch.cols[1];
LongColumnVector c = (LongColumnVector) batch.cols[2];
BytesColumnVector d = (BytesColumnVector) batch.cols[3];
String tempString = null;
//测试转换时间
long start = System.currentTimeMillis();
//开始转换成二进制的orc文件
while ((tempString = reader.readLine())!=null){
int row = batch.size++;
String[] contents = tempString.split(" ");
//int,double,long等数据类型用 引用.vector
a.vector[row] = Integer.parseInt(contents[0]);
//String等数据类型用 引用.setVal
b.setVal(row,contents[1].getBytes());
c.vector[row] = Integer.parseInt(contents[2]);
d.setVal(row,contents[3].getBytes());
writer.addRowBatch(batch);
batch.reset();
}
long stop = System.currentTimeMillis();
System.out.println("将text文件转换为orc文件花费的时间是 "+(stop-start)/1000+"秒");
writer.close();
}
}
四、验证结果
发现生成了一个 .orc 文件和一个 .crc文件,.orc问价是我们的目标文件,打开发现全部为二进制代码。在这里我们进行测试,将orc文件读取读取出来,这里我们可以继续用Java API进行读取,但是这里我推荐用 hive里面提供的方法读取,读取步骤如下:
- 1,在Hadoop中配置hive(在网上自己教程,过程很easy)
- 2,创建表:
create table orcfile(
filed1 int,
filed2 string,
filed3 int,
filed4 string
) stored as orc;
3,导入数据 将生成的数据先拉进虚拟机中。执行命令load data local inpath '导入的orc文件路径' overwrite into table orcFile;
4,查询数据
最后就 执行 select * from orcfile;
如果发现有空值,检查,创建表和和代码的filed是否对应。
五 代码解读
代码实现思路如下:1,首先确定每一列的数据类型,生成该列的数据类型引用(addfiled),2,设置orc文件的输出路径。3,设置writer(写入到orc文件)时的参数。4,获取每一列的引用。5,利用获取的引用开始继续循环转换。(待完善)
六、优化实战
package com.demo.orcadapter;
import com.magic.orcadapter.utils.CustomUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@Slf4j
public class TextToOrc {
public static void main(String[] args) throws IOException {
String filePath = "src/main/resources/cyhData500.txt";
String outOrcFileDir = "src/main/resources/output/";
String delimiter = " "; // 设置默认分隔符为空格
// convertOrc(filePath, outOrcFileDir, delimiter);
convertOrc(filePath, outOrcFileDir, delimiter);
}
/**
* 将文本文件转换为 ORC 文件
*
* @param fileName
* @param outOrcFileDir
* @param delimiter
* @throws IOException
*/
public static void convertOrc(String fileName, String outOrcFileDir, String delimiter) throws IOException {
// 创建配置
Configuration conf = new Configuration();
// orcFileDir + fileName
// 输出文件转为 orc后缀的文件名,如 /out/test.orc
String orcFilePath = outOrcFileDir + CustomUtils.ToOrcFileName(fileName);
// 判断输出文件是否存在,如果存在则删掉
CustomUtils.deleteIfExist(orcFilePath);
try (BufferedReader reader = new BufferedReader(new FileReader(fileName))) {
// 读取文本文件,获取第一行内容,用于创建 ORC 文件的 schema
String firstLine = reader.readLine();
TypeDescription schema = createSchema(firstLine, delimiter);
// 创建 ORC 文件的 Writer
Writer writer = OrcFile.createWriter(new Path(orcFilePath), OrcFile.writerOptions(conf)
.setSchema(schema)
.compress(CompressionKind.SNAPPY));
// 创建用于写入 ORC 文件的批处理对象
VectorizedRowBatch batch = schema.createRowBatch();
// 解析第一行内容,推断字段的数据类型
Map<String, TypeDescription> fieldMap = getFieldMap(firstLine, delimiter);
// 逐行读取文本文件,解析内容并写入 ORC 文件
String lineTempString;
long lineNo = 1; // 行号,第一行已经处理了
long start = System.currentTimeMillis();
// 处理第一行数据,新增一行数据
int row = batch.size++;
processLine(firstLine, delimiter, batch, row, fieldMap, lineNo);
while ((lineTempString = reader.readLine()) != null) {
row = batch.size++;
lineNo += 1;
processLine(lineTempString, delimiter, batch, row, fieldMap, lineNo);
// 如果超过 1024 会写入一批, 防止超过内存限制
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
// 将剩余未写入的数据写入 ORC 文件
if (batch.size != 0) {
writer.addRowBatch(batch);
}
writer.close();
long stop = System.currentTimeMillis();
log.debug("Time taken to convert text file to orc file: " + (stop - start) / 1000 + " seconds");
} catch (Exception e) {
log.error("Read file content fail, error msg: " + e.getMessage());
}
}
/**
* 处理行内容
*
* @param line
* @param delimiter
* @param batch
* @param row
* @param fieldMap
* @param lineNo
*/
private static void processLine(String line, String delimiter, VectorizedRowBatch batch, int row, Map<String, TypeDescription> fieldMap, long lineNo) {
String[] tokens = line.split(delimiter);
for (int i = 0; i < tokens.length; i++) {
String cellValue = tokens[i];
TypeDescription cellType = fieldMap.get("field" + (i + 1));
if (cellType.equals(TypeDescription.createLong())) {
processLongValue(cellValue, batch, i, row, lineNo);
} else if (cellType.equals(TypeDescription.createDouble())) {
processDoubleValue(cellValue, batch, i, row, lineNo);
} else {
processStringValue(cellValue, batch, i, row);
}
}
}
/**
* 处理Long类型的值
*
* @param cellValue
* @param batch
* @param columnIndex
* @param row
* @param lineNo
*/
private static void processLongValue(String cellValue, VectorizedRowBatch batch, int columnIndex, int row, long lineNo) {
long value;
try {
value = Long.parseLong(cellValue);
} catch (NumberFormatException e) {
log.warn(String.format("[第%d行/%d列] NumberFormatException: value is %s", lineNo, columnIndex, cellValue));
value = 0L;
}
((LongColumnVector) batch.cols[columnIndex]).vector[row] = value;
}
/**
* 处理 Double类型的值
*
* @param cellValue
* @param batch
* @param columnIndex
* @param row
* @param lineNo
*/
private static void processDoubleValue(String cellValue, VectorizedRowBatch batch, int columnIndex, int row, long lineNo) {
double value;
try {
if (StringUtils.isBlank(cellValue) || "NaN".equals(cellValue)) {
value = Double.NaN;
} else {
value = Double.parseDouble(cellValue);
}
} catch (NumberFormatException e) {
log.warn(String.format("[第%d行/%d列] NumberFormatException: value is %s", lineNo, columnIndex, cellValue));
value = Double.NaN;
}
((DoubleColumnVector) batch.cols[columnIndex]).vector[row] = value;
}
private static void processStringValue(String cellValue, VectorizedRowBatch batch, int columnIndex, int row) {
byte[] valueBytes = cellValue.getBytes();
((BytesColumnVector) batch.cols[columnIndex]).setVal(row, valueBytes, 0, valueBytes.length);
}
/**
* 根据第一行内容创建 ORC 文件的 schema
*
* @param firstLine
* @return
*/
private static TypeDescription createSchema(String firstLine, String delimiter) {
String[] fields = firstLine.split(delimiter);
TypeDescription schema = TypeDescription.createStruct();
for (int i = 0; i < fields.length; i++) {
TypeDescription fieldType = inferFieldType(fields[i]);
schema.addField("field" + (i + 1), fieldType);
}
return schema;
}
/**
* 推断字符类型
*
* @param value
* @return
*/
private static TypeDescription inferFieldType(String value) {
if (value.matches("-?\\d+")) {
return TypeDescription.createLong();
} else if (value.matches("-?\\d*\\.\\d+")) {
return TypeDescription.createDouble();
} else {
return TypeDescription.createString();
}
}
/**
* 解析第一行内容,推断每个字段的数据类型,并构建字段与数据类型的映射
*
* @param firstLine
* @return
*/
private static Map<String, TypeDescription> getFieldMap(String firstLine, String delimiter) {
String[] fields = firstLine.split(delimiter);
Map<String, TypeDescription> fieldMap = new HashMap<>();
for (int i = 0; i < fields.length; i++) {
fieldMap.put("field" + (i + 1), inferFieldType(fields[i]));
}
return fieldMap;
}
/**
* 解析字段的值,并根据字段的数据类型进行适当的转换
*
* @param content
* @param fieldType
* @return
*/
private static Object parseValue(String content, TypeDescription fieldType) {
try {
switch (fieldType.getCategory()) {
case BOOLEAN:
return Boolean.parseBoolean(content);
case BYTE:
return Byte.parseByte(content);
case SHORT:
return Short.parseShort(content);
case INT:
return Integer.parseInt(content);
case LONG:
return Long.parseLong(content);
case FLOAT:
return Float.parseFloat(content);
case DOUBLE:
return Double.parseDouble(content);
case STRING:
return content;
default:
// 如果数据类型未知或不支持,直接返回字符串形式
return content;
}
} catch (NumberFormatException e) {
// 转换失败,返回原始字符串内容
return content;
}
}
/**
* 将文本文件转换为 ORC 文件
*
* @param fileName
* @param outOrcFileDir
* @param delimiter
* @throws IOException
*/
public static void convertOrcV1(String fileName, String outOrcFileDir, String delimiter) throws IOException {
// 创建配置
Configuration conf = new Configuration();
// orcFileDir + fileName
// 输出文件转为 orc后缀的文件名,如 /out/test.orc
String orcFilePath = outOrcFileDir + CustomUtils.ToOrcFileName(fileName);
// 判断输出文件是否存在,如果存在则删掉
CustomUtils.deleteIfExist(orcFilePath);
try(BufferedReader reader = new BufferedReader(new FileReader(fileName))) {
// 读取文本文件,获取第一行内容,用于创建 ORC 文件的 schema
String firstLine = reader.readLine();
TypeDescription schema = createSchema(firstLine, delimiter);
// 创建 ORC 文件的 Writer
Writer writer = OrcFile.createWriter(new Path(orcFilePath), OrcFile.writerOptions(conf)
.setSchema(schema)
.compress(CompressionKind.SNAPPY));
// 创建用于写入 ORC 文件的批处理对象
VectorizedRowBatch batch = schema.createRowBatch();
// 解析第一行内容,推断字段的数据类型
Map<String, TypeDescription> fieldMap = getFieldMap(firstLine, delimiter);
// 逐行读取文本文件,解析内容并写入 ORC 文件
String lineTempString = null;
String[] lineToken;
long lineNo = 0; // 行号
long start = System.currentTimeMillis();
while ((lineTempString = reader.readLine()) != null) {
lineNo += 1;
int row = batch.size++;
// 使用自定义的分隔符
lineToken = lineTempString.split(delimiter);
byte[] cellValueByte = null;
for (int i = 0; i < lineToken.length; i++) {
String cellValue = lineToken[i];
// 判断类型
TypeDescription cellType = fieldMap.get("field" + (i + 1));
if(cellType.equals(TypeDescription.createLong())) {
try{
((LongColumnVector) batch.cols[i]).vector[row] = Long.parseLong(cellValue);
} catch (NumberFormatException e){
String message = String.format("[%d行/%d列 ] numberformatexception 。value is %s , line is %s", lineNo, i, cellValue, lineTempString);
log.warn(message);
((LongColumnVector) batch.cols[i]).vector[row] = 0L;
}
} else if(cellType.equals(TypeDescription.createDouble())) {
try{
if(StringUtils.isBlank(cellValue) || "NaN".equals(cellValue)){
((DoubleColumnVector) batch.cols[i]).vector[row] = Double.NaN;
}else{
((DoubleColumnVector) batch.cols[i]).vector[row] = Double.parseDouble(cellValue);
}
} catch (NumberFormatException e){
String message = String.format("[%d行/%d列 ] numberformatexception 。value is %s , line is %s", lineNo, i, cellValue, lineTempString);
log.warn(message);
((DoubleColumnVector) batch.cols[i]).vector[row] = Double.NaN;
}
} else {
cellValueByte = cellValue.getBytes();
((BytesColumnVector) batch.cols[i]).setVal(row, cellValueByte, 0, cellValueByte.length);
}
}
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
// 将剩余未写入的数据写入 ORC 文件
if (batch.size != 0) {
writer.addRowBatch(batch);
}
writer.close();
long stop = System.currentTimeMillis();
log.debug("Time taken to convert text file to orc file: " + (stop - start) / 1000 + " seconds");
} catch (Exception e){
log.error("Read file content fail, error msg: " + e.getMessage());
}
}
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)