1717
1818package io .minio ;
1919
20+ import com .google .common .collect .HashMultimap ;
21+ import com .google .common .collect .Multimap ;
22+ import com .google .common .collect .Multimaps ;
2023import com .google .common .io .ByteStreams ;
2124
2225import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
5760import io .minio .messages .NotificationConfiguration ;
5861import io .minio .org .apache .commons .validator .routines .InetAddressValidator ;
5962
63+ import io .minio .notification .NotificationInfo ;
64+
6065import okhttp3 .HttpUrl ;
6166import okhttp3 .OkHttpClient ;
6267import okhttp3 .Request ;
6974import org .xmlpull .v1 .XmlPullParserException ;
7075import org .xmlpull .v1 .XmlPullParserFactory ;
7176
77+ import com .fasterxml .jackson .databind .ObjectMapper ;
78+
7279import java .io .BufferedInputStream ;
7380import java .io .IOException ;
7481import java .io .InputStream ;
@@ -815,8 +822,8 @@ public boolean verify(String hostname, SSLSession session) {
815822 * @param length Length of HTTP request body.
816823 */
817824 private Request createRequest (Method method , String bucketName , String objectName ,
818- String region , Map <String ,String > headerMap ,
819- Map <String ,String > queryParamMap , final String contentType ,
825+ String region , Multimap <String ,String > headerMap ,
826+ Multimap <String ,String > queryParamMap , final String contentType ,
820827 Object body , int length )
821828 throws InvalidBucketNameException , NoSuchAlgorithmException , InvalidKeyException , InsufficientDataException ,
822829 IOException , InternalException {
@@ -870,7 +877,7 @@ private Request createRequest(Method method, String bucketName, String objectNam
870877 }
871878
872879 if (queryParamMap != null ) {
873- for (Map .Entry <String ,String > entry : queryParamMap .entrySet ()) {
880+ for (Map .Entry <String ,String > entry : queryParamMap .entries ()) {
874881 urlBuilder .addEncodedQueryParameter (S3Escaper .encode (entry .getKey ()), S3Escaper .encode (entry .getValue ()));
875882 }
876883 }
@@ -880,7 +887,7 @@ private Request createRequest(Method method, String bucketName, String objectNam
880887 Request .Builder requestBuilder = new Request .Builder ();
881888 requestBuilder .url (url );
882889 if (headerMap != null ) {
883- for (Map .Entry <String ,String > entry : headerMap .entrySet ()) {
890+ for (Map .Entry <String ,String > entry : headerMap .entries ()) {
884891 requestBuilder .header (entry .getKey (), entry .getValue ());
885892 }
886893 }
@@ -998,9 +1005,29 @@ private HttpResponse execute(Method method, String region, String bucketName, St
9981005 throws InvalidBucketNameException , NoSuchAlgorithmException , InsufficientDataException , IOException ,
9991006 InvalidKeyException , NoResponseException , XmlPullParserException , ErrorResponseException ,
10001007 InternalException {
1001- String contentType = null ;
1008+
1009+ Multimap <String , String > queryParamMultiMap = null ;
1010+ if (queryParamMap != null ) {
1011+ queryParamMultiMap = Multimaps .forMap (queryParamMap );
1012+ }
1013+
1014+ Multimap <String , String > headerMultiMap = null ;
10021015 if (headerMap != null ) {
1003- contentType = headerMap .get ("Content-Type" );
1016+ headerMultiMap = Multimaps .forMap (headerMap );
1017+ }
1018+
1019+ return executeReq (method , region , bucketName , objectName , headerMultiMap , queryParamMultiMap , body , length );
1020+ }
1021+
1022+ private HttpResponse executeReq (Method method , String region , String bucketName , String objectName ,
1023+ Multimap <String ,String > headerMap , Multimap <String ,String > queryParamMap ,
1024+ Object body , int length )
1025+ throws InvalidBucketNameException , NoSuchAlgorithmException , InsufficientDataException , IOException ,
1026+ InvalidKeyException , NoResponseException , XmlPullParserException , ErrorResponseException ,
1027+ InternalException {
1028+ String contentType = null ;
1029+ if (headerMap != null && headerMap .get ("Content-Type" ) != null ) {
1030+ contentType = String .join (" " , headerMap .get ("Content-Type" ));
10041031 }
10051032 if (body != null && !(body instanceof InputStream || body instanceof RandomAccessFile || body instanceof byte [])) {
10061033 byte [] bytes = body .toString ().getBytes (StandardCharsets .UTF_8 );
@@ -2053,8 +2080,16 @@ public String getPresignedObjectUrl(Method method, String bucketName, String obj
20532080 body = new byte [0 ];
20542081 }
20552082
2083+ Multimap <String , String > queryParamMap = null ;
2084+ if (reqParams != null ) {
2085+ queryParamMap = HashMultimap .create ();
2086+ for (Map .Entry <String , String > m : reqParams .entrySet ()) {
2087+ queryParamMap .put (m .getKey (), m .getValue ());
2088+ }
2089+ }
2090+
20562091 String region = getRegion (bucketName );
2057- Request request = createRequest (method , bucketName , objectName , region , null , reqParams , null , body , 0 );
2092+ Request request = createRequest (method , bucketName , objectName , region , null , queryParamMap , null , body , 0 );
20582093 HttpUrl url = Signer .presignV4 (request , region , accessKey , secretKey , expires );
20592094 return url .toString ();
20602095 }
@@ -4303,6 +4338,63 @@ public void removeIncompleteUpload(String bucketName, String objectName)
43034338 }
43044339 }
43054340
4341+ /**
4342+ * Listen to bucket notifications.
4343+ *
4344+ * @param bucketName Bucket name.
4345+ * @param prefix Prefix of concerned objects events.
4346+ * @param suffix Suffix of concerned objects events.
4347+ * @param events List of events to watch.
4348+ * @param eventCallback Event handler.
4349+ *
4350+ */
4351+
4352+ public void listenBucketNotification (String bucketName , String prefix , String suffix , String [] events ,
4353+ BucketEventListener eventCallback )
4354+ throws InvalidBucketNameException , NoSuchAlgorithmException , InsufficientDataException , IOException ,
4355+ InvalidKeyException , NoResponseException , XmlPullParserException , ErrorResponseException ,
4356+ InternalException {
4357+
4358+ Multimap <String ,String > queryParamMap = HashMultimap .create ();
4359+ queryParamMap .put ("prefix" , prefix );
4360+ queryParamMap .put ("suffix" , suffix );
4361+ for (String event : events ) {
4362+ queryParamMap .put ("events" , event );
4363+ }
4364+
4365+ String bodyContent = "" ;
4366+ Scanner scanner = null ;
4367+ HttpResponse response = null ;
4368+ ObjectMapper mapper = new ObjectMapper ();
4369+
4370+ try {
4371+ response = executeReq (Method .GET , getRegion (bucketName ),
4372+ bucketName , "" , null , queryParamMap , null , 0 );
4373+ scanner = new Scanner (response .body ().charStream ());
4374+ scanner .useDelimiter ("\n " );
4375+ while (scanner .hasNext ()) {
4376+ bodyContent = scanner .next ().trim ();
4377+ if (bodyContent .equals ("" )) {
4378+ continue ;
4379+ }
4380+ NotificationInfo ni = mapper .readValue (bodyContent , NotificationInfo .class );
4381+ eventCallback .updateEvent (ni );
4382+ }
4383+ } catch (RuntimeException e ) {
4384+ throw e ;
4385+ } catch (Exception e ) {
4386+ throw e ;
4387+ } finally {
4388+ if (response != null ) {
4389+ response .body ().close ();
4390+ }
4391+ if (scanner != null ) {
4392+ scanner .close ();
4393+ }
4394+ }
4395+ }
4396+
4397+
43064398
43074399 /**
43084400 * Skips data of up to given length in given input stream.
@@ -4449,3 +4541,4 @@ public void traceOff() throws IOException {
44494541 this .traceStream = null ;
44504542 }
44514543}
4544+
0 commit comments