Skip to content

Commit

Permalink
RavenDB-21341 Added retry mechanism for InitTransactions of Kafka pro…
Browse files Browse the repository at this point in the history
…ducer. Added configuration option.
  • Loading branch information
arekpalinski committed Sep 13, 2023
1 parent d4e2098 commit 3521730
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
6 changes: 6 additions & 0 deletions src/Raven.Server/Config/Categories/EtlConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,11 @@ public class EtlConfiguration : ConfigurationCategory
[DefaultValue(64 * 1024)]
[ConfigurationEntry("ETL.OLAP.MaxNumberOfExtractedDocuments", ConfigurationEntryScope.ServerWideOrPerDatabase)]
public int? OlapMaxNumberOfExtractedDocuments { get; protected set; }

[Description("Timeout to initialize transactions for the Kafka producer")]
[DefaultValue(60)]
[TimeUnit(TimeUnit.Seconds)]
[ConfigurationEntry("ETL.Queue.Kafka.InitTransactionsTimeoutInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)]
public TimeSetting KafkaInitTransactionsTimeout { get; set; }
}
}
25 changes: 24 additions & 1 deletion src/Raven.Server/Documents/ETL/Providers/Queue/Kafka/KafkaEtl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,30 @@ protected override int PublishMessages(List<QueueWithItems<KafkaItem>> itemsPerT

try
{
producer.InitTransactions(TimeSpan.FromSeconds(60));
var retries = 3;

do
{
try
{
producer.InitTransactions(Database.Configuration.Etl.KafkaInitTransactionsTimeout.AsTimeSpan);
break;
}
catch (KafkaRetriableException e)
{
if (--retries > 0)
{
if (Logger.IsOperationsEnabled)
{
Logger.Operations($"ETL process: {Name}. Failed to init transactions for the producer instance. Retries: {retries}", e);
}
}
else
{
throw;
}
}
} while (true);
}
catch (Exception e)
{
Expand Down

0 comments on commit 3521730

Please sign in to comment.