Skip to content

Commit bf8ffe6

Browse files
nakahjainharsh98
andauthored
Added OAuthBearer authentication mode (#437)
Co-authored-by: Hakan Altinbasak <hakan.altinbasak@avanade.com> Co-authored-by: jainharsh98 <jainh@microsoft.com>
1 parent 648884e commit bf8ffe6

File tree

9 files changed

+193
-3
lines changed

9 files changed

+193
-3
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,12 @@ Both, trigger and output, can connect to a secure Kafka broker. The following at
344344
|SslKeyPassword|ssl.key.password|Password for client's certificate|
345345
|SslCertificateLocation|ssl.certificate.location|Path to client's certificate|
346346
|SslCaLocation|ssl.ca.location|Path to CA certificate file for verifying the broker's certificate|
347+
|OAuthBearerMethod|sasl.oauthbearer|OAuth bearer method. Only 'default' or 'oidc'. AuthenticationMode must be set to OAuthBearer
348+
|OAuthBearerClientId|sasl.oauthbearer.client.id|OIDC ClientId
349+
|OAuthBearerClientSecret|sasl.oauthbearer.client.secret|OIDC ClientSecret
350+
|OAuthBearerScope|sasl.oauthbearer.scope|OIDC Scope
351+
|OAuthBearerTokenEndpointUrl|sasl.oauthbearer.token.endpoint.url|Token endpoint URL
352+
|OAuthBearerExtensions|sasl.oauthbearer.extensions|Comma separated key/value pair required by Confluent Kafka
347353

348354
Username and password should reference a Azure function configuration variable and not be hardcoded.
349355

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Config/BrokerAuthenticationMode.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public enum BrokerAuthenticationMode
1616
Gssapi,
1717
Plain,
1818
ScramSha256,
19-
ScramSha512
19+
ScramSha512,
20+
OAuthBearer
2021
}
2122
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Text;
7+
8+
namespace Microsoft.Azure.WebJobs.Extensions.Kafka.Config
9+
{
10+
/// <summary>
11+
/// Defines the OAuth bearer method
12+
/// </summary>
13+
public enum OAuthBearerMethod
14+
{
15+
Default,
16+
Oidc
17+
}
18+
}

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,14 @@ private ConsumerConfig GetConsumerConfiguration()
190190
SslKeyLocation = this.listenerConfiguration.SslKeyLocation,
191191
SslKeyPassword = this.listenerConfiguration.SslKeyPassword,
192192

193+
// OAuthBearer config
194+
SaslOauthbearerMethod = this.listenerConfiguration.SaslOAuthBearerMethod,
195+
SaslOauthbearerClientId = this.listenerConfiguration.SaslOAuthBearerClientId,
196+
SaslOauthbearerClientSecret = this.listenerConfiguration.SaslOAuthBearerClientSecret,
197+
SaslOauthbearerScope = this.listenerConfiguration.SaslOAuthBearerScope,
198+
SaslOauthbearerTokenEndpointUrl = this.listenerConfiguration.SaslOAuthBearerTokenEndpointUrl,
199+
SaslOauthbearerExtensions = this.listenerConfiguration.SaslOAuthBearerExtensions,
200+
193201
// Values from host configuration
194202
StatisticsIntervalMs = this.options.StatisticsIntervalMs,
195203
ReconnectBackoffMs = this.options.ReconnectBackoffMs,

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaAttribute.cs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using Microsoft.Azure.WebJobs.Description;
6+
using Microsoft.Azure.WebJobs.Extensions.Kafka.Config;
67

