使用 Java API 将 txt 文件转换为 orc 文件

orc文件是hive中重要文件格式,在大数据中具有广泛的应用场景。orc文件是二进制文件,不能直接进行读取或者写入,这里介绍如何通过Java API将普通规范式文件转换为orc文件,并且将orc文件读到控制台。关于orc文件格式,这里不做详细介绍,有需要请参考文章,或者orc官网

一、添加依赖

1,第一步,添加相关依赖(出处来源于官网),测试该程序时应具备Hadoop的相关环境依赖。

<dependencies>
  <dependency>
    <groupId>org.apache.orc</groupId>
    <artifactId>orc-mapreduce</artifactId>
    <version>1.1.0</version>
  </dependency>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.7.0</version>
  </dependency>
</dependencies>

二、确定转换格式

2,确定我们转换的数据格式,我测试时的数据格式如下:
file

该数据时用Java程序随机生成的,第一列为数据索引,第二列为随机生成的单词,第三列为随机 生成的五位数,第五列为随机生成的一个汉字。我本人在进行测试的时候,生成了520M数据。

三、开发

package com.atguigu.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;

public class WriteBigTextToOrc {

    public static void main(String[] args) throws IOException {

        String filePath = "data/normal/cyhData500.txt";
        writeOrc(filePath);
    }
    public static void writeOrc(String fileName) throws IOException {
        Configuration conf = new Configuration();
        //确定每一列的数据类型
        TypeDescription schema = TypeDescription.createStruct()
                .addField("filed1",TypeDescription.createInt())
                .addField("filed2",TypeDescription.createString())
                .addField("filed3",TypeDescription.createInt())
                .addField("filed4",TypeDescription.createString());

        //输出orc文件到本地路径
        String path = "data/orc/zlib/cyhOrc500.orc";
        //设置写入流时的参数,
        Writer writer = OrcFile.createWriter(new Path(path),OrcFile.writerOptions(conf)
                .setSchema(schema)
                .stripeSize(67108864)
                .bufferSize(64*1024)
                .blockSize(128*1024*1024)
                .rowIndexStride(10000)
                .blockPadding(true)
                //默认压缩算法为zlib,zlib相对于snappy压缩算法,压缩比更低,压缩效果更好,但是花费了更多的压缩时间
                .compress(CompressionKind.ZLIB));

        File file = new File(fileName);
        BufferedReader reader = null;
        reader = new BufferedReader(new FileReader(file));
        VectorizedRowBatch batch = schema.createRowBatch();
        //获取每一列的引用
        LongColumnVector a = (LongColumnVector) batch.cols[0];
        BytesColumnVector b = (BytesColumnVector) batch.cols[1];
        LongColumnVector c = (LongColumnVector) batch.cols[2];
        BytesColumnVector d = (BytesColumnVector) batch.cols[3];
        String tempString = null;
        //测试转换时间
        long start = System.currentTimeMillis();
        //开始转换成二进制的orc文件
        while ((tempString = reader.readLine())!=null){
            int row = batch.size++;
            String[] contents = tempString.split(" ");
            //int,double,long等数据类型用  引用.vector
            a.vector[row] = Integer.parseInt(contents[0]);
            //String等数据类型用 引用.setVal
            b.setVal(row,contents[1].getBytes());
            c.vector[row] = Integer.parseInt(contents[2]);
            d.setVal(row,contents[3].getBytes());
            writer.addRowBatch(batch);
            batch.reset();

        }
        long stop = System.currentTimeMillis();
        System.out.println("将text文件转换为orc文件花费的时间是 "+(stop-start)/1000+"秒");

        writer.close();

    }
}

四、验证结果

file

发现生成了一个 .orc 文件和一个 .crc文件,.orc问价是我们的目标文件,打开发现全部为二进制代码。在这里我们进行测试,将orc文件读取读取出来,这里我们可以继续用Java API进行读取,但是这里我推荐用 hive里面提供的方法读取,读取步骤如下:

  • 1,在Hadoop中配置hive(在网上自己教程,过程很easy)
  • 2,创建表:
create table orcfile(
filed1 int,
filed2 string,
filed3 int,
filed4 string
) stored as orc;

