diff --git a/src/core/ezcurl_core.ml b/src/core/ezcurl_core.ml index 20f8b12..6fe6d9d 100644 --- a/src/core/ezcurl_core.ml +++ b/src/core/ezcurl_core.ml @@ -144,9 +144,21 @@ let _apply_config (self : t) (config : Config.t) : unit = opt_iter authmethod ~f:(Curl.set_httpauth self.curl); opt_iter username ~f:(Curl.set_username self.curl); opt_iter password ~f:(Curl.set_password self.curl); - Curl.set_nosignal self.curl (_get_no_signal ()); () +let _eq_case a b = + let low = String.lowercase_ascii in + String.equal (low a) (low b) + +let _add_header_nodup (h : string * string) (headers : _ list ref) : unit = + let sq = List.to_seq !headers in + let k, v = h in + if not (List.exists (fun (tk,tv) -> _eq_case k tk && _eq_case v tv) !headers) then + headers := h :: !headers; + +let _contains_resp_headers (h : string) (headers : string list) : bool = + List.exists (_eq_case h) headers + let _set_headers (self : t) (headers : _ list) : unit = let headers = List.map (fun (k, v) -> k ^ ": " ^ v) headers in Curl.set_httpheader self.curl headers; @@ -217,6 +229,20 @@ let string_of_meth = function let pp_meth out m = Format.pp_print_string out (string_of_meth m) +type sse_frame = { + event: string option; + id: string option; + data: string option; + retry: int option; + empties: string list; (* Lines without a ':' *) +} + +type sse_state = + | Frame of sse_frame + | End_of_stream + +type sse_callback = sse_state -> bool + module type IO = sig type 'a t @@ -237,6 +263,7 @@ module type S = sig ?range:string -> ?content:[ `String of string | `Write of bytes -> int -> int ] -> ?headers:(string * string) list -> + ?callback:[ `Sse_event of sse_callback ] -> url:string -> meth:meth -> unit -> @@ -275,6 +302,7 @@ module type S = sig ?range:string -> ?content:[ `String of string | `Write of bytes -> int -> int ] -> ?headers:(string * string) list -> + ?callback:[ `Sse_event of sse_callback ] -> url:string -> meth:meth -> write_into:#input_stream -> @@ -289,6 +317,7 @@ module type S = sig ?config:Config.t -> ?range:string -> ?headers:(string * string) list -> + ?callback:[ `Sse_event of sse_callback ] -> url:string -> unit -> (string response, Curl.curlCode * string) result io @@ -301,6 +330,7 @@ module type S = sig ?client:t -> ?config:Config.t -> ?headers:(string * string) list -> + ?callback:[ `Sse_event of sse_callback ] -> url:string -> content:[ `String of string | `Write of bytes -> int -> int ] -> unit -> @@ -315,6 +345,7 @@ module type S = sig ?config:Config.t -> ?headers:(string * string) list -> ?content:[ `String of string | `Write of bytes -> int -> int ] -> + ?callback:[ `Sse_event of sse_callback ] -> params:Curl.curlHTTPPost list -> url:string -> unit -> @@ -353,6 +384,166 @@ let mk_res (self : t) headers body : (_ response, _) result = Ok { headers; code; body; info } with Parse_error (e, msg) -> Error (e, Curl.strerror e ^ ": " ^ msg) +let sse_frame_with_event sse_f v = + { + (!sse_f) with + event = Some v; + } + +let sse_frame_with_id sse_f v = + { + event = !sse_f.event; + id = Some v; + data = !sse_f.data; + retry = !sse_f.retry; + empties = !sse_f.empties; + } + +let sse_frame_append_data sse_f v = + let data = + match !sse_f.data with + | None -> Some v + | Some vv -> Some (vv ^ "\n" ^ v) + in + { + event = !sse_f.event; + id = !sse_f.id; + data; + retry = !sse_f.retry; + empties = !sse_f.empties; + } + +let sse_frame_with_retry sse_f v = + let retry = int_of_string_opt v in + { + event = !sse_f.event; + id = !sse_f.id; + data = !sse_f.data; + retry; + empties = !sse_f.empties; + } + +let sse_frame_append_empties sse_f v = + let empties = + match !sse_f.empties with + | [] -> [ v ] + | vv -> v :: vv + in + { + event = !sse_f.event; + id = !sse_f.id; + data = !sse_f.data; + retry = !sse_f.retry; + empties; + } + +let sse_process_pair k v sse_f = + match k with + | "event" -> sse_f := sse_frame_with_event sse_f v + | "id" -> sse_f := sse_frame_with_id sse_f v + | "data" -> sse_f := sse_frame_append_data sse_f v + | "retry" -> sse_f := sse_frame_with_retry sse_f v + | "" -> () (* The field is ignored *) + | _ -> sse_f := sse_frame_append_empties sse_f k + +let sse_split_line s = + let l = String.length s in + let sq = String.to_seq s in + match Seq.find_index (fun c -> c = ':') sq with + | None -> s :: [] + | Some p -> [ String.sub s 0 p; String.sub s (p + 1) (l - p - 1) ] + +let sse_parse_line line sse_f = + match sse_split_line line with + | [ k; v ] -> + let k = String.trim k in + let v = String.trim v in + sse_process_pair k v sse_f + | [ k ] -> + let k = String.trim k in + sse_process_pair k "" sse_f + | _ -> + (); + () + +let sse_extract_next_line body = + let len = Buffer.length body in + let bf_seq = Buffer.to_seq body in + (* Search for some complete line *) + match Seq.find_index (fun c -> c = '\n') bf_seq with + (* Then no complete line available for now *) + | None -> None + (* Oh nice a complete line found *) + | Some pivot -> + (* Extract line except ending LF *) + let bf_line = Bytes.create pivot in + Buffer.blit body 0 bf_line 0 pivot; + let line = String.trim (Bytes.to_string bf_line) in + (* Now shift left the remaining after LF *) + let pivot = pivot + 1 in + let rem = len - pivot in + let bf_after = Bytes.create rem in + if rem > 0 then Buffer.blit body pivot bf_after 0 rem; + Buffer.reset body; + Buffer.add_bytes body bf_after; + (* Here the line finally *) + Some line + +let sse_parse_lines body sse_f = + let rec loop body sse_f = + match sse_extract_next_line body with + | None -> false (* Nothing for now *) + | Some line -> + (match line with + (* Ready to send event *) + | "" -> true + (* Try next line *) + | line -> + sse_parse_line line sse_f; + loop body sse_f) + in + loop body sse_f + +let sse_handle_post_write callback body sse_f = + match sse_f with + | None -> true (* Stream can continue (No SSE content) *) + | Some _sse_f -> + (match callback with + | None -> true (* Stream can continue (No SSE callback) *) + | Some (`Sse_event sse_cb) -> + let sse_cb_clean sse_cb _sse_f = + (* Send callback to user *) + let r = sse_cb (Frame !_sse_f) in + (* And reset internal event data *) + _sse_f := + { event = None; id = None; data = None; retry = None; empties = [] }; + r + in + let rec loop body sse_f = + match sse_parse_lines body sse_f with + (* Stream can continue *) + | false -> true + (* Must dispatch event now*) + | true -> + (match sse_cb_clean sse_cb sse_f with + (* Stream must close now *) + | false -> false + (* Else continue parse *) + | true -> loop body sse_f) + in + loop body _sse_f) + +let sse_handle_post_finish callback body sse_f = + match sse_f with + | None -> () + | Some _ -> + (match callback with + | None -> () + | Some (`Sse_event sse_cb) -> + let _ = sse_handle_post_write callback body sse_f in + let _ = sse_cb End_of_stream in + ()) + module Make (IO : IO) : S with type 'a io = 'a IO.t = struct open IO @@ -388,10 +579,11 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct do_cleanup: bool; mutable resp_headers: string list; mutable resp_headers_done: bool; + mutable sse_frame: sse_frame ref option; } let http_setup_ ?client ?(config = Config.default) ?range ?content - ?(headers = []) ~url ~meth () : http_state_ = + ?(headers = []) ?callback ~url ~meth () : http_state_ = let headers = ref headers in let do_cleanup, self = match client with @@ -412,6 +604,14 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct | Some size, POST _ -> Curl.set_postfieldsize self.curl size | Some size, _ -> Curl.set_infilesize self.curl size); + (* Add more pre-determined request headers depend of feature *) + (match callback with + | None -> () + | Some (`Sse_event _) -> + _add_header_nodup ("Cache-control", "no-cache") headers; + _add_header_nodup ("Accept", "text/event-stream") headers; + ()); + (* local state *) let st = { @@ -419,6 +619,7 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct client = self; resp_headers = []; resp_headers_done = false; + sse_frame = None; } in @@ -442,9 +643,27 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct Curl.set_headerfunction self.curl (fun s0 -> let s = String.trim s0 in (* Printf.printf "got header %S\n%!" s0; *) - if s0 = "\r\n" then - st.resp_headers_done <- true - else ( + if s0 = "\r\n" then ( + st.resp_headers_done <- true ; + (* Validate headers for user callback *) + match callback with + | None -> () + | Some (`Sse_event _) -> + if + _contains_resp_headers "Content-type: text/event-stream" + st.resp_headers + then + st.sse_frame <- + Some + (ref + { + event = None; + id = None; + data = None; + retry = None; + empties = []; + }) + ) else ( (* redirection: drop previous headers *) if st.resp_headers_done then ( st.resp_headers_done <- false; @@ -457,22 +676,27 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct st - let http ?(tries = 1) ?client ?config ?range ?content ?headers ~url ~meth () : + let http ?(tries = 1) ?client ?config ?range ?content ?headers ?callback + ~url ~meth () : (string response, _) result io = (* at least one attempt *) let tries = max tries 1 in let st = - http_setup_ ?client ?config ?range ?content ?headers ~url ~meth () + http_setup_ ?client ?config ?range ?content ?headers ?callback ~url ~meth () in let body = Buffer.create 64 in Curl.set_writefunction st.client.curl (fun s -> Buffer.add_string body s; - String.length s); + match sse_handle_post_write callback body st.sse_frame with + | true -> String.length s (* Continue write *) + | false -> 0xFFFFFFFE + (* Stop stream, not-forked libcurl has no CURL_WRITE_FUNC_ABORT *)); let rec loop i = IO.perform st.client.curl >>= function | Curl.CURLE_OK -> + let _ = sse_handle_post_finish callback body st.sse_frame in let r = mk_res st.client (List.rev st.resp_headers) (Buffer.contents body) in @@ -480,45 +704,64 @@ module Make (IO : IO) : S with type 'a io = 'a IO.t = struct return r | Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *) | c -> + let _ = sse_handle_post_finish callback body st.sse_frame in if st.do_cleanup then Curl.cleanup st.client.curl; return (Error (c, Curl.strerror c)) in loop tries - let http_stream ?(tries = 1) ?client ?config ?range ?content ?headers ~url - ~meth ~(write_into : #input_stream) () : (unit response, _) result io = + let http_stream ?(tries = 1) ?client ?config ?range ?content ?headers + ?callback ~url ~meth ~(write_into : #input_stream) () : (unit response, _) result io = let tries = max tries 1 in let st = - http_setup_ ?client ?config ?range ?content ?headers ~url ~meth () + http_setup_ ?client ?config ?range ?content ?headers ?callback ~url ~meth () in + let body_sse_ = + match callback with + | None -> None + | Some (`Sse_event _) -> Some (Buffer.create 64) in Curl.set_writefunction st.client.curl (fun s -> let n = String.length s in write_into#on_input (Bytes.unsafe_of_string s) 0 n; - n); + match body_sse_ with + | None -> n + | Some body -> + Buffer.add_string body s; + (match sse_handle_post_write callback body st.sse_frame with + | true -> n (* Continue write *) + | false -> 0xFFFFFFFE + (* Stop stream, not-forked libcurl has no CURL_WRITE_FUNC_ABORT *)) + ); let rec loop i = IO.perform st.client.curl >>= function | Curl.CURLE_OK -> + (match body_sse_ with + | None -> () + | Some body -> let _ = sse_handle_post_finish callback body st.sse_frame in ()); let r = mk_res st.client (List.rev st.resp_headers) () in write_into#on_close (); if st.do_cleanup then Curl.cleanup st.client.curl; return r | Curl.CURLE_AGAIN when i > 1 -> loop (i - 1) (* try again *) | c -> + (match body_sse_ with + | None -> () + | Some body -> let _ = sse_handle_post_finish callback body st.sse_frame in ()); write_into#on_close (); if st.do_cleanup then Curl.cleanup st.client.curl; return (Error (c, Curl.strerror c)) in loop tries - let get ?tries ?client ?config ?range ?headers ~url () : _ result io = - http ?tries ?client ?config ?range ?headers ~url ~meth:GET () + let get ?tries ?client ?config ?range ?headers ?callback ~url () : _ result io = + http ?tries ?client ?config ?range ?headers ?callback ~url ~meth:GET () - let post ?tries ?client ?config ?headers ?content ~params ~url () : + let post ?tries ?client ?config ?headers ?content ?callback ~params ~url () : _ result io = - http ?tries ?client ?config ?headers ?content ~url ~meth:(POST params) () + http ?tries ?client ?config ?headers ?content ?callback ~url ~meth:(POST params) () - let put ?tries ?client ?config ?headers ~url ~content () : _ result io = - http ?tries ?client ?config ?headers ~url ~content ~meth:PUT () + let put ?tries ?client ?config ?headers ?callback ~url ~content () : _ result io = + http ?tries ?client ?config ?headers ?callback ~url ~content ~meth:PUT () end diff --git a/src/core/ezcurl_core.mli b/src/core/ezcurl_core.mli index 1a3620a..cf0e65b 100644 --- a/src/core/ezcurl_core.mli +++ b/src/core/ezcurl_core.mli @@ -114,6 +114,20 @@ type meth = val pp_meth : Format.formatter -> meth -> unit val string_of_meth : meth -> string +type sse_frame = { + event: string option; + id: string option; + data: string option; + retry: int option; + empties: string list; (* Lines without a ':' *) +} + +type sse_state = + | Frame of sse_frame + | End_of_stream + +type sse_callback = sse_state -> bool + (** {2 Underlying IO Monad} *) module type IO = sig type 'a t @@ -136,6 +150,7 @@ module type S = sig ?range:string -> ?content:[ `String of string | `Write of bytes -> int -> int ] -> ?headers:(string * string) list -> + ?callback:[ `Sse_event of sse_callback ] -> url:string -> meth:meth -> unit -> @@ -158,6 +173,12 @@ module type S = sig It must return [0] when the content is entirely written, and not before. @param headers headers of the query + @param callback callback to use on received body, either + a [None] to keep normal Curl write behavior, or [`Sse_event f] + to enable {{: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events} + Server-sent events } processing, where [f] is a callback type [sse_callback] + and returns boolean to indicate if the internal write callback can + continue to proceed process or else close the incoming infinite stream. *) (** Push-based stream of bytes @@ -174,6 +195,7 @@ module type S = sig ?range:string -> ?content:[ `String of string | `Write of bytes -> int -> int ] -> ?headers:(string * string) list -> + ?callback:[ `Sse_event of sse_callback ] -> url:string -> meth:meth -> write_into:#input_stream -> @@ -191,6 +213,7 @@ module type S = sig ?config:Config.t -> ?range:string -> ?headers:(string * string) list -> + ?callback:[ `Sse_event of sse_callback ] -> url:string -> unit -> (string response, Curl.curlCode * string) result io @@ -203,6 +226,7 @@ module type S = sig ?client:t -> ?config:Config.t -> ?headers:(string * string) list -> + ?callback:[ `Sse_event of sse_callback ] -> url:string -> content:[ `String of string | `Write of bytes -> int -> int ] -> unit -> @@ -217,6 +241,7 @@ module type S = sig ?config:Config.t -> ?headers:(string * string) list -> ?content:[ `String of string | `Write of bytes -> int -> int ] -> + ?callback:[ `Sse_event of sse_callback ] -> params:Curl.curlHTTPPost list -> url:string -> unit ->