Skip to content

thestick613/mongoqueue

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

44 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

mongoqueue

Properties

  • Isolation

    Do not let different consumers process the same message.

  • Reliablity

    Do not let a failed consumer disappear an item.

  • Atomic

    Operations on the queue are atomic.

Usage

A queue can be instantiated with a mongo collection and a consumer identifier. The consumer identifier helps distinguish multiple queue consumers that are taking jobs from the queue:

>> from pymongo import MongoClient
>> from mongoqueue import MongoQueue
>> queue = MongoQueue(
...   MongoClient().test_db.doctest_queue,
...   consumer_id="consumer-1",
...   timeout=300,
...   max_attempts=3,
...   retry_after=0)

The MongoQueue class timeout parameters specifies how long in a seconds a how long a job may be held by a consumer before its considered failed.

A job which timeouts or errors more than the max_attempts parameter is considered permanently failed, and will no longer be processed.

The retry_after parameter is the default waiting time, in seconds, between two attempts of the same job, when the first one failed. This property supersedes scheduling and priority.

New jobs/items can be placed in the queue by passing a dictionary:

>> queue.put({"foobar": 1})

A job priority key and integer value can be specified in the dictionary which will cause the job to be processed before lower priority items:

>> queue.put({"foobar": 0}, priority=1})

You can disable in the put function the unique check for a job, by passing no_dupe=False, which is, by default, True.

An item can be fetched out by calling the next method on a queue. This returns a Job object:

>> job = queue.next()
>> job.payload
{"foobar": 1}

You can prefer to select objects from the queue which have certain properties, which are translated directly into mongo syntax. You only have to remember that the saved message is put in the "payload" key:

>> queue.put({"type":"alert"})
>> queue.put({"type":"message"})
>> job = queue.next({"payload.type":"message"})
>> job.payload
{"type":"message"}
>> job.complete()
>> job = queue.next({"payload.type":"message"})
>> job
None
>> job = queue.next()
>> job.payload
{"type":"alert"}
>> job.complete()

The job class exposes some control methods on the job, for marking progress, completion, errors, or releasing the job back into the queue.

  • complete Marks a job as complete and removes it from the queue.
    It will not successfully release the job if it had run for more than timeout seconds. The queueing mechanism will assume that the job has failed.
  • error Optionally specified with a message, releases the job back to the queue, and increments its attempts, and stores the error message on the job.
    It has an optional parameter, called custom_retry_after, which supersedes the queue's internal retry_after property only one time.
  • progress Optionally takes a progress count integer, notes progress on the job and resets the lock timeout.
  • release Release a job back to the pool. The attempts counter is modified.
    It has an optional parameter, called custom_retry_after, which supersedes the queue's internal retry_after property only one time.
  • defer Release a job back to the pool. The attempts counter is not modified.
    It has an optional parameter, called custom_retry_after, which supersedes the queue's internal retry_after property only one time.

As a convience the job supports the context manager protocol:

>> with job as data:
...   print data['payload']

{"foobar: 0}

If the context closure is exited without the job is marked complete, if there's an exception the error is stored on the job.

Inspired By

Running Tests

Unit tests can be run with

$ python setup.py nosetests

Changes

  • 0.8.4 - Feb 3rd, 2016 - Added a hack_priority parameter inside next_free_fastest
  • 0.8.2 - Jan 30th, 2016 - Add a next_free_fastest function, which ignores scheduled jobs and priority and just returns one valid job
  • 0.8.1 - Jan 27th, 2016 - Add a next_free_fast function, which ignores scheduled jobs
  • 0.8.0 - Jan 08th, 2016 - Fix release function and create defer.
  • 0.7.9 - Jan 08th, 2016 - Added no_dupe parameter in put.
  • 0.7.7 - Dec 29th, 2015 - Added function to repair stale locks on sharded clusters.
  • 0.7.6 - Dec 19th, 2015 - Allow to delay failed or re-released jobs.
  • 0.7.5 - Nov 30th, 2015 - Allow to query by partial payload message.
  • 0.6.0 - Feb 4th, 2013 - Isolate passed in data from metadata in Job.
  • 0.5.2 - Dec 9th, 2012 - Fix for regression in sort parameters from pymongo 2.4
  • 0.5.1 - Dec 2nd, 2012 - Packaging fix for readme data file.

Credits

  • Kapil Thangavelu, author & maintainer
  • Dustin Laurence, sort fix for pymongo 2.4
  • Jonathan Sackett, Job data isolation.

About

Simple python library for distributed queues in mongodb

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 100.0%