78
namespace Microsoft.Azure.WebJobs.Extensions.Kafka
89
{
@@ -82,7 +83,7 @@ public KafkaAttribute()
8283

8384
/// <summary>
8485
/// SASL mechanism to use for authentication.
85-
/// Allowed values: Gssapi, Plain, ScramSha256, ScramSha512
86+
/// Allowed values: Gssapi, Plain, ScramSha256, ScramSha512, OAuthBearer
8687
/// Default: Plain
8788
///
8889
/// sasl.mechanism in librdkafka
@@ -158,5 +159,49 @@ public KafkaAttribute()
158159
/// Password for the Avro Schema Registry
159160
/// </summary>
160161
public string SchemaRegistryPassword { get; set; }
162+
163+
/// <summary>
164+
/// OAuth Bearer method.
165+
/// Either 'default' or 'oidc'
166+
/// sasl.oauthbearer in librdkafka
167+
/// </summary>
168+
public OAuthBearerMethod OAuthBearerMethod { get; set; }
169+
170+
/// <summary>
171+
/// OAuth Bearer Client Id
172+
/// Specify only when OAuthBearerMethod is 'oidc'
173+
/// sasl.oauthbearer.client.id in librdkafka
174+
/// </summary>
175+
public string OAuthBearerClientId { get; set; }
176+
177+
/// <summary>
178+
/// OAuth Bearer Client Secret
179+
/// Specify only when OAuthBearerMethod is 'oidc'
180+
/// sasl.oauthbearer.client.secret in librdkafka
181+
/// </summary>
182+
public string OAuthBearerClientSecret { get; set; }
183+
184+
/// <summary>
185+
/// OAuth Bearer scope.
186+
/// Client use this to specify the scope of the access request to the broker.
187+
/// Specify only when OAuthBearerMethod is 'oidc'
188+
/// sasl.oauthbearer.extensions in librdkafka
189+
/// </summary>
190+
public string OAuthBearerScope { get; set; }
191+
192+
/// <summary>
193+
/// OAuth Bearer token endpoint url.
194+
/// Specify only when OAuthBearerMethod is 'oidc'
195+
/// sasl.oauthbearer.token.endpoint.url in librdkafka
196+
/// </summary>
197+
public string OAuthBearerTokenEndpointUrl { get; set; }
198+
199+
/// <summary>
200+
/// OAuth Bearer extensions.
201+
/// Allow additional information to be provided to the broker.
202+
/// Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea"
203+
/// sasl.oauthbearer.extensions in librdkafka
204+
/// </summary>
205+
public string OAuthBearerExtensions { get; set; }
161206
}
162207
}

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
134134
Debug = kafkaOptions?.LibkafkaDebug,
135135
MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs,
136136
SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable,
137-
LingerMs = entity.Attribute.LingerMs
137+
LingerMs = entity.Attribute.LingerMs,
138138
};
139139

140140
if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet)
@@ -147,6 +147,16 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
147147
conf.SecurityProtocol = (SecurityProtocol)entity.Attribute.Protocol;
148148
}
149149

150+
if (entity.Attribute.AuthenticationMode == BrokerAuthenticationMode.OAuthBearer)
151+
{
152+
conf.SaslOauthbearerMethod = (SaslOauthbearerMethod)entity.Attribute.OAuthBearerMethod;
153+
conf.SaslOauthbearerClientId = entity.Attribute.OAuthBearerClientId;
154+
conf.SaslOauthbearerClientSecret = entity.Attribute.OAuthBearerClientSecret;
155+
conf.SaslOauthbearerScope = entity.Attribute.OAuthBearerScope;
156+
conf.SaslOauthbearerTokenEndpointUrl = entity.Attribute.OAuthBearerTokenEndpointUrl;
157+
conf.SaslOauthbearerExtensions = entity.Attribute.OAuthBearerExtensions;
158+
}
159+
150160
return conf;
151161
}
152162
}

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaListenerConfiguration.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33

44
using Confluent.Kafka;
5+
using Microsoft.Azure.WebJobs.Extensions.Kafka.Config;
56

