From dfa240045dd4c903cca608743747131433d66494 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 11 Nov 2025 18:36:17 +0100 Subject: [PATCH 1/8] server: (refactor) implement generator-based API for task results --- tools/server/server.cpp | 366 +++++++++++++++++++++------------------- tools/server/utils.hpp | 26 ++- 2 files changed, 210 insertions(+), 182 deletions(-) diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 6bd4be3cc17c4..681ff766201e4 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -684,7 +684,7 @@ struct server_task_result { } virtual bool is_stop() { // only used by server_task_result_cmpl_* - return false; + return true; } virtual int get_index() { return -1; @@ -3238,105 +3238,6 @@ struct server_context { queue_results.send(std::move(res)); } - // - // Functions to create new task(s) and receive result(s) - // - - void cancel_tasks(const std::unordered_set & id_tasks) { - std::vector cancel_tasks; - cancel_tasks.reserve(id_tasks.size()); - for (const auto & id_task : id_tasks) { - SRV_WRN("cancel task, id_task = %d\n", id_task); - - server_task task(SERVER_TASK_TYPE_CANCEL); - task.id_target = id_task; - queue_results.remove_waiting_task_id(id_task); - cancel_tasks.push_back(std::move(task)); - } - // push to beginning of the queue, so it has highest priority - queue_tasks.post(std::move(cancel_tasks), true); - } - - // receive the results from task(s) - void receive_multi_results( - const std::unordered_set & id_tasks, - const std::function&)> & result_handler, - const std::function & error_handler, - const std::function & is_connection_closed) { - std::vector results(id_tasks.size()); - for (int i = 0; i < (int)id_tasks.size(); i++) { - server_task_result_ptr result = queue_results.recv_with_timeout(id_tasks, HTTP_POLLING_SECONDS); - - if (is_connection_closed()) { - cancel_tasks(id_tasks); - return; - } - - if (result == nullptr) { - i--; // retry - continue; - } - - if (result->is_error()) { - error_handler(result->to_json()); - cancel_tasks(id_tasks); - return; - } - - GGML_ASSERT( - dynamic_cast(result.get()) != nullptr - || dynamic_cast(result.get()) != nullptr - || dynamic_cast(result.get()) != nullptr - ); - const size_t idx = result->get_index(); - GGML_ASSERT(idx < results.size() && "index out of range"); - results[idx] = std::move(result); - } - result_handler(results); - } - - // receive the results from task(s), in stream mode - void receive_cmpl_results_stream( - const std::unordered_set & id_tasks, - const std::function & result_handler, - const std::function & error_handler, - const std::function & is_connection_closed) { - size_t n_finished = 0; - while (true) { - server_task_result_ptr result = queue_results.recv_with_timeout(id_tasks, HTTP_POLLING_SECONDS); - - if (is_connection_closed()) { - cancel_tasks(id_tasks); - return; - } - - if (result == nullptr) { - continue; // retry - } - - if (result->is_error()) { - error_handler(result->to_json()); - cancel_tasks(id_tasks); - return; - } - - GGML_ASSERT( - dynamic_cast(result.get()) != nullptr - || dynamic_cast(result.get()) != nullptr - ); - if (!result_handler(result)) { - cancel_tasks(id_tasks); - break; - } - - if (result->is_stop()) { - if (++n_finished == id_tasks.size()) { - break; - } - } - } - } - // // Functions to process the task // @@ -4418,6 +4319,90 @@ struct server_context { } }; +// generator-like API for server responses +struct server_response_generator { + std::unordered_set id_tasks; + server_context & ctx_server; + size_t received_count = 0; + server_task_result_ptr error; + std::vector results; // used by all() + bool cancelled = false; + + server_response_generator(server_context & ctx_server) : ctx_server(ctx_server) {} + ~server_response_generator() { + SRV_DBG("%s", "deleting server_response_generator\n"); + stop(); + } + + void post_tasks(std::vector && tasks) { + id_tasks = server_task::get_list_id(tasks); + ctx_server.queue_results.add_waiting_tasks(tasks); + ctx_server.queue_tasks.post(std::move(tasks)); + results.resize(id_tasks.size()); + } + + bool has_next() { + return !error && received_count < id_tasks.size(); + } + + bool has_error() { + return error != nullptr; + } + + // can return nullptr for periodic check (checking if the connection is still alive) + server_task_result_ptr next() { + server_task_result_ptr result = ctx_server.queue_results.recv_with_timeout(id_tasks, HTTP_POLLING_SECONDS); + if (result != nullptr) { + if (result->is_error()) { + error = std::move(result); + // stop receiving further results on error + stop(); + return nullptr; + } + if (result->is_stop()) { + received_count++; + } + } + return result; + } + + // can return FALSE for periodic check (checking if the connection is still alive) + bool wait_for_all() { + while (has_next()) { + auto res = next(); + if (res == nullptr) { + return false; // timeout + } + const size_t idx = res->get_index(); + GGML_ASSERT(idx < results.size() && "index out of range"); + GGML_ASSERT(results[idx] == nullptr && "duplicate result received"); + results[idx] = std::move(res); + } + return true; + } + + void stop() { + ctx_server.queue_results.remove_waiting_task_ids(id_tasks); + if (has_next() && !cancelled) { + // if tasks is not finished yet, cancel them + cancelled = true; + std::vector cancel_tasks; + cancel_tasks.reserve(id_tasks.size()); + for (const auto & id_task : id_tasks) { + SRV_WRN("cancel task, id_task = %d\n", id_task); + server_task task(SERVER_TASK_TYPE_CANCEL); + task.id_target = id_task; + ctx_server.queue_results.remove_waiting_task_id(id_task); + cancel_tasks.push_back(std::move(task)); + } + // push to beginning of the queue, so it has highest priority + ctx_server.queue_tasks.post(std::move(cancel_tasks), true); + } else { + SRV_DBG("%s", "all tasks already finished, no need to cancel\n"); + } + } +}; + static void log_server_request(const httplib::Request & req, const httplib::Response & res) { // skip GH copilot requests when using default port if (req.path == "/v1/health") { @@ -5001,7 +4986,10 @@ int main(int argc, char ** argv) { GGML_ASSERT(type == SERVER_TASK_TYPE_COMPLETION || type == SERVER_TASK_TYPE_INFILL); auto completion_id = gen_chatcmplid(); - std::unordered_set task_ids; + // need to store the generator as a pointer, so that it won't be destroyed when the handle returns + // use shared_ptr as it's shared between the chunked_content_provider() and on_complete() + const auto gen = std::make_shared(ctx_server); + try { std::vector tasks; @@ -5050,9 +5038,7 @@ int main(int argc, char ** argv) { tasks.push_back(std::move(task)); } - task_ids = server_task::get_list_id(tasks); - ctx_server.queue_results.add_waiting_tasks(tasks); - ctx_server.queue_tasks.post(std::move(tasks)); + gen->post_tasks(std::move(tasks)); } catch (const std::exception & e) { res_error(res, format_error_response(e.what(), ERROR_TYPE_INVALID_REQUEST)); return; @@ -5061,54 +5047,81 @@ int main(int argc, char ** argv) { bool stream = json_value(data, "stream", false); if (!stream) { - ctx_server.receive_multi_results(task_ids, [&](std::vector & results) { - if (results.size() == 1) { - // single result - res_ok(res, results[0]->to_json()); - } else { - // multiple results (multitask) - json arr = json::array(); - for (auto & res : results) { - arr.push_back(res->to_json()); - } - res_ok(res, arr); + // non-stream, wait for the results + while (!gen->wait_for_all()) { + // periodically check for connection closed + if (is_connection_closed()) { + gen->stop(); + return; } - }, [&](const json & error_data) { - res_error(res, error_data); - }, is_connection_closed); + } + + // collect results + if (gen->has_error()) { + res_error(res, gen->error->to_json()); + return; + } else { + json arr = json::array(); + for (auto & res : gen->results) { + GGML_ASSERT(dynamic_cast(res.get()) != nullptr); + arr.push_back(res->to_json()); + } + // if single request, return single object instead of array + res_ok(res, arr.size() == 1 ? arr[0] : arr); + } - ctx_server.queue_results.remove_waiting_task_ids(task_ids); } else { - const auto chunked_content_provider = [task_ids, &ctx_server, oaicompat](size_t, httplib::DataSink & sink) { - ctx_server.receive_cmpl_results_stream(task_ids, [&](server_task_result_ptr & result) -> bool { - json res_json = result->to_json(); - if (res_json.is_array()) { - for (const auto & res : res_json) { - if (!server_sent_event(sink, res)) { - // sending failed (HTTP connection closed), cancel the generation - return false; - } - } - return true; - } else { - return server_sent_event(sink, res_json); + // in streaming mode, the first error must be treated as non-stream response + // this is to match the OAI API behavior + server_task_result_ptr first_result = gen->next(); + if (!gen->has_error()) { + res_error(res, gen->error->to_json()); + return; + } + + // next responses are streamed + json first_result_json = first_result->to_json(); + const auto chunked_content_provider = [first_result_json, gen, oaicompat](size_t, httplib::DataSink & sink) mutable -> bool { + // flush the first result as it's not an error + bool success = false; + if (!first_result_json.empty()) { + success = server_sent_event(sink, first_result_json); + first_result_json.clear(); // mark as sent + if (!success) { + return false; // sending failed, go to on_complete() } - }, [&](const json & error_data) { - server_sent_event(sink, json{{"error", error_data}}); - }, [&sink]() { - // note: do not use req.is_connection_closed here because req is already destroyed - return !sink.is_writable(); - }); - if (oaicompat != OAICOMPAT_TYPE_NONE) { - static const std::string ev_done = "data: [DONE]\n\n"; - sink.write(ev_done.data(), ev_done.size()); } - sink.done(); - return false; + + // receive subsequent results + auto result = gen->next(); + if (result == nullptr) { + // check for connection state; if false, go to on_complete() + return sink.is_writable(); + } + + // send the results + json res_json = result->to_json(); + success = server_sent_event(sink, res_json); + if (!success) { + return false; // sending failed, go to on_complete() + } + + // check if there is more data + if (!gen->has_next()) { + if (oaicompat != OAICOMPAT_TYPE_NONE) { + static const std::string ev_done = "data: [DONE]\n\n"; + sink.write(ev_done.data(), ev_done.size()); + sink.done(); + } + return false; // no more data, go to on_complete() + } + + // has next data, continue + return true; }; - auto on_complete = [task_ids, &ctx_server] (bool) { - ctx_server.queue_results.remove_waiting_task_ids(task_ids); + auto on_complete = [gen](bool) { + gen->stop(); }; res.set_chunked_content_provider("text/event-stream", chunked_content_provider, on_complete); @@ -5402,8 +5415,7 @@ int main(int argc, char ** argv) { // create and queue the task json responses = json::array(); - bool error = false; - std::unordered_set task_ids; + server_response_generator gen(ctx_server); { std::vector tasks; for (size_t i = 0; i < tokenized_prompts.size(); i++) { @@ -5419,27 +5431,27 @@ int main(int argc, char ** argv) { tasks.push_back(std::move(task)); } + gen.post_tasks(std::move(tasks)); + } - task_ids = server_task::get_list_id(tasks); - ctx_server.queue_results.add_waiting_tasks(tasks); - ctx_server.queue_tasks.post(std::move(tasks)); + // wait for the results + while (!gen.wait_for_all()) { + // periodically check for connection closed + if (req.is_connection_closed()) { + gen.stop(); + return; + } } - // get the result - ctx_server.receive_multi_results(task_ids, [&](std::vector & results) { - for (auto & res : results) { + // collect results + if (gen.has_error()) { + res_error(res, gen.error->to_json()); + return; + } else { + for (auto & res : gen.results) { GGML_ASSERT(dynamic_cast(res.get()) != nullptr); responses.push_back(res->to_json()); } - }, [&](const json & error_data) { - res_error(res, error_data); - error = true; - }, req.is_connection_closed); - - ctx_server.queue_results.remove_waiting_task_ids(task_ids); - - if (error) { - return; } // write JSON response @@ -5493,8 +5505,7 @@ int main(int argc, char ** argv) { // create and queue the task json responses = json::array(); - bool error = false; - std::unordered_set task_ids; + server_response_generator gen(ctx_server); { std::vector tasks; tasks.reserve(documents.size()); @@ -5506,24 +5517,27 @@ int main(int argc, char ** argv) { task.tokens = std::move(tmp); tasks.push_back(std::move(task)); } + gen.post_tasks(std::move(tasks)); + } - task_ids = server_task::get_list_id(tasks); - ctx_server.queue_results.add_waiting_tasks(tasks); - ctx_server.queue_tasks.post(std::move(tasks)); + // wait for the results + while (!gen.wait_for_all()) { + // periodically check for connection closed + if (req.is_connection_closed()) { + gen.stop(); + return; + } } - ctx_server.receive_multi_results(task_ids, [&](std::vector & results) { - for (auto & res : results) { + // collect results + if (gen.has_error()) { + res_error(res, gen.error->to_json()); + return; + } else { + for (auto & res : gen.results) { GGML_ASSERT(dynamic_cast(res.get()) != nullptr); responses.push_back(res->to_json()); } - }, [&](const json & error_data) { - res_error(res, error_data); - error = true; - }, req.is_connection_closed); - - if (error) { - return; } // write JSON response diff --git a/tools/server/utils.hpp b/tools/server/utils.hpp index e9d4431ddfb80..b1ecc5af5ed0a 100644 --- a/tools/server/utils.hpp +++ b/tools/server/utils.hpp @@ -453,15 +453,29 @@ static std::string tokens_to_output_formatted_string(const llama_context * ctx, return out; } +// note: if data is a json array, it will be sent as multiple events, one per item static bool server_sent_event(httplib::DataSink & sink, const json & data) { - const std::string str = - "data: " + - data.dump(-1, ' ', false, json::error_handler_t::replace) + - "\n\n"; // required by RFC 8895 - A message is terminated by a blank line (two line terminators in a row). + static auto send_single = [](httplib::DataSink & sink, const json & data) -> bool { + const std::string str = + "data: " + + data.dump(-1, ' ', false, json::error_handler_t::replace) + + "\n\n"; // required by RFC 8895 - A message is terminated by a blank line (two line terminators in a row). + + LOG_DBG("data stream, to_send: %s", str.c_str()); + return sink.write(str.c_str(), str.size()); + }; - LOG_DBG("data stream, to_send: %s", str.c_str()); + if (data.is_array()) { + for (const auto & item : data) { + if (!send_single(sink, item)) { + return false; + } + } + } else { + return send_single(sink, data); + } - return sink.write(str.c_str(), str.size()); + return true; } // From 88277d80a9d10aa7f09f3d594bf77c5953bafffb Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 11 Nov 2025 19:09:45 +0100 Subject: [PATCH 2/8] improve --- tools/server/server.cpp | 137 ++++++++++++++++++++-------------------- 1 file changed, 70 insertions(+), 67 deletions(-) diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 681ff766201e4..221720664e54b 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -4324,8 +4324,6 @@ struct server_response_generator { std::unordered_set id_tasks; server_context & ctx_server; size_t received_count = 0; - server_task_result_ptr error; - std::vector results; // used by all() bool cancelled = false; server_response_generator(server_context & ctx_server) : ctx_server(ctx_server) {} @@ -4338,47 +4336,64 @@ struct server_response_generator { id_tasks = server_task::get_list_id(tasks); ctx_server.queue_results.add_waiting_tasks(tasks); ctx_server.queue_tasks.post(std::move(tasks)); - results.resize(id_tasks.size()); } bool has_next() { - return !error && received_count < id_tasks.size(); + return !cancelled && received_count < id_tasks.size(); } - bool has_error() { - return error != nullptr; - } - - // can return nullptr for periodic check (checking if the connection is still alive) - server_task_result_ptr next() { - server_task_result_ptr result = ctx_server.queue_results.recv_with_timeout(id_tasks, HTTP_POLLING_SECONDS); - if (result != nullptr) { - if (result->is_error()) { - error = std::move(result); - // stop receiving further results on error - stop(); - return nullptr; - } - if (result->is_stop()) { - received_count++; + // return nullptr if should_stop() is true before receiving a result + // note: if one error is received, it will stop further processing and return error result + server_task_result_ptr next(const std::function & should_stop) { + while (true) { + server_task_result_ptr result = ctx_server.queue_results.recv_with_timeout(id_tasks, HTTP_POLLING_SECONDS); + if (result == nullptr) { + // timeout, check stop condition + if (should_stop()) { + SRV_DBG("%s", "stopping wait for next result due to should_stop condition\n"); + return nullptr; + } + } else { + if (result->is_error()) { + stop(); // cancel remaining tasks + SRV_DBG("%s", "received error result, stopping further processing\n"); + return result; + } + if (result->is_stop()) { + received_count++; + } + return result; } } - return result; + + // should not reach here } - // can return FALSE for periodic check (checking if the connection is still alive) - bool wait_for_all() { + struct batch_response { + bool is_terminated = false; // if true, indicates that processing was stopped before all results were received + std::vector results; + server_task_result_ptr error; // nullptr if no error + }; + + batch_response wait_for_all(const std::function & should_stop) { + batch_response batch_res; + batch_res.results.resize(id_tasks.size()); while (has_next()) { - auto res = next(); + auto res = next(should_stop); if (res == nullptr) { - return false; // timeout + batch_res.is_terminated = true; + return batch_res; + } + if (res->is_error()) { + batch_res.error = std::move(res); + return batch_res; } const size_t idx = res->get_index(); - GGML_ASSERT(idx < results.size() && "index out of range"); - GGML_ASSERT(results[idx] == nullptr && "duplicate result received"); - results[idx] = std::move(res); + GGML_ASSERT(idx < batch_res.results.size() && "index out of range"); + GGML_ASSERT(batch_res.results[idx] == nullptr && "duplicate result received"); + batch_res.results[idx] = std::move(res); } - return true; + return batch_res; } void stop() { @@ -5048,21 +5063,15 @@ int main(int argc, char ** argv) { if (!stream) { // non-stream, wait for the results - while (!gen->wait_for_all()) { - // periodically check for connection closed - if (is_connection_closed()) { - gen->stop(); - return; - } - } - - // collect results - if (gen->has_error()) { - res_error(res, gen->error->to_json()); + auto all_results = gen->wait_for_all(is_connection_closed); + if (all_results.is_terminated) { + return; // connection is closed + } else if (all_results.error) { + res_error(res, all_results.error->to_json()); return; } else { json arr = json::array(); - for (auto & res : gen->results) { + for (auto & res : all_results.results) { GGML_ASSERT(dynamic_cast(res.get()) != nullptr); arr.push_back(res->to_json()); } @@ -5073,9 +5082,12 @@ int main(int argc, char ** argv) { } else { // in streaming mode, the first error must be treated as non-stream response // this is to match the OAI API behavior - server_task_result_ptr first_result = gen->next(); - if (!gen->has_error()) { - res_error(res, gen->error->to_json()); + server_task_result_ptr first_result = gen->next(is_connection_closed); + if (first_result == nullptr) { + return; // connection is closed + } + if (first_result->is_error()) { + res_error(res, first_result->to_json()); return; } @@ -5093,10 +5105,9 @@ int main(int argc, char ** argv) { } // receive subsequent results - auto result = gen->next(); + auto result = gen->next([&sink]{ return !sink.is_writable(); }); if (result == nullptr) { - // check for connection state; if false, go to on_complete() - return sink.is_writable(); + return false; // connection is closed, go to on_complete() } // send the results @@ -5435,20 +5446,16 @@ int main(int argc, char ** argv) { } // wait for the results - while (!gen.wait_for_all()) { - // periodically check for connection closed - if (req.is_connection_closed()) { - gen.stop(); - return; - } - } + auto all_results = gen.wait_for_all(req.is_connection_closed); // collect results - if (gen.has_error()) { - res_error(res, gen.error->to_json()); + if (all_results.is_terminated) { + return; // connection is closed + } else if (all_results.error) { + res_error(res, all_results.error->to_json()); return; } else { - for (auto & res : gen.results) { + for (auto & res : all_results.results) { GGML_ASSERT(dynamic_cast(res.get()) != nullptr); responses.push_back(res->to_json()); } @@ -5521,20 +5528,16 @@ int main(int argc, char ** argv) { } // wait for the results - while (!gen.wait_for_all()) { - // periodically check for connection closed - if (req.is_connection_closed()) { - gen.stop(); - return; - } - } + auto all_results = gen.wait_for_all(req.is_connection_closed); // collect results - if (gen.has_error()) { - res_error(res, gen.error->to_json()); + if (all_results.is_terminated) { + return; // connection is closed + } else if (all_results.error) { + res_error(res, all_results.error->to_json()); return; } else { - for (auto & res : gen.results) { + for (auto & res : all_results.results) { GGML_ASSERT(dynamic_cast(res.get()) != nullptr); responses.push_back(res->to_json()); } From 440ce93178551b313111808c530aa94d2887aeee Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 11 Nov 2025 19:20:25 +0100 Subject: [PATCH 3/8] moving some code --- tools/server/server.cpp | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 221720664e54b..d8c4e9c54811e 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5082,26 +5082,29 @@ int main(int argc, char ** argv) { } else { // in streaming mode, the first error must be treated as non-stream response // this is to match the OAI API behavior + // ref: https://github.com/ggml-org/llama.cpp/pull/16486#discussion_r2419657309 server_task_result_ptr first_result = gen->next(is_connection_closed); if (first_result == nullptr) { return; // connection is closed - } - if (first_result->is_error()) { + } else if (first_result->is_error()) { res_error(res, first_result->to_json()); return; + } else { + GGML_ASSERT( + dynamic_cast(first_result.get()) != nullptr + || dynamic_cast(first_result.get()) != nullptr + ); } // next responses are streamed json first_result_json = first_result->to_json(); const auto chunked_content_provider = [first_result_json, gen, oaicompat](size_t, httplib::DataSink & sink) mutable -> bool { // flush the first result as it's not an error - bool success = false; if (!first_result_json.empty()) { - success = server_sent_event(sink, first_result_json); - first_result_json.clear(); // mark as sent - if (!success) { + if (!server_sent_event(sink, first_result_json)) { return false; // sending failed, go to on_complete() } + first_result_json.clear(); // mark as sent } // receive subsequent results @@ -5112,8 +5115,19 @@ int main(int argc, char ** argv) { // send the results json res_json = result->to_json(); - success = server_sent_event(sink, res_json); - if (!success) { + bool ok = false; + if (result->is_error()) { + ok = server_sent_event(sink, json {{ "error", result->to_json() }}); + return false; // go to on_complete() + } else { + GGML_ASSERT( + dynamic_cast(result.get()) != nullptr + || dynamic_cast(result.get()) != nullptr + ); + ok = server_sent_event(sink, res_json); + } + + if (!ok) { return false; // sending failed, go to on_complete() } From 993440ea3db565b38d9fe9835716e0bf112ef94f Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 11 Nov 2025 19:26:20 +0100 Subject: [PATCH 4/8] fix "Response ended prematurely" --- tools/server/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/server/server.cpp b/tools/server/server.cpp index d8c4e9c54811e..22a1ad738ddbc 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5136,8 +5136,8 @@ int main(int argc, char ** argv) { if (oaicompat != OAICOMPAT_TYPE_NONE) { static const std::string ev_done = "data: [DONE]\n\n"; sink.write(ev_done.data(), ev_done.size()); - sink.done(); } + sink.done(); return false; // no more data, go to on_complete() } From cc2e3977cb5ef38a17b95ce8284fe70b69799003 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 11 Nov 2025 19:26:49 +0100 Subject: [PATCH 5/8] add sink.done before return false --- tools/server/server.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 22a1ad738ddbc..f04459b3e0eba 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5102,6 +5102,7 @@ int main(int argc, char ** argv) { // flush the first result as it's not an error if (!first_result_json.empty()) { if (!server_sent_event(sink, first_result_json)) { + sink.done(); return false; // sending failed, go to on_complete() } first_result_json.clear(); // mark as sent @@ -5110,6 +5111,7 @@ int main(int argc, char ** argv) { // receive subsequent results auto result = gen->next([&sink]{ return !sink.is_writable(); }); if (result == nullptr) { + sink.done(); return false; // connection is closed, go to on_complete() } @@ -5118,6 +5120,7 @@ int main(int argc, char ** argv) { bool ok = false; if (result->is_error()) { ok = server_sent_event(sink, json {{ "error", result->to_json() }}); + sink.done(); return false; // go to on_complete() } else { GGML_ASSERT( @@ -5128,6 +5131,7 @@ int main(int argc, char ** argv) { } if (!ok) { + sink.done(); return false; // sending failed, go to on_complete() } From 31b8b7001e286982909795adf6bddb36f55ce86c Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 11 Nov 2025 19:36:35 +0100 Subject: [PATCH 6/8] rm redundant check --- tools/server/server.cpp | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tools/server/server.cpp b/tools/server/server.cpp index f04459b3e0eba..5539a12157a5b 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5025,14 +5025,6 @@ int main(int argc, char ** argv) { const size_t n_ctx_slot = ctx_server.slots.front().n_ctx; tasks.reserve(inputs.size()); for (size_t i = 0; i < inputs.size(); i++) { - auto n_prompt_tokens = inputs[i].size(); - if (n_prompt_tokens >= n_ctx_slot) { - json error_data = format_error_response("the request exceeds the available context size, try increasing it", ERROR_TYPE_EXCEED_CONTEXT_SIZE); - error_data["n_prompt_tokens"] = n_prompt_tokens; - error_data["n_ctx"] = n_ctx_slot; - res_error(res, error_data); - return; - } server_task task = server_task(type); task.id = ctx_server.queue_tasks.get_new_id(); From efd73cf18b256e6c5e61d65e527ed3b92b2f3a1e Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Tue, 11 Nov 2025 21:32:55 +0100 Subject: [PATCH 7/8] rm unused var --- tools/server/server.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/server/server.cpp b/tools/server/server.cpp index 5539a12157a5b..3ed827c24ada4 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -5022,7 +5022,6 @@ int main(int argc, char ** argv) { // Everything else, including multimodal completions. inputs = tokenize_input_prompts(ctx_server.vocab, ctx_server.mctx, prompt, true, true); } - const size_t n_ctx_slot = ctx_server.slots.front().n_ctx; tasks.reserve(inputs.size()); for (size_t i = 0; i < inputs.size(); i++) { server_task task = server_task(type); From bfa5a70c91f436b96512770cefcf458822e93a70 Mon Sep 17 00:00:00 2001 From: Xuan Son Nguyen Date: Wed, 12 Nov 2025 14:22:56 +0100 Subject: [PATCH 8/8] rename generator --> reader --- tools/server/server.cpp | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/tools/server/server.cpp b/tools/server/server.cpp index f799848f564a6..0b3c77879c2e2 100644 --- a/tools/server/server.cpp +++ b/tools/server/server.cpp @@ -4319,16 +4319,15 @@ struct server_context { } }; -// generator-like API for server responses -struct server_response_generator { +// generator-like API for server responses, support pooling connection state and aggregating results +struct server_response_reader { std::unordered_set id_tasks; server_context & ctx_server; size_t received_count = 0; bool cancelled = false; - server_response_generator(server_context & ctx_server) : ctx_server(ctx_server) {} - ~server_response_generator() { - SRV_DBG("%s", "deleting server_response_generator\n"); + server_response_reader(server_context & ctx_server) : ctx_server(ctx_server) {} + ~server_response_reader() { stop(); } @@ -5000,9 +4999,9 @@ int main(int argc, char ** argv) { GGML_ASSERT(type == SERVER_TASK_TYPE_COMPLETION || type == SERVER_TASK_TYPE_INFILL); auto completion_id = gen_chatcmplid(); - // need to store the generator as a pointer, so that it won't be destroyed when the handle returns + // need to store the reader as a pointer, so that it won't be destroyed when the handle returns // use shared_ptr as it's shared between the chunked_content_provider() and on_complete() - const auto gen = std::make_shared(ctx_server); + const auto rd = std::make_shared(ctx_server); try { std::vector tasks; @@ -5043,7 +5042,7 @@ int main(int argc, char ** argv) { tasks.push_back(std::move(task)); } - gen->post_tasks(std::move(tasks)); + rd->post_tasks(std::move(tasks)); } catch (const std::exception & e) { res_error(res, format_error_response(e.what(), ERROR_TYPE_INVALID_REQUEST)); return; @@ -5053,7 +5052,7 @@ int main(int argc, char ** argv) { if (!stream) { // non-stream, wait for the results - auto all_results = gen->wait_for_all(is_connection_closed); + auto all_results = rd->wait_for_all(is_connection_closed); if (all_results.is_terminated) { return; // connection is closed } else if (all_results.error) { @@ -5073,7 +5072,7 @@ int main(int argc, char ** argv) { // in streaming mode, the first error must be treated as non-stream response // this is to match the OAI API behavior // ref: https://github.com/ggml-org/llama.cpp/pull/16486#discussion_r2419657309 - server_task_result_ptr first_result = gen->next(is_connection_closed); + server_task_result_ptr first_result = rd->next(is_connection_closed); if (first_result == nullptr) { return; // connection is closed } else if (first_result->is_error()) { @@ -5088,7 +5087,7 @@ int main(int argc, char ** argv) { // next responses are streamed json first_result_json = first_result->to_json(); - const auto chunked_content_provider = [first_result_json, gen, oaicompat](size_t, httplib::DataSink & sink) mutable -> bool { + const auto chunked_content_provider = [first_result_json, rd, oaicompat](size_t, httplib::DataSink & sink) mutable -> bool { // flush the first result as it's not an error if (!first_result_json.empty()) { if (!server_sent_event(sink, first_result_json)) { @@ -5099,7 +5098,7 @@ int main(int argc, char ** argv) { } // receive subsequent results - auto result = gen->next([&sink]{ return !sink.is_writable(); }); + auto result = rd->next([&sink]{ return !sink.is_writable(); }); if (result == nullptr) { sink.done(); return false; // connection is closed, go to on_complete() @@ -5126,7 +5125,7 @@ int main(int argc, char ** argv) { } // check if there is more data - if (!gen->has_next()) { + if (!rd->has_next()) { if (oaicompat != OAICOMPAT_TYPE_NONE) { static const std::string ev_done = "data: [DONE]\n\n"; sink.write(ev_done.data(), ev_done.size()); @@ -5139,8 +5138,8 @@ int main(int argc, char ** argv) { return true; }; - auto on_complete = [gen](bool) { - gen->stop(); + auto on_complete = [rd](bool) { + rd->stop(); }; res.set_chunked_content_provider("text/event-stream", chunked_content_provider, on_complete); @@ -5434,7 +5433,7 @@ int main(int argc, char ** argv) { // create and queue the task json responses = json::array(); - server_response_generator gen(ctx_server); + server_response_reader rd(ctx_server); { std::vector tasks; for (size_t i = 0; i < tokenized_prompts.size(); i++) { @@ -5450,11 +5449,11 @@ int main(int argc, char ** argv) { tasks.push_back(std::move(task)); } - gen.post_tasks(std::move(tasks)); + rd.post_tasks(std::move(tasks)); } // wait for the results - auto all_results = gen.wait_for_all(req.is_connection_closed); + auto all_results = rd.wait_for_all(req.is_connection_closed); // collect results if (all_results.is_terminated) { @@ -5520,7 +5519,7 @@ int main(int argc, char ** argv) { // create and queue the task json responses = json::array(); - server_response_generator gen(ctx_server); + server_response_reader rd(ctx_server); { std::vector tasks; tasks.reserve(documents.size()); @@ -5532,11 +5531,11 @@ int main(int argc, char ** argv) { task.tokens = std::move(tmp); tasks.push_back(std::move(task)); } - gen.post_tasks(std::move(tasks)); + rd.post_tasks(std::move(tasks)); } // wait for the results - auto all_results = gen.wait_for_all(req.is_connection_closed); + auto all_results = rd.wait_for_all(req.is_connection_closed); // collect results if (all_results.is_terminated) {