Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need authentication when sending tcp heartbeat and keepalive in out_forward #2945

Merged
merged 2 commits into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,7 @@ def standby?

def verify_connection
connect do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)
if ri.state != :established
raise "Failed to establish connection to #{@host}:#{@port}"
end
end
ensure_established_connection(sock, ri)
end
end

Expand Down Expand Up @@ -652,14 +647,7 @@ def send_data_actual(sock, tag, chunk)
def send_data(tag, chunk)
ack = @ack_handler && @ack_handler.create_ack(chunk.unique_id, self)
connect(nil, ack: ack) do |sock, ri|
if ri.state != :established
establish_connection(sock, ri)

if ri.state != :established
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
end

ensure_established_connection(sock, ri)
send_data_actual(sock, tag, chunk)
end

Expand All @@ -684,7 +672,9 @@ def send_heartbeat

case @sender.heartbeat_type
when :transport
connect(dest_addr) do |_ri, _sock|
connect(dest_addr) do |sock, ri|
ensure_established_connection(sock, ri)

## don't send any data to not cause a compatibility problem
# sock.write FORWARD_TCP_HEARTBEAT_DATA

Expand Down Expand Up @@ -776,6 +766,16 @@ def heartbeat(detect=true)

private

def ensure_established_connection(sock, request_info)
if request_info.state != :established
establish_connection(sock, request_info)

if request_info.state != :established
raise ConnectionClosedError, "failed to establish connection with node #{@name}"
end
end
end

def connect(host = nil, ack: false, &block)
@connection_manager.connect(host: host || resolved_host, port: port, hostname: @hostname, ack: ack, &block)
end
Expand Down
4 changes: 2 additions & 2 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ def plugin_id_for_test?
e = assert_raise Fluent::UnrecoverableError do
d.instance_start
end
assert_match(/Failed to establish connection/, e.message)
assert_match(/failed to establish connection/, e.message)
end
end

Expand Down Expand Up @@ -1092,7 +1092,7 @@ def plugin_id_for_test?
d.instance_start
end

assert_match(/Failed to establish connection/, e.message)
assert_match(/failed to establish connection/, e.message)
end
end

Expand Down