Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ public class AuditRoute {
private String auditId;
private String inlongGroupIdsInclude;
private String inlongGroupIdsExclude;
private int priority;
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ public int getValue(String key, int defaultValue) {
return getValue(key, defaultValue, Integer::parseInt);
}

public boolean getValue(String key, boolean defaultValue) {
return getValue(key, defaultValue, Boolean::parseBoolean);
}

private boolean updatePropertiesHolder(Map<String, String> result,
String holderName, boolean addElseRemove) {
if (StringUtils.isNotEmpty(holderName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class AuditRouteCache {
private static final String AUDIT_ID_INCLUDE = "audit_id_include";
private static final String GROUP_ID_INCLUDE = "inlong_group_id_include";
private static final String GROUP_ID_EXCLUDE = "inlong_group_id_exclude";
private static final String PRIORITY = "priority";

@Getter
private static final AuditRouteCache instance = new AuditRouteCache();
Expand Down Expand Up @@ -115,6 +116,7 @@ private void updateAuditRouteCache(String querySQL) {
String auditId = StringUtils.trimToNull(resultSet.getString(AUDIT_ID_INCLUDE));
String includeGroupId = StringUtils.trimToNull(resultSet.getString(GROUP_ID_INCLUDE));
String excludeGroupId = StringUtils.trimToNull(resultSet.getString(GROUP_ID_EXCLUDE));
int priority = resultSet.getInt(PRIORITY);

if (!isValidRegexOrLog(auditId)
|| !isValidRegexOrLog(includeGroupId)
Expand All @@ -130,6 +132,7 @@ private void updateAuditRouteCache(String querySQL) {
data.setAuditId(auditId);
data.setInlongGroupIdsInclude(includeGroupId);
data.setInlongGroupIdsExclude(excludeGroupId);
data.setPriority(priority);

auditRoutes.computeIfAbsent(address, key -> new ArrayList<>()).add(data);
}
Expand All @@ -139,6 +142,9 @@ private void updateAuditRouteCache(String querySQL) {
}

if (!auditRoutes.isEmpty()) {
auditRoutes.values()
.forEach(routes -> routes.sort((r1, r2) -> Integer.compare(r2.getPriority(), r1.getPriority())));

auditRouteCache = auditRoutes;
LOGGER.info("AuditRouteCache update success. Cache size={}, Query size={}", auditRouteCache.size(),
auditRoutes.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,5 +255,5 @@ public class SqlConstants {

public static final String KEY_QUERY_AUDIT_ROUTE_SQL = "audit.query.route.sql";
public static final String DEFAULT_QUERY_AUDIT_ROUTE_SQL =
"SELECT address, audit_id_include, inlong_group_id_include, inlong_group_id_exclude FROM audit_route_config WHERE status=1";
"SELECT address, audit_id_include, inlong_group_id_include, inlong_group_id_exclude, priority FROM audit_route_config WHERE status=1";
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ public class ConfigConstants {
public static final int DEFAULT_AUDIT_SERVICE_TIMEOUT_MS = 30000;
public static final String KEY_AUDIT_SERVICE_ROUTE_API = "audit.service.route.api";
public static final String DEFAULT_AUDIT_SERVICE_ROUTE_API = "/audit/query/getAuditRoute";

public static final String KEY_AUDIT_STORE_ROUTE_ENABLED = "audit.store.route.enabled";
public static final boolean DEFAULT_AUDIT_STORE_ROUTE_ENABLED = false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
import static org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_AUDIT_SERVICE_ADDR;
import static org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_AUDIT_SERVICE_ROUTE_API;
import static org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_AUDIT_SERVICE_TIMEOUT_MS;
import static org.apache.inlong.audit.store.config.ConfigConstants.DEFAULT_AUDIT_STORE_ROUTE_ENABLED;
import static org.apache.inlong.audit.store.config.ConfigConstants.KEY_AUDIT_SERVICE_ADDR;
import static org.apache.inlong.audit.store.config.ConfigConstants.KEY_AUDIT_SERVICE_ROUTE_API;
import static org.apache.inlong.audit.store.config.ConfigConstants.KEY_AUDIT_SERVICE_TIMEOUT_MS;
import static org.apache.inlong.audit.store.config.ConfigConstants.KEY_AUDIT_STORE_ROUTE_ENABLED;
import static org.apache.inlong.audit.utils.RouteUtils.extractAddress;

public class AuditRouteManager {
Expand All @@ -68,6 +70,13 @@ private AuditRouteManager() {
}

public void init(String jdbcUrl) {
boolean routeEnabled =
ConfigManager.getInstance().getValue(KEY_AUDIT_STORE_ROUTE_ENABLED, DEFAULT_AUDIT_STORE_ROUTE_ENABLED);
if (!routeEnabled) {
LOGGER.info("AuditRouteManager is disabled by configuration ({}=false)", KEY_AUDIT_STORE_ROUTE_ENABLED);
return;
}

String serviceAddr = ConfigManager.getInstance().getValue(KEY_AUDIT_SERVICE_ADDR, DEFAULT_AUDIT_SERVICE_ADDR);
String routeApi =
ConfigManager.getInstance().getValue(KEY_AUDIT_SERVICE_ROUTE_API, DEFAULT_AUDIT_SERVICE_ROUTE_API);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void insert(AuditData msgBody, Consumer<byte[]> consumer, MessageId messa
AuditRouteManager.getInstance().getAuditRoutes())) {
MetricsManager.getInstance().filterSuccess();
PulsarUtils.acknowledge(consumer, messageId);
LOG.warn("The audit data does not match the routing rules and is filtered out: {} ", msgBody);
LOG.debug("The audit data does not match the routing rules and is filtered out: {} ", msgBody);
return;
}

Expand Down
1 change: 1 addition & 0 deletions inlong-audit/sql/apache_inlong_audit_mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ CREATE TABLE IF NOT EXISTS `audit_route_config` (
`inlong_group_id_include` VARCHAR(255) COMMENT 'Included Inlong group IDs (regular expression)',
`inlong_group_id_exclude` VARCHAR(255) COMMENT 'Excluded Inlong group IDs (regular expression)',
`status` TINYINT NOT NULL DEFAULT 1 COMMENT 'Status, 1=active, 0=inactive',
`priority` int(32) NOT NULL DEFAULT 1 COMMENT 'Priority level, default is 1',
`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update timestamp',
PRIMARY KEY (`id`)
) ENGINE = InnoDB DEFAULT CHARSET = UTF8 COMMENT='Audit route configuration table';
Expand Down
Loading