Skip to content

keepalive does not work with multiprocessing #7

@gromgull

Description

@gromgull

I've been trying to use the SPARQLStore in a multiprocessing setup, i.e. I fork more processes, they get their own graph/store, and make some queries.

They all use this package underneath.

My first bug was this: RDFLib/sparqlwrapper#84 (which made the sub-process hang)

After that was fixed it got much worse :)

I don't actually understand how, but somehow, when keepalive is installed, multiple processes will get mixed up in what they get back from their http requests.

I.e. the main process forks into process A and process B, they each make a SPARQL query. And then, sometimes, process A will get back the result meant for process B! And sometimes the xml document that is being read will be changed halfway through parsing it, and you get weird XML parsing errors.

I can only assume the different processes somehow share a buffer for reading stuff from the network, but I really don't understand how this happens. Normally, setting up shared memory between forks is lots of hassle :)

I condensed the problem to the program below. My db has a bunch of resources, with a single ID property, which is the same as the id in their URL - so I can fork one process per resource, and check that I get the right ID back.

I don't know how to even start fixing this - and it's not very urgent for me (I've replaced the code with requests), it's mainly as a heads up for other people using this!

import time

from multiprocessing import Pool

from rdflib import ConjunctiveGraph, URIRef, Namespace, RDF

BASE = 'http://api.nifs.no/'
NIFS_MATCHES = Namespace(BASE+'matches/')
NIFS = Namespace('http://api.nifs.no/schema/')


def test(m):

    g = ConjunctiveGraph('SPARQLStore')
    g.open('http://localhost:3030/ntb/query')

    i = int(m[m.rindex('/')+1:])

    qi = g.value(m, NIFS.id).value

    assert i == qi, '%s != %s'%(i,qi)


if __name__ == '__main__':

    import coloredlogs
    coloredlogs.install()

    g = ConjunctiveGraph('SPARQLStore')
    g.open('http://localhost:3030/ntb/query')
    matches = list(g[:RDF.type:NIFS.Match])

    print len(matches), 'matches'

    p = Pool(10)


    p.map(test, matches)

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions