Skip to content
This repository was archived by the owner on Jul 4, 2025. It is now read-only.

Commit 229b092

Browse files
committed
update
1 parent f916697 commit 229b092

File tree

11 files changed

+267
-85
lines changed

11 files changed

+267
-85
lines changed

engine/common/download_event.h

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#pragma once
2+
3+
#include "common/download_task.h"
4+
#include "common/event.h"
5+
6+
namespace cortex::event {
7+
8+
enum class DownloadEventType {
9+
DownloadValidateStarted,
10+
DownloadValidateFailed,
11+
DownloadStarted,
12+
DownloadPaused,
13+
DownloadUpdated,
14+
DownloadSuccess,
15+
DownloadError,
16+
};
17+
18+
namespace {
19+
std::string DownloadEventTypeToString(DownloadEventType type) {
20+
switch (type) {
21+
case DownloadEventType::DownloadValidateStarted:
22+
return "DownloadValidateStarted";
23+
case DownloadEventType::DownloadValidateFailed:
24+
return "DownloadValidateFailed";
25+
case DownloadEventType::DownloadStarted:
26+
return "DownloadStarted";
27+
case DownloadEventType::DownloadPaused:
28+
return "DownloadPaused";
29+
case DownloadEventType::DownloadUpdated:
30+
return "DownloadUpdated";
31+
case DownloadEventType::DownloadSuccess:
32+
return "DownloadSuccess";
33+
case DownloadEventType::DownloadError:
34+
return "DownloadError";
35+
default:
36+
return "Unknown";
37+
}
38+
}
39+
} // namespace
40+
41+
class DownloadEvent : public cortex::event::Event {
42+
public:
43+
explicit DownloadEvent(DownloadEventType type, const DownloadTask task)
44+
: type_{type}, download_task_{task} {}
45+
46+
std::string ToString() const {
47+
return "DownloadEvent: " + DownloadEventTypeToString(type_) +
48+
" task: " + download_task_.ToString();
49+
}
50+
51+
private:
52+
DownloadEventType type_;
53+
DownloadTask download_task_;
54+
};
55+
}; // namespace cortex::event

engine/common/download_task.h

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#pragma once
2+
3+
#include <filesystem>
4+
#include <sstream>
5+
#include <string>
6+
7+
enum class DownloadType { Model, Engine, Miscellaneous, CudaToolkit, Cortex };
8+
9+
struct DownloadItem {
10+
std::string id;
11+
12+
std::string downloadUrl;
13+
14+
/**
15+
* An absolute path to where the file is located (locally).
16+
*/
17+
std::filesystem::path localPath;
18+
19+
std::optional<std::string> checksum;
20+
21+
std::optional<uint64_t> bytes;
22+
23+
std::string ToString() const {
24+
std::ostringstream output;
25+
output << "DownloadItem{id: " << id << ", downloadUrl: " << downloadUrl
26+
<< ", localContainerPath: " << localPath
27+
<< ", checksum: " << checksum.value_or("N/A") << "}";
28+
return output.str();
29+
}
30+
};
31+
32+
struct DownloadTask {
33+
std::string id;
34+
35+
DownloadType type;
36+
37+
std::vector<DownloadItem> items;
38+
39+
std::string ToString() const {
40+
std::ostringstream output;
41+
output << "DownloadTask{id: " << id << ", type: " << static_cast<int>(type)
42+
<< ", items: [";
43+
for (const auto& item : items) {
44+
output << item.ToString() << ", ";
45+
}
46+
output << "]}";
47+
return output.str();
48+
}
49+
50+
// TODO: namh implement ToJson()
51+
};

engine/common/event.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
#pragma once
22

