11package com .codingchili .Controller ;
22
3- import java .io .PrintWriter ;
4- import java .io .StringWriter ;
3+ import com .codingchili .Model .*;
4+ import com .codingchili .logging .ApplicationLogger ;
5+ import io .vertx .core .*;
6+ import io .vertx .core .eventbus .DeliveryOptions ;
7+ import io .vertx .core .eventbus .MessageConsumer ;
8+ import io .vertx .core .http .HttpServer ;
9+ import io .vertx .core .json .JsonObject ;
10+ import io .vertx .ext .web .*;
11+ import io .vertx .ext .web .handler .*;
12+ import io .vertx .ext .web .templ .JadeTemplateEngine ;
13+
14+ import java .io .*;
515import java .util .Iterator ;
16+ import java .util .concurrent .atomic .AtomicReference ;
617import java .util .logging .Level ;
718import java .util .logging .Logger ;
819
9- import com .codingchili .Model .Configuration ;
10- import com .codingchili .Model .ElasticWriter ;
11- import com .codingchili .Model .FileParser ;
12- import com .codingchili .Model .ParserException ;
1320import static com .codingchili .ApplicationLauncher .VERSION ;
1421import static com .codingchili .Model .Configuration .INDEXING_ELASTICSEARCH ;
1522import static com .codingchili .Model .ElasticWriter .*;
1623import static com .codingchili .Model .FileParser .INDEX ;
1724
18- import io .vertx .core .AbstractVerticle ;
19- import io .vertx .core .Context ;
20- import io .vertx .core .Future ;
21- import io .vertx .core .MultiMap ;
22- import io .vertx .core .Vertx ;
23- import io .vertx .core .buffer .Buffer ;
24- import io .vertx .core .eventbus .DeliveryOptions ;
25- import io .vertx .core .eventbus .MessageConsumer ;
26- import io .vertx .core .json .JsonObject ;
27- import io .vertx .ext .web .FileUpload ;
28- import io .vertx .ext .web .Router ;
29- import io .vertx .ext .web .handler .BodyHandler ;
30- import io .vertx .ext .web .handler .StaticHandler ;
31- import io .vertx .ext .web .handler .TemplateHandler ;
32- import io .vertx .ext .web .templ .JadeTemplateEngine ;
33-
3425/**
3526 * @author Robin Duda
3627 * <p>
3728 * Manages the web interface and handles file uploads.
3829 */
3930public class Website extends AbstractVerticle {
40- public static final String MAPPING = "mapping" ;
4131 public static final String UPLOAD_ID = "uploadId" ;
42- public static final String OPTIONS = "options" ;
43- public static final String CLEAR = "clear" ;
44- private Logger logger = Logger .getLogger (getClass ().getName ());
4532 private static final String DONE = "/done" ;
4633 private static final String ERROR = "/error" ;
4734 private static final String MESSAGE = "message" ;
48- private static final String OFFSET = "offset" ;
4935 private static final String FILE = "file" ;
5036 private static final String IMPORTED = "imported" ;
5137 private static final String NO_FILE_WAS_UPLOADED = "No file was uploaded." ;
38+ private Logger logger = Logger .getLogger (getClass ().getName ());
5239 private Vertx vertx ;
5340
5441 @ Override
@@ -76,12 +63,28 @@ public void start(Future<Void> start) {
7663 });
7764
7865 router .route ("/*" ).handler (TemplateHandler .create (JadeTemplateEngine .create ()));
66+ startWebsite (start , router );
67+ }
68+
69+ private void startWebsite (Future <Void > start , Router router ) {
70+ setupStatusService ().requestHandler (router ::accept ).listen (Configuration .getWebPort (), done -> {
71+ if (done .succeeded ()) {
72+ Configuration .setWebPort (done .result ().actualPort ());
73+ logger .info ("Started website on port " + Configuration .getWebPort ());
74+ start .complete ();
75+ } else {
76+ start .fail (done .cause ());
77+ }
78+ });
79+ }
80+
81+ private HttpServer setupStatusService () {
82+ return vertx .createHttpServer ().websocketHandler (websock -> {
83+ websock .writeFinalTextFrame (new JsonObject ().put ("message" , getStatusServiceWelcomeMessage ()).encode ());
7984
80- vertx .createHttpServer ().websocketHandler (websock -> {
81- websock .writeFinalTextFrame (new JsonObject ().put ("message" , "websocket connected to excelastic " + VERSION +
82- " using ElasticSearch " + getElasticVersion ()).encode ());
85+ AtomicReference <String > uploadId = new AtomicReference <>("" );
8386
84- Atomic < String > uploadId = new Atomic <>( "" );
87+ // sets up an event bus consumer to listen for import progress.
8588 MessageConsumer <JsonObject > consumer = vertx .eventBus ().consumer (IMPORT_PROGRESS , data -> {
8689 try {
8790 if (uploadId .get ().equals (data .body ().getString (UPLOAD_ID ))) {
@@ -91,27 +94,28 @@ public void start(Future<Void> start) {
9194 websock .close ();
9295 }
9396 });
97+ // we only support one message from the client - to set the upload ID to listen to.
9498 websock .handler (handler -> uploadId .set (handler .toJsonObject ().getString (UPLOAD_ID )));
99+
100+ // when the websocket is closed we should stop listening for status messages.
95101 websock .closeHandler (closed -> consumer .unregister ());
96- websock .exceptionHandler (sock -> consumer .unregister ());
97102
98- }).requestHandler (router ::accept ).listen (Configuration .getWebPort (), done -> {
99- if (done .succeeded ()) {
100- Configuration .setWebPort (done .result ().actualPort ());
101- logger .info ("Started website on port " + Configuration .getWebPort ());
102- start .complete ();
103- } else {
104- start .fail (done .cause ());
105- }
103+ // when the websocket excepts we should also stop listening for status messages.
104+ websock .exceptionHandler (sock -> consumer .unregister ());
106105 });
107106 }
108107
108+ private String getStatusServiceWelcomeMessage () {
109+ return "websocket connected to excelastic " + VERSION + " using ElasticSearch " + getElasticVersion ();
110+ }
111+
109112 /**
110113 * Adds the upload route to the given router
111114 *
112115 * @param router the upload route is added to the given router
113116 */
114117 private void setRouterAPI (Router router ) {
118+ // API route for handling file uploads.
115119 router .route ("/api/upload" ).handler (context -> {
116120 Iterator <FileUpload > iterator = context .fileUploads ().iterator ();
117121
@@ -120,26 +124,7 @@ private void setRouterAPI(Router router) {
120124 logger .info ("Receiving uploaded file with request id " + params .get (UPLOAD_ID ));
121125 FileUpload upload = context .fileUploads ().iterator ().next ();
122126
123- vertx .fileSystem ().readFile (upload .uploadedFileName (), file -> {
124- parse (file .result (), params , upload .fileName (),
125- Future .<Integer >future ().setHandler (result -> {
126- if (result .succeeded ()) {
127- String index = context .request ().params ().get (INDEX );
128- logger .info (String .format ("Imported file '%s' successfully into '%s'." ,
129- upload .fileName (), index ));
130-
131- context .put (INDEX , index );
132- context .put (FILE , upload .fileName ());
133- context .put (IMPORTED , result .result ());
134- context .reroute (DONE );
135- } else {
136- context .put (MESSAGE , traceToText (result .cause ()));
137- logger .log (Level .SEVERE , String .format ("Failed to parse file '%s'." ,
138- upload .fileName ()), result .cause ());
139- context .reroute (ERROR );
140- }
141- }));
142- });
127+ parse (upload .uploadedFileName (), params , upload .fileName (), onComplete (context , upload .fileName ()));
143128 } else {
144129 context .put (MESSAGE , NO_FILE_WAS_UPLOADED );
145130 context .reroute (ERROR );
@@ -148,60 +133,65 @@ private void setRouterAPI(Router router) {
148133 }
149134
150135 /**
151- * converts a throwables stack trace into a string .
136+ * Creates a future that is called when the import completes either successfully or by an error .
152137 *
153- * @param throwable the throwable to be converted .
154- * @return a textual representation of the throwables trace,
155- * may be used in the app to display errors .
138+ * @param context the routing context the upload was initiated from .
139+ * @param fileName the file name of the file that was uplaoded.
140+ * @return a future to be completed when {@link #parse(String, MultiMap, String, Future)} completes} .
156141 */
157- private String traceToText (Throwable throwable ) {
158- StringWriter writer = new StringWriter ();
159- throwable .printStackTrace (new PrintWriter (writer ));
160- return writer .toString ();
142+ private Future <Integer > onComplete (RoutingContext context , String fileName ) {
143+ // when the file has been read from disk, parsed and imported.
144+ return Future .<Integer >future ().setHandler (result -> {
145+ if (result .succeeded ()) {
146+ String index = context .request ().params ().get (INDEX );
147+ logger .info (String .format ("Imported file '%s' successfully into '%s'." , fileName , index ));
148+
149+ context .put (INDEX , index );
150+ context .put (FILE , fileName );
151+ context .put (IMPORTED , result .result ());
152+ context .reroute (DONE );
153+ } else {
154+ // oops: the import has failed, make sure to emit the full error to clients.
155+ context .put (MESSAGE , ApplicationLogger .traceToText (result .cause ()));
156+ logger .log (Level .SEVERE , String .format ("Failed to parse file '%s'." , fileName ), result .cause ());
157+ context .reroute (ERROR );
158+ }
159+ });
161160 }
162161
163162 /**
164163 * Parses a file upload request, converting the excel payload into json and waits
165164 * for elasticsearch to complete indexing.
166165 *
167- * @param buffer contains the excel file data
166+ * @param uploadedFileName the actual file on disk that contains the uploaded file.
168167 * @param params upload parameters
169168 * @param fileName the name of the uploaded file
170169 * @param future callback on completed parse + indexing.
171170 */
172- private void parse (Buffer buffer , MultiMap params , String fileName , Future <Integer > future ) {
173- vertx .< Integer > executeBlocking (blocking -> {
171+ private void parse (String uploadedFileName , MultiMap params , String fileName , Future <Integer > future ) {
172+ vertx .executeBlocking (blocking -> {
174173 try {
175- int columnRow = Integer .parseInt (params .get (OFFSET ));
176- FileParser parser = new FileParser (buffer .getBytes (), columnRow , fileName );
177- JsonObject data = parser .toImportable (
178- params .get (INDEX ),
179- getMappingByParams (params ),
180- params .get (OPTIONS ).equals (CLEAR ));
181-
182- data .put (UPLOAD_ID , params .get (UPLOAD_ID ));
183-
184- vertx .eventBus ().send (INDEXING_ELASTICSEARCH , data , new DeliveryOptions ().setSendTimeout (INDEXING_TIMEOUT ),
185- reply -> {
186- if (reply .succeeded ()) {
187- blocking .complete (parser .getImportedItems ());
188- } else {
189- blocking .fail (reply .cause ());
190- }
191- });
192- } catch (ParserException | NumberFormatException e ) {
174+ ImportEvent event = ImportEvent .fromParams (params );
175+ FileParser parser = new FileParser (new File (uploadedFileName ), event .getOffset (), fileName );
176+ parser .assertFileParsable ();
177+ event .setParser (parser );
178+
179+ // submit an import event.
180+ vertx .eventBus ().send (INDEXING_ELASTICSEARCH , event , getDeliveryOptions (),
181+ reply -> {
182+ if (reply .succeeded ()) {
183+ blocking .complete (parser .getNumberOfElements ());
184+ } else {
185+ blocking .fail (reply .cause ());
186+ }
187+ });
188+ } catch (FileNotFoundException | ParserException | NumberFormatException e ) {
193189 blocking .fail (e );
194190 }
195- }, false , done -> {
196- if (done .succeeded ()) {
197- future .complete (done .result ());
198- } else {
199- future .fail (done .cause ());
200- }
201- });
191+ }, false , future );
202192 }
203193
204- private String getMappingByParams ( MultiMap params ) {
205- return ( params . get ( MAPPING ). length () == 0 ) ? "default" : params . get ( MAPPING );
194+ private DeliveryOptions getDeliveryOptions ( ) {
195+ return new DeliveryOptions (). setSendTimeout ( INDEXING_TIMEOUT );
206196 }
207- }
197+ }
0 commit comments