3,导入数据 将生成的数据先拉进虚拟机中。执行命令load data local inpath '导入的orc文件路径' overwrite into table orcFile;

file

4,查询数据
最后就 执行 select * from orcfile;
如果发现有空值,检查,创建表和和代码的filed是否对应。

五 代码解读

代码实现思路如下:1,首先确定每一列的数据类型,生成该列的数据类型引用(addfiled),2,设置orc文件的输出路径。3,设置writer(写入到orc文件)时的参数。4,获取每一列的引用。5,利用获取的引用开始继续循环转换。(待完善)

六、优化实战

package com.demo.orcadapter;

import com.magic.orcadapter.utils.CustomUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Slf4j
public class TextToOrc {

    public static void main(String[] args) throws IOException {

        String filePath = "src/main/resources/cyhData500.txt";

        String outOrcFileDir = "src/main/resources/output/";

        String delimiter = " "; // 设置默认分隔符为空格

        // convertOrc(filePath, outOrcFileDir, delimiter);
        convertOrc(filePath, outOrcFileDir, delimiter);
    }

    /**
     * 将文本文件转换为 ORC 文件
     *
     * @param fileName
     * @param outOrcFileDir
     * @param delimiter
     * @throws IOException
     */
    public static void convertOrc(String fileName, String outOrcFileDir, String delimiter) throws IOException {

        // 创建配置
        Configuration conf = new Configuration();

        // orcFileDir + fileName
        // 输出文件转为 orc后缀的文件名,如 /out/test.orc
        String orcFilePath = outOrcFileDir + CustomUtils.ToOrcFileName(fileName);

        // 判断输出文件是否存在,如果存在则删掉
        CustomUtils.deleteIfExist(orcFilePath);

        try (BufferedReader reader = new BufferedReader(new FileReader(fileName))) {

            // 读取文本文件,获取第一行内容,用于创建 ORC 文件的 schema
            String firstLine = reader.readLine();
            TypeDescription schema = createSchema(firstLine, delimiter);

            // 创建 ORC 文件的 Writer
            Writer writer = OrcFile.createWriter(new Path(orcFilePath), OrcFile.writerOptions(conf)
                    .setSchema(schema)
                    .compress(CompressionKind.SNAPPY));

            // 创建用于写入 ORC 文件的批处理对象
            VectorizedRowBatch batch = schema.createRowBatch();

            // 解析第一行内容,推断字段的数据类型
            Map<String, TypeDescription> fieldMap = getFieldMap(firstLine, delimiter);

            // 逐行读取文本文件,解析内容并写入 ORC 文件
            String lineTempString;
            long lineNo = 1;  // 行号,第一行已经处理了
            long start = System.currentTimeMillis();

            // 处理第一行数据,新增一行数据
            int row = batch.size++;
            processLine(firstLine, delimiter, batch, row, fieldMap, lineNo);

            while ((lineTempString = reader.readLine()) != null) {
                row = batch.size++;
                lineNo += 1;

                processLine(lineTempString, delimiter, batch, row, fieldMap, lineNo);

                // 如果超过 1024 会写入一批, 防止超过内存限制
                if (batch.size == batch.getMaxSize()) {
                    writer.addRowBatch(batch);
                    batch.reset();
                }
            }

            // 将剩余未写入的数据写入 ORC 文件
            if (batch.size != 0) {
                writer.addRowBatch(batch);
            }

            writer.close();
            long stop = System.currentTimeMillis();
            log.debug("Time taken to convert text file to orc file: " + (stop - start) / 1000 + " seconds");
        } catch (Exception e) {
            log.error("Read file content fail, error msg: " + e.getMessage());
        }

    }

    /**
     * 处理行内容
     *
     * @param line
     * @param delimiter
     * @param batch
     * @param row
     * @param fieldMap
     * @param lineNo
     */
    private static void processLine(String line, String delimiter, VectorizedRowBatch batch, int row, Map<String, TypeDescription> fieldMap, long lineNo) {
        String[] tokens = line.split(delimiter);
        for (int i = 0; i < tokens.length; i++) {
            String cellValue = tokens[i];
            TypeDescription cellType = fieldMap.get("field" + (i + 1));

            if (cellType.equals(TypeDescription.createLong())) {
                processLongValue(cellValue, batch, i, row, lineNo);
            } else if (cellType.equals(TypeDescription.createDouble())) {
                processDoubleValue(cellValue, batch, i, row, lineNo);
            } else {
                processStringValue(cellValue, batch, i, row);
            }
        }
    }

