Skip to content

Commit af28d46

Browse files
committed
Streaming URI fix for 2.0.5, bug #1201
1 parent 9219e10 commit af28d46

File tree

1 file changed

+15
-12
lines changed

1 file changed

+15
-12
lines changed

marklogic-data-hub/src/main/java/com/marklogic/hub/collector/impl/CollectorImpl.java

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.springframework.http.HttpMethod;
2323
import org.springframework.http.MediaType;
2424
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
25+
import org.springframework.web.client.RequestCallback;
26+
import org.springframework.web.client.ResponseExtractor;
2527
import org.springframework.web.client.RestTemplate;
2628

2729
import javax.naming.InvalidNameException;
@@ -40,10 +42,7 @@
4042
import java.security.cert.Certificate;
4143
import java.security.cert.CertificateParsingException;
4244
import java.security.cert.X509Certificate;
43-
import java.util.ArrayList;
44-
import java.util.Collection;
45-
import java.util.List;
46-
import java.util.Map;
45+
import java.util.*;
4746

4847
public class CollectorImpl implements Collector {
4948
private DatabaseClient client = null;
@@ -118,18 +117,22 @@ public DiskQueue<String> run(String jobId, String entity, String flow, int threa
118117
uriString += "&options=" + URLEncoder.encode(objectMapper.writeValueAsString(options), "UTF-8");
119118
}
120119
URI uri = new URI(uriString);
121-
HttpHeaders headers = new HttpHeaders();
122-
headers.set("Accept", MediaType.TEXT_PLAIN_VALUE);
123-
Resource responseBody = template.exchange(uri, HttpMethod.GET, new HttpEntity<>(headers), Resource.class).getBody();
124-
if(responseBody != null) {
125-
InputStream inputStream = responseBody.getInputStream();
126-
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
120+
RequestCallback requestCallback = request -> request.getHeaders()
121+
.setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL));
122+
123+
// Streams the response instead of loading it all in memory
124+
ResponseExtractor<Void> responseExtractor = response -> {
125+
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(response.getBody()));
127126
String line;
128127
while((line = bufferedReader.readLine()) != null) {
129128
results.add(line);
130129
}
131-
inputStream.close();
132-
}
130+
bufferedReader.close();
131+
return null;
132+
};
133+
134+
template.execute(uri, HttpMethod.GET, requestCallback, responseExtractor);
135+
133136
return results;
134137
}
135138
catch(Exception e) {

0 commit comments

Comments
 (0)