11package com .codingchili .Model ;
22
3- import java .util .concurrent .CountDownLatch ;
4- import java .util .logging .Logger ;
5-
6- import io .vertx .core .AbstractVerticle ;
7- import io .vertx .core .Context ;
8- import io .vertx .core .Future ;
9- import io .vertx .core .Vertx ;
3+ import io .vertx .core .*;
104import io .vertx .core .json .JsonArray ;
115import io .vertx .core .json .JsonObject ;
126
7+ import java .util .logging .Logger ;
8+
139import static com .codingchili .Model .FileParser .ITEMS ;
1410
1511/**
@@ -49,35 +45,57 @@ private void startSubmitListener() {
4945 vertx .eventBus ().consumer (Configuration .INDEXING_ELASTICSEARCH , handler -> {
5046 JsonObject data = (JsonObject ) handler .body ();
5147 JsonArray items = data .getJsonArray (ITEMS );
52- String index = data . getString ( INDEX );
53- CountDownLatch latch = new CountDownLatch ( getBatchCount ( items . size ())) ;
48+ Future < Void > starter = Future . future ( );
49+ Future < Void > next = starter ;
5450
55- // performs one bulk request for each bucket of MAX_BATCH
51+ // performs one bulk request for each bucket of MAX_BATCH serially.
5652 for (int i = 0 ; i < items .size (); i += MAX_BATCH ) {
57- final int max = ((i + MAX_BATCH < items .size ()) ? i + MAX_BATCH : items .size ());
5853 final int current = i ;
59- vertx .createHttpClient ().post (
60- Configuration .getElasticPort (), Configuration .getElasticHost (), index + BULK )
61- .handler (response -> {
62- logger .info (String .format ("Submitted items [%d -> %d] with result [%d] %s" ,
63- current , max - 1 , response .statusCode (), response .statusMessage ()));
64-
65- response .bodyHandler (body -> {
66- latch .countDown ();
67-
68- // complete successfully when all buckets are inserted.
69- if (latch .getCount () == 0 ) {
70- handler .reply (null );
71- }
72- });
73- }).exceptionHandler (exception -> handler .fail (500 , exception .getMessage ()))
74- .end (bulkQuery (items , index , max , current ));
54+ final int max = ((current + MAX_BATCH < items .size ()) ? current + MAX_BATCH : items .size ());
55+ next = next .compose (v -> submitForIndexing (items , data .getString (INDEX ), current , max ));
7556 }
57+
58+ // when the final submission completes complete the handler.
59+ next .setHandler ((v ) -> {
60+ if (v .succeeded ()) {
61+ handler .reply (null );
62+ } else {
63+ handler .fail (500 , v .cause ().getMessage ());
64+ }
65+ });
66+ starter .complete ();
7667 });
7768 }
7869
70+ /**
71+ * Submits a subset of the given json array for indexing.
72+ *
73+ * @param items items to be indexed
74+ * @param index the name of the index to use
75+ * @param current the low bound for the given json items to import
76+ * @param max the high bound for the given json items to import
77+ * @return a future completed when the indexing of the specified elements have completed.
78+ */
79+ private Future <Void > submitForIndexing (JsonArray items , String index , int current , int max ) {
80+ Future <Void > future = Future .future ();
81+ vertx .createHttpClient ().post (
82+ Configuration .getElasticPort (), Configuration .getElasticHost (), index + BULK )
83+ .handler (response -> response .bodyHandler (body -> {
84+
85+ float percent = (max * 1.0f / items .size ()) * 100 ;
86+ logger .info (
87+ String .format ("Submitted items [%d -> %d] of %d with result [%d] %s into '%s' [%.1f%%]" ,
88+ current , max - 1 , items .size (), response .statusCode (), response .statusMessage (), index , percent ));
89+
90+ future .complete ();
91+ })).exceptionHandler (exception -> future .fail (exception .getMessage ()))
92+ .end (bulkQuery (items , index , max , current ));
93+ return future ;
94+ }
95+
7996 /**
8097 * Determines the number of batches to bulk insert.
98+ *
8199 * @param size the total number of items to insert
82100 * @return the number of batches required to submit the size
83101 */
@@ -87,9 +105,10 @@ private int getBatchCount(int size) {
87105
88106 /**
89107 * Builds a bulk query for insertion into elasticsearch
90- * @param list full list of all elements
91- * @param index the current item anchor
92- * @param max max upper bound of items to include in the bulk
108+ *
109+ * @param list full list of all elements
110+ * @param index the current item anchor
111+ * @param max max upper bound of items to include in the bulk
93112 * @param current lower bound of items to include in the bulk
94113 * @return a payload encoded as json-lines.
95114 */
@@ -111,21 +130,23 @@ private String bulkQuery(JsonArray list, String index, int max, int current) {
111130 /**
112131 * Polls the elasticsearch server for version information. Sets connected if the server
113132 * is available.
133+ *
114134 * @param id the id of the timer that triggered the request, not used.
115135 */
116136 private void pollElasticServer (Long id ) {
117137 vertx .createHttpClient ().get (Configuration .getElasticPort (), Configuration .getElasticHost (), "/" ,
118138 response -> response .bodyHandler (buffer -> {
119- version = buffer .toJsonObject ().getJsonObject ("version" ).getString ("number" );
120- if (!connected ) {
121- logger .info (String .format ("Connected to elasticsearch server %s at %s:%d" ,
122- version , Configuration .getElasticHost (), Configuration .getElasticPort ()));
123- connected = true ;
124- vertx .eventBus ().send (ES_STATUS , connected );
125- }
126- })).exceptionHandler (event -> {
139+ version = buffer .toJsonObject ().getJsonObject ("version" ).getString ("number" );
140+ if (!connected ) {
141+ logger .info (String .format ("Connected to elasticsearch server %s at %s:%d" ,
142+ version , Configuration .getElasticHost (), Configuration .getElasticPort ()));
143+ connected = true ;
144+ vertx .eventBus ().send (ES_STATUS , connected );
145+ }
146+ })).exceptionHandler (event -> {
127147 connected = false ;
128148 logger .severe (event .getMessage ());
149+ vertx .eventBus ().send (ES_STATUS , connected );
129150 }).end ();
130151 }
131152
0 commit comments