-
Notifications
You must be signed in to change notification settings - Fork 3
Description
I've been trying to use the SPARQLStore in a multiprocessing setup, i.e. I fork more processes, they get their own graph/store, and make some queries.
They all use this package underneath.
My first bug was this: RDFLib/sparqlwrapper#84 (which made the sub-process hang)
After that was fixed it got much worse :)
I don't actually understand how, but somehow, when keepalive is installed, multiple processes will get mixed up in what they get back from their http requests.
I.e. the main process forks into process A and process B, they each make a SPARQL query. And then, sometimes, process A will get back the result meant for process B! And sometimes the xml document that is being read will be changed halfway through parsing it, and you get weird XML parsing errors.
I can only assume the different processes somehow share a buffer for reading stuff from the network, but I really don't understand how this happens. Normally, setting up shared memory between forks is lots of hassle :)
I condensed the problem to the program below. My db has a bunch of resources, with a single ID property, which is the same as the id in their URL - so I can fork one process per resource, and check that I get the right ID back.
I don't know how to even start fixing this - and it's not very urgent for me (I've replaced the code with requests), it's mainly as a heads up for other people using this!
import time
from multiprocessing import Pool
from rdflib import ConjunctiveGraph, URIRef, Namespace, RDF
BASE = 'http://api.nifs.no/'
NIFS_MATCHES = Namespace(BASE+'matches/')
NIFS = Namespace('http://api.nifs.no/schema/')
def test(m):
g = ConjunctiveGraph('SPARQLStore')
g.open('http://localhost:3030/ntb/query')
i = int(m[m.rindex('/')+1:])
qi = g.value(m, NIFS.id).value
assert i == qi, '%s != %s'%(i,qi)
if __name__ == '__main__':
import coloredlogs
coloredlogs.install()
g = ConjunctiveGraph('SPARQLStore')
g.open('http://localhost:3030/ntb/query')
matches = list(g[:RDF.type:NIFS.Match])
print len(matches), 'matches'
p = Pool(10)
p.map(test, matches)