Skip to content

Commit

Permalink
Refactor error handling logic
Browse files Browse the repository at this point in the history
  • Loading branch information
linhr committed Apr 20, 2023
1 parent 610b7ef commit 2e86172
Show file tree
Hide file tree
Showing 14 changed files with 402 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void open() throws CatalogException {
try {
this.metaClient = metaConnectionProvider.getMetaClient();
} catch (UnknownHostException | ClientServerIncompatibleException e) {
LOG.error("nebula get meta client error, ", e);
LOG.error("nebula get meta client error", e);
throw new CatalogException("nebula get meta client error.", e);
}

Expand All @@ -108,8 +108,8 @@ public void open() throws CatalogException {
graphConnectionProvider.getPassword(), true);
} catch (NotValidConnectionException | IOErrorException | AuthFailedException
| ClientServerIncompatibleException | UnknownHostException e) {
LOG.error("failed to get graph session, ", e);
throw new CatalogException("get graph session error, ", e);
LOG.error("failed to get graph session", e);
throw new CatalogException("get graph session error.", e);
}
}

Expand Down Expand Up @@ -137,7 +137,7 @@ public List<String> listDatabases() throws CatalogException {
spaceNames.add(new String(space.getName()));
}
} catch (TException | ExecuteFailedException | ClientServerIncompatibleException e) {
LOG.error("failed to connect meta service vis {} ", address, e);
LOG.error("failed to connect meta service via " + address, e);
throw new CatalogException("nebula meta service connect failed.", e);
}
return spaceNames;
Expand All @@ -152,7 +152,7 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE
props.put("spaceId",
String.valueOf(metaClient.getSpace(databaseName).getSpace_id()));
} catch (TException | ExecuteFailedException e) {
LOG.error("get spaceId error, ", e);
LOG.error("get spaceId error", e);
}
return new CatalogDatabaseImpl(props, databaseName);
} else {
Expand Down Expand Up @@ -186,13 +186,13 @@ public void createDatabase(String dataBaseName,
)
);
if (!newProperties.containsKey(NebulaConstant.CREATE_VID_TYPE)) {
LOG.error("failed to create graph space {}, missing VID type param", properties);
LOG.error("failed to create graph space {}, missing VID type", properties);
throw new CatalogException("nebula create graph space failed, missing VID type.");
}
String vidType = newProperties.get(NebulaConstant.CREATE_VID_TYPE);
if (!NebulaUtils.checkValidVidType(vidType)) {
LOG.error("VID type not satisfy {}", vidType);
throw new CatalogException("nebula graph dont support VID type.");
LOG.error("invalid VID type: {}", vidType);
throw new CatalogException("nebula does not support the specified VID type.");
}
NebulaSpace space = new NebulaSpace(
dataBaseName,catalogDatabase.getComment(), newProperties);
Expand All @@ -210,7 +210,7 @@ public void createDatabase(String dataBaseName,
LOG.debug("create space success.");
} else {
LOG.error("create space failed: {}", execResult.getErrorMessage());
throw new CatalogException("create space failed, " + execResult.getErrorMessage());
throw new CatalogException("create space failed: " + execResult.getErrorMessage());
}
}

Expand All @@ -227,7 +227,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
try {
return (listTables(graphSpace).contains(table));
} catch (DatabaseNotExistException e) {
throw new CatalogException("failed to call tableExists function, ", e);
throw new CatalogException("failed to call tableExists function.", e);
}
}

Expand All @@ -248,7 +248,7 @@ public List<String> listTables(String graphSpace) throws DatabaseNotExistExcepti
try {
metaClient.connect();
} catch (TException | ClientServerIncompatibleException e) {
LOG.error("failed to connect meta service vis {} ", address, e);
LOG.error("failed to connect meta service via " + address, e);
throw new CatalogException("nebula meta service connect failed.", e);
}
List<String> tables = new ArrayList<>();
Expand All @@ -260,7 +260,7 @@ public List<String> listTables(String graphSpace) throws DatabaseNotExistExcepti
tables.add("EDGE" + NebulaConstant.POINT + new String(edge.edge_name));
}
} catch (TException | ExecuteFailedException e) {
LOG.error("get tags or edges error,", e);
LOG.error("get tags or edges error", e);
}
return tables;
}
Expand All @@ -284,7 +284,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
try {
metaClient.connect();
} catch (TException | ClientServerIncompatibleException e) {
LOG.error("failed to connect meta service vis {} ", address, e);
LOG.error("failed to connect meta service via " + address, e);
throw new CatalogException("nebula meta service connect failed.", e);
}

