From 352173028037806038b2cae0229d450eedf97409 Mon Sep 17 00:00:00 2001 From: Arkadiusz Palinski Date: Wed, 13 Sep 2023 11:57:06 +0200 Subject: [PATCH] RavenDB-21341 Added retry mechanism for InitTransactions of Kafka producer. Added configuration option. --- .../Config/Categories/EtlConfiguration.cs | 6 +++++ .../ETL/Providers/Queue/Kafka/KafkaEtl.cs | 25 ++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/Raven.Server/Config/Categories/EtlConfiguration.cs b/src/Raven.Server/Config/Categories/EtlConfiguration.cs index 713a54dbf3f4..1a6a2c9e920a 100644 --- a/src/Raven.Server/Config/Categories/EtlConfiguration.cs +++ b/src/Raven.Server/Config/Categories/EtlConfiguration.cs @@ -46,5 +46,11 @@ public class EtlConfiguration : ConfigurationCategory [DefaultValue(64 * 1024)] [ConfigurationEntry("ETL.OLAP.MaxNumberOfExtractedDocuments", ConfigurationEntryScope.ServerWideOrPerDatabase)] public int? OlapMaxNumberOfExtractedDocuments { get; protected set; } + + [Description("Timeout to initialize transactions for the Kafka producer")] + [DefaultValue(60)] + [TimeUnit(TimeUnit.Seconds)] + [ConfigurationEntry("ETL.Queue.Kafka.InitTransactionsTimeoutInSec", ConfigurationEntryScope.ServerWideOrPerDatabase)] + public TimeSetting KafkaInitTransactionsTimeout { get; set; } } } diff --git a/src/Raven.Server/Documents/ETL/Providers/Queue/Kafka/KafkaEtl.cs b/src/Raven.Server/Documents/ETL/Providers/Queue/Kafka/KafkaEtl.cs index 47e06657d24b..a73175d79c83 100644 --- a/src/Raven.Server/Documents/ETL/Providers/Queue/Kafka/KafkaEtl.cs +++ b/src/Raven.Server/Documents/ETL/Providers/Queue/Kafka/KafkaEtl.cs @@ -46,7 +46,30 @@ protected override int PublishMessages(List> itemsPerT try { - producer.InitTransactions(TimeSpan.FromSeconds(60)); + var retries = 3; + + do + { + try + { + producer.InitTransactions(Database.Configuration.Etl.KafkaInitTransactionsTimeout.AsTimeSpan); + break; + } + catch (KafkaRetriableException e) + { + if (--retries > 0) + { + if (Logger.IsOperationsEnabled) + { + Logger.Operations($"ETL process: {Name}. Failed to init transactions for the producer instance. Retries: {retries}", e); + } + } + else + { + throw; + } + } + } while (true); } catch (Exception e) {