天天看点

使用JAVA API读取HDFS的文件数据出现乱码的解决方案

使用JAVA api读取HDFS文件乱码踩坑

想写一个读取HFDS上的部分文件数据做预览的接口,根据网上的博客实现后,发现有时读取信息会出现乱码,例如读取一个csv时,字符串之间被逗号分割

  • 英文字符串aaa,能正常显示
  • 中文字符串“你好”,能正常显示
  • 中英混合字符串如“aaa你好”,出现乱码

查阅了众多博客,解决方案大概都是:使用xxx字符集解码。抱着不信的想法,我依次尝试,果然没用。

解决思路

因为HDFS支持6种字符集编码,每个本地文件编码方式又是极可能不一样的,我们上传本地文件的时候其实就是把文件编码成字节流上传到文件系统存储。那么在GET文件数据时,面对不同文件、不同字符集编码的字节流,肯定不是一种固定字符集解码就能正确解码的吧。

那么解决方案其实有两种

  • 固定HDFS的编解码字符集。比如我选用UTF-8,那么在上传文件时统一编码,即把不同文件的字节流都转化为UTF-8编码再进行存储。这样的话在获取文件数据的时候,采用UTF-8字符集解码就没什么问题了。但这样做的话仍然会在转码部分存在诸多问题,且不好实现。
  • 动态解码。根据文件的编码字符集选用对应的字符集对解码,这样的话并不会对文件的原生字符流进行改动,基本不会乱码。

我选用动态解码的思路后,其难点在于如何判断使用哪种字符集解码。

好在看到了一篇博客

https://blog.csdn.net/smallnetvisitor/article/details/84682867

Google提供了检测字节流编码方式的包。那么方案就很明了了,先读一些文件字节流,用工具检测编码方式,再对应进行解码即可。

具体代码

pom

<dependency>
	<groupId>net.sourceforge.jchardet</groupId>
	<artifactId>jchardet</artifactId>
	<version>1.0</version>
</dependency>
           

从HDFS读取部分文件做预览的逻辑

// 获取文件的部分数据做预览
    public List<String> getFileDataWithLimitLines(String filePath, Integer limit) {
        FSDataInputStream fileStream = openFile(filePath);
        return readFileWithLimit(fileStream, limit);
    }

    // 获取文件的数据流
    private FSDataInputStream openFile(String filePath) {
        FSDataInputStream fileStream = null;
        try {
            fileStream = fs.open(new Path(getHdfsPath(filePath)));
        } catch (IOException e) {
            logger.error("fail to open file:{}", filePath, e);
        }
        return fileStream;
    }
    
    // 读取最多limit行文件数据
    private List<String> readFileWithLimit(FSDataInputStream fileStream, Integer limit) {
        byte[] bytes = readByteStream(fileStream);
        String data = decodeByteStream(bytes);
        if (data == null) {
            return null;
        }

        List<String> rows = Arrays.asList(data.split("\\r\\n"));
        return rows.stream().filter(StringUtils::isNotEmpty)
                .limit(limit)
                .collect(Collectors.toList());
    }

    // 从文件数据流中读取字节流
    private byte[] readByteStream(FSDataInputStream fileStream) {
        byte[] bytes = new byte[1024*30];
        int len;
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        try {
            while ((len = fileStream.read(bytes)) != -1) {
                stream.write(bytes, 0, len);
            }
        } catch (IOException e) {
            logger.error("read file bytes stream failed.", e);
            return null;
        }
        return stream.toByteArray();
    }

    // 解码字节流
    private String decodeByteStream(byte[] bytes) {
        if (bytes == null) {
            return null;
        }

        String encoding = guessEncoding(bytes);
        String data = null;
        try {
            data = new String(bytes, encoding);
        } catch (Exception e) {
            logger.error("decode byte stream failed.", e);
        }
        return data;
    }

    // 根据Google的工具判别编码
    private String guessEncoding(byte[] bytes) {
        UniversalDetector detector = new UniversalDetector(null);
        detector.handleData(bytes, 0, bytes.length);
        detector.dataEnd();
        String encoding = detector.getDetectedCharset();
        detector.reset();

        if (StringUtils.isEmpty(encoding)) {
            encoding = "UTF-8";
        }
        return encoding;
    }