Skip to content

Commit 2e3ac65

Browse files
committed
feat: switch to tardy v0.2.0
1 parent 0bf3cbf commit 2e3ac65

File tree

6 files changed

+145
-75
lines changed

6 files changed

+145
-75
lines changed

build.zig.zon

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
.minimum_zig_version = "0.13.0",
55
.dependencies = .{
66
.tardy = .{
7-
.url = "git+https://github.com/mookums/tardy?ref=v0.1.0#ae0970d6b3fa5b03625b14e142c664efe1fd7789",
8-
.hash = "12207f5afee3b8933c1c32737e8feedc80a2e4feebe058739509094c812e4a8d2cc8",
7+
.url = "git+https://github.com/mookums/tardy?ref=v0.2.0#543e6b01cba3caa691960a4a5a54d3419969f2d8",
8+
.hash = "12202bc544928f0bb67ab4a30d3ff6d54c9d62643296c8d04303762b477b71fd002d",
99
},
1010
.bearssl = .{
1111
.url = "git+https://github.com/mookums/bearssl-zig#37a96eee56fe2543579bbc6da148ca886f3dd32b",

flake.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

flake.nix

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"a framework for writing performant and reliable networked services";
44

55
inputs = {
6-
nixpkgs.url = "github:nixos/nixpkgs/release-24.05";
6+
nixpkgs.url = "github:nixos/nixpkgs/release-24.11";
77
iguana.url = "github:mookums/iguana";
88
flake-utils.url = "github:numtide/flake-utils";
99
};

src/http/router/fs_dir.zig

Lines changed: 61 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ const Response = @import("../response.zig").Response;
77
const Mime = @import("../mime.zig").Mime;
88
const _Context = @import("../context.zig").Context;
99

10+
const OpenResult = @import("tardy").OpenResult;
11+
const ReadResult = @import("tardy").ReadResult;
12+
const SendResult = @import("tardy").SendResult;
13+
const StatResult = @import("tardy").StatResult;
14+
1015
const Runtime = @import("tardy").Runtime;
1116
const Stat = @import("tardy").Stat;
1217
const Cross = @import("tardy").Cross;
@@ -27,33 +32,44 @@ pub fn FsDir(Server: type, AppState: type) type {
2732
buffer: []u8,
2833
};
2934

30-
fn open_file_task(rt: *Runtime, fd: std.posix.fd_t, provision: *FileProvision) !void {
35+
fn open_file_task(rt: *Runtime, result: OpenResult, provision: *FileProvision) !void {
3136
errdefer provision.context.respond(.{
3237
.status = .@"Internal Server Error",
3338
.mime = Mime.HTML,
3439
.body = "",
3540
}) catch unreachable;
3641

37-
if (!Cross.fd.is_valid(fd)) {
42+
const fd = result.unwrap() catch |e| {
43+
log.warn("file not found | {}", .{e});
3844
try provision.context.respond(.{
3945
.status = .@"Not Found",
4046
.mime = Mime.HTML,
4147
.body = "File Not Found",
4248
});
4349
return;
44-
}
50+
};
4551
provision.fd = fd;
4652

4753
try rt.fs.stat(provision, stat_file_task, fd);
4854
}
4955

50-
fn stat_file_task(rt: *Runtime, stat: Stat, provision: *FileProvision) !void {
56+
fn stat_file_task(rt: *Runtime, result: StatResult, provision: *FileProvision) !void {
5157
errdefer provision.context.respond(.{
5258
.status = .@"Internal Server Error",
5359
.mime = Mime.HTML,
5460
.body = "",
5561
}) catch unreachable;
5662

63+
const stat = result.unwrap() catch |e| {
64+
log.warn("stat on fd={d} failed | {}", .{ provision.fd, e });
65+
try provision.context.respond(.{
66+
.status = .@"Not Found",
67+
.mime = Mime.HTML,
68+
.body = "File Not Found",
69+
});
70+
return;
71+
};
72+
5773
// Set file size.
5874
provision.file_size = stat.size;
5975
log.debug("file size: {d}", .{provision.file_size});
@@ -109,50 +125,56 @@ pub fn FsDir(Server: type, AppState: type) type {
109125
);
110126
}
111127

112-
fn read_file_task(rt: *Runtime, result: i32, provision: *FileProvision) !void {
128+
fn read_file_task(rt: *Runtime, result: ReadResult, provision: *FileProvision) !void {
113129
errdefer {
114130
std.posix.close(provision.fd);
115131
provision.context.close() catch unreachable;
116132
}
117133

118-
if (result <= -1) {
119-
log.warn("read file task failed", .{});
120-
std.posix.close(provision.fd);
121-
try provision.context.close();
122-
return;
123-
}
134+
const length = result.unwrap() catch |e| {
135+
switch (e) {
136+
error.EndOfFile => {
137+
log.debug("done streaming file | rd off: {d} | f size: {d} ", .{
138+
provision.rd_offset,
139+
provision.file_size,
140+
});
141+
142+
std.posix.close(provision.fd);
143+
try provision.context.send_then_recv(
144+
provision.buffer[0..provision.current_length],
145+
);
146+
return;
147+
},
148+
else => {
149+
log.warn("reading on fd={d} failed | {}", .{ provision.fd, e });
150+
std.posix.close(provision.fd);
151+
try provision.context.close();
152+
return;
153+
},
154+
}
155+
};
124156

125-
const length: usize = @intCast(result);
126-
provision.rd_offset += length;
127-
provision.current_length += length;
157+
const length_as_usize: usize = @intCast(length);
158+
provision.rd_offset += length_as_usize;
159+
provision.current_length += length_as_usize;
128160
log.debug("current offset: {d} | fd: {}", .{ provision.rd_offset, provision.fd });
129161

130-
if (provision.rd_offset >= provision.file_size or result == 0) {
131-
log.debug("done streaming file | rd off: {d} | f size: {d} | result: {d}", .{
132-
provision.rd_offset,
133-
provision.file_size,
134-
result,
135-
});
136-
137-
std.posix.close(provision.fd);
138-
try provision.context.send_then_recv(provision.buffer[0..provision.current_length]);
162+
assert(provision.rd_offset <= length_as_usize);
163+
assert(provision.current_length <= provision.buffer.len);
164+
if (provision.current_length == provision.buffer.len) {
165+
try provision.context.send_then(
166+
provision.buffer[0..provision.current_length],
167+
provision,
168+
send_file_task,
169+
);
139170
} else {
140-
assert(provision.current_length <= provision.buffer.len);
141-
if (provision.current_length == provision.buffer.len) {
142-
try provision.context.send_then(
143-
provision.buffer[0..provision.current_length],
144-
provision,
145-
send_file_task,
146-
);
147-
} else {
148-
try rt.fs.read(
149-
provision,
150-
read_file_task,
151-
provision.fd,
152-
provision.buffer[provision.current_length..],
153-
provision.rd_offset,
154-
);
155-
}
171+
try rt.fs.read(
172+
provision,
173+
read_file_task,
174+
provision.fd,
175+
provision.buffer[provision.current_length..],
176+
provision.rd_offset,
177+
);
156178
}
157179
}
158180

src/http/router/routing_trie.zig

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,12 +146,16 @@ pub fn RoutingTrie(comptime Server: type, comptime AppState: type) type {
146146
}
147147

148148
/// Initialize new trie node for the next token.
149-
fn _with_route(comptime node: *const Node, comptime iterator: *std.mem.TokenIterator(u8, .scalar), comptime route: Route) Node {
149+
fn with_route_helper(
150+
comptime node: *const Node,
151+
comptime iterator: *std.mem.TokenIterator(u8, .scalar),
152+
comptime route: Route,
153+
) Node {
150154
if (iterator.next()) |chunk| {
151155
// Parse the current chunk.
152156
const token: Token = Token.parse_chunk(chunk);
153157
// Alter the child of the current node.
154-
return node.with_child(token, &(_with_route(
158+
return node.with_child(token, &(with_route_helper(
155159
node.children.get_optional(token) orelse &(Node.init(token, null)),
156160
iterator,
157161
route,
@@ -168,13 +172,13 @@ pub fn RoutingTrie(comptime Server: type, comptime AppState: type) type {
168172

169173
/// Copy the current routing trie to add the provided route.
170174
pub fn with_route(comptime self: *const Self, comptime route: Route) Self {
171-
@setEvalBranchQuota(10000);
175+
@setEvalBranchQuota(1_000_000);
172176

173177
// This is where we will parse out the path.
174178
comptime var iterator = std.mem.tokenizeScalar(u8, route.path, '/');
175179

176180
return Self{
177-
.root = _with_route(&(self.root), &iterator, route),
181+
.root = with_route_helper(&(self.root), &iterator, route),
178182
};
179183
}
180184

src/http/server.zig

Lines changed: 69 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ pub const AsyncIOType = @import("tardy").AsyncIOType;
3434
const TardyCreator = @import("tardy").Tardy;
3535
const Cross = @import("tardy").Cross;
3636

37+
const AcceptResult = @import("tardy").AcceptResult;
38+
const RecvResult = @import("tardy").RecvResult;
39+
const SendResult = @import("tardy").SendResult;
40+
3741
pub const RecvStatus = union(enum) {
3842
kill,
3943
recv,
@@ -278,21 +282,24 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
278282
}
279283
}
280284

281-
fn accept_task(rt: *Runtime, child_socket: std.posix.socket_t, socket: std.posix.socket_t) !void {
282-
const pool = rt.storage.get_ptr("__zzz_provision_pool", Pool(Provision));
285+
fn accept_task(rt: *Runtime, result: AcceptResult, socket: std.posix.socket_t) !void {
283286
const accept_queued = rt.storage.get_ptr("__zzz_accept_queued", bool);
287+
288+
const child_socket = result.unwrap() catch |e| {
289+
log.err("socket accept failed | {}", .{e});
290+
accept_queued.* = true;
291+
try rt.net.accept(socket, accept_task, socket);
292+
return;
293+
};
294+
295+
const pool = rt.storage.get_ptr("__zzz_provision_pool", Pool(Provision));
284296
accept_queued.* = false;
285297

286298
if (rt.scheduler.tasks.clean() >= 2) {
287299
accept_queued.* = true;
288300
try rt.net.accept(socket, accept_task, socket);
289301
}
290302

291-
if (!Cross.socket.is_valid(child_socket)) {
292-
log.err("socket accept failed", .{});
293-
return error.AcceptFailed;
294-
}
295-
296303
// This should never fail. It means that we have a dangling item.
297304
assert(pool.clean() > 0);
298305
const borrowed = pool.borrow() catch unreachable;
@@ -339,7 +346,7 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
339346
};
340347

341348
provision.job = .{ .handshake = .{ .state = .recv, .count = 0 } };
342-
try rt.net.recv(borrowed.item, handshake_task, child_socket, recv_buf);
349+
try rt.net.recv(borrowed.item, handshake_recv_task, child_socket, recv_buf);
343350
},
344351
.plain => {
345352
provision.job = .{ .recv = .{ .count = 0 } };
@@ -348,8 +355,18 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
348355
}
349356
}
350357

351-
fn recv_task(rt: *Runtime, length: i32, provision: *Provision) !void {
358+
fn recv_task(rt: *Runtime, result: RecvResult, provision: *Provision) !void {
352359
assert(provision.job == .recv);
360+
361+
const length = result.unwrap() catch |e| {
362+
if (e != error.Closed) {
363+
log.warn("socket recv failed | {}", .{e});
364+
}
365+
provision.job = .close;
366+
try rt.net.close(provision, close_task, provision.socket);
367+
return;
368+
};
369+
353370
const config = rt.storage.get_const_ptr("__zzz_config", ServerConfig);
354371
const router = rt.storage.get_const_ptr("__zzz_router", Router);
355372

@@ -424,7 +441,37 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
424441
}
425442
}
426443

427-
fn handshake_task(rt: *Runtime, length: i32, provision: *Provision) !void {
444+
fn handshake_recv_task(rt: *Runtime, result: RecvResult, provision: *Provision) !void {
445+
assert(security == .tls);
446+
447+
const length = result.unwrap() catch |e| {
448+
if (e != error.Closed) {
449+
log.warn("socket recv failed | {}", .{e});
450+
}
451+
provision.job = .close;
452+
try rt.net.close(provision, close_task, provision.socket);
453+
return error.TLSHandshakeClosed;
454+
};
455+
456+
try handshake_inner_task(rt, length, provision);
457+
}
458+
459+
fn handshake_send_task(rt: *Runtime, result: SendResult, provision: *Provision) !void {
460+
assert(security == .tls);
461+
462+
const length = result.unwrap() catch |e| {
463+
if (e != error.ConnectionReset) {
464+
log.warn("socket send failed | {}", .{e});
465+
}
466+
provision.job = .close;
467+
try rt.net.close(provision, close_task, provision.socket);
468+
return error.TLSHandshakeClosed;
469+
};
470+
471+
try handshake_inner_task(rt, length, provision);
472+
}
473+
474+
fn handshake_inner_task(rt: *Runtime, length: i32, provision: *Provision) !void {
428475
assert(security == .tls);
429476
if (comptime security == .tls) {
430477
const tls_slice = rt.storage.get("__zzz_tls_slice", []TLSType);
@@ -437,13 +484,6 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
437484
log.debug("processing handshake", .{});
438485
handshake_job.count += 1;
439486

440-
if (length <= 0) {
441-
log.debug("handshake connection closed", .{});
442-
provision.job = .close;
443-
try rt.net.close(provision, close_task, provision.socket);
444-
return error.TLSHandshakeClosed;
445-
}
446-
447487
if (handshake_job.count >= 50) {
448488
log.debug("handshake taken too many cycles", .{});
449489
provision.job = .close;
@@ -467,12 +507,12 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
467507
.recv => |buf| {
468508
log.debug("queueing recv in handshake", .{});
469509
handshake_job.state = .recv;
470-
try rt.net.recv(provision, handshake_task, provision.socket, buf);
510+
try rt.net.recv(provision, handshake_recv_task, provision.socket, buf);
471511
},
472512
.send => |buf| {
473513
log.debug("queueing send in handshake", .{});
474514
handshake_job.state = .send;
475-
try rt.net.send(provision, handshake_task, provision.socket, buf);
515+
try rt.net.send(provision, handshake_send_task, provision.socket, buf);
476516
},
477517
.complete => {
478518
log.debug("handshake complete", .{});
@@ -575,17 +615,21 @@ pub fn Server(comptime security: Security, comptime AppState: type) type {
575615
}
576616
}.inner);
577617

578-
pub fn send_then(comptime func: TaskFn(bool, *Provision)) TaskFn(i32, *Provision) {
618+
pub fn send_then(comptime func: TaskFn(bool, *Provision)) TaskFn(SendResult, *Provision) {
579619
return struct {
580-
fn send_then_inner(rt: *Runtime, length: i32, provision: *Provision) !void {
620+
fn send_then_inner(rt: *Runtime, result: SendResult, provision: *Provision) !void {
581621
assert(provision.job == .send);
582622
const config = rt.storage.get_const_ptr("__zzz_config", ServerConfig);
583623

584-
// If the socket is closed.
585-
if (length <= 0) {
586-
try @call(.always_inline, func, .{ rt, false, provision });
624+
const length = result.unwrap() catch |e| {
625+
// If the socket is closed.
626+
if (e != error.ConnectionReset) {
627+
log.warn("socket send failed: {}", .{e});
628+
}
629+
630+
try @call(.auto, func, .{ rt, false, provision });
587631
return;
588-
}
632+
};
589633

590634
const send_job = &provision.job.send;
591635

0 commit comments

Comments
 (0)