Skip to content

Commit

Permalink
[INLONG-10761][Agent] Delete reader related code (#10762)
Browse files Browse the repository at this point in the history
* [INLONG-10761][Agent] Delete reader related code

* [INLONG-10761][Agent] Delete reader related code

* [INLONG-10761][Agent] Delete reader related code

* [INLONG-10761][Agent] Delete reader related code

* [INLONG-10761][Agent] Delete reader related code
  • Loading branch information
justinwwhuang authored Aug 8, 2024
1 parent 150fd4c commit d8c2bf4
Show file tree
Hide file tree
Showing 33 changed files with 11 additions and 3,823 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,13 @@
package org.apache.inlong.agent.plugin.file;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.Message;

import java.util.List;

/**
* Source can be split into multiple reader.
* The source class is used to collect data from a data source
*/
public interface Source {

/**
* Split source into a list of readers.
*
* @param conf job conf
* @return list of reader
*/
List<Reader> split(TaskProfile conf);

/**
* Read message
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@
package org.apache.inlong.agent.plugin.sources;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.sources.reader.SqlReader;
import org.apache.inlong.agent.utils.AgentDbUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -39,56 +34,11 @@ public class DatabaseSqlSource extends AbstractSource {

private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseSqlSource.class);

private static final String JOB_DATABASE_SQL = "job.sql.command";

private static final String DATABASE_SOURCE_TAG_NAME = "AgentDatabaseSourceMetric";

private static AtomicLong metricsIndex = new AtomicLong(0);

public DatabaseSqlSource() {
}

/**
* Use SQL to read data.
*
* @param sqlPattern sql pattern
* @return list of readers or null if sql is not correct.
*/
private List<Reader> splitSqlJob(String sqlPattern) {
String[] sqlList = AgentDbUtils.replaceDynamicSeq(sqlPattern);
if (sqlList != null) {
List<Reader> result = new ArrayList<>();
for (String sql : sqlList) {
result.add(new SqlReader(sql));
}
return result;
}
return null;
}

/**
* Use SQL or binlog to read data.
*
* @return reader list or null if database type is not correct.
*/
@Override
public List<Reader> split(TaskProfile conf) {
String sqlPattern = conf.get(JOB_DATABASE_SQL, "").toLowerCase();
List<Reader> readerList = null;
if (!sqlPattern.isEmpty()) {
readerList = splitSqlJob(sqlPattern);
}
if (readerList != null) {
// increment the count of successful sources
sourceMetric.sourceSuccessCount.incrementAndGet();
} else {
// database type or sql is incorrect
// increment the count of failed sources
sourceMetric.sourceFailCount.incrementAndGet();
}
return readerList;
}

@Override
protected String getThreadName() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.inlong.agent.plugin.sources;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;

import org.apache.commons.lang3.ObjectUtils;
Expand Down Expand Up @@ -168,11 +166,6 @@ protected boolean isRunnable() {
return runnable;
}

@Override
public List<Reader> split(TaskProfile conf) {
return null;
}

@Override
public boolean sourceExist() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.OffsetProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.DataCollectType;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.core.FileStaticManager;
import org.apache.inlong.agent.core.FileStaticManager.FileStatic;
import org.apache.inlong.agent.core.task.OffsetManager;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
import org.apache.inlong.agent.utils.AgentUtils;
Expand Down Expand Up @@ -315,11 +313,6 @@ public boolean sourceExist() {
return fileExist;
}

@Override
public List<Reader> split(TaskProfile jobConf) {
return null;
}

@Override
protected void releaseSource() {
if (randomAccessFile != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;

import io.debezium.connector.mongodb.MongoDbConnector;
Expand Down Expand Up @@ -130,11 +128,6 @@ private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
committer.markBatchFinished();
}

@Override
public List<Reader> split(TaskProfile conf) {
return null;
}

@Override
protected String getThreadName() {
return "mongo-source-" + taskId + "-" + instanceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,10 @@
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.sources.reader.OracleReader;

import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
Expand All @@ -49,7 +46,14 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import static org.apache.inlong.agent.constant.TaskConstants.*;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_DBNAME;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_HOSTNAME;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_PASSWORD;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_PORT;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_SCHEMA_INCLUDE_LIST;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_SNAPSHOT_MODE;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_TABLE_INCLUDE_LIST;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_ORACLE_USER;

/**
* Oracle SQL source
Expand All @@ -71,15 +75,6 @@ public class OracleSource extends AbstractSource {
public OracleSource() {
}

@Override
public List<Reader> split(TaskProfile conf) {
Reader oracleReader = new OracleReader();
List<Reader> readerList = new ArrayList<>();
readerList.add(oracleReader);
sourceMetric.sourceSuccessCount.incrementAndGet();
return readerList;
}

@Override
protected String getThreadName() {
return "oracle-source-" + taskId + "-" + instanceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.AgentConstants;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;

import io.debezium.connector.postgresql.PostgresConnector;
Expand Down Expand Up @@ -145,11 +143,6 @@ private void handleConsumerEvent(List<ChangeEvent<String, String>> records,
committer.markBatchFinished();
}

@Override
public List<Reader> split(TaskProfile conf) {
return null;
}

@Override
protected String getThreadName() {
return "postgres-source-" + taskId + "-" + instanceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package org.apache.inlong.agent.plugin.sources;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;

import org.apache.commons.lang3.ObjectUtils;
Expand Down Expand Up @@ -165,11 +163,6 @@ protected void releaseSource() {
}
}

@Override
public List<Reader> split(TaskProfile conf) {
return null;
}

@Override
public boolean sourceExist() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@
package org.apache.inlong.agent.plugin.sources;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.sources.reader.RedisReader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
Expand All @@ -41,15 +37,6 @@ public RedisSource() {

}

@Override
public List<Reader> split(TaskProfile conf) {
RedisReader redisReader = new RedisReader();
List<Reader> readerList = new ArrayList<>();
readerList.add(redisReader);
sourceMetric.sourceSuccessCount.incrementAndGet();
return readerList;
}

@Override
protected String getThreadName() {
return null;
Expand Down
Loading

0 comments on commit d8c2bf4

Please sign in to comment.