From 4258c23bfde355e47212be7ca92ee71fde278d9b Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Fri, 14 Nov 2025 18:15:09 +0100 Subject: [PATCH 1/4] PoC proxy with stream support --- tools/server/server-http.cpp | 84 ++++++++++++++++++++++++++++++++++++ tools/server/server-http.h | 26 +++++++++++ tools/server/server.cpp | 5 ++- 3 files changed, 114 insertions(+), 1 deletion(-) diff --git a/tools/server/server-http.cpp b/tools/server/server-http.cpp index 8200dbc7a17..7f036959ce0 100644 --- a/tools/server/server-http.cpp +++ b/tools/server/server-http.cpp @@ -371,3 +371,87 @@ void server_http_context::post(const std::string & path, server_http_context::ha }); } +// +// server_http_client +// + +class server_http_client::Impl { +public: + std::unique_ptr cli; +}; + +void server_http_client::ImplDeleter::operator()(Impl *p) { + delete p; +} + +server_http_client::server_http_client() + : pimpl(std::unique_ptr(new server_http_client::Impl())) +{} + +std::unique_ptr server_http_client::make_request( + const std::string & method, + int port, + const std::string & path, + const std::map & headers, + const std::string & body) { + + server_http_client * self = new server_http_client(); + + // wire up the receive end of the queue + // TODO: handle case where one of the end terminates unexpectedly + self->next = [self](std::string & out) -> bool { + std::unique_lock lock(self->mutex); + while (self->queue.empty() && self->is_running) { + lock.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + lock.lock(); + } + if (!self->is_running && self->queue.empty()) { + return false; + } + out = std::move(self->queue.front()); + self->queue.pop(); + return true; + }; + + // wire up the HTTP client + self->pimpl->cli = std::make_unique("127.0.0.1", port); + httplib::ResponseHandler response_handler = [self](const httplib::Response & response) { + std::unique_lock lock(self->mutex); + self->status = response.status; + for (const auto & [key, value] : response.headers) { + self->headers[key] = value; + } + self->queue.emplace(""); // flush the headers + return true; + }; + httplib::ContentReceiverWithProgress content_receiver = [self](const char * data, size_t data_length, size_t, size_t) { + std::unique_lock lock(self->mutex); + if (!self->is_running) { + return false; + } + self->queue.emplace(std::string(data, data_length)); + return true; + }; + httplib::Request req; + req.method = method; + req.path = path; + for (const auto & [key, value] : headers) { + req.set_header(key, value); + } + req.body = body; + req.response_handler = response_handler; + req.content_receiver = content_receiver; + + self->thread = std::thread([self, req]() { + self->pimpl->cli->send(std::move(req)); + self->is_running = false; + }); + self->thread.detach(); + + // wait for the first chunk (headers) + std::string output; + self->next(output); // ignore output, it's just to flush headers + + return std::unique_ptr(self); +} diff --git a/tools/server/server-http.h b/tools/server/server-http.h index 71fd5c83808..5d87ea15deb 100644 --- a/tools/server/server-http.h +++ b/tools/server/server-http.h @@ -7,6 +7,8 @@ #include #include #include +#include +#include // generator-like API for HTTP response generation // this object response with one of the 2 modes: @@ -70,3 +72,27 @@ struct server_http_context { void get(const std::string &, handler_t); void post(const std::string &, handler_t); }; + +// simple HTTP client with blocking API +struct server_http_client : server_http_res { + class Impl; + struct ImplDeleter + { + void operator()(Impl *p); + }; + std::unique_ptr pimpl; +public: + server_http_client(); + ~server_http_client() = default; + static std::unique_ptr make_request( + const std::string & method, + int port, + const std::string & path, + const std::map & headers, + const std::string & body); +private: + std::thread thread; + std::mutex mutex; + std::queue queue; + bool is_running = true; +}; diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 9d41f16aec1..640f19d4f6b 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5557,7 +5557,7 @@ int main(int argc, char ** argv) { ctx_http.post("/completions", ex_wrapper(routes.post_completions)); ctx_http.post("/v1/completions", ex_wrapper(routes.post_completions_oai)); ctx_http.post("/chat/completions", ex_wrapper(routes.post_chat_completions)); - ctx_http.post("/v1/chat/completions", ex_wrapper(routes.post_chat_completions)); + //ctx_http.post("/v1/chat/completions", ex_wrapper(routes.post_chat_completions)); ctx_http.post("/api/chat", ex_wrapper(routes.post_chat_completions)); // ollama specific endpoint ctx_http.post("/infill", ex_wrapper(routes.post_infill)); ctx_http.post("/embedding", ex_wrapper(routes.post_embeddings)); // legacy @@ -5576,6 +5576,9 @@ int main(int argc, char ** argv) { // Save & load slots ctx_http.get ("/slots", ex_wrapper(routes.get_slots)); ctx_http.post("/slots/:id_slot", ex_wrapper(routes.post_slots)); + ctx_http.post("/v1/chat/completions", [](const server_http_req & req) { + return server_http_client::make_request("POST", 8080, "/chat/completions", {}, req.body); + }); // // Start the server From 9afb23b3945a99d721881095ef38dc8ce4cd5c67 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Sat, 15 Nov 2025 14:04:31 +0100 Subject: [PATCH 2/4] handle all error cases --- tools/server/server-http.cpp | 119 +++++++++++++++++------------------ tools/server/server-http.h | 79 ++++++++++++++++++----- tools/server/server.cpp | 4 +- 3 files changed, 124 insertions(+), 78 deletions(-) diff --git a/tools/server/server-http.cpp b/tools/server/server-http.cpp index 7f036959ce0..8626309928e 100644 --- a/tools/server/server-http.cpp +++ b/tools/server/server-http.cpp @@ -375,83 +375,82 @@ void server_http_context::post(const std::string & path, server_http_context::ha // server_http_client // -class server_http_client::Impl { -public: - std::unique_ptr cli; -}; - -void server_http_client::ImplDeleter::operator()(Impl *p) { - delete p; -} - -server_http_client::server_http_client() - : pimpl(std::unique_ptr(new server_http_client::Impl())) -{} - -std::unique_ptr server_http_client::make_request( +server_http_client::server_http_client( const std::string & method, int port, const std::string & path, const std::map & headers, - const std::string & body) { - - server_http_client * self = new server_http_client(); - - // wire up the receive end of the queue - // TODO: handle case where one of the end terminates unexpectedly - self->next = [self](std::string & out) -> bool { - std::unique_lock lock(self->mutex); - while (self->queue.empty() && self->is_running) { - lock.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - lock.lock(); - } - if (!self->is_running && self->queue.empty()) { - return false; + const std::string & body, + const std::function should_stop) { + // shared between reader and writer threads + auto cli = std::make_shared("127.0.0.1", port); + auto pipe = std::make_shared>(); + + // setup Client + cli->set_connection_timeout(0, 200000); // 200 milliseconds + this->status = 500; // to be overwritten upon response + this->cleanup = [pipe]() { + pipe->close_read(); + pipe->close_write(); + }; + + // wire up the receive end of the pipe + this->next = [pipe, should_stop](std::string & out) -> bool { + msg_t msg; + bool has_next = pipe->read(msg, should_stop); + if (!msg.data.empty()) { + out = std::move(msg.data); } - out = std::move(self->queue.front()); - self->queue.pop(); - return true; + return has_next; }; // wire up the HTTP client - self->pimpl->cli = std::make_unique("127.0.0.1", port); - httplib::ResponseHandler response_handler = [self](const httplib::Response & response) { - std::unique_lock lock(self->mutex); - self->status = response.status; + // note: do NOT capture `this` pointer, as it may be destroyed before the thread ends + httplib::ResponseHandler response_handler = [pipe, cli](const httplib::Response & response) { + msg_t msg; + msg.status = response.status; for (const auto & [key, value] : response.headers) { - self->headers[key] = value; + msg.headers[key] = value; } - self->queue.emplace(""); // flush the headers + pipe->write(std::move(msg)); // send headers first return true; }; - httplib::ContentReceiverWithProgress content_receiver = [self](const char * data, size_t data_length, size_t, size_t) { - std::unique_lock lock(self->mutex); - if (!self->is_running) { - return false; - } - self->queue.emplace(std::string(data, data_length)); - return true; + httplib::ContentReceiverWithProgress content_receiver = [pipe](const char * data, size_t data_length, size_t, size_t) { + return pipe->write({{}, 0, std::string(data, data_length)}); // send data chunks }; + + // prepare the request to destination server httplib::Request req; - req.method = method; - req.path = path; - for (const auto & [key, value] : headers) { - req.set_header(key, value); + { + req.method = method; + req.path = path; + for (const auto & [key, value] : headers) { + req.set_header(key, value); + } + req.body = body; + req.response_handler = response_handler; + req.content_receiver = content_receiver; } - req.body = body; - req.response_handler = response_handler; - req.content_receiver = content_receiver; - self->thread = std::thread([self, req]() { - self->pimpl->cli->send(std::move(req)); - self->is_running = false; + // start the proxy thread + SRV_DBG("start proxy thread %s %s\n", req.method.c_str(), req.path.c_str()); + this->thread = std::thread([cli, pipe, req]() { + auto result = cli->send(std::move(req)); + if (result.error() != httplib::Error::Success) { + auto err_str = httplib::to_string(result.error()); + SRV_ERR("http client error: %s\n", err_str.c_str()); + pipe->write({{}, 500, ""}); // header + pipe->write({{}, 0, "proxy error: " + err_str}); // body + } + pipe->close_write(); // signal EOF to reader + SRV_DBG("%s", "client request thread ended\n"); }); - self->thread.detach(); + this->thread.detach(); // wait for the first chunk (headers) - std::string output; - self->next(output); // ignore output, it's just to flush headers - - return std::unique_ptr(self); + msg_t header; + pipe->read(header, should_stop); + SRV_DBG("%s", "received response headers\n"); + this->status = header.status; + this->headers = header.headers; } diff --git a/tools/server/server-http.h b/tools/server/server-http.h index 5d87ea15deb..40bcba30cc9 100644 --- a/tools/server/server-http.h +++ b/tools/server/server-http.h @@ -75,24 +75,69 @@ struct server_http_context { // simple HTTP client with blocking API struct server_http_client : server_http_res { - class Impl; - struct ImplDeleter - { - void operator()(Impl *p); - }; - std::unique_ptr pimpl; + std::function cleanup = nullptr; public: - server_http_client(); - ~server_http_client() = default; - static std::unique_ptr make_request( - const std::string & method, - int port, - const std::string & path, - const std::map & headers, - const std::string & body); + server_http_client(const std::string & method, + int port, + const std::string & path, + const std::map & headers, + const std::string & body, + const std::function should_stop); + ~server_http_client() { + if (cleanup) { + cleanup(); + } + } private: std::thread thread; - std::mutex mutex; - std::queue queue; - bool is_running = true; + struct msg_t { + std::map headers; + int status = 0; + std::string data; + }; + // simple implementation of a pipe + template + struct pipe_t { + std::mutex mutex; + std::condition_variable cv; + std::queue queue; + std::atomic writer_closed{false}; + std::atomic reader_closed{false}; + void close_write() { + writer_closed.store(true); + cv.notify_all(); + } + void close_read() { + reader_closed.store(true); + cv.notify_all(); + } + bool read(T & output, const std::function & should_stop) { + std::unique_lock lk(mutex); + constexpr auto poll_interval = std::chrono::milliseconds(500); + while (true) { + if (!queue.empty()) { + output = std::move(queue.front()); + queue.pop(); + return true; + } + if (writer_closed.load()) { + return false; // clean EOF + } + if (should_stop()) { + close_read(); // signal broken pipe to writer + return false; // cancelled / reader no longer alive + } + cv.wait_for(lk, poll_interval); + } + } + bool write(T && data) { + std::lock_guard lk(mutex); + if (reader_closed.load()) { + return false; // broken pipe + } + queue.push(std::move(data)); + cv.notify_one(); + return true; + } + }; }; diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 640f19d4f6b..8a05ef2d72f 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5577,7 +5577,9 @@ int main(int argc, char ** argv) { ctx_http.get ("/slots", ex_wrapper(routes.get_slots)); ctx_http.post("/slots/:id_slot", ex_wrapper(routes.post_slots)); ctx_http.post("/v1/chat/completions", [](const server_http_req & req) { - return server_http_client::make_request("POST", 8080, "/chat/completions", {}, req.body); + return std::unique_ptr(new server_http_client( + "POST", 8080, "/chat/completions", {}, req.body, req.should_stop + )); }); // From 289aa9a3a1152c39afffb368bbf2f0691193ac22 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Sat, 15 Nov 2025 21:33:50 +0100 Subject: [PATCH 3/4] add req.headers --- tools/server/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 5f8ef31ab92..115cab4876e 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5583,7 +5583,7 @@ int main(int argc, char ** argv) { ctx_http.post("/slots/:id_slot", ex_wrapper(routes.post_slots)); ctx_http.post("/v1/chat/completions", [](const server_http_req & req) { return std::unique_ptr(new server_http_client( - "POST", 8080, "/chat/completions", {}, req.body, req.should_stop + "POST", 8080, "/chat/completions", req.headers, req.body, req.should_stop )); }); From dc6592c0e258cccdb851bb83173a7ebc07c892a4 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Sat, 15 Nov 2025 21:37:38 +0100 Subject: [PATCH 4/4] customizable proxy host --- tools/server/server-http.cpp | 3 ++- tools/server/server-http.h | 1 + tools/server/server.cpp | 4 ++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tools/server/server-http.cpp b/tools/server/server-http.cpp index 2a4d3bf45e2..139643fc387 100644 --- a/tools/server/server-http.cpp +++ b/tools/server/server-http.cpp @@ -387,13 +387,14 @@ void server_http_context::post(const std::string & path, server_http_context::ha server_http_client::server_http_client( const std::string & method, + const std::string & host, int port, const std::string & path, const std::map & headers, const std::string & body, const std::function should_stop) { // shared between reader and writer threads - auto cli = std::make_shared("127.0.0.1", port); + auto cli = std::make_shared(host, port); auto pipe = std::make_shared>(); // setup Client diff --git a/tools/server/server-http.h b/tools/server/server-http.h index 5ad846d674e..0198f018931 100644 --- a/tools/server/server-http.h +++ b/tools/server/server-http.h @@ -82,6 +82,7 @@ struct server_http_client : server_http_res { std::function cleanup = nullptr; public: server_http_client(const std::string & method, + const std::string & host, int port, const std::string & path, const std::map & headers, diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 115cab4876e..ca3f3e32633 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5581,9 +5581,9 @@ int main(int argc, char ** argv) { // Save & load slots ctx_http.get ("/slots", ex_wrapper(routes.get_slots)); ctx_http.post("/slots/:id_slot", ex_wrapper(routes.post_slots)); - ctx_http.post("/v1/chat/completions", [](const server_http_req & req) { + ctx_http.post("/v1/chat/completions", [¶ms](const server_http_req & req) { return std::unique_ptr(new server_http_client( - "POST", 8080, "/chat/completions", req.headers, req.body, req.should_stop + "POST", params.hostname, 8080, "/chat/completions", req.headers, req.body, req.should_stop )); });