Skip to content

Commit

Permalink
LOGGER
Browse files Browse the repository at this point in the history
  • Loading branch information
Luis Lázaro committed May 19, 2015
1 parent 8a5756f commit e6674f9
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ public class SourceFactory {

private KeedioSource keedioSource;
private static final Logger logger = LoggerFactory.getLogger(KeedioSource.class);
private final Integer discoverDelay = 10000;
private final boolean flushlinesDefault = true;
private final String folderDefault = System.getProperty("java.io.tmpdir");
private final Integer chunksizeDefault = 1024;
private static final Integer discoverDelay = 10000;
private static final boolean flushlinesDefault = true;
private static final String folderDefault = System.getProperty("java.io.tmpdir");
private static final Integer chunksizeDefault = 1024;

/**
* Create KeedioSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public boolean isDirectory(ChannelSftp.LsEntry file) {
*/
public boolean isFile(ChannelSftp.LsEntry file) {
boolean isfile = false;
if ((!isDirectory(file)) & (!isLink(file))) {
if ((!isDirectory(file)) && (!isLink(file))) {
isfile = true;
} else {
isfile = false;
Expand Down Expand Up @@ -284,7 +284,7 @@ public String getLink(ChannelSftp.LsEntry file) {
try {
link = sftpClient.readlink(file.getFilename());
} catch (SftpException e) {
logger.error("Could not readLink to get name");
logger.error("Could not readLink to get name",e);
}
return link;
}
Expand All @@ -299,7 +299,7 @@ public String getDirectoryserver() throws IOException {
try {
printWorkingDirectory = sftpClient.pwd();
} catch (SftpException e) {
logger.error("Error getting printworkingdirectory for server -sftpsource");
logger.error("Error getting printworkingdirectory for server -sftpsource",e);
throw new IOException(e.getMessage());
}
return printWorkingDirectory;
Expand Down
40 changes: 20 additions & 20 deletions src/main/java/org/keedio/flume/source/ftp/source/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class Source extends AbstractSource implements Configurable, PollableSour
private SourceFactory sourceFactory = new SourceFactory();
private KeedioSource keedioSource;

private static final Logger logger = LoggerFactory.getLogger(Source.class);
private static final Logger LOGGER = LoggerFactory.getLogger(Source.class);
private static final short ATTEMPTS_MAX = 3; // max limit attempts reconnection
private static final long EXTRA_DELAY = 10000;
private int counterConnect = 0;
Expand Down Expand Up @@ -71,7 +71,7 @@ public void configure(Context context) {
if (keedioSource.existFolder()) {
keedioSource.makeLocationFile();
} else {
logger.error("Folder " + keedioSource.getPathTohasmap().toString() + " not exists");
LOGGER.error("Folder " + keedioSource.getPathTohasmap().toString() + " not exists");
}
keedioSource.connect();
sourceCounter = new SourceCounter("SOURCE." + getName());
Expand All @@ -86,14 +86,14 @@ public void configure(Context context) {
public PollableSource.Status process() throws EventDeliveryException {

try {
logger.info("Actual dir: " + keedioSource.getDirectoryserver() + " files: "
LOGGER.info("Actual dir: " + keedioSource.getDirectoryserver() + " files: "
+ keedioSource.getFileList().size());

discoverElements(keedioSource, keedioSource.getDirectoryserver(), "", 0);
keedioSource.cleanList(); //clean list according existing actual files
keedioSource.getExistFileList().clear();
} catch (IOException e) {
logger.error("Exception thrown in proccess, try to reconnect " + counterConnect, e);
LOGGER.error("Exception thrown in proccess, try to reconnect " + counterConnect, e);

if (!keedioSource.connect()) {
counterConnect++;
Expand All @@ -104,12 +104,12 @@ public PollableSource.Status process() throws EventDeliveryException {
if (counterConnect < ATTEMPTS_MAX) {
process();
} else {
logger.error("Server connection closed without indication, reached limit reconnections " + counterConnect);
LOGGER.error("Server connection closed without indication, reached limit reconnections " + counterConnect);
try {
Thread.sleep(keedioSource.getRunDiscoverDelay() + EXTRA_DELAY);
counterConnect = 0;
} catch (InterruptedException ce) {
logger.error("InterruptedException", ce);
LOGGER.error("InterruptedException", ce);
}
}
}
Expand All @@ -119,7 +119,7 @@ public PollableSource.Status process() throws EventDeliveryException {
Thread.sleep(keedioSource.getRunDiscoverDelay());
return PollableSource.Status.READY; //source was successfully able to generate events
} catch (InterruptedException inte) {
logger.error("Exception thrown in process while putting to sleep", inte);
LOGGER.error("Exception thrown in process while putting to sleep", inte);
return PollableSource.Status.BACKOFF; //inform the runner thread to back off for a bit
}
}
Expand All @@ -129,8 +129,8 @@ public PollableSource.Status process() throws EventDeliveryException {
*/
@Override
public synchronized void start() {
logger.info("Starting Keedio source ...", this.getName());
logger.info("Source {} starting. Metrics: {}", getName(), sourceCounter);
LOGGER.info("Starting Keedio source ...", this.getName());
LOGGER.info("Source {} starting. Metrics: {}", getName(), sourceCounter);
super.start();
sourceCounter.start();
}
Expand Down Expand Up @@ -175,7 +175,7 @@ public <T> void discoverElements(KeedioSource keedioSource, String parentDir, St
String elementName = keedioSource.getObjectName(element);

if (keedioSource.isDirectory(element)) {
logger.info("[" + elementName + "]");
LOGGER.info("[" + elementName + "]");
keedioSource.changeToDirectory(parentDir);
discoverElements(keedioSource, dirToList, elementName, level + 1);

Expand All @@ -187,14 +187,14 @@ public <T> void discoverElements(KeedioSource keedioSource, String parentDir, St
if (!(keedioSource.getFileList().containsKey(dirToList + "/" + elementName))) { //new file
sourceCounter.incrementFilesCount(); //include all files, even not yet processed
position = 0L;
logger.info("Discovered: " + elementName + " ,size: " + keedioSource.getObjectSize(element));
LOGGER.info("Discovered: " + elementName + " ,size: " + keedioSource.getObjectSize(element));
} else { //known file
long prevSize = (long) keedioSource.getFileList().get(dirToList + "/" + elementName);
position = prevSize;
long dif = keedioSource.getObjectSize(element) - (long) keedioSource.getFileList().get(dirToList + "/" + elementName);

if (dif > 0) {
logger.info("Modified: " + elementName + " ,size: " + dif);
LOGGER.info("Modified: " + elementName + " ,size: " + dif);
} else if (dif < 0) { //known and full modified
keedioSource.getExistFileList().remove(dirToList + "/" + elementName); //will be rediscovered as new file
keedioSource.saveMap();
Expand Down Expand Up @@ -226,25 +226,25 @@ public <T> void discoverElements(KeedioSource keedioSource, String parentDir, St
sourceCounter.incrementFilesProcCount();
}

logger.info("Processed: " + elementName + " ,total files: " + this.keedioSource.getFileList().size() + "\n");
LOGGER.info("Processed: " + elementName + " ,total files: " + this.keedioSource.getFileList().size() + "\n");

} else {
handleProcessError(elementName);
}
} catch (IOException e) {
handleProcessError(elementName);
logger.error("Failed retrieving inputStream on discoverElements ", e);
LOGGER.error("Failed retrieving inputStream on discoverElements ", e);
continue;
}

keedioSource.changeToDirectory(dirToList);

} else if (keedioSource.isLink(element)) {
logger.info(elementName + " is a link of " + this.keedioSource.getLink(element) + " could not retrieve size");
LOGGER.info(elementName + " is a link of " + this.keedioSource.getLink(element) + " could not retrieve size");
keedioSource.changeToDirectory(parentDir);
continue;
} else {
logger.info(elementName + " unknown type of file");
LOGGER.info(elementName + " unknown type of file");
keedioSource.changeToDirectory(parentDir);
continue;
}
Expand Down Expand Up @@ -283,7 +283,7 @@ public boolean readStream(InputStream inputStream, long position) {
}
inputStream.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
LOGGER.error(e.getMessage(), e);
successRead = false;
}
} else {
Expand All @@ -303,7 +303,7 @@ public boolean readStream(InputStream inputStream, long position) {

inputStream.close();
} catch (IOException e) {
logger.error("on readStream", e);
LOGGER.error("on readStream", e);
successRead = false;

}
Expand All @@ -325,7 +325,7 @@ public void processMessage(byte[] lastInfo) {
try {
getChannelProcessor().processEvent(event);
} catch (ChannelException e) {
logger.error("ChannelException", e);
LOGGER.error("ChannelException", e);
}
sourceCounter.incrementCountSizeProc(message.length);
sourceCounter.incrementEventCount();
Expand All @@ -342,7 +342,7 @@ public void setListener(FTPSourceEventListener listener) {
* @param fileName
*/
public void handleProcessError(String fileName) {
logger.info("failed retrieving stream from file, will try in next poll :" + fileName);
LOGGER.info("failed retrieving stream from file, will try in next poll :" + fileName);
sourceCounter.incrementFilesProcCountError();
}

Expand Down

0 comments on commit e6674f9

Please sign in to comment.