3-
#include <string>
4-
53
namespace cortex::event {
64

7-
struct Event {
8-
std::string message;
5+
class Event {
6+
public:
7+
virtual ~Event() {}
98
};
109
} // namespace cortex::event

engine/common/hardward_event.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#pragma once
2+
3+
#include "common/event.h"
4+
5+
namespace cortex::event {
6+
class HardwareEvent : public cortex::event::Event {
7+
public:
8+
explicit HardwareEvent() {}
9+
};
10+
}; // namespace cortex::event

engine/common/model_event.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#pragma once
2+
3+
#include "common/event.h"
4+
5+
namespace cortex::event {
6+
class ModelEvent : public cortex::event::Event {
7+
public:
8+
explicit ModelEvent() {}
9+
};
10+
}; // namespace cortex::event

engine/controllers/events.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@ void Events::broadcast(const std::string& message) {
88

99
void Events::handleNewMessage(const WebSocketConnectionPtr& wsConnPtr,
1010
std::string&& message,
11-
const WebSocketMessageType& type) {}
11+
const WebSocketMessageType& type) {
12+
// ignore message sent from client
13+
}
1214

1315
void Events::handleNewConnection(const HttpRequestPtr& req,
14-
const WebSocketConnectionPtr& wsConnPtr) {
15-
LOG_DEBUG << "New connection";
16-
connections_.insert(wsConnPtr);
16+
const WebSocketConnectionPtr& ws_conn_ptr) {
17+
connections_.insert(ws_conn_ptr);
1718
}
1819

19-
void Events::handleConnectionClosed(const WebSocketConnectionPtr& wsConnPtr) {
20-
LOG_DEBUG << "Connection closed";
21-
connections_.erase(wsConnPtr);
20+
void Events::handleConnectionClosed(const WebSocketConnectionPtr& ws_conn_ptr) {
21+
connections_.erase(ws_conn_ptr);
2222
}

engine/controllers/events.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
#include <drogon/PubSubService.h>
44
#include <drogon/WebSocketController.h>
55
#include <eventpp/eventqueue.h>
6+
#include "common/download_event.h"
67
#include "common/event.h"
78

89
using namespace drogon;
910

1011
using Event = cortex::event::Event;
11-
using EventQueue = eventpp::EventQueue<std::string, void(const Event&)>;
12+
using DownloadEvent = cortex::event::DownloadEvent;
13+
using EventQueue = eventpp::EventQueue<std::string, void(DownloadEvent)>;
1214

1315
class Events : public drogon::WebSocketController<Events, false> {
1416

@@ -19,8 +21,9 @@ class Events : public drogon::WebSocketController<Events, false> {
1921

2022
explicit Events(std::shared_ptr<EventQueue> event_queue)
2123
: event_queue_{event_queue} {
22-
event_queue_->appendListener("download-started", [this](const Event& e) {
23-
this->broadcast(e.message);
24+
// TODO: namh make a list of event
25+
event_queue_->appendListener("download-update", [this](DownloadEvent e) {
26+
this->broadcast(e.ToString());
2427
});
2528
};
2629

engine/main.cc

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "utils/archive_utils.h"
1111
#include "utils/cortex_utils.h"
1212
#include "utils/dylib.h"
13+
#include "utils/event_processor.h"
1314
#include "utils/file_logger.h"
1415
#include "utils/file_manager_utils.h"
1516
#include "utils/logging_utils.h"
@@ -73,21 +74,22 @@ void RunServer() {
7374
#endif
7475

7576
using Event = cortex::event::Event;
76-
using EventQueue = eventpp::EventQueue<std::string, void(const Event&)>;
77+
using EventQueue = eventpp::EventQueue<std::string, void(DownloadEvent)>;
7778

7879
auto event_queue_ptr = std::make_shared<EventQueue>();
80+
cortex::event::EventProcessor event_processor(event_queue_ptr);
81+
7982
auto download_service = std::make_shared<DownloadService>(event_queue_ptr);
83+
auto engine_service = std::make_shared<EngineService>(download_service);
84+
auto model_service = std::make_shared<ModelService>(download_service);
8085

8186
// initialize custom controllers
82-
auto engine_service = std::make_shared<EngineService>(download_service);
8387
auto engine_ctl = std::make_shared<Engines>(engine_service);
84-
drogon::app().registerController(engine_ctl);
85-
86-
auto model_service = std::make_shared<ModelService>(download_service);
8788
auto model_ctl = std::make_shared<Models>(model_service);
88-
drogon::app().registerController(model_ctl);
89-
9089
auto event_ctl = std::make_shared<Events>(event_queue_ptr);
90+
91+
drogon::app().registerController(engine_ctl);
92+
drogon::app().registerController(model_ctl);
9193
drogon::app().registerController(event_ctl);
9294

9395
LOG_INFO << "Server started, listening at: " << config.apiServerHost << ":"
@@ -176,4 +178,4 @@ int main(int argc, char* argv[]) {
176178
CommandLineParser clp;
177179
clp.SetupCommand(argc, argv);
178180
return 0;
179-
}
181+
}

engine/services/download_service.cc

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -115,39 +115,48 @@ cpp::result<uint64_t, std::string> DownloadService::GetFileSize(
115115
cpp::result<bool, std::string> DownloadService::AddAsyncDownloadTask(
116116
DownloadTask& task,
117117
std::optional<OnDownloadTaskSuccessfully> callback) noexcept {
118-
auto verifying_result = VerifyDownloadTask(task);
119-
if (verifying_result.has_error()) {
120-
return cpp::fail(verifying_result.error());
118+
119+
if (std::find(download_task_list_.begin(), download_task_list_.end(),
120+
task.id) != download_task_list_.end()) {
121+
return cpp::fail("Download task already exists: " + task.id);
122+
}
123+
124+
download_task_list_.push_back(task.id);
125+
download_task_map_.insert({task.id, task});
126+
127+
{
128+
// verify download task
129+
auto result = VerifyDownloadTask(task);
130+
if (result.has_error()) {
131+
CleanUp(task.id);
132+
return cpp::fail(result.error());
133+
}
121134
}
122135

123136
auto execute_download_async = [&, task, callback]() {
124-
std::optional<std::string> dl_err_msg = std::nullopt;
137+
active_download_task_id_ = task.id;
138+
std::optional<std::string> err_msg = std::nullopt;
125139
for (const auto& item : task.items) {
140+
active_download_item_id_ = item.id;
126141
CTL_INF("Start downloading: " + item.localPath.filename().string());
127-
128-
if (event_queue_.has_value()) {
129-
event_queue_.value()->enqueue("download-started",
130-
Event{.message = task.ToString()});
131-
event_queue_.value()->process();
132-
CLI_LOG("Enqueued download-started event: " << task.ToString());
133-
}
134-
135142
auto result = Download(task.id, item, false);
136143
if (result.has_error()) {
137-
dl_err_msg = result.error();
144+
err_msg = result.error();
138145
break;
139146
}
140147
}
141148

142-
if (dl_err_msg.has_value()) {
143-
CTL_ERR(dl_err_msg.value());
149+
if (err_msg.has_value()) {
150+
CTL_ERR(err_msg.value());
151+
CleanUp(task.id);
144152
return;
145153
}
146154

147155
if (callback.has_value()) {
148156
CTL_INF("Download success, executing post download lambda!");
149157
callback.value()(task);
150158
}
159+
CleanUp(task.id);
151160
};
152161

153162
std::thread t(execute_download_async);
@@ -223,6 +232,9 @@ cpp::result<bool, std::string> DownloadService::Download(
223232
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);
224233
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
225234

235+
curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, progressCallback);
236+
curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, this);
237+
226238
if (mode == "ab") {
227239
auto local_file_size = GetLocalFileSize(download_item.localPath);
228240
if (local_file_size != -1) {
@@ -260,3 +272,17 @@ curl_off_t DownloadService::GetLocalFileSize(
260272
fclose(file);
261273
return file_size;
262274
}
275+
276+
void DownloadService::CleanUp(const std::string& task_id) {
277+
CTL_INF("Cleaning up download task: " << task_id);
278+
// TODO: might need to be wrap with mutex
279+
// remove from task list
280+
download_task_list_.erase(std::remove(download_task_list_.begin(),
281+
download_task_list_.end(), task_id),
282+
download_task_list_.end());
283+
// remove from task map
284+
download_task_map_.erase(task_id);
285+
286+
active_download_task_id_ = std::nullopt;
287+
active_download_item_id_ = std::nullopt;
288+
}

0 commit comments

Comments
 (0)