Expand All @@ -296,7 +296,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
schema = metaClient.getEdge(graphSpace, label);
}
} catch (TException | ExecuteFailedException e) {
LOG.error("get tag or edge schema error, ", e);
LOG.error("get tag or edge schema error", e);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
package org.apache.flink.connector.nebula.sink;

import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.net.Session;
import java.io.IOException;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,41 +28,26 @@ public abstract class NebulaBatchExecutor<T> {
*
* @param session graph session
*/
public abstract void executeBatch(Session session);
public abstract void executeBatch(Session session) throws IOException;

protected static void executeStatement(Session session, String statement,
int maxRetries, int retryDelayMs) throws IOException {
if (maxRetries < 0) {
throw new IllegalArgumentException(
String.format("invalid max retries: %s", maxRetries));
}
public abstract void clearBatch();

public abstract boolean isBatchEmpty();

// The statement will be executed at most `maxRetries + 1` times.
for (int i = 0; i <= maxRetries; i++) {
ResultSet execResult;
try {
execResult = session.execute(statement);
if (execResult.isSucceeded()) {
LOG.debug("write success");
break;
} else {
throw new IOException(String.format(
"write data failed for statement %s: %s [%s]",
statement, execResult.getErrorMessage(), execResult.getErrorCode()));
}
} catch (Exception e) {
LOG.error(String.format("write data error (attempt %s)", i), e);
if (i >= maxRetries) {
throw new IOException(e);
} else if (i + 1 <= maxRetries) {
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("interrupted", ex);
}
}
}
protected static void executeStatement(Session session, String statement) throws IOException {
LOG.debug("write statement: {}", statement);
ResultSet execResult;
try {
execResult = session.execute(statement);
} catch (IOErrorException e) {
throw new IOException(e);
}
if (execResult.isSucceeded()) {
LOG.debug("write success");
} else {
throw new IOException(String.format(
"write data failed for statement %s: %s [%s]",
statement, execResult.getErrorMessage(), execResult.getErrorCode()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import java.io.Flushable;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -29,6 +27,7 @@
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.connector.nebula.utils.FailureHandlerEnum;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,13 +41,14 @@ public abstract class NebulaBatchOutputFormat<T, OptionsT extends ExecutionOptio
protected final NebulaGraphConnectionProvider graphProvider;
protected final OptionsT executionOptions;
protected NebulaBatchExecutor<T> nebulaBatchExecutor;
private volatile AtomicLong numPendingRow;
private transient long numPendingRow;
private NebulaPool nebulaPool;
private Session session;

private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile boolean closed = false;
private transient volatile Exception commitException;

public NebulaBatchOutputFormat(
NebulaGraphConnectionProvider graphProvider,
Expand All @@ -70,33 +70,20 @@ public void configure(Configuration configuration) {
public void open(int i, int i1) throws IOException {
try {
nebulaPool = graphProvider.getNebulaPool();
session = nebulaPool.getSession(graphProvider.getUserName(),
graphProvider.getPassword(), true);
} catch (UnknownHostException | NotValidConnectionException | AuthFailedException
| ClientServerIncompatibleException | IOErrorException e) {
LOG.error("failed to get graph session, ", e);
throw new IOException("get graph session error, ", e);
}
ResultSet resultSet;
try {
resultSet = session.execute("USE " + executionOptions.getGraphSpace());
} catch (IOErrorException e) {
LOG.error("switch space error, ", e);
throw new IOException("switch space error,", e);
}
if (!resultSet.isSucceeded()) {
LOG.error("switch space failed, {}", resultSet.getErrorMessage());
throw new RuntimeException("switch space failed, " + resultSet.getErrorMessage());
} catch (UnknownHostException e) {
LOG.error("failed to create connection pool", e);
throw new IOException("connection pool creation error", e);
}
renewSession();

try {
metaClient = metaProvider.getMetaClient();
} catch (TException | ClientServerIncompatibleException e) {
LOG.error("failed to get meta client, ", e);
throw new IOException("get metaClient error, ", e);
LOG.error("failed to get meta client", e);
throw new IOException("get meta client error", e);
}

numPendingRow = new AtomicLong(0);
numPendingRow = 0;
nebulaBatchExecutor = createNebulaBatchExecutor();
// start the schedule task: submit the buffer records every batchInterval.
// If batchIntervalMs is 0, do not start the scheduler task.
Expand All @@ -105,8 +92,12 @@ public void open(int i, int i1) throws IOException {
"nebula-write-output-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
synchronized (NebulaBatchOutputFormat.this) {
if (!closed) {
commit();
if (!closed && commitException == null) {
try {
commit();
} catch (Exception e) {
commitException = e;
}
}
} },
executionOptions.getBatchIntervalMs(),
Expand All @@ -115,41 +106,105 @@ public void open(int i, int i1) throws IOException {
}
}

private void checkCommitException() {
if (commitException != null) {
throw new RuntimeException("commit records failed", commitException);
}
}

private void renewSession() throws IOException {
if (session != null) {
session.release();
session = null;
}
try {
session = nebulaPool.getSession(graphProvider.getUserName(),
graphProvider.getPassword(), true);
} catch (NotValidConnectionException | AuthFailedException
| ClientServerIncompatibleException | IOErrorException e) {
LOG.error("failed to get graph session", e);
throw new IOException("get graph session error", e);
}
ResultSet resultSet;
try {
resultSet = session.execute("USE " + executionOptions.getGraphSpace());
} catch (IOErrorException e) {
LOG.error("switch space error", e);
throw new IOException("switch space error", e);
}
if (!resultSet.isSucceeded()) {
LOG.error("switch space failed: " + resultSet.getErrorMessage());
throw new IOException("switch space failed: " + resultSet.getErrorMessage());
}
}

protected abstract NebulaBatchExecutor<T> createNebulaBatchExecutor();

/**
* write one record to buffer
*/
@Override
public final synchronized void writeRecord(T row) {
public final synchronized void writeRecord(T row) throws IOException {
checkCommitException();
nebulaBatchExecutor.addToBatch(row);
numPendingRow++;

if (numPendingRow.incrementAndGet() >= executionOptions.getBatchSize()) {
if (executionOptions.getBatchSize() > 0
&& numPendingRow >= executionOptions.getBatchSize()) {
commit();
}
}

/**
* commit batch insert statements
*/
private synchronized void commit() {
nebulaBatchExecutor.executeBatch(session);
long pendingRow = numPendingRow.get();
numPendingRow.compareAndSet(pendingRow, 0);
private synchronized void commit() throws IOException {
int maxRetries = executionOptions.getMaxRetries();
int retryDelayMs = executionOptions.getRetryDelayMs();
boolean failOnError = executionOptions.getFailureHandler().equals(FailureHandlerEnum.FAIL);

// execute the batch at most `maxRetries + 1` times
for (int i = 0; i <= maxRetries; i++) {
try {
nebulaBatchExecutor.executeBatch(session);
numPendingRow = 0;
break;
} catch (Exception e) {
LOG.error(String.format("write data error (attempt %s)", i), e);
if (i >= maxRetries) {
// clear the batch on failure after all retries
nebulaBatchExecutor.clearBatch();
numPendingRow = 0;
if (failOnError) {
throw e;
}
} else if (i + 1 <= maxRetries) {
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("interrupted", ex);
}
}
// We do not know whether the failure was due to an expired session or
// an issue with the query, so we renew the session anyway to be more robust.
renewSession();
}
}
}

/**
* commit the batch write operator before release connection
*/
@Override
public final synchronized void close() {
public final synchronized void close() throws IOException {
if (!closed) {
closed = true;
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
scheduler.shutdown();
}
if (numPendingRow != null && numPendingRow.get() > 0) {
if (numPendingRow > 0) {
commit();
}
if (session != null) {
Expand All @@ -169,7 +224,8 @@ public final synchronized void close() {
*/
@Override
public synchronized void flush() throws IOException {
while (numPendingRow.get() != 0) {
checkCommitException();
while (numPendingRow > 0) {
commit();
}
}
Expand Down
Loading

0 comments on commit 2e86172

Please sign in to comment.