From 96bb4a102952928eb774f04799cc3ba3f507a2ba Mon Sep 17 00:00:00 2001 From: Krassi Date: Fri, 12 Jun 2015 12:50:09 -0400 Subject: [PATCH 1/6] added support for: + atomic counters with the resulting counter value returned to the caller + support for partial data collection; the complete data is returned upon completion fixed: + multiple caches being initialized, one for each worker baseline ops per sec ~ 10K/sec --- lib/LRUCacheProxy.js | 27 +++++++- lib/lru-cache-cluster.js | 118 ++++++++++++++++++++------------- test/clustered.js | 5 ++ test/worker.js | 139 +++++++++++++++++++++++++-------------- 4 files changed, 193 insertions(+), 96 deletions(-) diff --git a/lib/LRUCacheProxy.js b/lib/LRUCacheProxy.js index 26680ce..8e564bc 100644 --- a/lib/LRUCacheProxy.js +++ b/lib/LRUCacheProxy.js @@ -43,14 +43,14 @@ var LRUCacheProxy = function LRUCacheProxy(options) { }; LRUCacheProxy.prototype.get = function(key, callback) { - var failSafe = setTimeout(function() { failSafe = undefined; callback(undefined); }, 100); + var failSafe = setTimeout(function() { failSafe = undefined; callback&&callback(undefined); }, 100); send(this.namespace, 'get', key, function(result) { if(!failSafe) return; clearTimeout(failSafe); - callback(result.value); + callback&&callback(result.value); }); } @@ -58,6 +58,29 @@ LRUCacheProxy.prototype.set = function(key, value) { send(this.namespace, 'set', key, value, function() {}); } + +LRUCacheProxy.prototype.complete = function( key, seq, total, partialData, callback ) { + var failSafe = setTimeout(function() { failSafe = undefined; callback&&callback(undefined); }, 100); + send(this.namespace, 'complete', key, seq, total, partialData, function(result) { + if(!failSafe) return; + + clearTimeout(failSafe); + + callback&&callback(result.value); + }); +} + +LRUCacheProxy.prototype.count = function( key, inc, callback ) { + var failSafe = setTimeout(function() { failSafe = undefined; callback&&callback(undefined); }, 100); + send(this.namespace, 'count', key, inc, function(result) { + if(!failSafe) return; + + clearTimeout(failSafe); + + callback&&callback(result.value); + }); +} + Object.defineProperty(LRUCacheProxy.prototype, 'pendingMessages', { get : function () { return Object.keys(pending).length; }, enumerable : true }); module.exports = LRUCacheProxy; diff --git a/lib/lru-cache-cluster.js b/lib/lru-cache-cluster.js index e23b1d0..b83a8e9 100644 --- a/lib/lru-cache-cluster.js +++ b/lib/lru-cache-cluster.js @@ -1,57 +1,87 @@ var cluster = require('cluster'); var LRUCache = require('lru-cache'); -var caches = {}; +var caches = {}, messages = 0, ts; if (cluster.isMaster) { cluster.on('fork', function(worker) { - worker.on('message', function(msg) { - if (msg.source !== 'lru-cache-cluster') return; - var lru = caches[msg.namespace]; - - function send(cmd, data){ - data.source = 'lru-cache-cluster'; - data.cmd = cmd; - data.namespace = lru.namespace; - data.id = msg.id; - - worker.send(data); - } - - var switcheroo = { - max: function () {}, - lengthCalculator: function () {}, - length: function () {}, - itemCount: function () {}, - forEach: function () {}, - keys: function () {}, - values: function () {}, - reset: function () {}, - dump: function () {}, - dumpLru: function () {}, - set: function (args) { - lru.set(args[0], args[1]); - - send('setResponse', {}); - }, - has: function () {}, - get: function (args) { + var lru, f; + + var send = function(cmd, data, msg){ + data.source = 'lru-cache-cluster'; + data.cmd = cmd; + data.namespace = lru.namespace; + data.id = msg.id; + worker.send( data ); }; + + var nop = function () {}; + var set = function ( args, msg ) { + lru.set( args[0], args[1] ); + send( 'setResponse', {}, msg ); }; + var get = function ( args, msg ) { var value = lru.get(args[0]); + send( 'getResponse', { value: value }, msg ); }; + var count = function( args, msg ) { + var key = args[ 0 ], inc = args[ 1 ]; + var it = lru.get( key ); + if ( isNaN ( it ) ) { + it = 0; } + it += inc; lru.set( key, it ); + send( 'getCount', { value: it }, msg ); }; + var complete = function( args, msg ) { + var key = args[ 0 ], seq = args[ 1 ], total = args[ 2 ], partialData = args[ 3 ]; + var set = false, it = lru.get( key ) || ( ( set = true ) && { c: [], w: 0 } ); + ++it.w; it.c[ seq ] = partialData; + // console.log( args, it ); + if ( it.w === total ) { + if ( !set ) { + lru.del( key ); } + send( 'getComplete', { value: it.c.join( "" ) }, msg ); } + else if ( set ) { + // console.log( "setting " ); + lru.set( key, it ); + send( 'getComplete', { value: "" }, msg ); } }; + var constructor = function( args, msg ) { + var ns = msg.namespace; + if ( !( lru = caches[ ns ] ) ) { + caches[ ns ] = lru = LRUCache(args[0]); } + lru.namespace = ns; + send('constructorResponse', {}, msg); }; - send('getResponse', { value: value }); - }, - peek: function () {}, - pop: function () {}, - del: function () {}, - constructor: function(args) { - lru = caches[msg.namespace] = LRUCache(args[0]); - lru.namespace = msg.namespace; + var switcheroo = { + max: nop, + lengthCalculator: nop, + length: nop, + itemCount: nop, + forEach: nop, + keys: nop, + values: nop, + reset: nop, + dump: nop, + dumpLru: nop, + set: set, // -- + has: nop, + get: get, // -- + complete: complete, // -- + count: count, // -- + peek: nop, + pop: nop, + del: nop, + constructor: constructor // -- + }; + + worker.on('message', function(msg) { + + // some basic performace stats + // if ( !( ts ) ) { ts = ( new Date( ) ).valueOf(); } + // if ( !( ( ++messages ) % 1000 ) ) console.log( messages, messages / ( ( new Date( ) ).valueOf() - ts ) ); + + if (msg.source !== 'lru-cache-cluster') return; - send('constructorResponse', {}); - } - }; + lru = caches[ msg.namespace ]; + ( f = msg.cmd ) && ( f = switcheroo[ f ] ) && f( msg.arguments, msg ); + f = undefined; - msg.cmd && switcheroo[msg.cmd] && switcheroo[msg.cmd](msg.arguments); }); }); } diff --git a/test/clustered.js b/test/clustered.js index 60b24f8..ca87d8e 100644 --- a/test/clustered.js +++ b/test/clustered.js @@ -8,3 +8,8 @@ cluster.setupMaster({ }); cluster.fork(); +cluster.fork(); +// cluster.fork(); +// cluster.fork(); +// cluster.fork(); +// cluster.fork(); diff --git a/test/worker.js b/test/worker.js index c2835e8..4087bb4 100644 --- a/test/worker.js +++ b/test/worker.js @@ -42,60 +42,99 @@ setTimeout(function(){ */ var test = require('tap').test; -test("basic", function (t) { - var cache = new LRU({max: 10}) - cache.set("key", "value") - - cache.get('key', function(value) { - t.equal(value, 'value'); - }); - - cache.get('nada', function(value) { - t.equal(value, undefined); - }); - - setTimeout(function(){ t.end(); }, 1000); -}) - -test("least recently set", function (t) { - var cache = new LRU(2) - cache.set("a", "A") - cache.set("b", "B") - cache.set("c", "C") - - cache.get('c', function(value) { - t.equal(value, 'C'); - }); - - cache.get('b', function(value) { - t.equal(value, 'B'); - }); - - cache.get('a', function(value) { - t.equal(value, undefined); - }); +// test("basic", function (t) { +// var cache = new LRU({max: 10}) +// cache.set("key", "value") + +// cache.get('key', function(value) { +// t.equal(value, 'value'); +// }); + +// cache.get('nada', function(value) { +// t.equal(value, undefined); +// }); + +// setTimeout(function(){ t.end(); }, 1000); +// }) + +// test("least recently set", function (t) { +// var cache = new LRU(2) +// cache.set("a", "A") +// cache.set("b", "B") +// cache.set("c", "C") + +// cache.get('c', function(value) { +// t.equal(value, 'C'); +// }); + +// cache.get('b', function(value) { +// t.equal(value, 'B'); +// }); + +// cache.get('a', function(value) { +// t.equal(value, undefined); +// }); + +// setTimeout(function(){ t.end(); }, 1000); +// }) + +// test("lru recently gotten", function (t) { +// var cache = new LRU(2) +// cache.set("a", "A") +// cache.set("b", "B") +// cache.get("a") +// cache.set("c", "C") + +// cache.get('c', function(value) { +// t.equal(value, 'C'); +// }); + +// cache.get('b', function(value) { +// t.equal(value, undefined); +// }); + +// cache.get('a', function(value) { +// t.equal(value, 'A'); +// }); + +// setTimeout(function(){ t.end(); }, 1000); +// }) + +var cluster = require( "cluster" ); +var cache = new LRU( 1000 ); +test("lru complete test", function (t) { + + var pieces = 12, items = 10000, workers = 2; + for( var i = cluster.worker.workerID - 1; i < pieces; i += workers ) { + for ( var j = 0, l = items; j < l; j++ ) { + cache.complete( ( "0000" + j.toString( 16 ) ).slice( -4 ), i, pieces, String( i ) + " ", function( data ) { + if ( !data ) return; + cache.count( "++++", 1 ); + } ); + } + } - setTimeout(function(){ t.end(); }, 1000); -}) + setTimeout( function() { + cache.count( "++++", 0, function( value ) { + console.log( value ); + t.equal( value, items ); } ); }, 10000 ); -test("lru recently gotten", function (t) { - var cache = new LRU(2) - cache.set("a", "A") - cache.set("b", "B") - cache.get("a") - cache.set("c", "C") + // cache.set("a", "A") + // cache.set("b", "B") + // cache.get("a") + // cache.set("c", "C") - cache.get('c', function(value) { - t.equal(value, 'C'); - }); + // cache.get('c', function(value) { + // t.equal(value, 'C'); + // }); - cache.get('b', function(value) { - t.equal(value, undefined); - }); + // cache.get('b', function(value) { + // t.equal(value, undefined); + // }); - cache.get('a', function(value) { - t.equal(value, 'A'); - }); + // cache.get('a', function(value) { + // t.equal(value, 'A'); + // }); - setTimeout(function(){ t.end(); }, 1000); + setTimeout(function(){ t.end(); }, 10000); }) From 984e82ae94c62385730b94bc1b86696f09d216fb Mon Sep 17 00:00:00 2001 From: Krassi Date: Fri, 12 Jun 2015 12:53:05 -0400 Subject: [PATCH 2/6] incrementing the revision version --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index d1c56bc..9667982 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "lru-cache-cluster", "description": "Cluster aware LRU cache. Master process maintains cache and uses worker messaging to deliver items machine-local.", - "version": "1.0.0", + "version": "1.0.1", "main": "lib/lru-cache-cluster.js", "devDependencies": { "tap": "~0.4.8" From f0bc7f45797b3918e8fd5b21bc935a7b29d3ca2d Mon Sep 17 00:00:00 2001 From: Krassi Date: Fri, 12 Jun 2015 16:59:35 -0400 Subject: [PATCH 3/6] fix bindings in the non-cluster case --- lib/LRUCacheProxy.js | 17 ++++-- lib/lru-cache-cluster.js | 22 ++----- lib/lru-cache-extensions.js | 35 +++++++++++ test/clustered.js | 4 -- test/worker.js | 117 +++++++++++++++--------------------- 5 files changed, 101 insertions(+), 94 deletions(-) create mode 100644 lib/lru-cache-extensions.js diff --git a/lib/LRUCacheProxy.js b/lib/LRUCacheProxy.js index 8e564bc..8ee3103 100644 --- a/lib/LRUCacheProxy.js +++ b/lib/LRUCacheProxy.js @@ -1,4 +1,4 @@ -var LRUCache = require('lru-cache'); +var LRUCache = require('./lru-cache-extensions'); var cluster = require('cluster'); var uuid = require('node-uuid'); var pending = {}; @@ -24,12 +24,19 @@ function send(namespace, cmd) { var LRUCacheProxy = function LRUCacheProxy(options) { if (!(this instanceof LRUCacheProxy)) { if(cluster.isMaster) { + var lru = new LRUCache(options); - var lruGet = LRUCache.prototype.get; - lru.get = function(key, callback) { - callback(lruGet.apply(lru, arguments)); - }; + var methodCtor = function( baseMethod ) { + return function( ) { + var cb; + if ( typeof ( cb = arguments[ arguments.length - 1 ] ) !== "function" ) { + cb = undefined; } + var value = baseMethod.apply( this, arguments ); + cb && cb( value ); }; } + + for( var p in { "get": 1, "count": 1, "complete": 1} ) { + lru[ p ] = methodCtor( LRUCache.prototype[ p ] ); } return lru; } diff --git a/lib/lru-cache-cluster.js b/lib/lru-cache-cluster.js index b83a8e9..9a3d752 100644 --- a/lib/lru-cache-cluster.js +++ b/lib/lru-cache-cluster.js @@ -22,25 +22,11 @@ if (cluster.isMaster) { var value = lru.get(args[0]); send( 'getResponse', { value: value }, msg ); }; var count = function( args, msg ) { - var key = args[ 0 ], inc = args[ 1 ]; - var it = lru.get( key ); - if ( isNaN ( it ) ) { - it = 0; } - it += inc; lru.set( key, it ); - send( 'getCount', { value: it }, msg ); }; + var value = lru.count(args[0],args[1]); + send( 'getCount', { value: value }, msg ); }; var complete = function( args, msg ) { - var key = args[ 0 ], seq = args[ 1 ], total = args[ 2 ], partialData = args[ 3 ]; - var set = false, it = lru.get( key ) || ( ( set = true ) && { c: [], w: 0 } ); - ++it.w; it.c[ seq ] = partialData; - // console.log( args, it ); - if ( it.w === total ) { - if ( !set ) { - lru.del( key ); } - send( 'getComplete', { value: it.c.join( "" ) }, msg ); } - else if ( set ) { - // console.log( "setting " ); - lru.set( key, it ); - send( 'getComplete', { value: "" }, msg ); } }; + var value = lru.complete( args[ 0 ], args[ 1 ], args[ 2 ], args[ 3 ] ); + send( 'getComplete', { value: value }, msg ); }; var constructor = function( args, msg ) { var ns = msg.namespace; if ( !( lru = caches[ ns ] ) ) { diff --git a/lib/lru-cache-extensions.js b/lib/lru-cache-extensions.js new file mode 100644 index 0000000..2ce9409 --- /dev/null +++ b/lib/lru-cache-extensions.js @@ -0,0 +1,35 @@ +var LRUCache = require('lru-cache'); + +LRUCache.prototype.count = function(key, inc){ + var lru = this; + var value = lru.get( key ); + var it = lru.get( key ); + if ( isNaN ( it ) ) { + it = 0; + } + it += inc; lru.set( key, it ); + return it; +}; + +LRUCache.prototype.complete = function( key, seq, total, partialData ) { + + var lru = this; + var set = false; + var it = lru.get( key ) || ( ( set = true ) && { c: [], w: 0 } ); + + ++it.w; it.c[ seq ] = partialData; + + if ( it.w === total ) { + if ( !set ) { + lru.del( key ); } + it = it.c.join( "" ); + } + else if ( set ) { + lru.set( key, it ); + it = ""; + } + + return it; +}; + +module.exports = LRUCache; diff --git a/test/clustered.js b/test/clustered.js index ca87d8e..860cca8 100644 --- a/test/clustered.js +++ b/test/clustered.js @@ -9,7 +9,3 @@ cluster.setupMaster({ cluster.fork(); cluster.fork(); -// cluster.fork(); -// cluster.fork(); -// cluster.fork(); -// cluster.fork(); diff --git a/test/worker.js b/test/worker.js index 4087bb4..05d669c 100644 --- a/test/worker.js +++ b/test/worker.js @@ -40,101 +40,84 @@ setTimeout(function(){ }, 250); }, 1000); */ +var cluster = require( "cluster" ); var test = require('tap').test; -// test("basic", function (t) { -// var cache = new LRU({max: 10}) -// cache.set("key", "value") +test("basic", function (t) { + var cache = LRU( { max: 10, namespace: "basic " + ( ( cluster.worker || { } ).workerID || 0 ) } ); + cache.set("key", "value") -// cache.get('key', function(value) { -// t.equal(value, 'value'); -// }); + cache.get('key', function(value) { + t.equal(value, 'value'); + }); -// cache.get('nada', function(value) { -// t.equal(value, undefined); -// }); + cache.get('nada', function(value) { + t.equal(value, undefined); + }); -// setTimeout(function(){ t.end(); }, 1000); -// }) + setTimeout(function(){ t.end(); }, 1000); +}) -// test("least recently set", function (t) { -// var cache = new LRU(2) -// cache.set("a", "A") -// cache.set("b", "B") -// cache.set("c", "C") +test("least recently set", function (t) { + var cache = LRU( { max: 2, namespace: "least recently set " + ( ( cluster.worker || { } ).workerID || 0 ) } ) + cache.set("a", "A") + cache.set("b", "B") + cache.set("c", "C") -// cache.get('c', function(value) { -// t.equal(value, 'C'); -// }); + cache.get('c', function(value) { + t.equal(value, 'C'); + }); -// cache.get('b', function(value) { -// t.equal(value, 'B'); -// }); + cache.get('b', function(value) { + t.equal(value, 'B'); + }); -// cache.get('a', function(value) { -// t.equal(value, undefined); -// }); + cache.get('a', function(value) { + t.equal(value, undefined); + }); -// setTimeout(function(){ t.end(); }, 1000); -// }) + setTimeout(function(){ t.end(); }, 1000); +}) -// test("lru recently gotten", function (t) { -// var cache = new LRU(2) -// cache.set("a", "A") -// cache.set("b", "B") -// cache.get("a") -// cache.set("c", "C") +test("lru recently gotten", function (t) { + var cache = LRU( { max: 2, namespace: "lru recently gotten " + ( ( cluster.worker || { } ).workerID || 0 ) } ) + cache.set("a", "A") + cache.set("b", "B") + cache.get("a") + cache.set("c", "C") -// cache.get('c', function(value) { -// t.equal(value, 'C'); -// }); + cache.get('c', function(value) { + t.equal(value, 'C'); + }); -// cache.get('b', function(value) { -// t.equal(value, undefined); -// }); + cache.get('b', function(value) { + t.equal(value, undefined); + }); -// cache.get('a', function(value) { -// t.equal(value, 'A'); -// }); + cache.get('a', function(value) { + t.equal(value, 'A'); + }); -// setTimeout(function(){ t.end(); }, 1000); -// }) + setTimeout(function(){ t.end(); }, 1000); +}) -var cluster = require( "cluster" ); -var cache = new LRU( 1000 ); test("lru complete test", function (t) { + var cache = LRU( { max: 1000, namespace: "complete" } ); + var pieces = 12, items = 10000, workers = 2; - for( var i = cluster.worker.workerID - 1; i < pieces; i += workers ) { + for( var i = ( cluster.worker || {} ).workerID - 1; i < pieces; i += workers ) { for ( var j = 0, l = items; j < l; j++ ) { cache.complete( ( "0000" + j.toString( 16 ) ).slice( -4 ), i, pieces, String( i ) + " ", function( data ) { if ( !data ) return; - cache.count( "++++", 1 ); + cache.count( "++++", 1 ); } ); } } setTimeout( function() { cache.count( "++++", 0, function( value ) { - console.log( value ); t.equal( value, items ); } ); }, 10000 ); - // cache.set("a", "A") - // cache.set("b", "B") - // cache.get("a") - // cache.set("c", "C") - - // cache.get('c', function(value) { - // t.equal(value, 'C'); - // }); - - // cache.get('b', function(value) { - // t.equal(value, undefined); - // }); - - // cache.get('a', function(value) { - // t.equal(value, 'A'); - // }); - - setTimeout(function(){ t.end(); }, 10000); + setTimeout(function(){ t.end(); }, 5000); }) From 77302853a2926cc9cbdf2c790413d202b97a6610 Mon Sep 17 00:00:00 2001 From: Krassi Date: Fri, 12 Jun 2015 17:27:39 -0400 Subject: [PATCH 4/6] fix: forgotten return value in the master case. --- lib/LRUCacheProxy.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/LRUCacheProxy.js b/lib/LRUCacheProxy.js index 8ee3103..0c87f3f 100644 --- a/lib/LRUCacheProxy.js +++ b/lib/LRUCacheProxy.js @@ -33,7 +33,8 @@ var LRUCacheProxy = function LRUCacheProxy(options) { if ( typeof ( cb = arguments[ arguments.length - 1 ] ) !== "function" ) { cb = undefined; } var value = baseMethod.apply( this, arguments ); - cb && cb( value ); }; } + cb && cb( value ); + return value; }; } for( var p in { "get": 1, "count": 1, "complete": 1} ) { lru[ p ] = methodCtor( LRUCache.prototype[ p ] ); } From dced3eac340cdd63395e6aca48f0a4bd8beec47a Mon Sep 17 00:00:00 2001 From: Krassi Date: Fri, 12 Jun 2015 18:33:24 -0400 Subject: [PATCH 5/6] fix: complete may return internal object instead of empty string --- lib/lru-cache-extensions.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/lru-cache-extensions.js b/lib/lru-cache-extensions.js index 2ce9409..0b5b519 100644 --- a/lib/lru-cache-extensions.js +++ b/lib/lru-cache-extensions.js @@ -28,6 +28,9 @@ LRUCache.prototype.complete = function( key, seq, total, partialData ) { lru.set( key, it ); it = ""; } + else { + it = ""; + } return it; }; From 9376359747e16c456e7cbada33a3d63a62ee3daf Mon Sep 17 00:00:00 2001 From: Krassi Date: Wed, 17 Jun 2015 20:28:15 +0000 Subject: [PATCH 6/6] added performace coutner --- test/clustered.js | 4 ++-- test/worker.js | 14 ++++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/test/clustered.js b/test/clustered.js index 860cca8..1bafd80 100644 --- a/test/clustered.js +++ b/test/clustered.js @@ -7,5 +7,5 @@ cluster.setupMaster({ silent : false }); -cluster.fork(); -cluster.fork(); +for( var i = 0, l = require("os").cpus().length >> 2; i < l; i++ ) { + cluster.fork(); } diff --git a/test/worker.js b/test/worker.js index 05d669c..847dff6 100644 --- a/test/worker.js +++ b/test/worker.js @@ -40,9 +40,11 @@ setTimeout(function(){ }, 250); }, 1000); */ + var cluster = require( "cluster" ); var test = require('tap').test; +/* test("basic", function (t) { var cache = LRU( { max: 10, namespace: "basic " + ( ( cluster.worker || { } ).workerID || 0 ) } ); cache.set("key", "value") @@ -100,16 +102,18 @@ test("lru recently gotten", function (t) { setTimeout(function(){ t.end(); }, 1000); }) - +*/ test("lru complete test", function (t) { - var cache = LRU( { max: 1000, namespace: "complete" } ); + var dt = (new Date()).valueOf(), dtMax = dt + 1; + var cache = LRU( { max: 10000000, namespace: "complete" } ); - var pieces = 12, items = 10000, workers = 2; + var pieces = 12, items = 10000, workers = require("os").cpus().length >> 2; for( var i = ( cluster.worker || {} ).workerID - 1; i < pieces; i += workers ) { for ( var j = 0, l = items; j < l; j++ ) { cache.complete( ( "0000" + j.toString( 16 ) ).slice( -4 ), i, pieces, String( i ) + " ", function( data ) { - if ( !data ) return; + dtMax = (new Date()).valueOf(); + if ( !data ) return; cache.count( "++++", 1 ); } ); } @@ -117,6 +121,8 @@ test("lru complete test", function (t) { setTimeout( function() { cache.count( "++++", 0, function( value ) { + console.log( ( dtMax - dt ) ); + console.log( Math.floor( pieces /* workers */ ) * items / ( dtMax - dt ) * 1000 ); t.equal( value, items ); } ); }, 10000 ); setTimeout(function(){ t.end(); }, 5000);