diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5fff1d9 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +pkg diff --git a/Gemfile b/Gemfile index de6d2c2..6a88fa0 100644 --- a/Gemfile +++ b/Gemfile @@ -10,5 +10,8 @@ group :development do gem "shoulda", ">= 0" gem "bundler", "~> 1.0.0" gem "jeweler", "~> 1.6.2" + gem 'rdoc' gem "rcov", ">= 0" + gem 'rspec', ">= 2" + gem 'timecop' end diff --git a/Gemfile.lock b/Gemfile.lock index 50c2490..51cb004 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -2,6 +2,7 @@ GEM remote: http://rubygems.org/ specs: bson (1.3.1) + diff-lcs (1.1.3) git (1.2.5) jeweler (1.6.2) bundler (~> 1.0) @@ -12,7 +13,18 @@ GEM bson (>= 1.3.1) rake (0.9.2) rcov (0.9.9) + rdoc (3.11) + json (~> 1.4) + rspec (2.7.0) + rspec-core (~> 2.7.0) + rspec-expectations (~> 2.7.0) + rspec-mocks (~> 2.7.0) + rspec-core (2.7.1) + rspec-expectations (2.7.0) + diff-lcs (~> 1.1.2) + rspec-mocks (2.7.0) shoulda (2.11.3) + timecop (0.3.5) PLATFORMS ruby @@ -23,4 +35,7 @@ DEPENDENCIES json mongo rcov + rdoc + rspec (>= 2) shoulda + timecop diff --git a/README.rdoc b/README.rdoc index be82167..c40cc00 100644 --- a/README.rdoc +++ b/README.rdoc @@ -26,6 +26,8 @@ Items popped from the queue must be confirmed complete, or they will be reissued queue = Mongo::Dequeue.new(collection, options) +If timeout is set to nil read timeout feature will be disabled. + === Pushing Items Items have a body, passed as the first argument to push(). A body can be a string, number, hash, or array. These values are preserved in the Mongo collection used to store the queue, allowing you to inspect the queue and see these values. @@ -93,4 +95,4 @@ but not confirmed. === TODO * Adjust auto generated duplication_key to be more accurate. -* Full examples \ No newline at end of file +* Full examples diff --git a/Rakefile b/Rakefile index 326b46f..2b04fae 100644 --- a/Rakefile +++ b/Rakefile @@ -25,7 +25,7 @@ Jeweler::Tasks.new do |gem| end Jeweler::RubygemsDotOrgTasks.new -require 'rake/rdoctask' +require 'rdoc/task' Rake::RDocTask.new do |rdoc| version = File.exist?('VERSION') ? File.read('VERSION') : "" @@ -34,3 +34,7 @@ Rake::RDocTask.new do |rdoc| rdoc.rdoc_files.include('README*') rdoc.rdoc_files.include('lib/**/*.rb') end + +require 'rspec/core/rake_task' +RSpec::Core::RakeTask.new(:spec) +task :default => :spec diff --git a/VERSION b/VERSION index 09a3acf..1a5ac0d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.6.0 \ No newline at end of file +0.6.9 diff --git a/lib/mongo-dequeue.rb b/lib/mongo-dequeue.rb index d344aff..441bc51 100644 --- a/lib/mongo-dequeue.rb +++ b/lib/mongo-dequeue.rb @@ -22,6 +22,13 @@ class Mongo::Dequeue # def initialize(collection, opts={}) @collection = collection + @collection.ensure_index([["locked", Mongo::ASCENDING], + ["complete", Mongo::ASCENDING], + ["priority", Mongo::DESCENDING], + ["inserted_at", Mongo::ASCENDING]]) + @collection.ensure_index([["duplicate_key", Mongo::ASCENDING], + ["complete", Mongo::ASCENDING], + ["locked_at", Mongo::ASCENDING]]) @config = DEFAULT_CONFIG.merge(opts) @batch = [] end @@ -32,7 +39,9 @@ def flush! end # Insert a new item into the queue. - # + # Valid options: + # - :priority: integer value, 3 by default. + # - :duplicate_key # Example: # queue.insert(:name => 'Billy', :email => 'billy@example.com', :message => 'Here is the thing you asked for') def push(body, item_opts = {}) @@ -48,6 +57,7 @@ def push(body, item_opts = {}) :body => body, :inserted_at => Time.now.utc, :complete => false, + :locked => false, :locked_till => nil, :completed_at => nil, :priority => item_opts[:priority] || @config[:default_priority], @@ -87,6 +97,7 @@ def batchprocess() 'body': e.body, 'inserted_at': nowutc, 'complete': false, + 'locked' : false, 'locked_till': null, 'completed_at': null, 'priority': e.priority, @@ -118,40 +129,84 @@ def batchprocess() # {:body=>"foo", :id=>"4e039c372b70275e345206e4"} def pop(opts = {}) - begin - timeout = opts[:timeout] || @config[:timeout] - cmd = BSON::OrderedHash.new - cmd['findandmodify'] = collection.name - cmd['update'] = {'$set' => {:locked_till => Time.now.utc+timeout}} - cmd['query'] = {:complete => false, '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}] } - cmd['sort'] = {:priority=>-1,:inserted_at=>1} - cmd['limit'] = 1 - cmd['new'] = true - result = collection.db.command(cmd) - rescue Mongo::OperationFailure => of - return nil - end - return { - :body => result['value']['body'], - :id => result['value']['_id'].to_s - } - end + timeout = opts[:timeout] || @config[:timeout] + locked_at = Time.now.utc + + cmd = BSON::OrderedHash.new + cmd['findandmodify'] = collection.name + if timeout + cmd['update'] = { '$set' => { :locked_at => locked_at, + :locked_till => locked_at + timeout, + :locked => true } } + cmd['query'] = {:complete => false, + '$or'=>[ {:locked => false}, + {:locked_till=> nil}, + {:locked_till=>{'$lt'=>Time.now.utc}}] } + else + cmd['update'] = { '$set' => { :locked => true, + :locked_at => locked_at } } + cmd['query'] = { "$and" => [{:complete => false}, {:locked => false }, + { "$or" => [{:locked_till => nil}, + {:locked_till => {'$lt' => Time.now.utc}}] + }]} + end + cmd['limit'] = 1 + cmd['new'] = true + + sort_directive = BSON::OrderedHash.new + sort_directive[:priority] = Mongo::DESCENDING + sort_directive[:inserted_at] = Mongo::ASCENDING + cmd['sort'] = sort_directive + + result = collection.db.command(cmd) + + if result['value'] + { :body => result['value']['body'], + :id => result['value']['_id'].to_s } + else + nil + end + rescue Mongo::OperationFailure => of + nil + end + + # "Re-add" the document to the queue + def unlock(id) + cmd = BSON::OrderedHash.new + cmd['findandmodify'] = collection.name + cmd['query'] = {:_id => BSON::ObjectId.from_string(id)} + cmd['update'] = {'$set' => {:locked => false, :locked_till => nil}} + collection.db.command(cmd) + rescue Mongo::OperationFailure => of + nil + end + + def lock_until(id, timeout) + cmd = BSON::OrderedHash.new + cmd['findandmodify'] = collection.name + cmd['query'] = {:_id => BSON::ObjectId.from_string(id)} + cmd['update'] = {'$set' => {:locked => false, + :locked_till => (Time.now.utc + timeout)}} + collection.db.command(cmd) + rescue Mongo::OperationFailure => of + nil + end # Remove the document from the queue. This should be called when the work is done and the document is no longer needed. # You must provide the process identifier that the document was locked with to complete it. def complete(id) - begin - cmd = BSON::OrderedHash.new - cmd['findandmodify'] = collection.name - cmd['query'] = {:_id => BSON::ObjectId.from_string(id)} - cmd['update'] = {'$set' => {:completed_at => Time.now.utc, :complete => true}, '$inc' => {:completecount => 1} } - cmd['limit'] = 1 - collection.db.command(cmd) - rescue Mongo::OperationFailure => of - #opfailure happens when item has been already completed - return nil - end - end + cmd = BSON::OrderedHash.new + cmd['findandmodify'] = collection.name + cmd['query'] = {:_id => BSON::ObjectId.from_string(id)} + cmd['update'] = {'$set' => {:completed_at => Time.now.utc, + :complete => true}, + '$inc' => {:completecount => 1} } + cmd['limit'] = 1 + collection.db.command(cmd) + rescue Mongo::OperationFailure => of + #opfailure happens when item has been already completed + nil + end # Removes completed job history def cleanup() @@ -167,10 +222,10 @@ def stats return db.eval( function(){ var nowutc = new Date(); - var a = db.#{collection.name}.count({'complete': false, '$or':[{'locked_till':null},{'locked_till':{'$lt':nowutc}}] }); + var a = db.#{collection.name}.count({'complete': false, '$or':[{'locked' : false}, {'locked_till':null},{'locked_till':{'$lt':nowutc}}] }); var c = db.#{collection.name}.count({'complete': true}); var t = db.#{collection.name}.count(); - var l = db.#{collection.name}.count({'complete': false, 'locked_till': {'$gte':nowutc} }); + var l = db.#{collection.name}.count({'complete': false, 'locked' : true, 'locked_till': {'$gte':nowutc} }); var rc = db.#{collection.name}.group({ 'key': {}, 'cond': {'complete':true}, @@ -236,15 +291,56 @@ def self.generate_duplicate_key(body) return Digest::MD5.hexdigest(body.to_json) #won't ever match a duplicate. Need a better way to handle hashes and arrays. end - def peek - firstfew = collection.find({ - :complete => false, - '$or'=>[{:locked_till=> nil},{:locked_till=>{'$lt'=>Time.now.utc}}] - }, - :sort => [[:priority, :descending],[:inserted_at, :ascending]], - :limit => 10) - return firstfew - end + def peek(opts = {}) + timeout = opts[:timeout] || @config[:timeout] + query = {:complete => false, } + + if timeout + query['$or'] = [ {:locked => false}, + {:locked_till=> nil}, + {:locked_till=>{'$lt'=>Time.now.utc}}] + else + query = { "$and" => [{:complete => false}, {:locked => false }, + { "$or" => [{:locked_till => nil}, + {:locked_till => {'$lt' => Time.now.utc}}] + }]} + + end + + collection.find( query, + :sort => [[:priority, :descending],[:inserted_at, :ascending]], + :limit => 10) + end + + # Set the priority of an item to a custom value + def change_item_priority obj_id, priority + cmd = BSON::OrderedHash.new + cmd['findandmodify'] = collection.name + cmd['update'] = { '$set' => { :priority => priority } } + cmd['query'] = { '_id' => obj_id } + + collection.db.command(cmd) + end + + # Increase the priority of an item by a given value + def increase_item_priority obj_id, step=1 + cmd = BSON::OrderedHash.new + cmd['findandmodify'] = collection.name + cmd['update'] = { '$inc' => { :priority => step } } + cmd['query'] = { '_id' => obj_id } + + collection.db.command(cmd) + end + + # Decrease the priority of an item by a given value + def decrease_item_priority obj_id, step=1 + cmd = BSON::OrderedHash.new + cmd['findandmodify'] = collection.name + cmd['update'] = { '$inc' => { :priority => - step } } + cmd['query'] = { '_id' => obj_id } + + collection.db.command(cmd) + end protected @@ -252,4 +348,4 @@ def value_of(result) #:nodoc: result['okay'] == 0 ? nil : result['value'] end -end \ No newline at end of file +end diff --git a/mongo-dequeue.gemspec b/mongo-dequeue.gemspec index 94d1d76..f2b865f 100644 --- a/mongo-dequeue.gemspec +++ b/mongo-dequeue.gemspec @@ -4,14 +4,14 @@ # -*- encoding: utf-8 -*- Gem::Specification.new do |s| - s.name = %q{mongo-dequeue} - s.version = "0.6.0" + s.name = "mongo-dequeue" + s.version = File.read("VERSION").strip s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version= s.authors = ["TelegramSam"] - s.date = %q{2011-08-29} - s.description = %q{A de-duplicating priority queue that uses mongodb as the storage engine.} - s.email = %q{telegramsam@gmail.com} + s.date = "2012-03-28" + s.description = "A de-duplicating priority queue that uses mongodb as the storage engine." + s.email = "telegramsam@gmail.com" s.extra_rdoc_files = [ "LICENSE.txt", "README.rdoc" @@ -25,22 +25,14 @@ Gem::Specification.new do |s| "VERSION", "lib/mongo-dequeue.rb", "mongo-dequeue.gemspec", - "pkg/mongo-dequeue-0.1.0.gem", - "pkg/mongo-dequeue-0.2.0.gem", - "pkg/mongo-dequeue-0.2.1.gem", - "pkg/mongo-dequeue-0.3.0.gem", - "pkg/mongo-dequeue-0.4.0.gem", - "pkg/mongo-dequeue-0.4.1.gem", - "pkg/mongo-dequeue-0.5.0.gem", - "pkg/mongo-dequeue-0.5.1.gem", "spec/mongo_dequeue_spec.rb", "spec/spec_helper.rb" ] - s.homepage = %q{http://github.com/TelegramSam/Dequeue} + s.homepage = "http://github.com/TelegramSam/Dequeue" s.licenses = ["MIT"] s.require_paths = ["lib"] - s.rubygems_version = %q{1.5.3} - s.summary = %q{Mongo based de-duplicating pritority queue} + s.rubygems_version = "1.8.15" + s.summary = "Mongo based de-duplicating pritority queue" if s.respond_to? :specification_version then s.specification_version = 3 @@ -51,14 +43,20 @@ Gem::Specification.new do |s| s.add_development_dependency(%q, [">= 0"]) s.add_development_dependency(%q, ["~> 1.0.0"]) s.add_development_dependency(%q, ["~> 1.6.2"]) + s.add_development_dependency(%q, [">= 0"]) s.add_development_dependency(%q, [">= 0"]) + s.add_development_dependency(%q, [">= 2"]) + s.add_development_dependency(%q, [">= 0"]) else s.add_dependency(%q, [">= 0"]) s.add_dependency(%q, [">= 0"]) s.add_dependency(%q, [">= 0"]) s.add_dependency(%q, ["~> 1.0.0"]) s.add_dependency(%q, ["~> 1.6.2"]) + s.add_dependency(%q, [">= 0"]) s.add_dependency(%q, [">= 0"]) + s.add_dependency(%q, [">= 2"]) + s.add_dependency(%q, [">= 0"]) end else s.add_dependency(%q, [">= 0"]) @@ -66,7 +64,10 @@ Gem::Specification.new do |s| s.add_dependency(%q, [">= 0"]) s.add_dependency(%q, ["~> 1.0.0"]) s.add_dependency(%q, ["~> 1.6.2"]) + s.add_dependency(%q, [">= 0"]) s.add_dependency(%q, [">= 0"]) + s.add_dependency(%q, [">= 2"]) + s.add_dependency(%q, [">= 0"]) end end diff --git a/pkg/mongo-dequeue-0.1.0.gem b/pkg/mongo-dequeue-0.1.0.gem deleted file mode 100644 index 1cd1978..0000000 Binary files a/pkg/mongo-dequeue-0.1.0.gem and /dev/null differ diff --git a/pkg/mongo-dequeue-0.2.0.gem b/pkg/mongo-dequeue-0.2.0.gem deleted file mode 100644 index f76a2a6..0000000 Binary files a/pkg/mongo-dequeue-0.2.0.gem and /dev/null differ diff --git a/pkg/mongo-dequeue-0.2.1.gem b/pkg/mongo-dequeue-0.2.1.gem deleted file mode 100644 index d700635..0000000 Binary files a/pkg/mongo-dequeue-0.2.1.gem and /dev/null differ diff --git a/pkg/mongo-dequeue-0.3.0.gem b/pkg/mongo-dequeue-0.3.0.gem deleted file mode 100644 index 310af10..0000000 Binary files a/pkg/mongo-dequeue-0.3.0.gem and /dev/null differ diff --git a/pkg/mongo-dequeue-0.4.0.gem b/pkg/mongo-dequeue-0.4.0.gem deleted file mode 100644 index ee016c3..0000000 Binary files a/pkg/mongo-dequeue-0.4.0.gem and /dev/null differ diff --git a/pkg/mongo-dequeue-0.4.1.gem b/pkg/mongo-dequeue-0.4.1.gem deleted file mode 100644 index d430a5f..0000000 Binary files a/pkg/mongo-dequeue-0.4.1.gem and /dev/null differ diff --git a/pkg/mongo-dequeue-0.5.0.gem b/pkg/mongo-dequeue-0.5.0.gem deleted file mode 100644 index dd16b6e..0000000 Binary files a/pkg/mongo-dequeue-0.5.0.gem and /dev/null differ diff --git a/pkg/mongo-dequeue-0.5.1.gem b/pkg/mongo-dequeue-0.5.1.gem deleted file mode 100644 index 205daf2..0000000 Binary files a/pkg/mongo-dequeue-0.5.1.gem and /dev/null differ diff --git a/spec/mongo_dequeue_spec.rb b/spec/mongo_dequeue_spec.rb index e8886be..6906f21 100644 --- a/spec/mongo_dequeue_spec.rb +++ b/spec/mongo_dequeue_spec.rb @@ -2,24 +2,24 @@ require 'pp' describe Mongo::Dequeue do - + def insert_and_inspect(body, options={}) - @queue.push(body,options) - @queue.send(:collection).find_one + @queue.push(body,options) + @collection.find_one end - - + + before(:all) do opts = { :timeout => 60} - @collection = Mongo::Connection.new('localhost', nil, :pool_size => 4).db('mongo_queue_spec').collection('spec') - @queue = Mongo::Dequeue.new(@collection, opts) + @collection = Mongo::Connection.new('localhost', nil, :pool_size => 4).db('mongo_queue_spec').collection('spec') + @queue = Mongo::Dequeue.new(@collection, opts) end - + before(:each) do @queue.flush! end - + describe "Configuration" do it "should set the connection" do @@ -29,301 +29,635 @@ def insert_and_inspect(body, options={}) it "should allow timeout option" do @queue.config[:timeout].should eql(60) end - + it "should have a sane set of defaults" do - q = Mongo::Dequeue.new(nil) + q = Mongo::Dequeue.new(@collection) q.config[:timeout].should eql 300 end end - + describe "Inserting a standard Job" do before(:all) do - @item = insert_and_inspect({:message => 'MongoQueueSpec', :foo => 5}) + @item = insert_and_inspect({:message => 'MongoQueueSpec', :foo => 5}) end - + it "should set priority to 3 by default" do @item['priority'].should be(3) end - + it "should not be complete" do @item['complete'].should be false end - + it "should have a null completed_at" do - @item['completed_at'].should be nil + @item['completed_at'].should be nil end - + it "should set a null locked_at" do @item['locked_at'].should be nil end - + it "should have no duplicates" do @item['count'].should be 1 end - + it "should have a duplicate_key" do @item['duplicate_key'].should_not be nil end - + it "should return struct body properly" do @item['body']['message'].should eql('MongoQueueSpec') @item['body']['foo'].should be(5) end end - + describe "bulk inserting multiple jobs" do - before(:all) do - @queue.batchpush({:message => 'MongoQueueSpec1', :foo => 5}) - @queue.batchpush({:message => 'MongoQueueSpec2', :foo => 5}) - @queue.batchpush({:message => 'MongoQueueSpec3', :foo => 5}) + before(:all) do + @queue.batchpush({:message => 'MongoQueueSpec1', :foo => 5}) + @queue.batchpush({:message => 'MongoQueueSpec2', :foo => 5}) + @queue.batchpush({:message => 'MongoQueueSpec3', :foo => 5}) end - + it "should correctly count items in batch" do - @queue.batch.length.should be(3) + @queue.batch.length.should be(3) end - + it "should correctly add items on process" do - @queue.batchprocess() - @queue.send(:collection).count.should == 3 - @queue.batch.length.should == 0 + @queue.batchprocess() + @collection.count.should == 3 + @queue.batch.length.should == 0 + end + + end + + describe "Inserting different body types" do + before(:each) do + @queue.flush! + end + + it "should handle a struct" do + i = insert_and_inspect({:message => 'MongoQueueSpec', :foo => 5}) + i['body']['message'].should eql('MongoQueueSpec') + i['body']['foo'].should be(5) + end + + it "should handle a string" do + i = insert_and_inspect("foobarbaz") + i['body'].should eql "foobarbaz" + end + + it "should handle a number" do + i = insert_and_inspect(42) + i['body'].should be 42 + end + end + + describe "Deduplicating messages" do + before(:each) do + @queue.flush! + end + + it "should combine identical bodies of type string" do + a = insert_and_inspect("foo") + b = insert_and_inspect("foo") + @collection.count.should be 1 + end + + it "should not combine different bodies of type string" do + a = insert_and_inspect("foo") + b = insert_and_inspect("bar") + @collection.count.should be 2 + b['count'].should be 1 + end + + it "should combine identical bodies of type struct" do + pending "Test after we have a better way of handling structs" + a = insert_and_inspect({:a=>'a',:b=>'b'}) + b = insert_and_inspect({:a=>'a',:b=>'b'}) + c = insert_and_inspect({:b=>'b',:a=>'a'}) + @collection.count.should be 1 + end + + it "should not combine different bodies of type struct" do + a = insert_and_inspect({:a=>'a',:b=>'b'}) + b = insert_and_inspect({:a=>'a',:c=>'c'}) + @collection.count.should be 2 + b['count'].should be 1 + end + + it "should combine based on duplication_key" do + a = insert_and_inspect({:a=>'a',:b=>'b'}, :duplicate_key => 'match') + b = insert_and_inspect({:a=>'a',:c=>'c'}, :duplicate_key => 'match') + @collection.count.should be 1 + b['count'].should be 2 + end + + it "should not combine based on duplication_key" do + a = insert_and_inspect("foo", :duplicate_key => 'match') + b = insert_and_inspect("foo", :duplicate_key => 'nomatch') + @collection.count.should be 2 + b['count'].should be 1 + end + + end + + describe "Popping messages" do + before(:each) do + @queue.flush! + end + + it "should return message" do + a = insert_and_inspect("foo") + m = @queue.pop + m[:body].should eq "foo" + @collection.count.should be 1 + end + + it "should unlock a queue item" do + body = "Unlock Me" + @queue.push body, {:priority => 1} + item = @queue.pop + @queue.unlock(item[:id]) + item2 = @queue.pop + item2[:body].should eq body + end + + it "should return an id" do + a = insert_and_inspect("foo") + m = @queue.pop + m[:id].should_not be nil + @collection.count.should be 1 + end + + it "should return nil when queue is empty" do + m = @queue.pop + m.should be nil + @collection.count.should be 0 + end + + it "should complete ok" do + a = insert_and_inspect("foo") + m = @queue.pop + @queue.complete(m[:id]) + m2 = @queue.pop + m2.should be nil + @collection.count.should be 1 + end + + it "should always set the locked_at attribute" do + expected = Time.now.utc + @queue.push 'test item 1' + @queue.push 'test item 2' + + Timecop.freeze expected do + item_id = @queue.pop[:id] + raw_item = @collection.find_one({ '_id' => BSON::ObjectId(item_id) }) + raw_item['locked_at'].to_i.should eq expected.to_i + + item_id = @queue.pop(:timeout => 60)[:id] + raw_item = @collection.find_one({ '_id' => BSON::ObjectId(item_id) }) + raw_item['locked_at'].to_i.should eq expected.to_i + end + + end + + it "should consider the timeout value" do + time = Time.now + timeout = 120 + item_id = nil + + Timecop.freeze time do + @queue.push 'test item', {:timeout => timeout} + item_id = @queue.pop[:id] + end + + Timecop.freeze(time + 10) do + @queue.pop.should eq nil + end + + Timecop.freeze(time + timeout + 10) do + actual = @queue.pop + actual.should_not be_nil + actual[:id].should eq item_id + end + end + + describe 'all the items have the same priority' do + it "should be a standard FIFO queue" do + items = %w(a b c d) + + items.each do |item| + @queue.push item + end + + items.each do |expected| + actual = @queue.pop + actual[:body].should eq expected + end + end + end + + describe 'items have different priority' do + it "should return items with higher priority first" do + p3 = 'priority 3' + p2 = 'priority 2' + p1 = 'priority 1' + + @queue.push p1, {:priority => 1} + @queue.push p3, {:priority => 3} + @queue.push p2, {:priority => 2} + + @queue.pop[:body].should eq p3 + @queue.pop[:body].should eq p2 + @queue.pop[:body].should eq p1 + end + + it "should sort by priority and then by insertion" do + p1 = 'priority 1' + p3 = 'priority 3' + p2_first = 'priority 2 - first insert' + p2_last = 'priority 2 - last insert' + + @queue.push p1, {:priority => 1} + @queue.push p2_first, {:priority => 2} + @queue.push p3, {:priority => 3} + @queue.push p2_last, {:priority => 2} + + @queue.pop[:body].should eq p3 + @queue.pop[:body].should eq p2_first + @queue.pop[:body].should eq p2_last + @queue.pop[:body].should eq p1 + end + + end + + end + + describe "Peek" do + it "should not remove items from the queue" do + expected = %w(a b c d) + + expected.each do |item| + @queue.push item + end + + 2.times do + actual = @queue.peek.map{|i| i["body"]} + actual.should eq expected + end + end + + it "should consider the timeout value" do + time = Time.now + timeout = 120 + item_id = nil + + Timecop.freeze time do + @queue.push 'test item', {:timeout => timeout} + item_id = @queue.pop[:id] + end + + Timecop.freeze(time + 10) do + @queue.pop.should eq nil + @queue.peek.count.should eq 0 + end + + Timecop.freeze(time + timeout + 10) do + actual = @queue.peek.first + actual.should_not be_nil + actual['_id'].to_s.should eq item_id + end + + end + + describe 'all the items have the same priority' do + it "should act as a FIFO queue" do + expected = %w(a b c d) + + expected.each do |item| + @queue.push item + end + actual = @queue.peek.map{|i| i["body"]} + actual.should eq expected + end end - + + describe 'items have different priority' do + it "should return items with higher priority first" do + p3 = 'priority 3' + p2 = 'priority 2' + p1 = 'priority 1' + expected = [p3, p2, p1] + + @queue.push p1, {:priority => 1} + @queue.push p3, {:priority => 3} + @queue.push p2, {:priority => 2} + + actual = @queue.peek.map{|i| i["body"]} + actual.should eq expected + end + + it "should sort by priority and then by insertion" do + p1 = 'priority 1' + p3 = 'priority 3' + p2_first = 'priority 2 - first insert' + p2_last = 'priority 2 - last insert' + expected = [p3, p2_first, p2_last, p1] + + @queue.push p1, {:priority => 1} + @queue.push p2_first, {:priority => 2} + @queue.push p3, {:priority => 3} + @queue.push p2_last, {:priority => 2} + + actual = @queue.peek.map{|i| i["body"]} + actual.should eq expected + end + end + + end + + describe "Completing" do + before(:all) do + @a = insert_and_inspect("a") + @b = insert_and_inspect("b") + @c = insert_and_inspect("c") + + @ap = @queue.pop + @queue.complete(@ap[:id]) + @ac = @collection.find_one({:_id => BSON::ObjectId.from_string(@ap[:id])}) + + @bp = @queue.pop + @bc = @collection.find_one({:_id => BSON::ObjectId.from_string(@bp[:id])}) + + @cp = @queue.pop + @queue.complete(@cp[:id]) + @queue.complete(@cp[:id]) + @queue.complete(@cp[:id]) + @cc = @collection.find_one({:_id => BSON::ObjectId.from_string(@cp[:id])}) + @stats = @queue.stats + + end + it "should count a single completion" do + @ac["completecount"].should eq 1 + + end + it "should report zero for uncompleted items" do + @bc["completecount"].should eq 0 + end + it "should count a single completion" do + @cc["completecount"].should eq 3 + end + it "should report stats correctly" do + @stats[:redundantcompletes].should == 2 + end + end + + describe "Stats" do + before(:all) do + @a = insert_and_inspect("a") + + @b = insert_and_inspect("b") + @c = insert_and_inspect("c") + @d = insert_and_inspect("d") + @e = insert_and_inspect({:task => "foo"}) + + @ap = @queue.pop(:timeout => 1) + @bp = @queue.pop + @cp = @queue.pop + + sleep(2) + + @queue.complete(@bp[:id]) + + @stats = @queue.stats + + end + #locked, complete, available, total + it "should count complete" do + @stats[:complete].should == 1 + end + it "should count total" do + @stats[:total].should == 5 + end + it "should count available" do + @stats[:available].should == 3 + end + it "should count locked" do + @stats[:locked].should == 1 + end + it "should count redundant completes" do + @stats[:redundantcompletes].should == 0 + end + it "should count priorities" do + #pp @stats[:priority] + @stats[:priority].should == [{"priority"=>3.0, "complete"=>1.0, "waiting"=>4.0}] + end + it "should count tasks" do + #pp @stats[:tasks] + @stats[:tasks].should == [ + {"body.task"=>nil, "complete"=>1.0, "waiting"=>3.0}, + {"body.task"=>"foo", "complete"=>0.0, "waiting"=>1.0} + ] + end + + end + + describe "disable item timeout" do + before(:each) do + opts = {:timeout => nil} + @queue = Mongo::Dequeue.new(@collection, opts) + @queue.flush! + end + + it "should work with pop" do + expected = "hello world" + @queue.push expected + item = @queue.pop + item[:body].should eq expected + + Timecop.travel(Time.local(Time.now.year + 10)) do + actual = @queue.pop + actual.should be_nil + end + end + + it "should work with peek" do + expected = "hello world" + @queue.push expected + item = @queue.pop + item[:body].should eq expected + + Timecop.travel(Time.local(Time.now.year + 10)) do + actual = @queue.peek + actual.count.should eq 0 + end + end + end - - - describe "Inserting different body types" do - before(:each) do - @queue.flush! - end - - it "should handle a struct" do - i = insert_and_inspect({:message => 'MongoQueueSpec', :foo => 5}) - i['body']['message'].should eql('MongoQueueSpec') - i['body']['foo'].should be(5) - end - - it "should handle a string" do - i = insert_and_inspect("foobarbaz") - i['body'].should eql "foobarbaz" - end - - it "should handle a number" do - i = insert_and_inspect(42) - i['body'].should be 42 - end - end - - describe "Deduplicating messages" do - before(:each) do - @queue.flush! - end - - it "should combine identical bodies of type string" do - a = insert_and_inspect("foo") - b = insert_and_inspect("foo") - @queue.send(:collection).count.should be 1 - end - - it "should not combine different bodies of type string" do - a = insert_and_inspect("foo") - b = insert_and_inspect("bar") - @queue.send(:collection).count.should be 2 - b['count'].should be 1 - end - - it "should combine identical bodies of type struct" do - pending "Test after we have a better way of handling structs" - a = insert_and_inspect({:a=>'a',:b=>'b'}) - b = insert_and_inspect({:a=>'a',:b=>'b'}) - c = insert_and_inspect({:b=>'b',:a=>'a'}) - @queue.send(:collection).count.should be 1 - end - - it "should not combine different bodies of type struct" do - a = insert_and_inspect({:a=>'a',:b=>'b'}) - b = insert_and_inspect({:a=>'a',:c=>'c'}) - @queue.send(:collection).count.should be 2 - b['count'].should be 1 - end - - it "should combine based on duplication_key" do - a = insert_and_inspect({:a=>'a',:b=>'b'}, :duplicate_key => 'match') - b = insert_and_inspect({:a=>'a',:c=>'c'}, :duplicate_key => 'match') - @queue.send(:collection).count.should be 1 - b['count'].should be 2 - end - - it "should not combine based on duplication_key" do - a = insert_and_inspect("foo", :duplicate_key => 'match') - b = insert_and_inspect("foo", :duplicate_key => 'nomatch') - @queue.send(:collection).count.should be 2 - b['count'].should be 1 - end - - end - - describe "Popping messages" do - before(:each) do - @queue.flush! - end - - it "should return message" do - a = insert_and_inspect("foo") - m = @queue.pop - m[:body].should eq "foo" - @queue.send(:collection).count.should be 1 - end - - it "should return an id" do - a = insert_and_inspect("foo") - m = @queue.pop - m[:id].should_not be nil - @queue.send(:collection).count.should be 1 - end - - it "should return nil when queue is empty" do - m = @queue.pop - m.should be nil - @queue.send(:collection).count.should be 0 - end - - it "should complete ok" do - a = insert_and_inspect("foo") - m = @queue.pop - @queue.complete(m[:id]) - m2 = @queue.pop - m2.should be nil - @queue.send(:collection).count.should be 1 - end - - it "should pop again after timeout" do - a = insert_and_inspect("foo") - m = @queue.pop(:timeout => 1) - sleep(2) - m2 = @queue.pop - m2[:id].should eq m[:id] - @queue.send(:collection).count.should be 1 - end - - it "should pop in order" do - @a = insert_and_inspect("a") - @b = insert_and_inspect("b") - @c = insert_and_inspect("c") - @d = insert_and_inspect("d") - - @ap = @queue.pop - @bp = @queue.pop - @cp = @queue.pop - @dp = @queue.pop - - @ap[:body].should eq "a" - @bp[:body].should eq "b" - @cp[:body].should eq "c" - @dp[:body].should eq "d" - end - - end - - describe "Peek" do - it "should peek properly" do - @a = insert_and_inspect("a") - @b = insert_and_inspect("b") - - @peek = [] - p = @queue.peek - p.each{|q| @peek << q } - - @peek.length.should == 2 - end - end - - describe "Completing" do - before(:all) do - @a = insert_and_inspect("a") - @b = insert_and_inspect("b") - @c = insert_and_inspect("c") - - - @ap = @queue.pop - @queue.complete(@ap[:id]) - @ac = @queue.send(:collection).find_one({:_id => BSON::ObjectId.from_string(@ap[:id])}) - - @bp = @queue.pop - @bc = @queue.send(:collection).find_one({:_id => BSON::ObjectId.from_string(@bp[:id])}) - - @cp = @queue.pop - @queue.complete(@cp[:id]) - @queue.complete(@cp[:id]) - @queue.complete(@cp[:id]) - @cc = @queue.send(:collection).find_one({:_id => BSON::ObjectId.from_string(@cp[:id])}) - @stats = @queue.stats - - end - it "should count a single completion" do - @ac["completecount"].should eq 1 - - end - it "should report zero for uncompleted items" do - @bc["completecount"].should eq 0 - end - it "should count a single completion" do - @cc["completecount"].should eq 3 - end - it "should report stats correctly" do - @stats[:redundantcompletes].should == 2 - end - end - - describe "Stats" do - before(:all) do - @a = insert_and_inspect("a") - - @b = insert_and_inspect("b") - @c = insert_and_inspect("c") - @d = insert_and_inspect("d") - @e = insert_and_inspect({:task => "foo"}) - - @ap = @queue.pop(:timeout => 1) - @bp = @queue.pop - @cp = @queue.pop - - sleep(2) - - @queue.complete(@bp[:id]) - - @stats = @queue.stats - - end - #locked, complete, available, total - it "should count complete" do - @stats[:complete].should == 1 - end - it "should count total" do - @stats[:total].should == 5 - end - it "should count available" do - @stats[:available].should == 3 - end - it "should count locked" do - @stats[:locked].should == 1 - end - it "should count redundant completes" do - @stats[:redundantcompletes].should == 0 - end - it "should count priorities" do - #pp @stats[:priority] - @stats[:priority].should == [{"priority"=>3.0, "complete"=>1.0, "waiting"=>4.0}] - end - it "should count tasks" do - #pp @stats[:tasks] - @stats[:tasks].should == [ - {"body.task"=>nil, "complete"=>1.0, "waiting"=>3.0}, - {"body.task"=>"foo", "complete"=>0.0, "waiting"=>1.0} - ] - end - - - end - - -end \ No newline at end of file + + describe "changing item priority" do + describe "set a custom value" do + it "work as expected" do + expected_priority = 3 + item = insert_and_inspect("Test", {:priority => 1}) + + @queue.change_item_priority item['_id'], expected_priority + + item = @collection.find_one + item["priority"].should eq expected_priority + end + + it "should not complain if item is not found" do + item = insert_and_inspect("Test", {:priority => 2}) + id = item['_id'] + + @collection.remove('_id' => id) + @queue.stats[:total].should eq 0 + + lambda do + @queue.change_item_priority(item['_id'], 1) + end.should_not raise_error + end + + end + + describe "increase the priority by steps" do + it "should use 1 as default step value" do + expected_priority = 3 + item = insert_and_inspect("Test", {:priority => 2}) + + @queue.increase_item_priority item['_id'] + + item = @collection.find_one + item["priority"].should eq expected_priority + end + + it "should allow custom values for step" do + expected_priority = 4 + item = insert_and_inspect("Test", {:priority => 2}) + + @queue.increase_item_priority item['_id'], 2 + + item = @collection.find_one + item["priority"].should eq expected_priority + end + + it "should not complain if item is not found" do + item = insert_and_inspect("Test", {:priority => 2}) + id = item['_id'] + + @collection.remove('_id' => id) + @queue.stats[:total].should eq 0 + + lambda do + @queue.increase_item_priority(item['_id']) + end.should_not raise_error + end + + end + + describe "decrease the priority by steps" do + it "should use 1 as default step value" do + expected_priority = 1 + item = insert_and_inspect("Test", {:priority => 2}) + + @queue.decrease_item_priority item['_id'] + + item = @collection.find_one + item["priority"].should eq expected_priority + end + + it "should allow custom values for step" do + expected_priority = 2 + item = insert_and_inspect("Test", {:priority => 4}) + + @queue.decrease_item_priority item['_id'], 2 + + item = @collection.find_one + item["priority"].should eq expected_priority + end + + it "should not complain if item is not found" do + item = insert_and_inspect("Test", {:priority => 2}) + id = item['_id'] + + @collection.remove('_id' => id) + @queue.stats[:total].should eq 0 + + lambda do + @queue.decrease_item_priority(item['_id']) + end.should_not raise_error + end + + end + + describe "locking an item until a certain time on a queue where timeout is disabled" do + before(:all) do + # ensure the item timeout feature is disabled + opts = {:timeout => nil} + @queue = Mongo::Dequeue.new(@collection, opts) + @queue.flush! + end + + before(:each) do + insert_and_inspect("Test") + item = @queue.pop + + @base_time = Time.now + Timecop.freeze(@base_time) do + @queue.lock_until(item[:id], 15) + end + + @raw_item = @collection.find_one(BSON::ObjectId.from_string(item[:id])) + end + + describe "item inside of the queue" do + it "should not be marked as completed" do + @raw_item['complete'].should be_false + end + + it "should not be locked" do + @raw_item['locked'].should be_false + end + end + + describe "popping" do + it "should not be popped out if the lock is not expired" do + Timecop.freeze(@base_time + 2) do + @queue.pop.should be_nil + end + end + + it "should be popped out once the lock expires"do + Timecop.freeze(@base_time + 60) do + item = @queue.pop + item.should_not be_nil + item[:id].should == @raw_item["_id"].to_s + end + end + end + + describe "peekin" do + it "should not be peeked out if the lock is not expired" do + Timecop.freeze(@base_time + 2) do + @queue.peek.count.should be_zero + end + end + + it "should be peeked once the lock expires"do + Timecop.freeze(@base_time + 60) do + items = @queue.peek + items.count.should == 1 + item = items.first + item.should_not be_nil + item["_id"].should == @raw_item["_id"] + end + end + + end + + end + + end + +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index c3beb2b..8454c10 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -3,4 +3,5 @@ require 'rubygems' require 'mongo' -require 'mongo-dequeue' \ No newline at end of file +require 'mongo-dequeue' +require 'timecop'