diff --git a/continuous/README.md b/continuous/README.md new file mode 100644 index 0000000..db9c0e1 --- /dev/null +++ b/continuous/README.md @@ -0,0 +1,21 @@ +This will ingest data into a Mongo database in a form that will be able to be used by the Entity Alignment app (with some modifications still required). + +So far, this can ingest Instagram as pulled from elasticsearch (such as that supplied with the July data), and Twitter in either GNIP format or the Twitter Public JSON format (as supplied with the July data). It will not work on the original Instagram data that was pulled from parquet files and saved as CSV. It should be easy to extend it to that format (though it has less data for comments and likes, so won't be as rich). + +Links are create from mentions, comments (Instagram only), likes (Instagram only), and replies (Twitter only). Because of the comments and likes, Instagram refers to a vastly larger network of entities than Twitter. Instagram also has fewer messages per entity. The Instagram network is sparser than the Twitter network. + +Ingest and metric computation is fully parallelizable - multiple ingests can run concurrently. Each metric can be computed in a separate process, and the metric computation can be divided into multiple processes as well. + +Four metrics have been implemented so far. These are: + +1hop - This is quick to compute, as we maintain a list of neighbors within the entity records. If the entity is updated, it will be recomputed. The stored value is the number of 1-hop neighbors EXCLUSIVE of the root entity itself. + +2hop - This requires visiting all the 1-hop neighbors. Since we don't know which of those neighbors has been updated, any entity update requires that we recheck all 2-hop values. We store both the number of 2-hop-only neighbors and the number of 2-hop and 1-hop neighbors. These counts never include the root entity itself. + +substring - This does a longest substring match between ALL entities, keeping the top-k for each service and subset. When an entity is added or updated, only the modified entities need to be rechecked to maintain the top-k lists. This is still slow on a large collection. Top-k lists are maintained separately for user names, full names, and the combination of the two. For entities with multiple names, the highest score is kept between all combinations. When an entity is added or updated, the metric only needs to be computed for the new entity with respect to other entities. + +levenshtein - This computes the Levenshtein metric between ALL entities, just like substring. If available, the python-Levenshtein library is used, as it is implemented in C and is faster than the pure-python implementation. As for substring, top-k lists are maintained separately for user names, full names, and the combination of the two, and when an entity is added, the metric can be incrementally updated. + +The substring and levenshtein metrics would be helped by more CPU resources, but would be helped even more by excluding entities from the candidate match. For instance, if the substring match is 0 (or below some other threshold), we could skip computing the levenshtein metric. + +See `ingest.py --help` for usage. diff --git a/continuous/conf.json b/continuous/conf.json new file mode 100644 index 0000000..c4bd644 --- /dev/null +++ b/continuous/conf.json @@ -0,0 +1,30 @@ +{ + "db": { + "entity": { + "dbUri": "mongodb://127.0.0.1:27017/syria", + "collection": "entity" + }, + "link": { + "dbUri": "mongodb://127.0.0.1:27017/syria", + "collection": "link" + }, + "metrics": { + "dbUri": "mongodb://127.0.0.1:27017/syria", + "collection": "metrics" + }, + "msg": { + "dbUri": "mongodb://127.0.0.1:27017/syria", + "collection": "msg" + } + }, + "topk": { + "k": 25, + "extra": 25 + }, + "metrics": { + "1hop": true, + "2hop": true, + "substring": true, + "levenshtein": true + } +} diff --git a/continuous/ingest.py b/continuous/ingest.py new file mode 100755 index 0000000..9a09a83 --- /dev/null +++ b/continuous/ingest.py @@ -0,0 +1,1236 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# This includes general utils for the continual ingest process. + +import argparse +import bz2 +import calendar +import glob +import HTMLParser +import json +import math +import os +import pprint +import pymongo +import re +import sys +import time +import xml.sax.saxutils +from bson.objectid import ObjectId + +import metric + + +TwitterMentionPattern = re.compile('@[a-zA-Z0-9_]{1,15}') +InstagramMentionPattern = re.compile( + '@[a-zA-Z0-9_][a-zA-Z0-9_.]{0,28}[a-zA-Z0-9_]?') + +HTMLDecoder = HTMLParser.HTMLParser() +# Used for doing timing tests +Timers = {'lastreport': time.time()} + + +# -------- General Functions -------- + +def bufferedQuery(buffer, key, coll, query, fields=None, maxBuffer=2000000, + verbose=0, normalizeNames=False): + """ + Buffer some queries we expect to be repeated. When using a buffered query, + the results are a list instead of a cursor. + + :param buffer: a dictionary to store the buffered parameters and results. + :param key: the key to use for the query buffering. + :param coll: the collection to query. + :param query: the query. + :param fields: the fields to return from the query. + :param maxBuffer: max count to buffer. + :param verbose: log verbosity. + :param normalizeNames: if True, normalize names. + :returns: either a cursor or a list. + """ + if (key in buffer and coll == buffer[key]['coll'] and + query == buffer[key]['query'] and + fields == buffer[key]['fields'] and buffer[key]['last']): + return buffer[key]['last'] + cursor = coll.find(query, fields, timeout=False) + if cursor.count() <= maxBuffer: + if verbose >= 2: + print 'Buffered', key, query, fields + newlist = [] + allmsg = [] + for row in cursor: + if len(row.get('msgs', [])): + msgs = [] + for msg in row['msgs']: + if msg not in msgs: + if msg not in allmsg: + allmsg.append(msg) + msg = allmsg[allmsg.index(msg)] + msgs.append(msg) + row['msgs'] = msgs + if normalizeNames: + metric.normalizeNames(row) + del row['name'] + del row['fullname'] + row = row.copy() # can reduce memory footprint + newlist.append(row) + if verbose >= 2 and not len(newlist) % 10000: + print '%d' % len(newlist), len(allmsg) + buffer[key] = { + 'coll': coll, 'query': query, 'fields': fields, 'last': newlist + } + cursor = newlist + return cursor + + +def castObjectId(id): + """ + Make sure an obejct is an ObjectId or None. + + :param id: the object to check. This may be a string, ObjectId, or a + dictionary with an _id field. + :returns: an ObjectId or None. + """ + if isinstance(id, dict): + id = id.get('_id') + if id is None or type(id) is ObjectId: + return id + return ObjectId(id) + + +def calculateMetrics(state, entities=None, nonDbEntities=None, callback=None): + """ + Calculate metrics for dirty entities. + + :param state: the state dictionary with the config values. + :param entities: a list of entities to calculate the metrics for. If None, + calculate the metrics for all entities. + :param nonDbEntities: a list of entity-like records that didn't come from + our database. If this is not None, the entities are + not used. + :param callback: a function to call after calculating a metric. + callback(state, entity, metClass, metric) is called. + """ + metricDict = state['config'].get('metrics', {}) + if state['args']['metric']: + metricDict = dict.fromkeys(state['args']['metric']) + for met in metricDict: + metric.loadMetric(met, metricDict[met]) + if entities is not None: + entities = [castObjectId(entity) for entity in entities] + entityColl = getDb('entity', state) + linkColl = getDb('link', state) + latestEntity = entityColl.find(timeout=False).sort([ + ('date_updated', pymongo.DESCENDING)]).limit(-1).next() + if not latestEntity: + # If there are no entities, we have nothing to do + return + latestEntity = latestEntity['date_updated'] + latestLink = linkColl.find(timeout=False).sort([ + ('date_updated', pymongo.DESCENDING)]).limit(-1).next() + latestLink = 0 if not latestLink else latestLink['date_updated'] + idQuery = {} if entities is None else {'_id': {'$in': entities}} + if nonDbEntities is not None: + cursor = nonDbEntities + numEntities = len(nonDbEntities) + else: + cursor = entityColl.find(idQuery, timeout=False).sort( + [('_id', pymongo.ASCENDING if not + state['args'].get('recent', False) else pymongo.DESCENDING)]) + numEntities = cursor.count() + if state['args'].get('offset'): + cursor.skip(state['args']['offset']) + if state['args'].get('limit'): + cursor.limit(state['args']['limit']) + calculateMetricsLoop(state, cursor, nonDbEntities, callback, numEntities, + latestEntity, latestLink) + + +def calculateMetricsLoop(state, cursor, nonDbEntities, callback, numEntities, + latestEntity, latestLink): + """ + Loop through the entities and metrics and perform the actual calculations. + + :param state: the state dictionary with the config values. + :param cursor: a Mongo cursor or list of entities to calculate metrics for. + + :param nonDbEntities: a list of entity-like records that didn't come from + our database. + :param callback: a function to call after calculating a metric. + callback(state, entity, metClass, metric) is called. + :param numEntities: the number of entities that we are computing metrics + for. + :param latestEntity: the epoch of the most recently changed entity record. + :param latestLink: the epoch of the most recently changed link record. + """ + metColl = getDb('metrics', state) + entityColl = getDb('entity', state) + linkColl = getDb('link', state) + count = numCalc = 0 + starttime = lastreport = time.time() + queryBuffer = {} + for entity in cursor: + for met in metric.LoadedMetrics: + metClass = metric.LoadedMetrics[met] + if nonDbEntities is None: + date = entity['date_updated'] + if metClass.onEntities or not metClass.onEntitiesOnlyNew: + date = max(date, latestEntity) + if metClass.onLinks or not metClass.onLinksOnlyNew: + date = max(date, latestLink) + oldMetric = metColl.find_one({ + 'entity': castObjectId(entity), + 'metric': metClass.name + }, timeout=False) + # Already up to date + if oldMetric and oldMetric['date_updated'] >= date: + continue + else: + oldMetric = None + metricDoc = calculateOneMetric( + entityColl, linkColl, metColl, metClass, entity, oldMetric, + state, starttime, queryBuffer) + if nonDbEntities is not None: + if 'metrics' not in entity: + entity['metrics'] = {} + entity['metrics'][metClass.name] = metricDoc + numCalc += 1 + if callback: + callback(state, entity, metClass, metricDoc) + count += 1 + if state['args']['verbose'] >= 1: + curtime = time.time() + if curtime - lastreport > 10.0 / state['args']['verbose']: + print '%d %d %d %5.3f' % (count, numCalc, numEntities, + curtime - starttime) + lastreport = curtime + + +def calculateOneMetric(entityColl, linkColl, metColl, metClass, entity, + metricDoc, state, updateTime, queryBuffer=None): + """ + Calculate the value of a single metric for a single entity. + + :param entityColl: the database entity collection. + :param linkColl: the database link collection. + :param metColl: the database metrics collection. + :param metClass: the instance of the metrics class used for computation. + :param entity: the entity for which the metric is being computed. + :param metricDoc: if the metric was previously computed, that previous + record. May be None to force a completely fresh + computation. + :param state: the state dictionary with the config values. + :param updateTime: the time to mark the metric as updated. + :param queryBuffer: if set, a buffer that can be used to skip some repeated + queries. + """ + entityId = castObjectId(entity) + if metricDoc is None: + metricDoc = { + 'metric': metClass.name, + 'entity': entityId, + 'date_updated': 0 + } + work = metricDoc.get('work', {}) + kwargs = { + 'work': work, + 'entityColl': entityColl, + 'linkColl': linkColl, + 'old': metricDoc, + 'state': state, + } + # The time has to be before we do the computation, as data could be added + # during the comptation. + metricDoc['date_updated'] = updateTime or time.time() + refresh = (metClass.saveWork and work == {}) + if metClass.onEntities: + query = ({} if not metClass.onEntitiesOnlyNew or refresh else + {'date_updated': {'$gte': metricDoc['date_updated']}}) + fields = getattr(metClass, 'entityFields', None) + bufkey = 'metric' + ( + 'norm' if getattr(metClass, 'normalizeNames') else '') + res = bufferedQuery( + queryBuffer, bufkey, entityColl, query, fields, + verbose=state['args']['verbose'], + normalizeNames=getattr(metClass, 'normalizeNames')) + if ((isinstance(res, list) and len(res)) or + (not isinstance(res, list) and res.count())): + metClass.calcEntityPrep(entity, **kwargs) + for gb in res: + if castObjectId(gb) != entityId: + if (not isinstance(res, list) and + getattr(metClass, 'normalizeNames')): + metric.normalizeNames(gb) + metClass.calcEntity(entity, gb, **kwargs) + if metClass.onLinks: + query = ({} if not metClass.onLinksOnlyNew or refresh else + {'date_updated': {'$gte': metricDoc['date_updated']}}) + query['ga'] = entityId + res = linkColl.find(query, timeout=False) + if res.count(): + for link in res: + gb = entityColl.find_one(castObjectId(link['gb']), + timeout=False) + metClass.calcLink(entity, gb, link, **kwargs) + value = metClass.calc(entity, **kwargs) + if metClass.saveWork: + metricDoc['work'] = work + metricDoc['value'] = value + if entityId is not None: + metColl.save(metricDoc) + return metricDoc + + +def convertInstagramESToMsg(inst, subset='unknown'): + """ + Convert an instragam Elasticsearch record to our message format. This + normalizes the format so that other routines can handle the data + generically. + + :param inst: the instagram record. + :param subset: a subset name to attach to the record. This is probably the + Elasticsearch _type. + :returns: a message record or None for failed. + """ + msg = { + 'service': 'instagram', + 'subset': subset, + 'user_name': inst.get('user', {}).get('username', None), + 'user_fullname': inst.get('user', {}).get('full_name', None), + 'user_id': inst.get('user', {}).get('id', None), + 'msg_date': float(inst['created_time']), + 'msg_id': inst['link'].strip('/').rsplit('/', 1)[-1], + 'url': inst['link'], + } + if msg['user_fullname'] == '': + msg['user_fullname'] = None + if msg['user_name'] == '': + msg['user_name'] = None + if msg['user_name']: + msg['user_name'] = msg['user_name'].lower() + if ('location' in inst and + inst['location'].get('latitude', None) is not None): + msg['latitude'] = inst['location']['latitude'] + msg['longitude'] = inst['location']['longitude'] + if inst.get('caption', None): + msg['msg'] = inst['caption']['text'] + if '@' in msg['msg']: + msg['mentions'] = set(mention[1:].lower() for mention in + InstagramMentionPattern.findall(msg['msg'])) + if inst.get('comments', None) and inst['comments'].get('data', None): + msg['comments'] = {} + for comment in inst['comments']['data']: + if 'from' in comment and 'id' in comment['from']: + record = comment['from'] + msg['comments'][record['id']] = { + 'user_name': (record['username'] + if 'username' in record else None), + 'user_fullname': record.get('full_name', None), + } + if inst.get('likes', None) and inst['likes'].get('data', None): + msg['likes'] = {} + for record in inst['likes']['data']: + if 'id' in record: + msg['likes'][record['id']] = { + 'user_name': (record['username'] + if 'username' in record else None), + 'user_fullname': record.get('full_name', None), + } + return msg + + +def convertTwitterGNIPToMsg(gnip): + """ + Convert a Twitter GNIP record to our message format. This normalizes the + format so that other routines can handle the data generically. + + :param gnip: the twitter gnip record. + :returns: a message record or None for failed. + """ + if 'postedTime' not in gnip: + return None + msg = { + 'service': 'twitter', + 'subset': 'unknown', + 'user_name': gnip['actor'].get('preferredUsername', None), + 'user_fullname': gnip['actor'].get('displayName', None), + 'user_id': gnip['actor']['id'].split(':')[-1], + 'msg_date': int(calendar.timegm(time.strptime( + gnip['postedTime'], "%Y-%m-%dT%H:%M:%S.000Z"))), + 'msg_id': gnip['object']['id'].split(':')[-1], + 'msg': xml.sax.saxutils.unescape(gnip['body']), + } + if msg['user_name']: + msg['user_name'] = msg['user_name'].lower() + msg['url'] = 'http://twitter.com/%s/statuses/%s' % ( + msg['user_id'], msg['msg_id']) + if ('geo' in gnip and gnip['geo'] and 'coordinates' in gnip['geo'] and + len(gnip['geo']['coordinates']) >= 2): + # gnip using latitude, longitude for geo (but twitter used long, lat + # for coordinates) + msg['latitude'] = gnip['geo']['coordinates'][0] + msg['longitude'] = gnip['geo']['coordinates'][1] + if ('twitter_entities' in gnip and 'media' in gnip['twitter_entities'] and + len(gnip['twitter_entities']['media']) > 0 and + 'media_url_https' in gnip['twitter_entities']['media'][0]): + msg['image_url'] = gnip['twitter_entities']['media'][0][ + 'media_url_https'] + if ('instagram' in gnip['generator'].get('link', '') and 'gnip' in gnip and + 'urls' in gnip['gnip'] and len(gnip['gnip']['urls']) and + 'expanded_url' in gnip['gnip']['urls'][0] and + 'instagram.com/p/' in gnip['gnip']['urls'][0]['expanded_url']): + msg['source'] = { + 'instagram': gnip['gnip']['urls'][0]['expanded_url'].rstrip( + '/').rsplit('/')[-1] + } + if ('twitter_entities' in gnip and + 'user_metions' in gnip['twitter_entities']): + msg['mentions'] = {} + for mention in gnip['twitter_entities']['user_mentions']: + if mention.get('id_str', None): + msg['mentions'][mention['id_str']] = { + 'user_name': mention.get('screen_name', None), + 'user_fullname': mention.get('name', None), + } + if ('inReplyTo' in gnip and 'link' in gnip['inReplyTo'] and + '/statuses/' in gnip['inReplyTo']['link']): + msg['replies'] = [gnip['inReplyTo']['link'].split( + '/statuses/')[0].rsplit('/', 1)[-1]] + return msg + + +def convertTwitterJSONToMsg(tw): + """ + Convert a Twitter firehose JSON record to our message format. This + normalizes the format so that other routines can handle the data + generically. + + :param tw: the twitter record. + :returns: a message record or None for failed. + """ + if tw.get('user') is None: + return + msg = { + 'service': 'twitter', + 'subset': 'unknown', + 'user_name': tw['user']['screen_name'].lower(), + 'user_fullname': tw['user']['name'], + 'user_id': tw['user']['id_str'], + 'msg_date': int(calendar.timegm(time.strptime( + tw['created_at'][4:], "%b %d %H:%M:%S +0000 %Y"))), + 'msg_id': tw['id_str'], + 'msg': HTMLDecoder.unescape(tw['text']), + } + msg['url'] = 'http://twitter.com/%s/statuses/%s' % ( + msg['user_id'], msg['msg_id']) + if (tw.get('coordinates') and 'coordinates' in tw['coordinates'] and + len(tw['coordinates']['coordinates']) >= 2): + msg['latitude'] = tw['coordinates']['coordinates'][1] + msg['longitude'] = tw['coordinates']['coordinates'][0] + if ('entities' in tw and 'media' in tw['entities'] and + len(tw['entities']['media']) > 0 and + 'media_url_https' in tw['entities']['media'][0]): + msg['image_url'] = tw['entities']['media'][0]['media_url_https'] + if ('Instagram' in tw.get('source', '') and 'entities' in tw and + 'urls' in tw['entities'] and len(tw['entities']['urls']) > 0 and + 'expanded_url' in tw['entities']['urls'][0] and + 'instagram.com' in tw['entities']['urls'][0]['expanded_url']): + msg['source'] = { + 'instagram': tw['entities']['urls'][0]['expanded_url'].rstrip( + '/').rsplit('/')[-1] + } + if ('entities' in tw and 'user_mentions' in tw['entities'] and + len(tw['entities']['user_mentions']) > 0): + msg['mentions'] = {} + for mention in tw['entities']['user_mentions']: + if mention.get('id_str', None): + msg['mentions'][mention['id_str']] = { + 'user_name': mention.get('screen_name', None), + 'user_fullname': mention.get('name', None), + } + if tw.get('in_reply_to_user_id_str', None) is not None: + msg['replies'] = { + tw['in_reply_to_user_id_str']: { + 'user_name': tw.get('in_reply_to_screen_name', None), + } + } + return msg + + +def getDb(dbName, state): + """ + Check if a DB has been connected to. If not, connect to it and ensure that + the appropriate indices are present. + + :param dbName: the internal key name of the database. + :param state: the state dictionary with the config values and a place to + store the result. + :returns: the collection. + """ + coll = state.get('db', {}).get(dbName, {}).get('coll', None) + if coll: + return coll + if 'db' not in state: + state['db'] = {} + state['db'][dbName] = getDbConnection(**state['config']['db'][dbName]) + coll = state['db'][dbName]['coll'] + indices = { + 'entity': [ + [('user_id', pymongo.ASCENDING)], + [('name', pymongo.ASCENDING)], + [('date_updated', pymongo.ASCENDING)], + ], + 'link': [ + [('ga', pymongo.ASCENDING)], + [('gb', pymongo.ASCENDING)], + [('date_updated', pymongo.ASCENDING)], + ], + 'metrics': [ + [('entity', pymongo.ASCENDING)], + ], + 'msg': [ + [('msg_id', pymongo.ASCENDING)], + ], + } + for index in indices.get(dbName, []): + coll.create_index(index) + return coll + + +def getDbConnection(dbUri, **kwargs): + """ + Get a connection to a mongo DB. The adds a connection timeout. + + :param dbUri: the uri to connect to. Usually something like + mongodb://(host):27017/(db) + :param database: if specified, connect to thsi database. Otherwise, + use the default database. + :param collection: the default collection to connect to. + :returns: a dictionary with 'connection', 'database', and 'coll'. + """ + clientOptions = { + 'connectTimeoutMS': 15000, + } + result = { + 'connection': pymongo.MongoClient(dbUri, **clientOptions) + } + if kwargs.get('database', None): + result['database'] = result['connection'].get_database( + kwargs['database']) + else: + result['database'] = result['connection'].get_default_database() + if 'collection' in kwargs: + result['coll'] = result['database'][kwargs['collection']] + return result + + +def getEntityByName(state, entity): + """ + Given some known information which can include _id (our id), service, + user_id, name, and fullname, ensure that the specified entity is in the + database and the associated ObjectId of the entity or None. + + :param state: includes the database connection. + :param entity: a dictionary of _id, service, user_id, name, and fullname. + if _id is unspecified, service is required and at least one + of user_id or name. If the entity doesn't exist, a + neighbors key may be present to populate the new entity, + otherwise this key is ignored. + :returns: an entity document or None. + :returns: updated: True if the document was changed in any way. 'new' if + the entity was added. + """ + entityColl = getDb('entity', state) + if entity.get('_id', None): + id = castObjectId(entity['_id']) + return entityColl.find_one({'_id': id}, timeout=False), False + spec = {'service': entity['service']} + specUserId = {'service': entity['service']} + specName = {'service': entity['service']} + for key in ['name', 'fullname']: + if entity.get(key, None) is not None and entity[key].strip() != '': + spec[key] = entity[key] + else: + entity[key] = None + hasUserId = entity.get('user_id', None) is not None + if hasUserId: + spec['user_id'] = specUserId['user_id'] = entity['user_id'] + if entity['name'] is not None: + specName['name'] = entity['name'] + doc = entityColl.find_one(spec, timeout=False) + if doc: + # We have an entity that matches all of our information + return doc, False + doc = entityColl.find_one(specUserId if hasUserId else specName, + timeout=False) + if not doc and hasUserId and entity['name'] is not None: + doc = entityColl.find_one(specName, timeout=False) + return getEntityByNameAdd(state, doc, entity, specName, hasUserId) + + +def getEntityByNameAdd(state, doc, entity, specName, hasUserId): + """ + Add or update an entity for which we did not have complete information. + + :param state: includes the database connection. + :param doc: an existing document for the entity to be updated or None + if this is a new entity. + :param entity: a dictionary of _id, service, user_id, name, and fullname. + if _id is unspecified, service is required and at least one + of user_id or name. + :param specName: a mongo query to get an extant entity. + :param hasUserId: True if we know the user Id (for the service) of this + entity. + + :returns: an entity document or None. + :returns: updated: True if the document was changed in any way. 'new' if + the entity was added. + """ + entityColl = getDb('entity', state) + curtime = time.time() + if doc: + # We have this user id, but not all of its aliases. + if (entity['name'] is not None and hasUserId and + entity['name'] not in doc['name']): + knownName = entityColl.find_one(specName, timeout=False) + if knownName: + # Merge this with the main doc + mergeEntities(state, doc, knownName) + doc['date_updated'] = curtime + updated = True + else: + # We've never seen this entity, so add it. + doc = { + 'service': entity['service'], + 'name': [], + 'fullname': [], + 'date_added': curtime, + 'date_updated': curtime, + 'msgs': [], + 'neighbors': entity.get('neighbors', []), + } + updated = 'new' + if hasUserId: + doc['user_id'] = entity['user_id'] + # Update the names and full names. + for key in [key for key in ['name', 'fullname'] + if entity.get(key, None) is not None]: + if entity[key] not in doc[key]: + doc[key].append(entity[key]) + doc['_id'] = entityColl.save(doc) + return doc, updated + + +def ingestMessage(state, msg): + """ + Check if we have already ingested a message. If not ingest it. This + checks if the (service, user_id) is present in our database. If not, we + add it, possibly by converting (service, user_name) or + (service, user_fullname). Once the user is present, we check if this + msg_id is listed in their known messages. If it is, we are done. If not, + add it to the list and ensure that all referenced users are present, too. + Update appropriate graph edges for referenced users. + + :param state: includes the database connection. + :param msg: our standard message format. + :returns: True if ingested, False if already present, None if we cannot + ingest this sort of record. + """ + if not msg.get('service', None) or not msg.get('user_id', None): + pprint.pprint(msg) + sys.exit(0) + return None + curtime = time.time() + msgColl = getDb('msg', state) + # Assume if we have processed this message, then we have everything we care + # about in our database. This might not be true -- a message could get + # reposted with new information. + if msgColl.find_one({ + 'service': msg['service'], 'msg_id': msg['msg_id'], + }, {'_id': True}, limit=1, timeout=False): + return False + entityColl = getDb('entity', state) + entity, changed = getEntityByName(state, { + 'service': msg['service'], + 'user_id': msg['user_id'], + 'name': msg.get('user_name', None), + 'fullname': msg.get('user_fullname', None), + }) + + found = False + for knownMsg in entity['msgs']: + if (knownMsg['service'] == msg['service'] and + knownMsg['msg_id'] == msg['msg_id']): + if msg['subset'] not in knownMsg['subset']: + knownMsg['subset'].append(msg['subset']) + entity['date_updated'] = curtime + entityColl.save(entity) + found = True + break + if not found: + newmsg = { + 'service': msg['service'], + 'subset': [msg['subset']], + 'msg_id': msg['msg_id'], + 'date': msg['msg_date'] + } + for key in ('latitude', 'longitude', 'source'): + if msg.get(key, None) is not None: + newmsg[key] = msg[key] + entity['msgs'].append(newmsg) + entity['date_updated'] = curtime + # update neighbors and edges + ingestMessageEdges(state, entity, msg) + entityColl.save(entity) + # Mark descendants as dirty (for when we merge nodes) + # ##DWM:: + msgColl.save(msg) + if found and not changed: + return False + return True + + +def ingestMessageEdges(state, entity, msg): + """ + Update all of the edges associated with a message. Add any new neighbors + to the entity's neighbor list. + + :param state: includes the database connection. + :param entity: the entity document. Changed. + :param msg: our standard message format. + """ + entityId = castObjectId(entity) + entityColl = getDb('entity', state) + linkColl = getDb('link', state) + for (linktype, linkdir) in [('mentions', 'out'), ('likes', 'in'), + ('comments', 'in'), ('replies', 'out')]: + if linktype not in msg: + continue + links = msg[linktype] + if isinstance(links, dict): + links = [{ + 'service': entity['service'], + 'user_id': key, + 'name': (links[key]['user_name'].lower() + if links[key].get('user_name', None) is not None else + None), + 'fullname': links[key].get('user_fullname', None), + } for key in links if key != entity.get('user_id', None)] + else: + links = [{ + 'service': entity['service'], + 'name': key + } for key in links] + for link in links: + link['neighbors'] = [entityId] + linkEntity, linkChanged = getEntityByName(state, link) + linkId = castObjectId(linkEntity) + isNew = linkChanged == 'new' + # Don't link to ourselves + if linkId == entityId: + continue + if linkId not in entity['neighbors']: + entity['neighbors'].append(linkId) + if not isNew: + # The linked item is now a neighbor + updateResult = entityColl.update({ + '_id': linkId, + 'neighbors': {'$ne': entityId}, + }, { + '$set': {'date_updated': time.time()}, + '$addToSet': {'neighbors': entityId}, + }) + # If we added this link as a neighbor, then we know the edges + # are new edges and not increased weights to existing edges. + # Using nModified or n allows use of pymongo 2 or pymongo 3 + # (which is independent of the version of Mongo). + if updateResult.get('nModified', updateResult.get('n')): + isNew = True + # We are currently bidirectional on everything + addLink(linkColl, entityId, linkId, linktype, isNew=isNew, + bidir=True) + + +def addLink(linkColl, ga, gb, linktype=None, weight=1, isNew=False, + bidir=False): + """ + Add a link or increase its weight. + + :param linkColl: mongo collection for links. + :param ga: ga _id of the link. + :param gb: gb _id of the link. + :param linktype: named type of the link. + :param weight: weight to add to the link. If the link doesn't exist, this + will be the entire weight. + :param isNew: if True, the link doesn't exist yet. If False, the link may + or may not already exist. + :param bidir: True to add a bidirectional link. False to only add a single + direction. + """ + curtime = time.time() + if isNew: + docs = [{ + 'ga': ga, 'gb': gb, 'linktype': linktype, + 'date_updated': curtime, 'weight': weight + }] + if bidir: + docs.append({ + 'ga': gb, 'gb': ga, 'linktype': linktype, + 'date_updated': curtime, 'weight': weight + }) + linkColl.insert(docs) + else: + if bidir: + query = {'linktype': linktype, '$or': [ + {'ga': ga, 'gb': gb}, {'ga': gb, 'gb': ga}]} + else: + query = {'ga': ga, 'gb': gb, 'linktype': linktype} + linkColl.update( + query, { + '$set': {'date_updated': curtime}, + '$inc': {'weight': weight} + }, upsert=True, multi=True) + + +def loadConfig(filename=None): + """ + Load the config file. This will load an arbitrary json file, then ensure + that certain minimum standards are met. + + :param filename: the name of the file to load. None to load conf.json + in the script directory. + :return: a config dictionary. + """ + if not filename: + filename = os.path.join(os.path.realpath(sys.path[0]), 'conf.json') + config = json.load(open(filename)) + return config + + +def mergeEntities(state, mainId, secondId): + """ + Merge two entity records by discarding the secondary record and converting + all references to the secondary record to the main record. All such + records are marked as updated (dirty). + + :param state: includes the database connection. + :param mainId: the main entity _id. + :param secondId: the secondary entity _id. + """ + entityColl = getDb('entity', state) + mainId = castObjectId(mainId) + secondId = castObjectId(secondId) + if state['args']['verbose'] >= 2: + print 'merge:', mainId, secondId + main = entityColl.find_one({'_id': mainId}, timeout=False) + second = entityColl.find_one({'_id': secondId}, timeout=False) + main['msgs'].extend(second['msgs']) + main['date_updated'] = time.time() + if secondId in main['neighbors']: + main['neighbors'].remove(secondId) + entityColl.save(main) + # update everyone's neighbors that include secondId to mainId + entityColl.update( + {'neighbors': secondId}, {'$addToSet': {'neighbors': mainId}}, + multi=True) + entityColl.update( + {'neighbors': secondId}, {'$pull': {'neighbors': secondId}}, + multi=True) + # update links + linkColl = getDb('link', state) + for link in linkColl.find({'ga': secondId}, timeout=False): + addLink(linkColl, mainId, link['gb'], link['linktype'], link['weight']) + linkColl.remove({'ga': secondId}) + for link in linkColl.find({'gb': secondId}, timeout=False): + addLink(linkColl, link['ga'], mainId, link['linktype'], link['weight']) + linkColl.remove({'gb': secondId}) + # Don't allow self link + linkColl.remove({'ga': mainId, 'gb': mainId}) + # Find all descendants and convert to the mainId and mark them as dirty + # ##DWM:: + entityColl.remove({'_id': secondId}) + + +# -------- Functions for stand-alone use -------- + +def checkForDuplicateNames(state): + """ + Use a brute-force appoach to see if any service has duplicate user names. + + :param state: a state object used for passing config information, database + connections, and other data. + """ + entityColl = getDb('entity', state) + names = {} + for entity in entityColl.find({}, timeout=False): + service = entity['service'] + if service not in names: + names[service] = {} + for name in entity['name']: + if (name in names[service] and + entity['_id'] != names[service][name]): + print 'Duplicate name %s %r %r' % (name, names[service][name], + entity['_id']) + else: + names[service][name] = entity['_id'] + + +def ingestInstagramFile(filename, state, region=None): + """ + Ingest an Instagram file. The files are expected to be in the + elasticsearch output format with lines of json, each of which contains a + _source key that contains the instagram data. + + :param filename: a file to ingest. This may be compressed with gzip or + bzip2. + :param state: a state object used for passing config information, database + connections, and other data. + :param region: if not None, the region to use for this data. + """ + state['filesProcessed'] = state.get('filesProcessed', 0) + 1 + linesProcessed = state.get('linesProcessed', 0) + linesIngested = state.get('linesIngested', 0) + fptr = openFile(filename) + for line in fptr: + line = line.strip().strip(',[]') + if not len(line): + continue + showProgress(linesProcessed, state, filename) + linesProcessed += 1 + try: + inst = json.loads(line) + except ValueError: + continue + msg = convertInstagramESToMsg(inst.get('_source', {}), + inst.get('_type', 'unknown')) + if not msg: + continue + if region is not None: + msg['subset'] = region + for retry in xrange(3): + try: + if ingestMessage(state, msg): + linesIngested += 1 + break + except pymongo.errors.OperationFailure: + if state['args']['verbose'] >= 1: + print 'retrying' + state['linesProcessed'] = linesProcessed + state['linesIngested'] = linesIngested + + +def ingestTwitterFile(filename, state, region=None): + """ + Ingest a Twitter file. The file may contain gnip or firehose json. + + :param filename: a file to ingest. This may be compressed with gzip or + bzip2. + :param state: a state object used for passing config information, database + connections, and other data. + :param region: if not None, the region to use for this data. + """ + state['filesProcessed'] = state.get('filesProcessed', 0) + 1 + linesProcessed = state.get('linesProcessed', 0) + linesIngested = state.get('linesIngested', 0) + fptr = openFile(filename) + for line in fptr: + line = line.strip().strip(',[]') + if not len(line): + continue + showProgress(linesProcessed, state, filename) + linesProcessed += 1 + try: + twit = json.loads(line) + except ValueError: + continue + if 'gnip' in twit: + msg = convertTwitterGNIPToMsg(twit) + else: + msg = convertTwitterJSONToMsg(twit) + if not msg: + continue + msg['subset'] = region if region is not None else msg.get( + 'subset', 'unknown') + for retry in xrange(3): + try: + if ingestMessage(state, msg): + linesIngested += 1 + break + except pymongo.errors.OperationFailure: + if state['args']['verbose'] >= 1: + print 'retrying' + state['linesProcessed'] = linesProcessed + state['linesIngested'] = linesIngested + + +def logarithmicBin(items): + """ + Convert a dictionary of the form (key): (sum) where the keys are all non- + negative integers into a binned dictionary with logarithmic-based bins. + + :param items: the dictionary of initial bins. + :return: the binned dictionary. + """ + bins = {} + for val in items: + if val <= 5: + bin = val + else: + logval = math.log10(val) + frac = 10 ** (logval - math.floor(logval)) + for start in [10, 9, 8, 7, 6, 5, 4, 3, 2, 1.5, 1]: + if frac >= start: + bin = int(10 ** math.floor(logval) * start) + break + bins[bin] = bins.get(bin, 0) + items[val] + return bins + + +def openFile(filename): + """ + Check if a file is gzip or bzip2 and open it with decompression. If not, + just open it. + + :param filename: name of the file to open. + :returns: a stream pointer. + """ + fileHeaders = { + '\x1f\x8b\x08': 'gunzip < %s', # gzip.open could be used + '\x42\x5a\x68': bz2.BZ2File, + } + filename = os.path.realpath(os.path.expanduser(filename)) + start = open(filename, 'rb').read(max(len(key) for key in fileHeaders)) + for key in fileHeaders: + if start[:len(key)] == key: + if isinstance(fileHeaders[key], basestring): + return os.popen(fileHeaders[key] % filename) + return fileHeaders[key](filename) + # Reopen it, since we may not be able to rewind it + return open(filename, 'rb') + + +def resetMetrics(state): + """ + Mark metrics as dirty or delete known values. + + :param state: the state dictionary with the config values. + """ + metricDict = state['config'].get('metrics', {}) + if state['args']['metric']: + metricDict = dict.fromkeys(state['args']['metric']) + for met in metricDict: + metric.loadMetric(met, metricDict[met]) + metColl = getDb('metrics', state) + for met in metric.LoadedMetrics: + metClass = metric.LoadedMetrics[met] + if state['args'].get('clear'): + metColl.remove({'metric': metClass.name}) + elif state['args'].get('recalc'): + metColl.update({'metric': metClass.name}, {'date_updated': 0}) + + +def showEntityStatistics(entityColl): + """ + Report on distributions and statistics of the entity collection. + + :param entityColl: the entity collection. + """ + counts = entityColl.aggregate([ + {'$project': {'count': {'$size': '$msgs'}}}, + {'$group': {'_id': '$count', 'count': {'$sum': 1}}}, + {'$sort': {'_id': 1}}, + ]) + msgs = {count['_id']: count['count'] for count in counts['result']} + msgCount = sum([key * msgs[key] for key in msgs]) + msgs = logarithmicBin(msgs) + print 'Message distribution:' + pprint.pprint(msgs) + counts = entityColl.aggregate([ + {'$project': {'count': {'$size': '$neighbors'}}}, + {'$group': {'_id': '$count', 'count': {'$sum': 1}}}, + {'$sort': {'_id': 1}}, + ]) + neighbors = {count['_id']: count['count'] for count in counts['result']} + neighbors = logarithmicBin(neighbors) + print 'Neighbor distribution:' + pprint.pprint(neighbors) + senders = sum(msgs.values()) - msgs.get(0, 0) + total = entityColl.count() + if total and msgCount and senders: + print '%d senders (%4.2f%%), %5.3f msg/sender, %5.3f entities/msg' % ( + senders, 100.0 * senders / total, float(msgCount) / senders, + float(total) / msgCount) + print '%d messages, %d entities' % (msgCount, total) + + +def showProgress(linesProcessed, state, filename): + """ + Show progress if the verbosity is appropriate. + + :param linesProcessed: the number of lines processed. + :param state: a state object used for passing config information, database + connections, and other data. + :param filename: filename to report. + """ + if state['args']['verbose'] < 1 or linesProcessed % 1000: + return + if 'starttime' not in state: + state['starttime'] = time.time() + if (state['args']['verbose'] >= 1 and + filename != state.get('lastFilename', None)): + print filename + state['lastFilename'] = filename + entityColl = getDb('entity', state) + linkColl = getDb('link', state) + state['lastcounts'] = { + 'entity': entityColl.count(), 'link': linkColl.count() + } + print('%d %d %d %5.3f' % ( + linesProcessed, state['lastcounts']['entity'], + state['lastcounts']['link'], time.time() - state['starttime'])) + if state['args']['verbose'] >= 2 and not linesProcessed % 100000: + if state.get('laststatcounts', {}) != state['lastcounts']: + showEntityStatistics(entityColl) + state['laststatcounts'] = state['lastcounts'] + + +def timer(name, action='toggle', report='auto', data=None): + """ + Track timing of functions to help optimize code. + + :param name: name of the timer. Required. + :param action: 'start': start the timer, 'stop': stop the timer, 'toggle': + switch between start and stop, anything else doesn't affect + the timer (can be used for reporting). + :param report: 'auto' to report all timer states no more than once every 10 + seconds, otherwise 'all' to report all timer states, True to + report just the specified timer, or anything else to not + report time. + :param data: if present, store this as some example data for the process. + """ + curtime = time.time() + if name not in Timers and action in ('start', 'stop', 'toggle'): + Timers[name] = {'count': 0, 'tally': 0, 'start': 0, 'data': None} + if action == 'start' or (action == 'toggle' and not Timers[name]['start']): + Timers[name]['start'] = curtime + elif (action == 'stop' and Timers[name]['start']) or action == 'toggle': + Timers[name]['count'] += 1 + Timers[name]['tally'] += curtime - Timers[name]['start'] + Timers[name]['start'] = 0 + if name in Timers and data is not None: + Timers[name]['data'] = data + if report == 'auto': + if curtime - Timers['lastreport'] > 10: + report = 'all' + if report == 'all' or report is True: + keys = sorted(Timers.keys()) + for key in keys: + if (key != 'lastreport' and (report == 'all' or key == name) and + Timers[key]['count']): + data = '' + if Timers[key]['data'] is not None: + data = ' ' + str(Timers[key]['data']) + print ('%s %d %5.3f %8.6f%s' % ( + key, Timers[key]['count'], Timers[key]['tally'], + Timers[key]['tally'] / Timers[key]['count'], data)) + if report == 'all': + Timers['lastreport'] = curtime + + +class AppendRegionAction(argparse.Action): + """Append an item to a list with the current value of region.""" + def __init__(self, option_strings, dest, nargs=None, const=None, + default=None, type=None, choices=None, required=False, + help=None, metavar=None): + super(AppendRegionAction, self).__init__( + option_strings=option_strings, dest=dest, nargs=nargs, const=const, + default=default, type=type, choices=choices, required=required, + help=help, metavar=metavar) + + def __call__(self, parser, namespace, values, option_string=None): + items = argparse._copy.copy(argparse._ensure_value( + namespace, self.dest, [])) + items.append((values, getattr(namespace, 'region', None))) + setattr(namespace, self.dest, items) + + +# -------- stand-alone program -------- + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Load messages into the entity graph database') + parser.add_argument( + '--calculate', '--calc', '-c', help='Calculate metrics.', + action='store_true', dest='calc') + parser.add_argument( + '--checknames', help='Check for duplicate names.', action='store_true') + parser.add_argument( + '--clear', '--clear-metrics', help='Clear metrics so that they must ' + 'be recalculated completely.', action='store_true') + parser.add_argument( + '--config', '--conf', help='The path to the config file') + parser.add_argument( + '--instagram', '-i', '--inst', + help='Ingest one or more files that contain Instagram messages in the ' + 'elasticsearch format. These may be compressed with gzip or bzip2, ' + 'and names with wildcards are allowed. The file is expected to ' + 'contain one record per line. Those records with _source keys are ' + 'ingested.', action=AppendRegionAction, dest='inst') + parser.add_argument( + '--limit', '-l', help='Limit the number of entities for metrics ' + 'calculations.', type=int) + parser.add_argument( + '--metric', '-m', help='Explicitly choose which metrics are ' + 'calculated. Multiple metrics may be specified. Multiple processes ' + 'can be run in parallel with different metrics to increase overall ' + 'processing speed.', action='append') + parser.add_argument( + '--offset', '-o', help='Offset within entities for metrics ' + 'calculations.', type=int) + parser.add_argument( + '--recalculate', '--recalc', help='Mark metrics as dirty so that they ' + 'are all updated. This is less aggresive than clearing, as they are ' + 'not deleted and may reuse partial work.', action='store_true', + dest='recalc') + parser.add_argument( + '--recent', help='Calculate metrics for the most recently added ' + 'entities first.', action='store_true') + parser.add_argument( + '--region', '-r', help='Subsequent input files will use this as ' + 'their region or subset. Set to blank to revert to parsing regions ' + 'if possible.') + parser.add_argument( + '--twitter', '-t', '--twit', + help='Ingest one or more files that contain Twitter messages in ' + 'either gnip or firehose json format. These may be compressed with ' + 'gzip or bzip2, and names with wildcards are allowed.', + action=AppendRegionAction, dest='twit') + parser.add_argument('--verbose', '-v', help='Increase verbosity', + action='count') + args = vars(parser.parse_args()) + state = { + 'args': args, + 'config': loadConfig(args['config']) + } + if args.get('checknames', False): + checkForDuplicateNames(state) + if args.get('inst', None): + for filespec, region in args['inst']: + for filename in sorted(glob.iglob(os.path.expanduser(filespec))): + ingestInstagramFile(filename, state, region) + if args.get('twit', None): + for filespec, region in args['twit']: + for filename in sorted(glob.iglob(os.path.expanduser(filespec))): + ingestTwitterFile(filename, state, region) + if args.get('calc', False): + if args.get('clear') or args.get('recalc'): + resetMetrics(state) + calculateMetrics(state) + if state['args']['verbose'] >= 1: + pprint.pprint(state) +# TODO: +# Ingest zip files +# ingest json from zip files +# Ingest elasticsearch instagram data from xdata +# Run metrics against entities not in the database diff --git a/continuous/metric.py b/continuous/metric.py new file mode 100644 index 0000000..d08283b --- /dev/null +++ b/continuous/metric.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# This has the general classes and functions for metric computation + +import collections +import importlib +import pprint +import unicodedata + +LoadedMetrics = collections.OrderedDict() + + +class Metric(object): + """ + Subclass this to compute a metric on an entity. + """ + name = 'metric' + + def __init__(self, **kwargs): + self.args = kwargs + self.dependencies = [] # metrics that must run before this one + # does this need to operate on all links to the entity? + self.onLinks = False + # If we have to visit links, only visit updated links if this is True, + # otherwise visit all the links when recomputing the metric. + # This also determines if a metric should be recomputed if any link + # was updated, even if we aren't using the links. + self.onLinksOnlyNew = True + # does this need to operate on all other entities in combination with + # this entity? + self.onEntities = False + # If we have to visit entities, only visit updated entities if this is + # True, otherwise visit all the entities when recomputing the metric. + # This also determines if a metric should be recomputed if any entity + # was updated, even if we aren't using the entities. + self.onEntitiesOnlyNew = True + # If self.entityFields is specified, only those fields are retreived + # when visiting entities. This can be used to reduce data transfer + # from the database. + # self.entityFields = {'name': True} + # If saveWork is True, the work record is saved along with the value of + # the metric + self.saveWork = False + + def calc(self, ga, work={}, **kwargs): + """ + Calculate the metric based on the accumulated or stand-alone data. + + :param ga: the entity for which we are computing the metric. + :param work: an object for working on the metric. Results should be + stored here. + :returns: the value of the metric. + """ + return + + def calcEntity(self, ga, gb, work={}, **kwargs): + """ + Subclass this to handle partial calculations based on a second entity. + + :param ga: the entity for which we are computing the metric. + :param gb: the secondary entity. + :param work: an object for working on the metric. Results should be + stored here. + """ + return + + def calcEntityPrep(self, ga, work={}, **kwargs): + """ + Subclass this to handle partial calculations based on a second entity. + This is called before calcEntity is called on each second entity. + + :param ga: the entity for which we are computing the metric. + :param work: an object for working on the metric. Results should be + stored here. + """ + return + + def calcLink(self, ga, gb, link, work={}, **kwargs): + """ + Subclass this to handle partial calculations based on a link. + + :param ga: the entity for which we are computing the metric. + :param gb: the secondary entity. + :param link: the link between the two entities. + :param work: an object for working on the metric. Results should be + stored here. + """ + return + + def calcLinkPrep(self, ga, work={}, **kwargs): + """ + Subclass this to handle partial calculations based on a link. + This is called before calcLink is called on each link. + + :param ga: the entity for which we are computing the metric. + :param work: an object for working on the metric. Results should be + stored here. + """ + return + + +def loadMetric(metricClass, initVal=None): + """ + Load a metric and its dependencies. Keep the list of loaded metrics + ordered so that if we walk the list, all dependencies will be met. + + :param metricClass: the class of the metric to load or a string with the + name of a metric we should attempt to located. + :param initVal: a value to pass to the class when initializing it. + :returns: True if success, None if failed to load dependencies. + """ + if isinstance(metricClass, basestring): + className = 'Metric' + metricClass.capitalize() + if globals().get(className): + metricClass = globals().get(className) + else: + moduleName = 'metric' + metricClass + module = importlib.import_module(moduleName) + # We expect metrics to register themselves, but if they + # don't, we try the expected class name again. + if metricClass not in LoadedMetrics: + if getattr(module, className, None): + metricClass = getattr(module, className) + if isinstance(metricClass, basestring): + return None + if metricClass.name not in LoadedMetrics: + if initVal is None: + initVal = {} + elif not isinstance(initVal, dict): + initVal = {'value': initVal} + newMetric = metricClass(**initVal) + for dependency in newMetric.dependencies: + if dependency not in LoadedMetrics: + loadMetric(dependency) + LoadedMetrics[metricClass.name] = newMetric + return True + + +def normalizeAndLower(text): + """ + Convert some text so that it is normalized and lowercased unicode. + + :param text: the text to alter. + :returns: the normalized and lower-cased text. + """ + if isinstance(text, str): + text = text.decode('utf8') + text = unicodedata.normalize('NFC', text) + return text.lower() + + +def normalizeNames(entity): + """ + Normalize the name and fullname lists in an entity. + + :param entity: the entity to modify. + """ + entity['normname'] = list({normalizeAndLower(name) + for name in entity['name']}) + entity['normfullname'] = list({normalizeAndLower(name) + for name in entity['fullname']}) + + +def topKCategories(entity): + """ + Return a set of categories used for tracking topk. + + :param entity: the entity for which to extract categories. + :returns: a set of categories. + """ + cat = set() + if entity.get('service'): + cat.add(entity['service']) + for msg in entity.get('msgs', []): + cat.update([msg['service'] + '-' + subset for subset in msg['subset']]) + return cat + + +def topKSetsToLists(topkDict): + """ + Convert the sets in the topk list to lists so we can store them. This also + removes the id dictionary, as it is not necessary. + + :param topkDict: the dictionary of topk results. Modified. + """ + if 'topk' not in topkDict: + return + topk = topkDict['topk'] + if len(topk) and isinstance(topk[0][-1], set): + for pos in xrange(len(topk)): + topk[pos] = (topk[pos][0], topk[pos][1], list(topk[pos][2])) + if 'ids' in topkDict: + del topkDict['ids'] + + +def trackTopK(topkDict, value, id, cats, state): + """ + Track the top-k values for various services and subsets. Each service and + subset will have at least k entries in the list. + + :param topkDict: a dictionary to store the top-k in. + :param value: the value associated with an item. Higher values are kept. + :param id: the id of the item. If the id is already present, it will be + replaced. + :param cats: a set of categories to track for this item. + :param state: the state dictionary with the definition of k. + :return: True is we added this item into the top-k. False if it was too + minor. + """ + if 'topk' not in topkDict: + topkDict['topk'] = [] + topkDict['cats'] = {} + topkDict['ids'] = {} + topkDict['processed'] = 0 + if (state and 'config' in state and 'topk' in state['config'] and + state['config']['topk'].get('k')): + topkDict['k'] = (state['config']['topk']['k'] + + state['config']['topk'].get('extra', 0)) + else: + topkDict['k'] = 25 + topk = topkDict['topk'] + k = topkDict['k'] + topkDict['processed'] += 1 + # When we get our dictionary out of storage, it contains lists, not sets. + # We want to operate on sets. Also, recerate our ids dictionary. + if len(topk) and isinstance(topk[0][-1], list): + for pos in xrange(len(topk)): + topk[pos] = (topk[pos][0], topk[pos][1], set(topk[pos][2])) + topkDict['ids'] = dict.fromkeys([row[1] for row in topk]) + if not cats or not len(cats): + cats = set('default') + # If we already have this id, remove it + if id in topkDict['ids']: + for pos in xrange(len(topk)): + rval, rid, rcats = topk[pos] + if rid == id: + if rid in topkDict['ids']: + del topkDict['ids'][rid] + for cat in rcats: + topkDict['cats'][cat] -= 1 + topk[pos:pos+1] = [] + break + # Skip this one if we can tell it shouldn't be added. + if (len(topk) >= k and value < topk[-1][0] and + not any(topkDict['cats'].get(cat, 0) < k for cat in cats)): + return False + # Add the entry to the list + entry = (value, id, cats) + topk.append(entry) + topk.sort(reverse=True) + topkDict['ids'][id] = True + for cat in cats: + topkDict['cats'][cat] = topkDict['cats'].get(cat, 0) + 1 + kept = trackTopKRemove(topkDict, entry) + if kept and state['args']['verbose'] >= 3: + pprint.pprint(topkDict) + return kept + + +def trackTopKRemove(topkDict, entry): + """ + Check if we need to remove any entries from the top-k list. Because of + keeping the top-k for multiple categories, one addition can result in + removing multiple rows. + + :param topk: the list of topk entries. + :param entry: a tuple of (value, id, cats) that was just added to the topk + list. + :return: True is we kept the item that was added into the top-k. False if + it was removed. + """ + k = topkDict['k'] + topk = topkDict['topk'] + kept = True + cats = entry[2] + if len(topk) > k: + while True: + counts = {cat: 0 for cat in cats} + remove = False + for pos in xrange(len(topk)): + rval, rid, rcats = topk[pos] + for cat in cats: + if cat in rcats: + counts[cat] += 1 + if (min(counts.values()) > k and rcats.issubset(cats)): + if topk[pos] == entry: + kept = False + if rid in topkDict['ids']: + del topkDict['ids'][rid] + for cat in rcats: + topkDict['cats'][cat] -= 1 + topk[pos:pos+1] = [] + remove = True + break + if not remove: + break + return kept + + +def trackTopKWorst(topkDict, cats, low): + """ + Determine the worst value that we need to care about for tracking the + sepecified categories. + + :param topkDict: a dictionary with the top-k. + :param cats: a set of categories for a potential item. + :param low: a fall-back low value. + :return: The worst value that could be added to the top-k for these + categories. + """ + if not cats or not len(cats) or 'topk' not in topkDict: + return low + topk = topkDict['topk'] + k = topkDict['k'] + if len(topk) < k or isinstance(topk[0][-1], list): + return low + if any(topkDict['cats'].get(cat, 0) < k for cat in cats): + return low + return topk[-1][0] diff --git a/continuous/metric1hop.py b/continuous/metric1hop.py new file mode 100644 index 0000000..b6abab0 --- /dev/null +++ b/continuous/metric1hop.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import metric + + +class Metric1hop(metric.Metric): + name = '1hop' + + def __init__(self, **kwargs): + super(Metric1hop, self).__init__(**kwargs) + + def calc(self, ga, **kwargs): + """ + Calculate the number of 1 hop neighbors. Since we already have the + neighbors in the entity record, this is just that length. + + :param ga: the entity for which we are computing the metric. + :returns: the number of 1-hop neighbors. + """ + # This does not include the node itself + return len(ga.get('neighbors', [])) + + +metric.loadMetric(Metric1hop) diff --git a/continuous/metric2hop.py b/continuous/metric2hop.py new file mode 100644 index 0000000..d906999 --- /dev/null +++ b/continuous/metric2hop.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import metric + + +class Metric2hop(metric.Metric): + name = '2hop' + + def __init__(self, **kwargs): + super(Metric2hop, self).__init__(**kwargs) + self.onLinksOnlyNew = False + + def calc(self, ga, entityColl, **kwargs): + """ + Calculate the number of 2 hop neighbors. + + :param ga: the entity for which we are computing the metric. + :param entityColl: the database collection used for querying neighbors. + :returns: the number of 2-hop neighbors, both excluding 1-hop-only + neighbors and including 1-hop-only neighbors. The central + node is never counted. + """ + if not len(ga.get('neighbors', [])): + return {'2hop': 0, 'lte2hop': 0} + neighbors = set() + for gb in entityColl.find({'_id': {'$in': list(neighbors)}}, + {'neighbors': True}, timeout=False): + neighbors.update(gb.get('neighbors', [])) + neighbors.discard(ga['_id']) + result = {'2hop': len(neighbors)} + neighbors.update(ga['neighbors']) + result['lte2hop'] = len(neighbors) + return result + + +metric.loadMetric(Metric2hop) diff --git a/continuous/metriclevenshtein.py b/continuous/metriclevenshtein.py new file mode 100644 index 0000000..6cf7e33 --- /dev/null +++ b/continuous/metriclevenshtein.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +try: + import Levenshtein +except Exception: + Levenshtein = None +import metric + + +# This is a straightforward implementation of a well-known algorithm, and thus +# probably shouldn't be covered by copyright to begin with. But in case it is, +# the author (Magnus Lie Hetland) has, to the extent possible under law, +# dedicated all copyright and related and neighboring rights to this software +# to the public domain worldwide, by distributing it under the CC0 license, +# version 1.0. This software is distributed without any warranty. For more +# information, see +def levenshtein(a, b): + """ + Calculates the Levenshtein distance between a and b. + + :param a: one string to compare. + :param b: the second string to compare. + :returns: the Levenshtein distance between the two strings. + """ + n, m = len(a), len(b) + if n > m: + # Make sure n <= m, to use O(min(n,m)) space + a, b = b, a + n, m = m, n + + current = range(n + 1) + for i in range(1, m + 1): + previous, current = current, [i] + [0] * n + for j in range(1, n + 1): + add, delete = previous[j] + 1, current[j - 1] + 1 + change = previous[j - 1] + if a[j - 1] != b[i - 1]: + change = change + 1 + current[j] = min(add, delete, change) + + return current[n] + + +def levenshteinSimilarity(s1, s2): + """ + Calculate and normalize the Levenshtein metric to a result of 0 (poor) to 1 + (perfect). + + :param s1: the first string + :param s2: the second string + :return: the normalized result. 1 is a perfect match. + """ + totalLen = float(len(s1) + len(s2)) + # The C-module version of Levenshtein is vastly faster + if Levenshtein is None: + return (totalLen - levenshtein(s1, s2)) / totalLen + return (totalLen - Levenshtein.distance(s1, s2)) / totalLen + + +class MetricLevenshtein(metric.Metric): + name = 'levenshtein' + + def __init__(self, **kwargs): + super(MetricLevenshtein, self).__init__(**kwargs) + self.onEntities = True + self.entityFields = { + 'name': True, + 'fullname': True, + 'service': True, + 'msgs.service': True, + 'msgs.subset': True + } + self.normalizeNames = True + self.saveWork = True + + def calc(self, ga, work, **kwargs): + """ + Calculate the Levenshtein top-k relations. + + :param ga: the entity for which we are computing the metric. + :returns: the top-k table of relations. + """ + res = {} + for key in ('name', 'fullname', 'name_fullname'): + metric.topKSetsToLists(work[key]) + if 'topk' in work[key]: + res[key] = work[key]['topk'] + return res + + def calcEntity(self, ga, gb, work={}, state={}, **kwargs): + """ + Calculate the Levenshtein similarity between these two entities. If + appopriate, add this to the top-k list. + + :param ga: the entity for which we are computing the metric. + :param gb: the secondary entity. + :param work: an object for working on the metric. This includes the + top-k data. + :param state: the state dictionary. + """ + # We actually calculate the BEST levenshtein similarity between any + # name of ga with any name of gb and use that. + cat = metric.topKCategories(gb) + simName = simFull = simBoth = 0 + gaNames = ga['normname'] + gbNames = gb['normname'] + gaFullnames = ga['normfullname'] + gbFullnames = gb['normfullname'] + for gaName in gaNames: + for gbName in gbNames: + # Note: both gaName and gbName are lowercase. + simName = max(simName, levenshteinSimilarity(gaName, gbName)) + for gbName in gbFullnames: + simBoth = max(simBoth, levenshteinSimilarity(gaName, gbName)) + for gaName in gaFullnames: + for gbName in gbFullnames: + simFull = max(simFull, levenshteinSimilarity(gaName, gbName)) + for gbName in gbNames: + simBoth = max(simBoth, levenshteinSimilarity(gaName, gbName)) + simBoth = max(simBoth, simName, simFull) + if simName: + metric.trackTopK(work['name'], simName, gb['_id'], + cat, state) + if simFull: + metric.trackTopK(work['fullname'], simFull, gb['_id'], + cat, state) + if simBoth: + metric.trackTopK(work['name_fullname'], simBoth, gb['_id'], + cat, state) + + def calcEntityPrep(self, ga, work={}, **kwargs): + """ + This is called before calcEntity is called on each second entity. + + :param ga: the entity for which we are computing the metric. + :param work: an object for working on the metric. Results should be + stored here. + """ + metric.normalizeNames(ga) + for key in ('name', 'fullname', 'name_fullname'): + if key not in work: + work[key] = {} + + +metric.loadMetric(MetricLevenshtein) diff --git a/continuous/metricsubstring.py b/continuous/metricsubstring.py new file mode 100644 index 0000000..cca24a7 --- /dev/null +++ b/continuous/metricsubstring.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import metric + + +def longestCommonSubstring(s1, s2, low=0): + """ + Return the longest common substring between two strings. + + :param s1: the first string + :param s2: the second string + :param low: one less than the length of the shortest string we need to + consider. + :return: the longest common substring. + """ + if len(s2) > len(s1): + s1, s2 = s2, s1 + while len(s2) > low: + if s2[0] in s1: + break + s2 = s2[1:] + while len(s2) > low: + if s2[-1] in s1: + break + s2 = s2[:-1] + lens2p1 = len(s2) + 1 + for l in xrange(len(s2), low, -1): + for s in xrange(lens2p1 - l): + substr = s2[s: s + l] + if substr in s1: + return substr + return '' + + +def substringSimilarity(s1, s2, low=0): + """ + Determine the longest common substring between two strings and normalize + the results to a scale of 0 to 1. + + :param s1: the first string + :param s2: the second string + :param low: the lowest value we need to consider. + :return: the normalized result. 1 is a perfect match. + """ + lens1s2 = len(s1) + len(s2) + return (2.0 * len(longestCommonSubstring(s1, s2, int(low * lens1s2 / 2))) / + (lens1s2)) + + +class MetricSubstring(metric.Metric): + name = 'substring' + + def __init__(self, **kwargs): + super(MetricSubstring, self).__init__(**kwargs) + self.onEntities = True + self.entityFields = { + 'name': True, + 'fullname': True, + 'service': True, + 'msgs.service': True, + 'msgs.subset': True + } + self.normalizeNames = True + self.saveWork = True + + def calc(self, ga, work, **kwargs): + """ + Calculate the substring top-k relations. + + :param ga: the entity for which we are computing the metric. + :returns: the top-k table of relations. + """ + res = {} + for key in ('name', 'fullname', 'name_fullname'): + metric.topKSetsToLists(work[key]) + if 'topk' in work[key]: + res[key] = work[key]['topk'] + return res + + def calcEntity(self, ga, gb, work={}, state={}, **kwargs): + """ + Calculate the substring similarity between these two entities. If + appopriate, add this to the top-k list. + + :param ga: the entity for which we are computing the metric. + :param gb: the secondary entity. + :param work: an object for working on the metric. This includes the + top-k data. + :param state: the state dictionary. + """ + # We actually calculate the BEST substring similarity between any name + # of ga with any name of gb and use that. + cat = metric.topKCategories(gb) + lowName = metric.trackTopKWorst(work['name'], cat, 0) + lowFull = metric.trackTopKWorst(work['fullname'], cat, 0) + lowBoth = metric.trackTopKWorst(work['name_fullname'], cat, 0) + simName = simFull = simBoth = 0 + gaNames = ga['normname'] + gbNames = gb['normname'] + gaFullnames = ga['normfullname'] + gbFullnames = gb['normfullname'] + for gaName in gaNames: + for gbName in gbNames: + # Note: both gaName and gbName are lowercase. + simName = max(simName, substringSimilarity( + gaName, gbName, lowName)) + if simName < 1: + for gbName in gbFullnames: + simBoth = max(simBoth, substringSimilarity( + gaName, gbName, lowBoth)) + for gaName in gaFullnames: + for gbName in gbFullnames: + simFull = max(simFull, substringSimilarity( + gaName, gbName, lowFull)) + if simFull < 1 and simName < 1: + for gbName in gbNames: + simBoth = max(simBoth, substringSimilarity( + gaName, gbName, lowBoth)) + simBoth = max(simBoth, simName, simFull) + if simName and simName >= lowName: + metric.trackTopK(work['name'], simName, gb['_id'], + cat, state) + if simFull and simFull >= lowFull: + metric.trackTopK(work['fullname'], simFull, gb['_id'], + cat, state) + if simBoth and simBoth >= lowBoth: + metric.trackTopK(work['name_fullname'], simBoth, gb['_id'], + cat, state) + + def calcEntityPrep(self, ga, work={}, **kwargs): + """ + This is called before calcEntity is called on each second entity. + + :param ga: the entity for which we are computing the metric. + :param work: an object for working on the metric. Results should be + stored here. + """ + metric.normalizeNames(ga) + for key in ('name', 'fullname', 'name_fullname'): + if key not in work: + work[key] = {} + + +metric.loadMetric(MetricSubstring) diff --git a/continuous/xmlmetric.py b/continuous/xmlmetric.py new file mode 100644 index 0000000..b33162c --- /dev/null +++ b/continuous/xmlmetric.py @@ -0,0 +1,237 @@ +import elasticsearch +import ingest +import pprint +import sys +import time +import urllib3 +import xml.etree.ElementTree + + +def convertXMLToObject(node): + """ + Convert an ElementTree node to a more pythonic object. + + :param node: the element to convert. + :returns: a python object. + """ + if hasattr(node, 'getroot'): + node = node.getroot() + value = None + children = list(node) + if children: + value = {} + for child in children: + childval = convertXMLToObject(child) + if childval is not None: + if child.tag not in value: + value[child.tag] = [] + value[child.tag].append(childval) + for tag in value: + if len(value[tag]) == 1 and not isinstance(value[tag][0], dict): + value[tag] = value[tag][0] + if not len(value): + value = None + else: + if node.text: + value = node.text.strip() + elif hasattr(node, 'attribute'): + value = node.attribute + if value is not None and not len(value): + value = None + return value + + +dbref = None + + +def getDb(): + """ + Get a connection to Elasticsearch. + + :returns: a database connection. + :returns: the default index we want to use. + """ + global dbref + index = 'test1' + if dbref is None: + dbref = elasticsearch.Elasticsearch(hosts=[{ + 'host': '127.0.0.1', + 'port': 9200, + 'url_prefix': '/', + 'timeout': urllib3.Timeout(read=150, connect=10) + }]) + return dbref, index + + +def printAndStoreMetric(state, entity, metClass, metric): + """ + Print and store the metrics we computed for an entity. + + :param state: the state dictionary. + :param entity: the current entity. + :param metClass: the metric class. + :param metric: the computed metric value. + """ + db, index = getDb() + rankings = [] + for sub in ['name', 'fullname']: + if sub not in metric['value']: + continue + metricName = metClass.name + '-' + sub + kept = 0 + for met in metric['value'][sub]: + if met[0] < 0.85 and kept >= 3: + continue + gb = entityColl.find_one(met[1]) + namelist = gb['name'][:] + namelist.extend(gb['fullname']) + ranking = { + 'entityId': [entity['id']], + 'documentId': gb['user_id'], + 'documentSource': 'twitter', + 'documentLink': 'http://twitter.com/intent/user?' + 'user_id=' + gb['user_id'], + 'name': metricName, + 'entityNames': namelist, + 'score': met[0], + 'date_updated': time.time(), + 'info': { + 'name': gb['name'], + 'fullname': gb['fullname'] + } + } + # Not really necessary, but it lets me be lazy about deleting old + # results + id = entity['id'] + '-' + ranking['documentId'] + try: + old = db.get(index=index, doc_type='ranking', id=id) + except elasticsearch.NotFoundError: + old = None + if old is None or old['_source'] != ranking: + db.index(index=index, doc_type='ranking', body=ranking, id=id) + state['rankings'] = state.get('rankings', 0) + 1 + kept += 1 + rankings.append(ranking) + if 'fullname' in metric['value']: + met = metric['value']['fullname'][0] + gb = entityColl.find_one(met[1]) + logstr = '%d %6.4f %r;%r %s %r;%r' % ( + state.get('rankings', 0), met[0], ','.join(entity['name']), + ','.join(entity['fullname']), met[1], ','.join(gb['name']), + ','.join(gb['fullname'])) + if met[0] > 0.85: + print logstr + sys.stderr.write(logstr + '\n') + # sys.stderr.write('%r\n' % entity) + sys.stderr.write(pprint.pformat(rankings) + 'n') + sys.stderr.flush() + del entity['metrics'] + + +if __name__ == '__main__': # noqa + reverse = False + offset = 0 + filename = None + help = False + for arg in sys.argv[1:]: + if arg.startswith('--offset='): + offset = int(arg.split('=', 1)[1]) + elif arg == '-r': + reverse = True + elif arg.startswith('-') or filename: + help = True + else: + filename = arg + if help or not filename: + print """Load xml person list and compute metrics. + +Syntax: xmlmetric.py (xml file) [-r] [--offet=(offset)] + +-r reverse the processing order. +--offset skips entities at the beginning of the processing order. +""" + sys.exit(0) + starttime = lastupdate = time.time() + metricDict = { + 'levenshtein-name': { + 'longname': 'Levenshtein User Name', + 'description': 'User name similarity based Levenshtein distance. ' + 'This is based on the email user name, Twitter ' + 'handle, or other service name.', + 'version': '0.1' + }, + 'levenshtein-fullname': { + 'longname': 'Levenshtein Full Name', + 'description': 'Full name similarity based Levenshtein distance.', + 'version': '0.1' + }, + } + state = { + 'args': { + 'metric': ['levenshtein'], + 'verbose': 2, + }, + 'config': ingest.loadConfig(None) + } + tree = xml.etree.ElementTree.iterparse(filename) + for _, el in tree: + if '}' in el.tag: + el.tag = el.tag.split('}', 1)[1] + root = tree.root + print 'parsed' + entityColl = ingest.getDb('entity', state) + entities = [] + persons = [] + db, index = getDb() + for key in metricDict: + id = key + metricDict[key]['name'] = key + try: + old = db.get(index=index, doc_type='metrics', id=id) + except elasticsearch.NotFoundError: + old = None + if old is None or old['_source'] != metricDict[key]: + db.index(index=index, doc_type='metrics', body=metricDict[key], + id=id) + count = updated = 0 + for personNode in root.findall('Person'): + person = convertXMLToObject(personNode) + persons.append(person) + + id = person['PersonGUID'] + try: + old = db.get(index=index, doc_type='entity', id=id) + except elasticsearch.NotFoundError: + old = None + if old is None or old['_source'] != person: + db.index(index=index, doc_type='entity', body=person, id=id) + updated += 1 + entity = { + 'id': person['PersonGUID'], + 'name': [], + 'fullname': [], + 'service': 'xml', + } + for ident in person.get('Identity', []): + for name in ident.get('Name', []): + if ('FullName' in name and + name['FullName'] not in entity['fullname']): + entity['fullname'].append(name['FullName']) + for email in ident.get('Email', []): + if 'Username' in email: + namelower = email['Username'].lower() + if namelower not in entity['name']: + entity['name'].append(namelower) + entities.append(entity) + count += 1 + curtime = time.time() + if curtime - lastupdate > 10: + print '%d %d %4.2f' % (count, updated, curtime - starttime) + lastupdate = curtime + root = tree = None + if reverse: + entities.reverse() + if offset: + entities = entities[offset:] + print 'start %4.2f' % (time.time() - starttime) + ingest.calculateMetrics(state, None, entities, printAndStoreMetric)