Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve connector failure handling logic #85

Merged
merged 6 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void open() throws CatalogException {
try {
this.metaClient = metaConnectionProvider.getMetaClient();
} catch (UnknownHostException | ClientServerIncompatibleException e) {
LOG.error("nebula get meta client error, ", e);
LOG.error("nebula get meta client error", e);
throw new CatalogException("nebula get meta client error.", e);
}

Expand All @@ -108,8 +108,8 @@ public void open() throws CatalogException {
graphConnectionProvider.getPassword(), true);
} catch (NotValidConnectionException | IOErrorException | AuthFailedException
| ClientServerIncompatibleException | UnknownHostException e) {
LOG.error("failed to get graph session, ", e);
throw new CatalogException("get graph session error, ", e);
LOG.error("failed to get graph session", e);
throw new CatalogException("get graph session error.", e);
}
}

Expand Down Expand Up @@ -137,7 +137,7 @@ public List<String> listDatabases() throws CatalogException {
spaceNames.add(new String(space.getName()));
}
} catch (TException | ExecuteFailedException | ClientServerIncompatibleException e) {
LOG.error("failed to connect meta service vis {} ", address, e);
LOG.error("failed to connect meta service via " + address, e);
throw new CatalogException("nebula meta service connect failed.", e);
}
return spaceNames;
Expand All @@ -152,7 +152,7 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE
props.put("spaceId",
String.valueOf(metaClient.getSpace(databaseName).getSpace_id()));
} catch (TException | ExecuteFailedException e) {
LOG.error("get spaceId error, ", e);
LOG.error("get spaceId error", e);
}
return new CatalogDatabaseImpl(props, databaseName);
} else {
Expand Down Expand Up @@ -186,13 +186,13 @@ public void createDatabase(String dataBaseName,
)
);
if (!newProperties.containsKey(NebulaConstant.CREATE_VID_TYPE)) {
LOG.error("failed to create graph space {}, missing VID type param", properties);
LOG.error("failed to create graph space {}, missing VID type", properties);
throw new CatalogException("nebula create graph space failed, missing VID type.");
}
String vidType = newProperties.get(NebulaConstant.CREATE_VID_TYPE);
if (!NebulaUtils.checkValidVidType(vidType)) {
LOG.error("VID type not satisfy {}", vidType);
throw new CatalogException("nebula graph dont support VID type.");
LOG.error("invalid VID type: {}", vidType);
throw new CatalogException("nebula does not support the specified VID type.");
}
NebulaSpace space = new NebulaSpace(
dataBaseName,catalogDatabase.getComment(), newProperties);
Expand All @@ -210,7 +210,7 @@ public void createDatabase(String dataBaseName,
LOG.debug("create space success.");
} else {
LOG.error("create space failed: {}", execResult.getErrorMessage());
throw new CatalogException("create space failed, " + execResult.getErrorMessage());
throw new CatalogException("create space failed: " + execResult.getErrorMessage());
}
}

Expand All @@ -227,7 +227,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
try {
return (listTables(graphSpace).contains(table));
} catch (DatabaseNotExistException e) {
throw new CatalogException("failed to call tableExists function, ", e);
throw new CatalogException("failed to call tableExists function.", e);
}
}

Expand All @@ -248,7 +248,7 @@ public List<String> listTables(String graphSpace) throws DatabaseNotExistExcepti
try {
metaClient.connect();
} catch (TException | ClientServerIncompatibleException e) {
LOG.error("failed to connect meta service vis {} ", address, e);
LOG.error("failed to connect meta service via " + address, e);
throw new CatalogException("nebula meta service connect failed.", e);
}
List<String> tables = new ArrayList<>();
Expand All @@ -260,7 +260,7 @@ public List<String> listTables(String graphSpace) throws DatabaseNotExistExcepti
tables.add("EDGE" + NebulaConstant.POINT + new String(edge.edge_name));
}
} catch (TException | ExecuteFailedException e) {
LOG.error("get tags or edges error,", e);
LOG.error("get tags or edges error", e);
}
return tables;
}
Expand All @@ -284,7 +284,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
try {
metaClient.connect();
} catch (TException | ClientServerIncompatibleException e) {
LOG.error("failed to connect meta service vis {} ", address, e);
LOG.error("failed to connect meta service via " + address, e);
throw new CatalogException("nebula meta service connect failed.", e);
}

Expand All @@ -296,7 +296,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
schema = metaClient.getEdge(graphSpace, label);
}
} catch (TException | ExecuteFailedException e) {
LOG.error("get tag or edge schema error, ", e);
LOG.error("get tag or edge schema error", e);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,72 @@

package org.apache.flink.connector.nebula.sink;

import com.vesoft.nebula.ErrorCode;
import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.net.Session;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface NebulaBatchExecutor<T> {
public abstract class NebulaBatchExecutor<T> {

public static class ExecutionException extends IOException {
private final String statement;
private final String errorMessage;
private final int errorCode;

public ExecutionException(String statement, String errorMessage, int errorCode) {
this.statement = statement;
this.errorMessage = errorMessage;
this.errorCode = errorCode;
}

@Override
public String getMessage() {
return String.format("failed to execute statement %s: %s [%s]",
statement, errorMessage, errorCode);
}

public boolean isNonRecoverableError() {
return this.errorCode == ErrorCode.E_SEMANTIC_ERROR.getValue()
|| this.errorCode == ErrorCode.E_SYNTAX_ERROR.getValue();
}
}

private static final Logger LOG = LoggerFactory.getLogger(NebulaBatchExecutor.class);

/**
* put record into buffer
*
* @param record represent vertex or edge
*/
void addToBatch(T record);
public abstract void addToBatch(T record);

/**
* execute the statement
*
* @param session graph session
*/
String executeBatch(Session session);
public abstract void executeBatch(Session session) throws IOException;

public abstract void clearBatch();

public abstract boolean isBatchEmpty();

protected static void executeStatement(Session session, String statement) throws IOException {
LOG.debug("write statement: {}", statement);
ResultSet execResult;
try {
execResult = session.execute(statement);
} catch (IOErrorException e) {
throw new IOException(e);
}
if (execResult.isSucceeded()) {
LOG.debug("write success");
} else {
throw new ExecutionException(
statement, execResult.getErrorMessage(), execResult.getErrorCode());
}
}
}
Loading