Skip to content

Commit

Permalink
FTPReader supports reading data from multiple directories and multipl…
Browse files Browse the repository at this point in the history
…e files
  • Loading branch information
lijiangbo committed Dec 18, 2018
1 parent ddb519b commit ea7f103
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.types.Row;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

/**
Expand Down Expand Up @@ -70,7 +69,7 @@ public class FtpInputFormat extends RichInputFormat {

protected boolean isFirstLineHeader;

private transient BufferedReader br;
private transient FtpSeqBufferedReader br;

private transient FtpHandler ftpHandler;

Expand All @@ -93,7 +92,16 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOEx

@Override
public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
List<String> files = ftpHandler.getFiles(path);
List<String> files = new ArrayList<>();

if(path != null && path.length() > 0){
path = path.replace("\n","").replace("\r","");
String[] pathArray = path.split(",");
for (String p : pathArray) {
files.addAll(ftpHandler.getFiles(p.trim()));
}
}

int numSplits = (files.size() < minNumSplits ? files.size() : minNumSplits);
FtpInputSplit[] ftpInputSplits = new FtpInputSplit[numSplits];
for(int index = 0; index < numSplits; ++index) {
Expand All @@ -115,9 +123,15 @@ public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
public void openInternal(InputSplit split) throws IOException {
FtpInputSplit inputSplit = (FtpInputSplit)split;
List<String> paths = inputSplit.getPaths();
FtpSeqInputStream is = new FtpSeqInputStream(ftpHandler, paths);

br = new BufferedReader(new InputStreamReader(is, charsetName));
if (isFirstLineHeader){
br = new FtpSeqBufferedReader(ftpHandler,paths.iterator());
br.setFromLine(1);
} else {
br = new FtpSeqBufferedReader(ftpHandler,paths.iterator());
br.setFromLine(0);
}
br.setCharsetName(charsetName);

if(StringUtils.isNotBlank(monitorUrls) && this.bytes > 0) {
this.byteRateLimiter = new ByteRateLimiter(getRuntimeContext(), monitorUrls, bytes, 1);
Expand All @@ -128,12 +142,6 @@ public void openInternal(InputSplit split) throws IOException {
@Override
public boolean reachedEnd() throws IOException {
line = br.readLine();

// if first line is header,then read next line
if(isFirstLineHeader){
line = br.readLine();
isFirstLineHeader = false;
}
return line == null;
}

Expand All @@ -152,7 +160,7 @@ public Row nextRecordInternal(Row row) throws IOException {
Object value;
if(metaColumn.getValue() != null){
value = metaColumn.getValue();
} else if(metaColumn.getIndex() != null){
} else if(metaColumn.getIndex() != null && metaColumn.getIndex() < fields.length){
value = fields[metaColumn.getIndex()];
} else {
value = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.flinkx.ftp.reader;

import com.dtstack.flinkx.ftp.FtpHandler;

import java.io.*;
import java.util.Iterator;

/**
* The FtpSeqBufferedReader that read multiple ftp files one by one.
*
* @author jiangbo
* @date 2018/12/18
*/
public class FtpSeqBufferedReader {

private FtpHandler ftpHandler;

private Iterator<String> iter;

private int fromLine = 0;

private BufferedReader br;

private String charsetName = "utf-8";

public FtpSeqBufferedReader(FtpHandler ftpHandler, Iterator<String> iter) {
this.ftpHandler = ftpHandler;
this.iter = iter;
}

public String readLine() throws IOException{
if (br == null){
nextStream();
}

if(br != null){
String line = br.readLine();
if (line == null){
br = null;
return readLine();
}

return line;
} else {
return null;
}
}

private void nextStream() throws IOException{
close();

if(iter.hasNext()){
String file = iter.next();
InputStream in = ftpHandler.getInputStream(file);
if (in == null) {
throw new NullPointerException();
}

br = new BufferedReader(new InputStreamReader(in, charsetName));

for (int i = 0; i < fromLine; i++) {
br.readLine();
}
} else {
br = null;
}
}

public void close() throws IOException {
if (br != null){
br.close();
br = null;
}
}

public void setFromLine(int fromLine) {
this.fromLine = fromLine;
}

public void setCharsetName(String charsetName) {
this.charsetName = charsetName;
}
}

0 comments on commit ea7f103

Please sign in to comment.