67
namespace Microsoft.Azure.WebJobs.Extensions.Kafka
78
{
@@ -100,6 +101,51 @@ public class KafkaListenerConfiguration
100101
/// </summary>
101102
public long LagThreshold { get; set; }
102103

104+
/// <summary>
105+
/// OAuth Bearer method.
106+
/// Either 'default' or 'oidc'
107+
/// sasl.oauthbearer in librdkafka
108+
/// </summary>
109+
public SaslOauthbearerMethod SaslOAuthBearerMethod { get; set; }
110+
111+
/// <summary>
112+
/// OAuth Bearer Client Id
113+
/// Specify only when OAuthBearerMethod is 'oidc'
114+
/// sasl.oauthbearer.client.id in librdkafka
115+
/// </summary>
116+
public string SaslOAuthBearerClientId { get; set; }
117+
118+
/// <summary>
119+
/// OAuth Bearer Client Secret
120+
/// Specify only when OAuthBearerMethod is 'oidc'
121+
/// sasl.oauthbearer.client.secret in librdkafka
122+
/// </summary>
123+
public string SaslOAuthBearerClientSecret { get; set; }
124+
125+
/// <summary>
126+
/// OAuth Bearer scope.
127+
/// Client use this to specify the scope of the access request to the broker.
128+
/// Specify only when OAuthBearerMethod is 'oidc'
129+
/// sasl.oauthbearer.extensions in librdkafka
130+
/// </summary>
131+
public string SaslOAuthBearerScope { get; set; }
132+
133+
/// <summary>
134+
/// OAuth Bearer token endpoint url.
135+
/// Specify only when OAuthBearerMethod is 'oidc'
136+
/// sasl.oauthbearer.token.endpoint.url in librdkafka
137+
/// </summary>
138+
public string SaslOAuthBearerTokenEndpointUrl { get; set; }
139+
140+
/// <summary>
141+
/// OAuth Bearer extensions.
142+
/// Allow additional information to be provided to the broker.
143+
/// Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea"
144+
/// sasl.oauthbearer.extensions in librdkafka
145+
/// </summary>
146+
public string SaslOAuthBearerExtensions { get; set; }
147+
148+
103149
internal void ApplyToConfig(ClientConfig conf)
104150
{
105151
if (this.SaslMechanism.HasValue)

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttribute.cs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using Avro.Specific;
55
using Microsoft.Azure.WebJobs.Description;
6+
using Microsoft.Azure.WebJobs.Extensions.Kafka.Config;
67
using System;
78

89
namespace Microsoft.Azure.WebJobs.Extensions.Kafka
@@ -129,6 +130,50 @@ public KafkaTriggerAttribute(string brokerList, string topic)
129130
/// </summary>
130131
public string SchemaRegistryPassword { get; set; }
131132

133+
/// <summary>
134+
/// OAuth Bearer method.
135+
/// Either 'default' or 'oidc'
136+
/// sasl.oauthbearer in librdkafka
137+
/// </summary>
138+
public OAuthBearerMethod OAuthBearerMethod { get; set; }
139+
140+
/// <summary>
141+
/// OAuth Bearer Client Id
142+
/// Specify only when OAuthBearerMethod is 'oidc'
143+
/// sasl.oauthbearer.client.id in librdkafka
144+
/// </summary>
145+
public string OAuthBearerClientId { get; set; }
146+
147+
/// <summary>
148+
/// OAuth Bearer Client Secret
149+
/// Specify only when OAuthBearerMethod is 'oidc'
150+
/// sasl.oauthbearer.client.secret in librdkafka
151+
/// </summary>
152+
public string OAuthBearerClientSecret { get; set; }
153+
154+
/// <summary>
155+
/// OAuth Bearer scope.
156+
/// Client use this to specify the scope of the access request to the broker.
157+
/// Specify only when OAuthBearerMethod is 'oidc'
158+
/// sasl.oauthbearer.extensions in librdkafka
159+
/// </summary>
160+
public string OAuthBearerScope { get; set; }
161+
162+
/// <summary>
163+
/// OAuth Bearer token endpoint url.
164+
/// Specify only when OAuthBearerMethod is 'oidc'
165+
/// sasl.oauthbearer.token.endpoint.url in librdkafka
166+
/// </summary>
167+
public string OAuthBearerTokenEndpointUrl { get; set; }
168+
169+
/// <summary>
170+
/// OAuth Bearer extensions.
171+
/// Allow additional information to be provided to the broker.
172+
/// Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea"
173+
/// sasl.oauthbearer.extensions in librdkafka
174+
/// </summary>
175+
public string OAuthBearerExtensions { get; set; }
176+
132177
bool IsValidValueType(Type value)
133178
{
134179
return

src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Reflection;
66
using System.Threading.Tasks;
77
using Confluent.Kafka;
8+
using Microsoft.Azure.WebJobs.Extensions.Kafka.Config;
89
using Microsoft.Azure.WebJobs.Extensions.Kafka.Trigger;
910
using Microsoft.Azure.WebJobs.Host.Bindings;
1011
using Microsoft.Azure.WebJobs.Host.Listeners;
@@ -115,6 +116,16 @@ private KafkaListenerConfiguration CreateConsumerConfiguration(KafkaTriggerAttri
115116
{
116117
consumerConfig.SecurityProtocol = (SecurityProtocol)attribute.Protocol;
117118
}
119+
120+
if (attribute.AuthenticationMode == BrokerAuthenticationMode.OAuthBearer)
121+
{
122+
consumerConfig.SaslOAuthBearerMethod = (SaslOauthbearerMethod) attribute.OAuthBearerMethod;
123+
consumerConfig.SaslOAuthBearerClientId = this.config.ResolveSecureSetting(nameResolver, attribute.OAuthBearerClientId);
124+
consumerConfig.SaslOAuthBearerClientSecret = this.config.ResolveSecureSetting(nameResolver, attribute.OAuthBearerClientSecret);
125+
consumerConfig.SaslOAuthBearerScope = this.config.ResolveSecureSetting(nameResolver, attribute.OAuthBearerScope);
126+
consumerConfig.SaslOAuthBearerTokenEndpointUrl = this.config.ResolveSecureSetting(nameResolver, attribute.OAuthBearerTokenEndpointUrl);
127+
consumerConfig.SaslOAuthBearerExtensions = this.config.ResolveSecureSetting(nameResolver, attribute.OAuthBearerExtensions);
128+
}
118129
}
119130

120131
return consumerConfig;

0 commit comments

Comments
 (0)