Skip to content

Commit 10e767a

Browse files
author
Igal Tsoiref
committed
Adding option to create exchange and bind it to another one.
Adding ttl param to queue
1 parent fe08a3d commit 10e767a

File tree

3 files changed

+29
-4
lines changed

3 files changed

+29
-4
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@
88
/spec/reports/
99
/tmp/
1010
.gem
11+
.idea

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,16 @@ fluentd >= 0.14.0
4646
|durable|true|false|set durable flag of the queue|
4747
|exclusive|true|false|set exclusive flag of the queue|
4848
|auto_delete|true|false|set auto_delete flag of the queue|
49+
|ttl|60000|nil|queue ttl in ms|
4950
|prefetch_count|10|nil||
5051
|consumer_pool_size|5|nil||
5152
|include_headers|true|false|include headers in events|
5253
|headers_key|string|header|key name of headers|
54+
|create_exchange|true|false|create exchange or not|
55+
|exchange_to_bind|string|nil|exchange to bind created exchange|
56+
|exchange_type|direct|topic|type of created exchange|
57+
|exchange_routing_key|hoge|nil|created exchange routing key|
58+
|exchange_durable|true|false|durability of create exchange|
5359

5460
### Output
5561

lib/fluent/plugin/in_rabbitmq.rb

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class RabbitMQInput < Input
3434
config_param :user, :string, default: nil
3535
config_param :pass, :string, default: nil, secret: true
3636
config_param :vhost, :string, default: nil
37-
config_param :exchange, :string, default: nil
37+
3838
config_param :routing_key, :string, default: nil
3939
config_param :connection_timeout, :time, default: nil
4040
config_param :continuation_timeout, :integer, default: nil
@@ -47,6 +47,13 @@ class RabbitMQInput < Input
4747
end
4848
config_param :consumer_pool_size, :integer, default: nil
4949

50+
config_param :exchange, :string, default: nil
51+
config_param :create_exchange, :bool, default: false
52+
config_param :exchange_to_bind, :string, default: nil
53+
config_param :exchange_type, :string, default: "topic"
54+
config_param :exchange_routing_key, :string, default: nil
55+
config_param :exchange_durable, :bool, default: false
56+
5057
config_param :tls, :bool, default: false
5158
config_param :tls_cert, :string, default: nil
5259
config_param :tls_key, :string, default: nil
@@ -57,8 +64,7 @@ class RabbitMQInput < Input
5764
config_param :durable, :bool, default: false
5865
config_param :exclusive, :bool, default: false
5966
config_param :auto_delete, :bool, default: false
60-
61-
config_param :prefetch_count, :integer, default: nil
67+
config_param :ttl, :integer, default: nil
6268

6369
config_param :include_headers, :bool, default: false
6470
config_param :headers_key, :string, default: "headers"
@@ -106,11 +112,23 @@ def start
106112
@bunny.start
107113
channel = @bunny.create_channel(nil, @consumer_pool_size)
108114
channel.prefetch(@prefetch_count) if @prefetch_count
115+
if @create_exchange
116+
exchange_options = {
117+
durable: @exchange_durable,
118+
auto_delete: @auto_delete
119+
}
120+
@bunny_exchange = Bunny::Exchange.new(channel, @exchange_type, @exchange, exchange_options)
121+
if @exchange_to_bind
122+
@bunny_exchange.bind(@exchange_to_bind, routing_key: @exchange_routing_key)
123+
end
124+
end
125+
queue_arguments = {"x-message-ttl" => @ttl} if @ttl
109126
queue = channel.queue(
110127
@queue,
111128
durable: @durable,
112129
exclusive: @exclusive,
113-
auto_delete: @auto_delete
130+
auto_delete: @auto_delete,
131+
arguments: queue_arguments
114132
)
115133
if @exchange
116134
queue.bind(@exchange, routing_key: @routing_key)

0 commit comments

Comments
 (0)