11import sys
22import click
33import logging
4+ import uuid
45from pprint import pprint
6+ from random import randrange
7+
58from ftmstore import get_dataset
6- from servicelayer .cache import get_redis , get_fakeredis
9+ from servicelayer .cache import get_redis
710from servicelayer .logs import configure_logging
8- from servicelayer .jobs import Job , Dataset
11+ from servicelayer .taskqueue import Dataset , Task
912from servicelayer import settings as sl_settings
1013from servicelayer .archive .util import ensure_path
14+ from servicelayer import settings as sls
1115from servicelayer .tags import Tags
1216
1317from ingestors import settings
1418from ingestors .manager import Manager
1519from ingestors .directory import DirectoryIngestor
1620from ingestors .analysis import Analyzer
17- from ingestors .worker import IngestWorker , OP_ANALYZE , OP_INGEST
21+ from ingestors .worker import get_worker
1822
1923log = logging .getLogger (__name__ )
20- STAGES = [OP_ANALYZE , OP_INGEST ]
2124
2225
2326@click .group ()
@@ -30,7 +33,7 @@ def cli():
3033def process (sync ):
3134 """Start the queue and process tasks as they come. Blocks while waiting"""
3235 num_threads = None if sync else sl_settings .WORKER_THREADS
33- worker = IngestWorker ( stages = STAGES , num_threads = num_threads )
36+ worker = get_worker ( num_threads = num_threads )
3437 code = worker .run ()
3538 sys .exit (code )
3639
@@ -50,11 +53,22 @@ def killthekitten():
5053 conn .flushall ()
5154
5255
53- def _ingest_path (db , conn , dataset , path , languages = []):
56+ def _ingest_path (db , dataset , path , languages = []):
5457 context = {"languages" : languages }
55- job = Job .create (conn , dataset )
56- stage = job .get_stage (OP_INGEST )
57- manager = Manager (db , stage , context )
58+
59+ priority = priority = randrange (1 , sls .RABBITMQ_MAX_PRIORITY + 1 )
60+
61+ task = Task (
62+ task_id = uuid .uuid4 ().hex ,
63+ job_id = uuid .uuid4 ().hex ,
64+ collection_id = dataset ,
65+ delivery_tag = "" ,
66+ operation = settings .STAGE_INGEST ,
67+ priority = priority ,
68+ context = context ,
69+ payload = {},
70+ )
71+ manager = Manager (db , task )
5872 path = ensure_path (path )
5973 if path is not None :
6074 if path .is_file ():
@@ -76,15 +90,14 @@ def _ingest_path(db, conn, dataset, path, languages=[]):
7690@click .argument ("path" , type = click .Path (exists = True ))
7791def ingest (path , dataset , languages = None ):
7892 """Queue a set of files for ingest."""
79- conn = get_redis ()
80- db = get_dataset (dataset , OP_INGEST )
81- _ingest_path (db , conn , dataset , path , languages = languages )
93+ db = get_dataset (dataset , settings .STAGE_INGEST )
94+ _ingest_path (db , dataset , path , languages = languages )
8295
8396
8497@cli .command ()
8598@click .option ("--dataset" , required = True , help = "Name of the dataset" )
8699def analyze (dataset ):
87- db = get_dataset (dataset , OP_ANALYZE )
100+ db = get_dataset (dataset , settings . STAGE_ANALYZE )
88101 analyzer = None
89102 for entity in db .partials ():
90103 if analyzer is None or analyzer .entity .id != entity .id :
@@ -102,13 +115,20 @@ def analyze(dataset):
102115@click .argument ("path" , type = click .Path (exists = True ))
103116def debug (path , languages = None ):
104117 """Debug the ingest for the given path."""
105- conn = get_fakeredis ()
106118 settings .fts .DATABASE_URI = "sqlite:////tmp/debug.sqlite3"
107- db = get_dataset ("debug" , origin = OP_INGEST , database_uri = settings .fts .DATABASE_URI )
119+
120+ # collection ID that is meant for testing purposes only
121+ debug_datatset_id = 100
122+
123+ db = get_dataset (
124+ debug_datatset_id ,
125+ origin = settings .STAGE_INGEST ,
126+ database_uri = settings .fts .DATABASE_URI ,
127+ )
108128 db .delete ()
109- _ingest_path (db , conn , "debug" , path , languages = languages )
110- worker = IngestWorker ( conn = conn , stages = STAGES )
111- worker .sync ( )
129+ _ingest_path (db , debug_datatset_id , path , languages = languages )
130+ worker = get_worker ( )
131+ worker .process ( blocking = False )
112132 for entity in db .iterate ():
113133 pprint (entity .to_dict ())
114134
0 commit comments