Skip to content

Commit

Permalink
[improvement](spark-connector) Throw an exception when the data push …
Browse files Browse the repository at this point in the history
…fails and there are too many retries (#7531)
  • Loading branch information
Aiden-Dong authored Jan 11, 2022
1 parent d418887 commit 6864a37
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public String toString() {
}
}

public void load(List<List<Object>> rows) throws StreamLoadException {
public String listToString(List<List<Object>> rows){
StringJoiner lines = new StringJoiner(LINE_DELIMITER);
for (List<Object> row : rows) {
StringJoiner line = new StringJoiner(FIELD_DELIMITER);
Expand All @@ -151,9 +151,14 @@ public void load(List<List<Object>> rows) throws StreamLoadException {
}
lines.add(line.toString());
}
load(lines.toString());
return lines.toString();
}


public void load(List<List<Object>> rows) throws StreamLoadException {
String records = listToString(rows);
load(records);
}
public void load(String value) throws StreamLoadException {
LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
LoadResponse loadResponse = loadBatch(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.slf4j.{Logger, LoggerFactory}

import java.io.IOException
import java.util

import org.apache.doris.spark.rest.RestService

import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.util.control.Breaks

private[sql] class DorisSourceProvider extends DataSourceRegister
with RelationProvider
with CreatableRelationProvider
with StreamSinkProvider {
with StreamSinkProvider
with Serializable {

private val logger: Logger = LoggerFactory.getLogger(classOf[DorisSourceProvider].getName)

Expand Down Expand Up @@ -97,14 +100,23 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
catch {
case e: Exception =>
try {
logger.warn("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
//If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings,logger))
Thread.sleep(1000 * i)
} catch {
case ex: InterruptedException =>
logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
Thread.currentThread.interrupt()
throw new IOException("unable to flush; interrupted while doing another attempt", e)
}
}
}

if(!rowsBuffer.isEmpty){
logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.slf4j.{Logger, LoggerFactory}

import java.io.IOException
import java.util

import org.apache.doris.spark.rest.RestService

import scala.util.control.Breaks

private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable {
Expand Down Expand Up @@ -81,14 +83,23 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
catch {
case e: Exception =>
try {
logger.warn("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
//If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
dorisStreamLoader.setHostPort(RestService.randomBackendV2(settings,logger))
Thread.sleep(1000 * i)
} catch {
case ex: InterruptedException =>
logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
Thread.currentThread.interrupt()
throw new IOException("unable to flush; interrupted while doing another attempt", e)
}
}
}

if(!rowsBuffer.isEmpty){
logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
}
}
}
})
Expand Down

0 comments on commit 6864a37

Please sign in to comment.