天天看點

使用JAVA API讀取HDFS的檔案資料出現亂碼的解決方案

使用JAVA api讀取HDFS檔案亂碼踩坑

想寫一個讀取HFDS上的部分檔案資料做預覽的接口,根據網上的部落格實作後,發現有時讀取資訊會出現亂碼,例如讀取一個csv時,字元串之間被逗号分割

  • 英文字元串aaa,能正常顯示
  • 中文字元串“你好”,能正常顯示
  • 中英混合字元串如“aaa你好”,出現亂碼

查閱了衆多部落格,解決方案大概都是:使用xxx字元集解碼。抱着不信的想法,我依次嘗試,果然沒用。

解決思路

因為HDFS支援6種字元集編碼,每個本地檔案編碼方式又是極可能不一樣的,我們上傳本地檔案的時候其實就是把檔案編碼成位元組流上傳到檔案系統存儲。那麼在GET檔案資料時,面對不同檔案、不同字元集編碼的位元組流,肯定不是一種固定字元集解碼就能正确解碼的吧。

那麼解決方案其實有兩種

  • 固定HDFS的編解碼字元集。比如我選用UTF-8,那麼在上傳檔案時統一編碼,即把不同檔案的位元組流都轉化為UTF-8編碼再進行存儲。這樣的話在擷取檔案資料的時候,采用UTF-8字元集解碼就沒什麼問題了。但這樣做的話仍然會在轉碼部分存在諸多問題,且不好實作。
  • 動态解碼。根據檔案的編碼字元集選用對應的字元集對解碼,這樣的話并不會對檔案的原生字元流進行改動,基本不會亂碼。

我選用動态解碼的思路後,其難點在于如何判斷使用哪種字元集解碼。

好在看到了一篇部落格

https://blog.csdn.net/smallnetvisitor/article/details/84682867

Google提供了檢測位元組流編碼方式的包。那麼方案就很明了了,先讀一些檔案位元組流,用工具檢測編碼方式,再對應進行解碼即可。

具體代碼

pom

<dependency>
	<groupId>net.sourceforge.jchardet</groupId>
	<artifactId>jchardet</artifactId>
	<version>1.0</version>
</dependency>
           

從HDFS讀取部分檔案做預覽的邏輯

// 擷取檔案的部分資料做預覽
    public List<String> getFileDataWithLimitLines(String filePath, Integer limit) {
        FSDataInputStream fileStream = openFile(filePath);
        return readFileWithLimit(fileStream, limit);
    }

    // 擷取檔案的資料流
    private FSDataInputStream openFile(String filePath) {
        FSDataInputStream fileStream = null;
        try {
            fileStream = fs.open(new Path(getHdfsPath(filePath)));
        } catch (IOException e) {
            logger.error("fail to open file:{}", filePath, e);
        }
        return fileStream;
    }
    
    // 讀取最多limit行檔案資料
    private List<String> readFileWithLimit(FSDataInputStream fileStream, Integer limit) {
        byte[] bytes = readByteStream(fileStream);
        String data = decodeByteStream(bytes);
        if (data == null) {
            return null;
        }

        List<String> rows = Arrays.asList(data.split("\\r\\n"));
        return rows.stream().filter(StringUtils::isNotEmpty)
                .limit(limit)
                .collect(Collectors.toList());
    }

    // 從檔案資料流中讀取位元組流
    private byte[] readByteStream(FSDataInputStream fileStream) {
        byte[] bytes = new byte[1024*30];
        int len;
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        try {
            while ((len = fileStream.read(bytes)) != -1) {
                stream.write(bytes, 0, len);
            }
        } catch (IOException e) {
            logger.error("read file bytes stream failed.", e);
            return null;
        }
        return stream.toByteArray();
    }

    // 解碼位元組流
    private String decodeByteStream(byte[] bytes) {
        if (bytes == null) {
            return null;
        }

        String encoding = guessEncoding(bytes);
        String data = null;
        try {
            data = new String(bytes, encoding);
        } catch (Exception e) {
            logger.error("decode byte stream failed.", e);
        }
        return data;
    }

    // 根據Google的工具判别編碼
    private String guessEncoding(byte[] bytes) {
        UniversalDetector detector = new UniversalDetector(null);
        detector.handleData(bytes, 0, bytes.length);
        detector.dataEnd();
        String encoding = detector.getDetectedCharset();
        detector.reset();

        if (StringUtils.isEmpty(encoding)) {
            encoding = "UTF-8";
        }
        return encoding;
    }