11package com .codingchili .Controller ;
22
3- import com .codingchili .ApplicationLauncher ;
3+ import java .io .PrintWriter ;
4+ import java .io .StringWriter ;
5+ import java .util .Iterator ;
6+ import java .util .logging .Level ;
7+ import java .util .logging .Logger ;
8+
49import com .codingchili .Model .Configuration ;
510import com .codingchili .Model .ElasticWriter ;
611import com .codingchili .Model .FileParser ;
712import com .codingchili .Model .ParserException ;
8- import io .vertx .core .*;
13+ import static com .codingchili .ApplicationLauncher .VERSION ;
14+ import static com .codingchili .Model .Configuration .INDEXING_ELASTICSEARCH ;
15+ import static com .codingchili .Model .ElasticWriter .*;
16+ import static com .codingchili .Model .FileParser .INDEX ;
17+
18+ import io .vertx .core .AbstractVerticle ;
19+ import io .vertx .core .Context ;
20+ import io .vertx .core .Future ;
21+ import io .vertx .core .MultiMap ;
22+ import io .vertx .core .Vertx ;
923import io .vertx .core .buffer .Buffer ;
1024import io .vertx .core .eventbus .DeliveryOptions ;
25+ import io .vertx .core .eventbus .MessageConsumer ;
26+ import io .vertx .core .json .JsonObject ;
1127import io .vertx .ext .web .FileUpload ;
1228import io .vertx .ext .web .Router ;
1329import io .vertx .ext .web .handler .BodyHandler ;
1430import io .vertx .ext .web .handler .StaticHandler ;
1531import io .vertx .ext .web .handler .TemplateHandler ;
1632import io .vertx .ext .web .templ .JadeTemplateEngine ;
1733
18- import java .io .PrintWriter ;
19- import java .io .StringWriter ;
20- import java .util .Iterator ;
21- import java .util .logging .Level ;
22- import java .util .logging .Logger ;
23-
24- import static com .codingchili .Model .ElasticWriter .INDEXING_TIMEOUT ;
25- import static com .codingchili .Model .FileParser .INDEX ;
26-
2734/**
2835 * @author Robin Duda
29- * <p>
30- * Manages the web interface and handles file uploads.
36+ * <p>
37+ * Manages the web interface and handles file uploads.
3138 */
3239public class Website extends AbstractVerticle {
3340 public static final String MAPPING = "mapping" ;
41+ public static final String UPLOAD_ID = "uploadId" ;
3442 private Logger logger = Logger .getLogger (getClass ().getName ());
3543 private static final String DONE = "/done" ;
3644 private static final String ERROR = "/error" ;
@@ -57,7 +65,7 @@ public void start(Future<Void> start) {
5765
5866 // adds values used in the template to all routes.
5967 router .route ("/*" ).handler (context -> {
60- context .put ("version" , ApplicationLauncher . version );
68+ context .put ("version" , VERSION );
6169 context .put ("esVersion" , ElasticWriter .getElasticVersion ());
6270 context .put ("esURL" , Configuration .getElasticURL ());
6371 context .put ("connected" , ElasticWriter .isConnected ());
@@ -66,7 +74,25 @@ public void start(Future<Void> start) {
6674
6775 router .route ("/*" ).handler (TemplateHandler .create (JadeTemplateEngine .create ()));
6876
69- vertx .createHttpServer ().requestHandler (router ::accept ).listen (Configuration .getWebPort (), done -> {
77+ vertx .createHttpServer ().websocketHandler (websock -> {
78+ websock .writeFinalTextFrame (new JsonObject ().put ("message" , "websocket connected to excelastic " + VERSION +
79+ " using ElasticSearch " + getElasticVersion ()).encode ());
80+
81+ Atomic <String > uploadId = new Atomic <>("" );
82+ MessageConsumer <JsonObject > consumer = vertx .eventBus ().consumer (IMPORT_PROGRESS , data -> {
83+ try {
84+ if (uploadId .get ().equals (data .body ().getString (UPLOAD_ID ))) {
85+ websock .writeFinalTextFrame (data .body ().encode ());
86+ }
87+ } catch (Throwable e ) {
88+ websock .close ();
89+ }
90+ });
91+ websock .handler (handler -> uploadId .set (handler .toJsonObject ().getString (UPLOAD_ID )));
92+ websock .closeHandler (closed -> consumer .unregister ());
93+ websock .exceptionHandler (sock -> consumer .unregister ());
94+
95+ }).requestHandler (router ::accept ).listen (Configuration .getWebPort (), done -> {
7096 if (done .succeeded ()) {
7197 Configuration .setWebPort (done .result ().actualPort ());
7298 logger .info ("Started website on port " + Configuration .getWebPort ());
@@ -79,35 +105,37 @@ public void start(Future<Void> start) {
79105
80106 /**
81107 * Adds the upload route to the given router
108+ *
82109 * @param router the upload route is added to the given router
83110 */
84111 private void setRouterAPI (Router router ) {
85112 router .route ("/api/upload" ).handler (context -> {
86113 Iterator <FileUpload > iterator = context .fileUploads ().iterator ();
87114
88115 if (iterator .hasNext ()) {
89- logger .info ("Receiving uploaded file.. " );
116+ MultiMap params = context .request ().params ();
117+ logger .info ("Receiving uploaded file with request id " + params .get (UPLOAD_ID ));
90118 FileUpload upload = context .fileUploads ().iterator ().next ();
91119
92120 vertx .fileSystem ().readFile (upload .uploadedFileName (), file -> {
93- parse (file .result (), context . request (). params () , upload .fileName (),
94- Future .<Integer >future ().setHandler (result -> {
95- if (result .succeeded ()) {
96- String index = context .request ().params ().get (INDEX );
97- logger .info (String .format ("Imported file '%s' successfully into '%s'." ,
98- upload .fileName (), index ));
99-
100- context .put (INDEX , index );
101- context .put (FILE , upload .fileName ());
102- context .put (IMPORTED , result .result ());
103- context .reroute (DONE );
104- } else {
105- context .put (MESSAGE , traceToText (result .cause ()));
106- logger .log (Level .SEVERE , String .format ("Failed to parse file '%s'." ,
107- upload .fileName ()), result .cause ());
108- context .reroute (ERROR );
109- }
110- }));
121+ parse (file .result (), params , upload .fileName (),
122+ Future .<Integer >future ().setHandler (result -> {
123+ if (result .succeeded ()) {
124+ String index = context .request ().params ().get (INDEX );
125+ logger .info (String .format ("Imported file '%s' successfully into '%s'." ,
126+ upload .fileName (), index ));
127+
128+ context .put (INDEX , index );
129+ context .put (FILE , upload .fileName ());
130+ context .put (IMPORTED , result .result ());
131+ context .reroute (DONE );
132+ } else {
133+ context .put (MESSAGE , traceToText (result .cause ()));
134+ logger .log (Level .SEVERE , String .format ("Failed to parse file '%s'." ,
135+ upload .fileName ()), result .cause ());
136+ context .reroute (ERROR );
137+ }
138+ }));
111139 });
112140 } else {
113141 context .put (MESSAGE , NO_FILE_WAS_UPLOADED );
@@ -118,6 +146,7 @@ private void setRouterAPI(Router router) {
118146
119147 /**
120148 * converts a throwables stack trace into a string.
149+ *
121150 * @param throwable the throwable to be converted.
122151 * @return a textual representation of the throwables trace,
123152 * may be used in the app to display errors.
@@ -131,26 +160,28 @@ private String traceToText(Throwable throwable) {
131160 /**
132161 * Parses a file upload request, converting the excel payload into json and waits
133162 * for elasticsearch to complete indexing.
134- * @param buffer contains the excel file data
135- * @param params upload parameters
163+ *
164+ * @param buffer contains the excel file data
165+ * @param params upload parameters
136166 * @param fileName the name of the uploaded file
137- * @param future callback on completed parse + indexing.
167+ * @param future callback on completed parse + indexing.
138168 */
139169 private void parse (Buffer buffer , MultiMap params , String fileName , Future <Integer > future ) {
140170 vertx .<Integer >executeBlocking (blocking -> {
141171 try {
142172 int columnRow = Integer .parseInt (params .get (OFFSET ));
143173 FileParser parser = new FileParser (buffer .getBytes (), columnRow , fileName );
144- vertx .eventBus ().send (Configuration .INDEXING_ELASTICSEARCH ,
145- parser .toImportable (params .get (INDEX ), getMappingByParams (params )),
146- new DeliveryOptions ().setSendTimeout (INDEXING_TIMEOUT ),
147- reply -> {
148- if (reply .succeeded ()) {
149- blocking .complete (parser .getImportedItems ());
150- } else {
151- blocking .fail (reply .cause ());
152- }
153- });
174+ JsonObject data = parser .toImportable (params .get (INDEX ), getMappingByParams (params ))
175+ .put (UPLOAD_ID , params .get (UPLOAD_ID ));
176+
177+ vertx .eventBus ().send (INDEXING_ELASTICSEARCH , data , new DeliveryOptions ().setSendTimeout (INDEXING_TIMEOUT ),
178+ reply -> {
179+ if (reply .succeeded ()) {
180+ blocking .complete (parser .getImportedItems ());
181+ } else {
182+ blocking .fail (reply .cause ());
183+ }
184+ });
154185 } catch (ParserException | NumberFormatException e ) {
155186 blocking .fail (new ParserException (e ));
156187 }
0 commit comments