Skip to content

Commit c7a68a7

Browse files
author
erdemcer
committed
Start SCN deployment
1 parent 79f491e commit c7a68a7

File tree

4 files changed

+16
-4
lines changed

4 files changed

+16
-4
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ In order to execute connector successfully, connector must be started with privi
5353
|table.whitelist|String|A comma separated list of database schema or table names which will be captured.<br />For all schema capture **<SCHEMA_NAME>.*** <br /> For table capture **<SCHEMA_NAME>.<TABLE_NAME>** must be specified.|
5454
|parse.dml.data|Boolean|If it is true , captured sql DML statement will be parsed into fields and values.If it is false only sql DML statement is published.
5555
|reset.offset|Boolean|If it is true , offset value will be set to current SCN of database when connector started.If it is false connector will start from last offset value.
56+
|start.scn|Long|If it is set , offset value will be set this specified value and logminer will start at this SCN.If connector would like to be started from desired SCN , this property can be used.
5657
|||
5758

5859

config/OracleSourceConnector.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,5 @@ db.user.password=kminerpass
2626
db.fetch.size=1
2727
table.whitelist=TEST.*,TEST2.TABLE2
2828
parse.dml.data=true
29-
reset.offset=true
29+
reset.offset=true
30+
start.scn=

src/main/java/com/ecer/kafka/connect/oracle/OracleSourceConnectorConfig.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class OracleSourceConnectorConfig extends AbstractConfig {
2121
public static final String PARSE_DML_DATA = "parse.dml.data";
2222
public static final String DB_FETCH_SIZE = "db.fetch.size";
2323
public static final String RESET_OFFSET = "reset.offset";
24+
public static final String START_SCN = "start.scn";
2425

2526
public OracleSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
2627
super(config, parsedConfig);
@@ -42,7 +43,8 @@ public static ConfigDef conf() {
4243
.define(TABLE_WHITELIST,Type.STRING,Importance.HIGH,"TAbles will be mined")
4344
.define(PARSE_DML_DATA,Type.BOOLEAN,Importance.HIGH,"Parse DML Data")
4445
.define(DB_FETCH_SIZE,Type.INT,Importance.HIGH,"Database Record Fetch Size")
45-
.define(RESET_OFFSET,Type.BOOLEAN,Importance.HIGH,"Reset Offset");
46+
.define(RESET_OFFSET,Type.BOOLEAN,Importance.HIGH,"Reset Offset")
47+
.define(START_SCN,Type.STRING,"",Importance.LOW,"Start SCN");
4648
}
4749

4850
/*public String getMy(){
@@ -59,4 +61,5 @@ public static ConfigDef conf() {
5961
public Boolean getParseDmlData(){return this.getBoolean(PARSE_DML_DATA);}
6062
public int getDbFetchSize(){return this.getInt(DB_FETCH_SIZE);}
6163
public Boolean getResetOffset(){return this.getBoolean(RESET_OFFSET);}
64+
public String getStartScn(){return this.getString(START_SCN);}
6265
}

src/main/java/com/ecer/kafka/connect/oracle/OracleSourceTask.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,11 @@ public static void closeDbConn() throws SQLException{
6969
@Override
7070
public void start(Map<String, String> map) {
7171
//TODO: Do things here that are required to start your task. This could be open a connection to a database, etc.
72-
config=new OracleSourceConnectorConfig(map);
72+
config=new OracleSourceConnectorConfig(map);
7373
topic=config.getTopic();
7474
dbName=config.getDbNameAlias();
75-
parseDmlData=config.getParseDmlData();
75+
parseDmlData=config.getParseDmlData();
76+
String startSCN = config.getStartScn();
7677
log.info("Oracle Kafka Connector is starting on {}",config.getDbNameAlias());
7778
try {
7879
log.info("Connecting to database");
@@ -89,6 +90,12 @@ public void start(Map<String, String> map) {
8990
Object lastRecordedOffset = offset.get(POSITION_FIELD);
9091
streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
9192
}
93+
94+
if (!startSCN.equals("")){
95+
log.info("Resetting offset with specified SCN:{}",startSCN);
96+
streamOffset=Long.parseLong(startSCN);
97+
streamOffset-=1;
98+
}
9299

93100
if (config.getResetOffset()){
94101
log.info("Resetting offset with new SCN");

0 commit comments

Comments
 (0)