    /**
     * 处理Long类型的值
     *
     * @param cellValue
     * @param batch
     * @param columnIndex
     * @param row
     * @param lineNo
     */
    private static void processLongValue(String cellValue, VectorizedRowBatch batch, int columnIndex, int row, long lineNo) {
        long value;
        try {
            value = Long.parseLong(cellValue);
        } catch (NumberFormatException e) {
            log.warn(String.format("[第%d行/%d列] NumberFormatException: value is %s", lineNo, columnIndex, cellValue));
            value = 0L;
        }
        ((LongColumnVector) batch.cols[columnIndex]).vector[row] = value;
    }

    /**
     * 处理 Double类型的值
     *
     * @param cellValue
     * @param batch
     * @param columnIndex
     * @param row
     * @param lineNo
     */
    private static void processDoubleValue(String cellValue, VectorizedRowBatch batch, int columnIndex, int row, long lineNo) {
        double value;
        try {
            if (StringUtils.isBlank(cellValue) || "NaN".equals(cellValue)) {
                value = Double.NaN;
            } else {
                value = Double.parseDouble(cellValue);
            }
        } catch (NumberFormatException e) {
            log.warn(String.format("[第%d行/%d列] NumberFormatException: value is %s", lineNo, columnIndex, cellValue));
            value = Double.NaN;
        }
        ((DoubleColumnVector) batch.cols[columnIndex]).vector[row] = value;
    }

    private static void processStringValue(String cellValue, VectorizedRowBatch batch, int columnIndex, int row) {
        byte[] valueBytes = cellValue.getBytes();
        ((BytesColumnVector) batch.cols[columnIndex]).setVal(row, valueBytes, 0, valueBytes.length);
    }

    /**
     * 根据第一行内容创建 ORC 文件的 schema
     *
     * @param firstLine
     * @return
     */
    private static TypeDescription createSchema(String firstLine, String delimiter) {
        String[] fields = firstLine.split(delimiter);
        TypeDescription schema = TypeDescription.createStruct();
        for (int i = 0; i < fields.length; i++) {
            TypeDescription fieldType = inferFieldType(fields[i]);
            schema.addField("field" + (i + 1), fieldType);
        }
        return schema;
    }

    /**
     * 推断字符类型
     *
     * @param value
     * @return
     */
    private static TypeDescription inferFieldType(String value) {
        if (value.matches("-?\\d+")) {
            return TypeDescription.createLong();
        } else if (value.matches("-?\\d*\\.\\d+")) {
            return TypeDescription.createDouble();
        }  else {
            return TypeDescription.createString();
        }
    }

    /**
     * 解析第一行内容,推断每个字段的数据类型,并构建字段与数据类型的映射
     *
     * @param firstLine
     * @return
     */
    private static Map<String, TypeDescription> getFieldMap(String firstLine, String delimiter) {
        String[] fields = firstLine.split(delimiter);
        Map<String, TypeDescription> fieldMap = new HashMap<>();
        for (int i = 0; i < fields.length; i++) {
            fieldMap.put("field" + (i + 1), inferFieldType(fields[i]));
        }
        return fieldMap;
    }

    /**
     * 解析字段的值,并根据字段的数据类型进行适当的转换
     *
     * @param content
     * @param fieldType
     * @return
     */
    private static Object parseValue(String content, TypeDescription fieldType) {
        try {
            switch (fieldType.getCategory()) {
                case BOOLEAN:
                    return Boolean.parseBoolean(content);
                case BYTE:
                    return Byte.parseByte(content);
                case SHORT:
                    return Short.parseShort(content);
                case INT:
                    return Integer.parseInt(content);
                case LONG:
                    return Long.parseLong(content);
                case FLOAT:
                    return Float.parseFloat(content);
                case DOUBLE:
                    return Double.parseDouble(content);
                case STRING:
                    return content;
                default:
                    // 如果数据类型未知或不支持,直接返回字符串形式
                    return content;
            }
        } catch (NumberFormatException e) {
            // 转换失败,返回原始字符串内容
            return content;
        }
    }

