-
Notifications
You must be signed in to change notification settings - Fork 83
/
Copy pathKafkaAttribute.cs
155 lines (133 loc) · 5.69 KB
/
KafkaAttribute.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using Avro.Specific;
using Confluent.Kafka;
using Microsoft.Azure.WebJobs.Description;
namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
/// <summary>
/// Setup an 'output' binding to an Kafka topic. This can be any output type compatible with an IAsyncCollector.
/// </summary>
[AttributeUsage(AttributeTargets.Parameter | AttributeTargets.ReturnValue)]
[Binding]
public sealed class KafkaAttribute : Attribute
{
/// <summary>
/// Initialize a new instance of the <see cref="KafkaAttribute"/>
/// </summary>
/// <param name="brokerList">Broker list</param>
/// <param name="topic">Topic name</param>
public KafkaAttribute(string brokerList, string topic)
{
BrokerList = brokerList;
Topic = topic;
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.Kafka.KafkaAttribute"/> class.
/// </summary>
public KafkaAttribute()
{
}
/// <summary>
/// The topic name hub.
/// </summary>
[AutoResolve]
public string Topic { get; private set; }
/// <summary>
/// Gets or sets the Broker List.
/// </summary>
// [ConnectionString]
public string BrokerList { get; set; }
/// <summary>
/// Gets or sets the Avro schema.
/// Should be used only if a generic record should be generated
/// </summary>
public string AvroSchema { get; set; }
/// <summary>
/// Gets or sets the Maximum transmit message size. Default: 1MB
/// </summary>
public int MaxMessageBytes { get; set; } = 1_000_000;
/// <summary>
/// Maximum number of messages batched in one MessageSet. default: 10000
/// </summary>
public int BatchSize { get; set; } = 10_000;
/// <summary>
/// When set to `true`, the producer will ensure that messages are successfully produced exactly once and in the original produce order. default: false
/// </summary>
public bool EnableIdempotence { get; set; } = false;
/// <summary>
/// Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. default: 300000
/// </summary>
public int MessageTimeoutMs { get; set; } = 300_000;
/// <summary>
/// The ack timeout of the producer request in milliseconds. default: 5000
/// </summary>
public int RequestTimeoutMs { get; set; } = 5_000;
/// <summary>
/// How many times to retry sending a failing Message. **Note:** default: 2147483647
/// </summary>
/// <remarks>Retrying may cause reordering unless <c>EnableIdempotence</c> is set to <c>true</c>.</remarks>
public int MaxRetries { get; set; } = int.MaxValue;
/// <summary>
/// SASL mechanism to use for authentication.
/// Allowed values: Gssapi, Plain, ScramSha256, ScramSha512
/// Default: Plain
///
/// sasl.mechanism in librdkafka
/// </summary>
public BrokerAuthenticationMode AuthenticationMode { get; set; } = BrokerAuthenticationMode.NotSet;
/// <summary>
/// SASL username for use with the PLAIN and SASL-SCRAM-.. mechanisms
/// Default: ""
///
/// 'sasl.username' in librdkafka
/// </summary>
public string Username { get; set; }
/// <summary>
/// SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism
/// Default: ""
///
/// sasl.password in librdkafka
/// </summary>
public string Password { get; set; }
/// <summary>
/// Gets or sets the security protocol used to communicate with brokers
/// Default is plain text
///
/// security.protocol in librdkafka
/// </summary>
public BrokerProtocol Protocol { get; set; } = BrokerProtocol.NotSet;
/// <summary>
/// Path to client's private key (PEM) used for authentication.
/// Default: ""
/// ssl.key.location in librdkafka
/// </summary>
public string SslKeyLocation { get; set; }
/// <summary>
/// Path to CA certificate file for verifying the broker's certificate.
/// ssl.ca.location in librdkafka
/// </summary>
public string SslCaLocation { get; set; }
/// <summary>
/// Path to client's certificate.
/// ssl.certificate.location in librdkafka
/// </summary>
public string SslCertificateLocation { get; set; }
/// <summary>
/// Password for client's certificate.
/// ssl.key.password in librdkafka
/// </summary>
public string SslKeyPassword { get; set; }
/// <summary>
/// Compression level parameter for algorithm selected by configuration property <see cref="CompressionType"/>
/// compression.level in librdkafka
/// </summary>
public int CompressionLevel { get; set; } = -1;
/// <summary>
/// Compression codec to use for compressing message sets.
/// compression.codec in librdkafka
/// </summary>
public MessageCompressionType CompressionType { get; set; } = MessageCompressionType.NotSet;
}
}