From 2a820797694f3c79ecf0c8d0a0c82729c216f93b Mon Sep 17 00:00:00 2001 From: Tyler Mauthe Date: Mon, 22 Feb 2016 00:15:27 -0700 Subject: [PATCH 1/4] :bug: Modify SSE connection handling for Puma - Keep track of events on per-client basis - Clients get a UUID - If a client's event queue grows too big it gets dropped - If a Puma thread tries to send the next batch of events to a client and it's event queue is dropped, the queue will be created - Updated tests to match above changes --- lib/dashing/app.rb | 24 +++++++++++++++++++----- test/app_test.rb | 17 ++++++----------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/lib/dashing/app.rb b/lib/dashing/app.rb index b11352d2..2ffe0360 100644 --- a/lib/dashing/app.rb +++ b/lib/dashing/app.rb @@ -1,4 +1,6 @@ +require 'securerandom' require 'sinatra' +require 'sinatra/streaming' require 'sprockets' require 'sinatra/content_for' require 'rufus/scheduler' @@ -34,7 +36,7 @@ def authenticated?(token) set :sprockets, Sprockets::Environment.new(settings.root) set :assets_prefix, '/assets' set :digest_assets, false -set server: 'thin', connections: [], history_file: 'history.yml' +set server: 'puma', client_events: {}, history_file: 'history.yml' set :public_folder, File.join(settings.root, 'public') set :views, File.join(settings.root, 'dashboards') set :default_dashboard, nil @@ -73,10 +75,15 @@ def authenticated?(token) get '/events', provides: 'text/event-stream' do protected! response.headers['X-Accel-Buffering'] = 'no' # Disable buffering for nginx - stream :keep_open do |out| - settings.connections << out + client_id = SecureRandom.uuid + stream do |out| out << latest_events - out.callback { settings.connections.delete(out) } + loop do + settings.client_events[client_id] = [] unless settings.client_events.has_key?(client_id) + while event = settings.client_events[client_id].shift do + out << event unless out.closed? + end + end end end @@ -138,7 +145,14 @@ def send_event(id, body, target=nil) body[:updatedAt] ||= Time.now.to_i event = format_event(body.to_json, target) Sinatra::Application.settings.history[id] = event unless target == 'dashboards' - Sinatra::Application.settings.connections.each { |out| out << event } + max_event_queue_size = Sinatra::Application.settings.history.length * 2 + Sinatra::Application.settings.client_events.each do |connection_id, events| + if events.length > max_event_queue_size + Sinatra::Application.settings.client_events.delete(connection_id) + else + events << event + end + end end def format_event(body, name=nil) diff --git a/test/app_test.rb b/test/app_test.rb index 36cd3100..dcf1ae3d 100644 --- a/test/app_test.rb +++ b/test/app_test.rb @@ -3,8 +3,7 @@ class AppTest < Dashing::Test def setup - @connection = [] - app.settings.connections = [@connection] + app.settings.client_events = {'tests' => []} app.settings.auth_token = nil app.settings.default_dashboard = nil app.settings.history_file = File.join(Dir.tmpdir, 'history.yml') @@ -49,8 +48,8 @@ def test_post_widgets_without_auth_token post '/widgets/some_widget', JSON.generate({value: 6}) assert_equal 204, last_response.status - assert_equal 1, @connection.length - data = parse_data @connection[0] + assert_equal 1, app.settings.client_events.length + data = parse_data app.settings.client_events.values.first.first assert_equal 6, data['value'] assert_equal 'some_widget', data['id'] assert data['updatedAt'] @@ -72,19 +71,15 @@ def test_get_events post '/widgets/some_widget', JSON.generate({value: 8}) assert_equal 204, last_response.status - get '/events' - assert_equal 200, last_response.status - assert_equal 8, parse_data(@connection[0])['value'] + assert_equal 8, parse_data(app.settings.client_events.values.first.first)['value'] end def test_dashboard_events post '/dashboards/my_super_sweet_dashboard', JSON.generate({event: 'reload'}) assert_equal 204, last_response.status - get '/events' - assert_equal 200, last_response.status - assert_equal 'dashboards', parse_event(@connection[0]) - assert_equal 'reload', parse_data(@connection[0])['event'] + assert_equal 'dashboards', parse_event(app.settings.client_events.values.first.first) + assert_equal 'reload', parse_data(app.settings.client_events.values.first.first)['event'] end def test_get_dashboard From 64d365c7414d43174c2531a58dc9a0308f35f1b3 Mon Sep 17 00:00:00 2001 From: Tyler Mauthe Date: Sun, 28 Feb 2016 00:38:55 -0700 Subject: [PATCH 2/4] :bug: Properly fix SSE connections in Puma! - Realized that Rufus can also be multi-threaded now - Added a mutex for each connection, allowing safe access during send_event - Don't need to track a queue of events per connection - should lower memory footprint - Connection termination detected by catching Puma::ConnectionError exceptions during send_event conntion writing - Updated tests accordingly --- lib/dashing/app.rb | 31 ++++++++++++++++++++----------- test/app_test.rb | 20 +++++++++++++------- 2 files changed, 33 insertions(+), 18 deletions(-) diff --git a/lib/dashing/app.rb b/lib/dashing/app.rb index 2ffe0360..b85028f6 100644 --- a/lib/dashing/app.rb +++ b/lib/dashing/app.rb @@ -36,7 +36,7 @@ def authenticated?(token) set :sprockets, Sprockets::Environment.new(settings.root) set :assets_prefix, '/assets' set :digest_assets, false -set server: 'puma', client_events: {}, history_file: 'history.yml' +set server: 'puma', connections: [], history_file: 'history.yml' set :public_folder, File.join(settings.root, 'public') set :views, File.join(settings.root, 'dashboards') set :default_dashboard, nil @@ -72,18 +72,23 @@ def authenticated?(token) redirect "/" + dashboard end + get '/events', provides: 'text/event-stream' do protected! response.headers['X-Accel-Buffering'] = 'no' # Disable buffering for nginx - client_id = SecureRandom.uuid stream do |out| out << latest_events + settings.connections << connection = {out: out, mutex: Mutex.new, terminated: false} + terminated = false + loop do - settings.client_events[client_id] = [] unless settings.client_events.has_key?(client_id) - while event = settings.client_events[client_id].shift do - out << event unless out.closed? + connection[:mutex].synchronize do + terminated = true if connection[:terminated] end + break if terminated end + + settings.connections.delete(connection) end end @@ -145,12 +150,16 @@ def send_event(id, body, target=nil) body[:updatedAt] ||= Time.now.to_i event = format_event(body.to_json, target) Sinatra::Application.settings.history[id] = event unless target == 'dashboards' - max_event_queue_size = Sinatra::Application.settings.history.length * 2 - Sinatra::Application.settings.client_events.each do |connection_id, events| - if events.length > max_event_queue_size - Sinatra::Application.settings.client_events.delete(connection_id) - else - events << event + Sinatra::Application.settings.connections.each do |connection| + connection[:mutex].synchronize do + begin + connection[:out] << event unless connection[:out].closed? + rescue Puma::ConnectionError + connection[:terminated] = true + rescue Exception => e + connection[:terminated] = true + puts e + end end end end diff --git a/test/app_test.rb b/test/app_test.rb index dcf1ae3d..353df44f 100644 --- a/test/app_test.rb +++ b/test/app_test.rb @@ -1,9 +1,16 @@ require 'test_helper' require 'haml' +class StreamStub < Array + def closed? + return false + end +end + class AppTest < Dashing::Test def setup - app.settings.client_events = {'tests' => []} + @connection = {out: StreamStub.new, mutex: Mutex.new, terminated: false} + app.settings.connections = [ @connection ] app.settings.auth_token = nil app.settings.default_dashboard = nil app.settings.history_file = File.join(Dir.tmpdir, 'history.yml') @@ -47,9 +54,8 @@ def test_errors_out_when_no_dashboards_available def test_post_widgets_without_auth_token post '/widgets/some_widget', JSON.generate({value: 6}) assert_equal 204, last_response.status - - assert_equal 1, app.settings.client_events.length - data = parse_data app.settings.client_events.values.first.first + assert_equal 1, @connection[:out].length + data = parse_data @connection[:out][0] assert_equal 6, data['value'] assert_equal 'some_widget', data['id'] assert data['updatedAt'] @@ -71,15 +77,15 @@ def test_get_events post '/widgets/some_widget', JSON.generate({value: 8}) assert_equal 204, last_response.status - assert_equal 8, parse_data(app.settings.client_events.values.first.first)['value'] + assert_equal 8, parse_data(@connection[:out][0])['value'] end def test_dashboard_events post '/dashboards/my_super_sweet_dashboard', JSON.generate({event: 'reload'}) assert_equal 204, last_response.status - assert_equal 'dashboards', parse_event(app.settings.client_events.values.first.first) - assert_equal 'reload', parse_data(app.settings.client_events.values.first.first)['event'] + assert_equal 'dashboards', parse_event(@connection[:out][0]) + assert_equal 'reload', parse_data(@connection[:out][0])['event'] end def test_get_dashboard From 34ec439d2623125c8e9375208e07d57ed891ec9d Mon Sep 17 00:00:00 2001 From: Tyler Mauthe Date: Sun, 28 Feb 2016 00:42:57 -0700 Subject: [PATCH 3/4] Cleanup imports --- lib/dashing/app.rb | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/lib/dashing/app.rb b/lib/dashing/app.rb index b85028f6..a1ac50c7 100644 --- a/lib/dashing/app.rb +++ b/lib/dashing/app.rb @@ -1,12 +1,11 @@ -require 'securerandom' +require 'coffee-script' +require 'json' +require 'rufus/scheduler' +require 'sass' require 'sinatra' +require 'sinatra/content_for' require 'sinatra/streaming' require 'sprockets' -require 'sinatra/content_for' -require 'rufus/scheduler' -require 'coffee-script' -require 'sass' -require 'json' require 'yaml' require 'thin' From ee64d7df2d5850d723807ab36735f2a80ebe14a3 Mon Sep 17 00:00:00 2001 From: Quentin Brossard Date: Tue, 9 Feb 2016 09:03:59 +0100 Subject: [PATCH 4/4] Switch server to puma, provide puma config and adapt command line - configure puma using config/puma.rb config file - add default puma.rb config file for new projects - adapt dashing cli to use the config file and correctly handle -d (daemonize) arg. - adapt cli_test.rb to the new setup - remove tmp dir from gitignore as we want to generate it in the project template - add tmp/pids/ dir in project template (for saving puma pid / state files. --- .gitignore | 1 - .travis.yml | 3 +- dashing.gemspec | 2 +- lib/dashing/app.rb | 11 ------ lib/dashing/cli.rb | 14 ++++--- templates/project/config/puma.rb | 42 +++++++++++++++++++++ templates/project/tmp/pids/.empty_directory | 1 + test/cli_test.rb | 24 ++++++++---- 8 files changed, 72 insertions(+), 26 deletions(-) create mode 100644 templates/project/config/puma.rb create mode 100644 templates/project/tmp/pids/.empty_directory diff --git a/.gitignore b/.gitignore index 65cbda82..fe7f5d53 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,5 @@ *.gem coverage/ log/ -tmp/ .ruby-version history.yml diff --git a/.travis.yml b/.travis.yml index f68ed763..84155ea3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,5 +3,6 @@ rvm: - 2.3.0 - 2.2.4 - 2.1.8 - + - jruby-19mode + - jruby-9.0.4.0 script: "rake test" diff --git a/dashing.gemspec b/dashing.gemspec index 33b5b17f..80b7db27 100644 --- a/dashing.gemspec +++ b/dashing.gemspec @@ -21,7 +21,7 @@ Gem::Specification.new do |s| s.add_dependency('execjs', '~> 2.0.2') s.add_dependency('sinatra', '~> 1.4.4') s.add_dependency('sinatra-contrib', '~> 1.4.2') - s.add_dependency('thin', '~> 1.6.1') + s.add_dependency('puma', '~> 2.16.0') s.add_dependency('rufus-scheduler', '~> 2.0.24') s.add_dependency('thor', '> 0.18.1') s.add_dependency('sprockets', '~> 2.10.1') diff --git a/lib/dashing/app.rb b/lib/dashing/app.rb index a1ac50c7..f153a33f 100644 --- a/lib/dashing/app.rb +++ b/lib/dashing/app.rb @@ -7,7 +7,6 @@ require 'sinatra/streaming' require 'sprockets' require 'yaml' -require 'thin' SCHEDULER = Rufus::Scheduler.new @@ -134,16 +133,6 @@ def authenticated?(token) end end -Thin::Server.class_eval do - def stop_with_connection_closing - Sinatra::Application.settings.connections.dup.each(&:close) - stop_without_connection_closing - end - - alias_method :stop_without_connection_closing, :stop - alias_method :stop, :stop_with_connection_closing -end - def send_event(id, body, target=nil) body[:id] = id body[:updatedAt] ||= Time.now.to_i diff --git a/lib/dashing/cli.rb b/lib/dashing/cli.rb index 4b93f892..374686a3 100644 --- a/lib/dashing/cli.rb +++ b/lib/dashing/cli.rb @@ -57,16 +57,20 @@ def install(gist_id, *args) desc "start", "Starts the server in style!" method_option :job_path, :desc => "Specify the directory where jobs are stored" def start(*args) - port_option = args.include?('-p') ? '' : ' -p 3030' + daemonize = args.include?('-d') args = args.join(' ') - command = "bundle exec thin -R config.ru start#{port_option} #{args}" + command = "bundle exec puma #{args}" command.prepend "export JOB_PATH=#{options[:job_path]}; " if options[:job_path] + command.prepend "export DAEMONIZE=true; " if daemonize run_command(command) end - desc "stop", "Stops the thin server" - def stop - command = "bundle exec thin stop" + desc "stop", "Stops the puma server (daemon mode only)" + def stop(*args) + args = args.join(' ') + # TODO correctly handle pidfile location change in puma config + daemon_pidfile = !args.include?('--pidfile') ? '--pidfile ./tmp/pids/puma.pid' : args + command = "bundle exec pumactl #{daemon_pidfile} stop" run_command(command) end diff --git a/templates/project/config/puma.rb b/templates/project/config/puma.rb new file mode 100644 index 00000000..26140fa4 --- /dev/null +++ b/templates/project/config/puma.rb @@ -0,0 +1,42 @@ +# For a complete list of puma configuration parameters, please see +# https://github.com/puma/puma + +# Puma can serve each request in a thread from an internal thread pool. +# The `threads` method setting takes two numbers a minimum and maximum. +# Any libraries that use thread pools should be configured to match +# the maximum value specified for Puma. Default is set to 5 threads for minimum +# and maximum. +# +threads_count = ENV.fetch("PUMA_MAX_THREADS") { 5 }.to_i +threads threads_count, threads_count + +# Specifies the `port` that Puma will listen on to receive requests, default is 2020. +# +port ENV.fetch("DASHING_PORT") { 3030 } + +# Specifies the `environment` that Puma will run in. +# +environment ENV.fetch("RACK_ENV") { "production" } + +# Daemonize the server into the background. Highly suggest that +# this be combined with "pidfile" and "stdout_redirect". +# +# The default is "false". +# +daemonize ENV.fetch("DAEMONIZE") { false } + +# Store the pid of the server in the file at "path". +# +pidfile './tmp/pids/puma.pid' + +# Use "path" as the file to store the server info state. This is +# used by "pumactl" to query and control the server. +# +state_path './tmp/pids/puma.state' + +# Redirect STDOUT and STDERR to files specified. The 3rd parameter +# ("append") specifies whether the output is appended, the default is +# "false". +# +# stdout_redirect '/u/apps/lolcat/log/stdout', '/u/apps/lolcat/log/stderr' +# stdout_redirect '/u/apps/lolcat/log/stdout', '/u/apps/lolcat/log/stderr', true \ No newline at end of file diff --git a/templates/project/tmp/pids/.empty_directory b/templates/project/tmp/pids/.empty_directory new file mode 100644 index 00000000..3da7f49b --- /dev/null +++ b/templates/project/tmp/pids/.empty_directory @@ -0,0 +1 @@ +.empty_directory \ No newline at end of file diff --git a/test/cli_test.rb b/test/cli_test.rb index 4d822969..c0efa8da 100644 --- a/test/cli_test.rb +++ b/test/cli_test.rb @@ -101,14 +101,24 @@ def test_install_task_warns_when_gist_not_found assert_includes output, 'Could not find gist at ' end - def test_start_task_starts_thin_with_default_port - command = 'bundle exec thin -R config.ru start -p 3030 ' + def test_start_task_starts_puma_with_default_port + command = 'bundle exec puma ' @cli.stubs(:run_command).with(command).once @cli.start end - def test_start_task_starts_thin_with_specified_port - command = 'bundle exec thin -R config.ru start -p 2020' + def test_start_task_starts_puma_in_daemon_mode + commands = [ + 'export DAEMONIZE=true; ', + 'bundle exec puma -d' + ] + + @cli.stubs(:run_command).with(commands.join('')).once + @cli.start('-d') + end + + def test_start_task_starts_puma_with_specified_port + command = 'bundle exec puma -p 2020' @cli.stubs(:run_command).with(command).once @cli.start('-p', '2020') end @@ -116,7 +126,7 @@ def test_start_task_starts_thin_with_specified_port def test_start_task_supports_job_path_option commands = [ 'export JOB_PATH=other_spot; ', - 'bundle exec thin -R config.ru start -p 3030 ' + 'bundle exec puma ' ] @cli.stubs(:options).returns(job_path: 'other_spot') @@ -124,8 +134,8 @@ def test_start_task_supports_job_path_option @cli.start end - def test_stop_task_stops_thin_server - @cli.stubs(:run_command).with('bundle exec thin stop') + def test_stop_task_stops_puma_server + @cli.stubs(:run_command).with('bundle exec pumactl --pidfile ./tmp/pids/puma.pid stop') @cli.stop end