Skip to content
This repository was archived by the owner on Jan 8, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Mandatory properties in <b>bold</b>
| max.rows | 10000| Max rows to import per query |
| read.only | false| Sets read only session with DDBB |
| custom.query | - | Custom query to force a special request to the DB, be carefull. Check below explanation of this property. |
| increase.column | -1 | Custom query with increase column to get the current index. |
| hibernate.connection.driver_class | -| Driver class to use by hibernate, if not specified the framework will auto asign one |
| hibernate.dialect | - | Dialect to use by hibernate, if not specified the framework will auto asign one. Check https://docs.jboss.org/hibernate/orm/4.3/manual/en-US/html/ch03.html#configuration-optional-dialects for a complete list of available dialects |
| hibernate.connection.provider_class | - | Set to org.hibernate.connection.C3P0ConnectionProvider to use C3P0 connection pool (recommended for production) |
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/org/keedio/flume/source/HibernateHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,13 @@ public List<List<Object>> executeQuery() throws InterruptedException {
}

if (!rowsList.isEmpty()){
sqlSourceHelper.setCurrentIndex(Integer.toString((Integer.parseInt(sqlSourceHelper.getCurrentIndex())
+ rowsList.size())));
int currentIndex = Integer.parseInt(sqlSourceHelper.getCurrentIndex());
if (0 <= sqlSourceHelper.getIncreaseColumn()){
currentIndex = Integer.parseInt(rowsList.get(rowsList.size()-1).get(sqlSourceHelper.getIncreaseColumn()).toString());
}else {
currentIndex = currentIndex + rowsList.size();
}
sqlSourceHelper.setCurrentIndex(Integer.toString(currentIndex));
}

return rowsList;
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/org/keedio/flume/source/SQLSourceHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class SQLSourceHelper {
private static final Logger LOG = LoggerFactory.getLogger(SQLSourceHelper.class);

private File file, directory;
private int runQueryDelay, batchSize, maxRows;
private int runQueryDelay, batchSize, maxRows, increaseColumn;
private String startFrom, currentIndex;
private String statusFilePath, statusFileName, connectionURL, table,
columnsToSelect, customQuery, query, sourceName, delimiterEntry, connectionUserName, connectionPassword,
Expand Down Expand Up @@ -72,6 +72,7 @@ public class SQLSourceHelper {
private static final String LAST_INDEX_STATUS_FILE = "LastIndex";
private static final String QUERY_STATUS_FILE = "Query";
private static final String DEFAULT_CHARSET_RESULTSET = "UTF-8";
private static final int DEFAULT_INCREASE_COLUMN = -1;

/**
* Builds an SQLSourceHelper containing the configuration parameters and
Expand Down Expand Up @@ -104,6 +105,7 @@ public SQLSourceHelper(Context context, String sourceName) {
encloseByQuotes = context.getBoolean("enclose.by.quotes", DEFAULT_ENCLOSE_BY_QUOTES);
statusFileJsonMap = new LinkedHashMap<String, String>();
defaultCharsetResultSet = context.getString("default.charset.resultset", DEFAULT_CHARSET_RESULTSET);
increaseColumn = context.getInteger("increase.column", DEFAULT_INCREASE_COLUMN);

checkMandatoryProperties();

Expand Down Expand Up @@ -388,4 +390,8 @@ public String getConnectionPassword() {
public String getDefaultCharsetResultSet() {
return defaultCharsetResultSet;
}

public int getIncreaseColumn() {
return increaseColumn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void setup() {
when(context.getString("start.from", "0")).thenReturn("0");
when(context.getString("hibernate.connection.user")).thenReturn("user");
when(context.getString("hibernate.connection.password")).thenReturn("password");
when(context.getInteger("increase.column",-1)).thenReturn(-1);
}

/*
Expand Down Expand Up @@ -97,6 +98,12 @@ public void getCustomQuery() {
SQLSourceHelper sqlSourceHelper = new SQLSourceHelper(context,"Source Name");
assertEquals("SELECT column FROM table",sqlSourceHelper.getQuery());
}

@Test
public void getIncreaseColumn() {
SQLSourceHelper sqlSourceHelper = new SQLSourceHelper(context,"Source Name");
assertEquals(-1,sqlSourceHelper.getIncreaseColumn());
}

@Test
public void chekGetAllRowsWithNullParam() {
Expand Down