Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
- [Failed jobs and retries](#failed-jobs-and-retries)
- [Error reporting on jobs](#error-reporting-on-jobs)
- [Puma plugin](#puma-plugin)
- [Health-check HTTP server](#health-check-http-server)
- [Jobs and transactional integrity](#jobs-and-transactional-integrity)
- [Recurring tasks](#recurring-tasks)
- [Inspiration](#inspiration)
Expand Down Expand Up @@ -603,6 +604,28 @@ that you set in production only. This is what Rails 8's default Puma config look

**Note**: phased restarts are not supported currently because the plugin requires [app preloading](https://github.com/puma/puma?tab=readme-ov-file#cluster-mode) to work.

## Health-check HTTP server

Solid Queue provides a tiny HTTP health-check server that now runs as a supervised process.

- Endpoints:
- `/` and `/health`: returns `200 OK` with body `OK`
- Any other path: returns `404 Not Found`
- Configure via `config/queue.yml` under `health_servers:`. Both `host` and `port` are required.

Enable and configure via process configuration:

```yml
production:
health_servers:
- host: 0.0.0.0
port: 9393
```

Note:
- This runs under the supervisor just like workers/dispatchers.
- When the Puma plugin is active (`plugin :solid_queue` in `puma.rb`), configured `health_servers` are skipped to avoid running multiple HTTP servers in the same process tree. A warning is logged. If you need the health server, run Solid Queue outside Puma (for example, via `bin/jobs`) or disable the plugin on that instance.

## Jobs and transactional integrity
:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed and vice versa, and ensuring that your job won't be enqueued until the transaction within which you're enqueuing it is committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you. Because this can be quite tricky and many people shouldn't need to worry about it, by default Solid Queue is configured in a different database as the main app.

Expand Down
2 changes: 2 additions & 0 deletions lib/puma/plugin/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def start(launcher)

if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
SolidQueue.puma_plugin = true
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
Expand All @@ -23,6 +24,7 @@ def start(launcher)
launcher.events.on_restart { stop_solid_queue }
else
launcher.events.after_booted do
SolidQueue.puma_plugin = true
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
Expand Down
4 changes: 3 additions & 1 deletion lib/solid_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ module SolidQueue
mattr_accessor :clear_finished_jobs_after, default: 1.day
mattr_accessor :default_concurrency_control_period, default: 3.minutes

mattr_accessor :puma_plugin, default: false

delegate :on_start, :on_stop, :on_exit, to: Supervisor

[ Dispatcher, Scheduler, Worker ].each do |process|
[ Dispatcher, Scheduler, Worker, HealthServer ].each do |process|
define_singleton_method(:"on_#{process.name.demodulize.downcase}_start") do |&block|
process.on_start(&block)
end
Expand Down
38 changes: 34 additions & 4 deletions lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ class Configuration
validate :ensure_configured_processes
validate :ensure_valid_recurring_tasks
validate :ensure_correctly_sized_thread_pool
validate :ensure_valid_health_servers

class Process < Struct.new(:kind, :attributes)
def instantiate
"SolidQueue::#{kind.to_s.titleize}".safe_constantize.new(**attributes)
"SolidQueue::#{kind.to_s.camelize}".safe_constantize.new(**attributes)
end
end

Expand Down Expand Up @@ -38,7 +39,7 @@ def initialize(**options)
def configured_processes
if only_work? then workers
else
dispatchers + workers + schedulers
dispatchers + workers + schedulers + health_servers
end
end

Expand Down Expand Up @@ -129,6 +130,30 @@ def schedulers
end
end

def health_servers
if SolidQueue.puma_plugin
SolidQueue.logger&.warn("SolidQueue health server is configured but Puma plugin is active; skipping starting health server to avoid duplicate servers")
return []
end

health_servers_options.flat_map do |server_options|
processes = server_options.fetch(:processes, 1)
processes.times.map { Process.new(:health_server, server_options) }
end
end

def ensure_valid_health_servers
health_servers_options.each_with_index do |server_options, index|
unless server_options[:host].present?
errors.add(:base, "Health server ##{index + 1}: host is required")
end

unless server_options.key?(:port) && server_options[:port].present?
errors.add(:base, "Health server ##{index + 1}: port is required")
end
end
end

def workers_options
@workers_options ||= processes_config.fetch(:workers, [])
.map { |options| options.dup.symbolize_keys }
Expand All @@ -139,6 +164,11 @@ def dispatchers_options
.map { |options| options.dup.symbolize_keys }
end

def health_servers_options
@health_servers_options ||= processes_config.fetch(:health_servers, [])
.map { |options| options.dup.symbolize_keys }
end

def recurring_tasks
@recurring_tasks ||= recurring_tasks_config.map do |id, options|
RecurringTask.from_configuration(id, **options) if options&.has_key?(:schedule)
Expand All @@ -147,8 +177,8 @@ def recurring_tasks

def processes_config
@processes_config ||= config_from \
options.slice(:workers, :dispatchers).presence || options[:config_file],
keys: [ :workers, :dispatchers ],
options.slice(:workers, :dispatchers, :health_servers).presence || options[:config_file],
keys: [ :workers, :dispatchers, :health_servers ],
fallback: { workers: [ WORKER_DEFAULTS ], dispatchers: [ DISPATCHER_DEFAULTS ] }
end

Expand Down
118 changes: 118 additions & 0 deletions lib/solid_queue/health_server.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# frozen_string_literal: true

require "socket"
require "logger"

module SolidQueue
class HealthServer < Processes::Base
include Processes::Runnable

attr_reader :host, :port, :logger

def initialize(host:, port:, logger: nil, **options)
@host = host
@port = port
@logger = logger || default_logger
@server = nil

super(**options)
end

def metadata
super.merge(host: host, port: port)
end

def running?
@thread&.alive?
end

private
def run
begin
@server = TCPServer.new(host, port)
log_info("listening on #{host}:#{port}")

loop do
break if shutting_down?

readables, = IO.select([ @server, self_pipe[:reader] ].compact, nil, nil, 1)
next unless readables

if readables.include?(self_pipe[:reader])
drain_self_pipe
end

if readables.include?(@server)
handle_connection
end
end
rescue Exception => exception
handle_thread_error(exception)
ensure
SolidQueue.instrument(:shutdown_process, process: self) do
run_callbacks(:shutdown) { shutdown }
end
end
end

def handle_connection
socket = @server.accept_nonblock(exception: false)
return unless socket.is_a?(::TCPSocket)

begin
request_line = socket.gets
path = request_line&.split(" ")&.at(1) || "/"

if path == "/" || path == "/health"
body = "OK"
status_line = "HTTP/1.1 200 OK"
else
body = "Not Found"
status_line = "HTTP/1.1 404 Not Found"
end

headers = [
"Content-Type: text/plain",
"Content-Length: #{body.bytesize}",
"Connection: close"
].join("\r\n")

socket.write("#{status_line}\r\n#{headers}\r\n\r\n#{body}")
ensure
begin
socket.close
rescue StandardError
end
end
end

def shutdown
begin
@server&.close
rescue StandardError
ensure
@server = nil
end
end

def set_procline
procline "http #{host}:#{port}"
end

def default_logger
logger = Logger.new($stdout)
logger.level = Logger::INFO
logger.progname = "SolidQueueHTTP"
logger
end

def log_info(message)
logger&.info(message)
end

def drain_self_pipe
loop { self_pipe[:reader].read_nonblock(11) }
rescue Errno::EAGAIN, Errno::EINTR, IO::EWOULDBLOCKWaitReadable
end
end
end
126 changes: 126 additions & 0 deletions test/unit/health_server_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# frozen_string_literal: true

require "test_helper"
require "net/http"
require "socket"
require "stringio"

class HealthServerTest < ActiveSupport::TestCase
self.use_transactional_tests = false
def setup
@host = "127.0.0.1"
@port = available_port(@host)
@server = SolidQueue::HealthServer.new(host: @host, port: @port, logger: Logger.new(IO::NULL))
@server.start
wait_for_server
end

def teardown
@server.stop if defined?(@server)
end

def test_health_endpoint_returns_ok
response = http_get("/health")
assert_equal "200", response.code
assert_equal "OK", response.body
end

def test_root_endpoint_returns_ok
response = http_get("/")
assert_equal "200", response.code
assert_equal "OK", response.body
end

def test_unknown_path_returns_not_found
response = http_get("/unknown")
assert_equal "404", response.code
assert_equal "Not Found", response.body
end

def test_stop_stops_server
assert @server.running?, "server should be running before stop"
@server.stop
assert_not @server.running?, "server should not be running after stop"
ensure
# Avoid double-stop in teardown if we stopped here
@server = SolidQueue::HealthServer.new(host: @host, port: @port, logger: Logger.new(IO::NULL))
end

def test_supervisor_starts_health_server_from_configuration
@server.stop # ensure no unsupervised health server is registered
other_port = available_port(@host)
pid = run_supervisor_as_fork(health_servers: [ { host: @host, port: other_port } ], workers: [], dispatchers: [])
wait_for_registered_processes(2, timeout: 2) # supervisor + health server

assert_registered_processes(kind: "HealthServer", count: 1)

# Verify it responds to HTTP
wait_for_server_on(other_port)
response = http_get_on(other_port, "/health")
assert_equal "200", response.code
assert_equal "OK", response.body
ensure
terminate_process(pid) if pid
end

def test_supervisor_skips_health_server_when_puma_plugin_is_active
SolidQueue.puma_plugin = true

original_logger = SolidQueue.logger
SolidQueue.logger = ActiveSupport::Logger.new($stdout)

@server.stop # ensure no unsupervised health server is registered
pid = nil
pid = run_supervisor_as_fork(health_servers: [ { host: @host, port: available_port(@host) } ], workers: [], dispatchers: [])
# Expect only supervisor to register
wait_for_registered_processes(1, timeout: 2)
assert_equal 0, find_processes_registered_as("HealthServer").count
ensure
SolidQueue.logger = original_logger if defined?(original_logger)
SolidQueue.puma_plugin = false
terminate_process(pid) if pid
end

private
def http_get(path)
Net::HTTP.start(@host, @port) do |http|
http.get(path)
end
end

def wait_for_server
# Try to connect for up to 1 second
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 1.0
begin
Net::HTTP.start(@host, @port) { |http| http.head("/") }
rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH
raise if Process.clock_gettime(Process::CLOCK_MONOTONIC) > deadline
sleep 0.05
retry
end
end

def wait_for_server_on(port)
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 1.0
begin
Net::HTTP.start(@host, port) { |http| http.head("/") }
rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH
raise if Process.clock_gettime(Process::CLOCK_MONOTONIC) > deadline
sleep 0.05
retry
end
end

def http_get_on(port, path)
Net::HTTP.start(@host, port) do |http|
http.get(path)
end
end

def available_port(host)
tcp = TCPServer.new(host, 0)
port = tcp.addr[1]
tcp.close
port
end
end
Loading