    /**
     * 将文本文件转换为 ORC 文件
     *
     * @param fileName
     * @param outOrcFileDir
     * @param delimiter
     * @throws IOException
     */
    public static void convertOrcV1(String fileName, String outOrcFileDir, String delimiter) throws IOException {

        // 创建配置
        Configuration conf = new Configuration();

        // orcFileDir + fileName
        // 输出文件转为 orc后缀的文件名,如 /out/test.orc
        String orcFilePath = outOrcFileDir + CustomUtils.ToOrcFileName(fileName);

        // 判断输出文件是否存在,如果存在则删掉
        CustomUtils.deleteIfExist(orcFilePath);

        try(BufferedReader reader = new BufferedReader(new FileReader(fileName))) {

            // 读取文本文件,获取第一行内容,用于创建 ORC 文件的 schema
            String firstLine = reader.readLine();
            TypeDescription schema = createSchema(firstLine, delimiter);

            // 创建 ORC 文件的 Writer
            Writer writer = OrcFile.createWriter(new Path(orcFilePath), OrcFile.writerOptions(conf)
                    .setSchema(schema)
                    .compress(CompressionKind.SNAPPY));

            // 创建用于写入 ORC 文件的批处理对象
            VectorizedRowBatch batch = schema.createRowBatch();

            // 解析第一行内容,推断字段的数据类型
            Map<String, TypeDescription> fieldMap = getFieldMap(firstLine, delimiter);

            // 逐行读取文本文件,解析内容并写入 ORC 文件
            String lineTempString = null;
            String[] lineToken;
            long lineNo = 0;  // 行号
            long start = System.currentTimeMillis();
            while ((lineTempString = reader.readLine()) != null) {
                lineNo += 1;
                int row = batch.size++;

                // 使用自定义的分隔符
                lineToken = lineTempString.split(delimiter);
                byte[] cellValueByte = null;
                for (int i = 0; i < lineToken.length; i++) {
                    String cellValue = lineToken[i];

                    // 判断类型
                    TypeDescription cellType = fieldMap.get("field" + (i + 1));
                    if(cellType.equals(TypeDescription.createLong())) {
                        try{
                            ((LongColumnVector) batch.cols[i]).vector[row] = Long.parseLong(cellValue);
                        } catch (NumberFormatException e){
                            String message = String.format("[%d行/%d列 ] numberformatexception 。value is %s , line is %s", lineNo, i, cellValue, lineTempString);
                            log.warn(message);
                            ((LongColumnVector) batch.cols[i]).vector[row] = 0L;
                        }
                    } else if(cellType.equals(TypeDescription.createDouble())) {
                        try{
                            if(StringUtils.isBlank(cellValue) || "NaN".equals(cellValue)){
                                ((DoubleColumnVector) batch.cols[i]).vector[row] = Double.NaN;
                            }else{
                                ((DoubleColumnVector) batch.cols[i]).vector[row] = Double.parseDouble(cellValue);
                            }
                        } catch (NumberFormatException e){
                            String message = String.format("[%d行/%d列 ] numberformatexception 。value is %s , line is %s", lineNo, i, cellValue, lineTempString);
                            log.warn(message);
                            ((DoubleColumnVector) batch.cols[i]).vector[row] = Double.NaN;
                        }
                    } else {
                        cellValueByte = cellValue.getBytes();
                        ((BytesColumnVector) batch.cols[i]).setVal(row, cellValueByte, 0, cellValueByte.length);
                    }
                }

                if (batch.size == batch.getMaxSize()) {
                    writer.addRowBatch(batch);
                    batch.reset();
                }
            }

            // 将剩余未写入的数据写入 ORC 文件
            if (batch.size != 0) {
                writer.addRowBatch(batch);
            }

            writer.close();
            long stop = System.currentTimeMillis();
            log.debug("Time taken to convert text file to orc file: " + (stop - start) / 1000 + " seconds");
        } catch (Exception e){
            log.error("Read file content fail, error msg: " + e.getMessage());
        }

    }

相关文章:
使用Java API将txt文件转换为orc文件

为者常成,行者常至