From ea7f1030026a53ee70b52d6431cafd7cd190f57c Mon Sep 17 00:00:00 2001 From: lijiangbo <245730400@qq.com> Date: Tue, 18 Dec 2018 14:13:05 +0800 Subject: [PATCH] FTPReader supports reading data from multiple directories and multiple files --- .../flinkx/ftp/reader/FtpInputFormat.java | 34 +++--- .../ftp/reader/FtpSeqBufferedReader.java | 101 ++++++++++++++++++ 2 files changed, 122 insertions(+), 13 deletions(-) create mode 100644 flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpSeqBufferedReader.java diff --git a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java index baf24559ab..ec246e6ee9 100644 --- a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java +++ b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java @@ -33,9 +33,8 @@ import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.types.Row; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.List; /** @@ -70,7 +69,7 @@ public class FtpInputFormat extends RichInputFormat { protected boolean isFirstLineHeader; - private transient BufferedReader br; + private transient FtpSeqBufferedReader br; private transient FtpHandler ftpHandler; @@ -93,7 +92,16 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOEx @Override public InputSplit[] createInputSplits(int minNumSplits) throws IOException { - List files = ftpHandler.getFiles(path); + List files = new ArrayList<>(); + + if(path != null && path.length() > 0){ + path = path.replace("\n","").replace("\r",""); + String[] pathArray = path.split(","); + for (String p : pathArray) { + files.addAll(ftpHandler.getFiles(p.trim())); + } + } + int numSplits = (files.size() < minNumSplits ? files.size() : minNumSplits); FtpInputSplit[] ftpInputSplits = new FtpInputSplit[numSplits]; for(int index = 0; index < numSplits; ++index) { @@ -115,9 +123,15 @@ public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { public void openInternal(InputSplit split) throws IOException { FtpInputSplit inputSplit = (FtpInputSplit)split; List paths = inputSplit.getPaths(); - FtpSeqInputStream is = new FtpSeqInputStream(ftpHandler, paths); - br = new BufferedReader(new InputStreamReader(is, charsetName)); + if (isFirstLineHeader){ + br = new FtpSeqBufferedReader(ftpHandler,paths.iterator()); + br.setFromLine(1); + } else { + br = new FtpSeqBufferedReader(ftpHandler,paths.iterator()); + br.setFromLine(0); + } + br.setCharsetName(charsetName); if(StringUtils.isNotBlank(monitorUrls) && this.bytes > 0) { this.byteRateLimiter = new ByteRateLimiter(getRuntimeContext(), monitorUrls, bytes, 1); @@ -128,12 +142,6 @@ public void openInternal(InputSplit split) throws IOException { @Override public boolean reachedEnd() throws IOException { line = br.readLine(); - - // if first line is header,then read next line - if(isFirstLineHeader){ - line = br.readLine(); - isFirstLineHeader = false; - } return line == null; } @@ -152,7 +160,7 @@ public Row nextRecordInternal(Row row) throws IOException { Object value; if(metaColumn.getValue() != null){ value = metaColumn.getValue(); - } else if(metaColumn.getIndex() != null){ + } else if(metaColumn.getIndex() != null && metaColumn.getIndex() < fields.length){ value = fields[metaColumn.getIndex()]; } else { value = null; diff --git a/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpSeqBufferedReader.java b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpSeqBufferedReader.java new file mode 100644 index 0000000000..e8ee15fb5e --- /dev/null +++ b/flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpSeqBufferedReader.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.ftp.reader; + +import com.dtstack.flinkx.ftp.FtpHandler; + +import java.io.*; +import java.util.Iterator; + +/** + * The FtpSeqBufferedReader that read multiple ftp files one by one. + * + * @author jiangbo + * @date 2018/12/18 + */ +public class FtpSeqBufferedReader { + + private FtpHandler ftpHandler; + + private Iterator iter; + + private int fromLine = 0; + + private BufferedReader br; + + private String charsetName = "utf-8"; + + public FtpSeqBufferedReader(FtpHandler ftpHandler, Iterator iter) { + this.ftpHandler = ftpHandler; + this.iter = iter; + } + + public String readLine() throws IOException{ + if (br == null){ + nextStream(); + } + + if(br != null){ + String line = br.readLine(); + if (line == null){ + br = null; + return readLine(); + } + + return line; + } else { + return null; + } + } + + private void nextStream() throws IOException{ + close(); + + if(iter.hasNext()){ + String file = iter.next(); + InputStream in = ftpHandler.getInputStream(file); + if (in == null) { + throw new NullPointerException(); + } + + br = new BufferedReader(new InputStreamReader(in, charsetName)); + + for (int i = 0; i < fromLine; i++) { + br.readLine(); + } + } else { + br = null; + } + } + + public void close() throws IOException { + if (br != null){ + br.close(); + br = null; + } + } + + public void setFromLine(int fromLine) { + this.fromLine = fromLine; + } + + public void setCharsetName(String charsetName) { + this.charsetName = charsetName; + } +}