Skip to content

Commit 31be512

Browse files
authored
BREAKING CHANGE: designated and non-designated timestamp feature parity for micros and nanos (#121)
1 parent b3f08fa commit 31be512

File tree

7 files changed

+203
-67
lines changed

7 files changed

+203
-67
lines changed

ci/run_all_tests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def main():
4343
build_cxx20_dir.glob(f'**/test_line_sender{exe_suffix}')))
4444

4545
system_test_path = pathlib.Path('system_test') / 'test.py'
46-
qdb_v = '9.0.3' # The version of QuestDB we'll test against.
46+
qdb_v = '9.1.0' # The version of QuestDB we'll test against.
4747

4848
run_cmd('cargo', 'test',
4949
'--', '--nocapture', cwd='questdb-rs')

cpp_test/test_line_sender.cpp

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,16 @@ TEST_CASE("line_sender c api basics")
228228
&err));
229229
CHECK(::line_sender_buffer_at_nanos(buffer, 10000000, &err));
230230
CHECK(server.recv() == 0);
231-
CHECK(::line_sender_buffer_size(buffer) == 382);
231+
CHECK(::line_sender_buffer_size(buffer) == 383);
232232
CHECK(::line_sender_flush(sender, buffer, &err));
233233
::line_sender_buffer_free(buffer);
234234
CHECK(server.recv() == 1);
235235
std::string expect{"test,t1=v1 f1=="};
236236
push_double_to_buffer(expect, 0.5).append(",a1==");
237237
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a2==");
238238
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a3==");
239-
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(" 10000000\n");
239+
push_double_arr_to_buffer(expect, arr_data, 3, shape)
240+
.append(" 10000000n\n");
240241
CHECK(server.msgs(0) == expect);
241242
}
242243

@@ -329,7 +330,7 @@ TEST_CASE("line_sender c++ api basics")
329330
.at(questdb::ingress::timestamp_nanos{10000000});
330331

331332
CHECK(server.recv() == 0);
332-
CHECK(buffer.size() == 610);
333+
CHECK(buffer.size() == 611);
333334
sender.flush(buffer);
334335
CHECK(server.recv() == 1);
335336
std::string expect{"test,t1=v1,t2= f1=="};
@@ -340,7 +341,7 @@ TEST_CASE("line_sender c++ api basics")
340341
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a4==");
341342
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a5==");
342343
push_double_arr_to_buffer(expect, arr_data, 1, shapes_1dim)
343-
.append(" 10000000\n");
344+
.append(" 10000000n\n");
344345
CHECK(server.msgs(0) == expect);
345346
}
346347

@@ -378,12 +379,12 @@ TEST_CASE("line_sender array vector API")
378379

379380
uintptr_t test_shape[] = {12};
380381
CHECK(server.recv() == 0);
381-
CHECK(buffer.size() == 132);
382+
CHECK(buffer.size() == 133);
382383
sender.flush(buffer);
383384
CHECK(server.recv() == 1);
384385
std::string expect{"test,t1=v1,t2= a1=="};
385386
push_double_arr_to_buffer(expect, arr_data, 1, test_shape)
386-
.append(" 10000000\n");
387+
.append(" 10000000n\n");
387388
CHECK(server.msgs(0) == expect);
388389
}
389390

@@ -427,12 +428,12 @@ TEST_CASE("line_sender array span API")
427428

428429
uintptr_t test_shape[] = {8};
429430
CHECK(server.recv() == 0);
430-
CHECK(buffer.size() == 100);
431+
CHECK(buffer.size() == 101);
431432
sender.flush(buffer);
432433
CHECK(server.recv() == 1);
433434
std::string expect{"test,t1=v1,t2= a1=="};
434435
push_double_arr_to_buffer(expect, expect_arr_data, 1, test_shape)
435-
.append(" 10000000\n");
436+
.append(" 10000000n\n");
436437
CHECK(server.msgs(0) == expect);
437438
}
438439
#endif
@@ -468,12 +469,12 @@ TEST_CASE("test multiple lines")
468469
.at_now();
469470

470471
CHECK(server.recv() == 0);
471-
CHECK(buffer.size() == 142);
472+
CHECK(buffer.size() == 143);
472473
sender.flush(buffer);
473474
CHECK(server.recv() == 2);
474475
std::string expect{"metric1,t1=val1,t2=val2 f1=t,f2=12345i,f3=="};
475476
push_double_to_buffer(expect, 10.75)
476-
.append(",f4=\"val3\",f5=\"val4\",f6=\"val5\" 111222233333\n");
477+
.append(",f4=\"val3\",f5=\"val4\",f6=\"val5\" 111222233333n\n");
477478
CHECK(server.msgs(0) == expect);
478479
CHECK(
479480
server.msgs(1) == "metric1,tag3=value\\ 3,tag\\ 4=value:4 field5=f\n");
@@ -917,7 +918,7 @@ TEST_CASE("Opts copy ctor, assignment and move testing.")
917918
}
918919
}
919920

920-
TEST_CASE("Test timestamp column.")
921+
TEST_CASE("Test timestamp column V1.")
921922
{
922923
questdb::ingress::test::mock_server server;
923924
questdb::ingress::line_sender sender{questdb::ingress::opts{
@@ -943,8 +944,8 @@ TEST_CASE("Test timestamp column.")
943944
.at(now_nanos_ts);
944945

945946
std::stringstream ss;
946-
ss << "test ts1=12345t,ts2=" << now_micros << "t,ts3=" << now_nanos << "n "
947-
<< now_nanos << "\n";
947+
ss << "test ts1=12345t,ts2=" << now_micros << "t,ts3=" << (now_nanos / 1000)
948+
<< "t " << now_nanos << "\n";
948949
const auto exp = ss.str();
949950
CHECK(buffer.peek() == exp);
950951

@@ -1193,11 +1194,11 @@ TEST_CASE("line sender protocol version v2")
11931194
.at(questdb::ingress::timestamp_nanos{10000000});
11941195

11951196
CHECK(server.recv() == 0);
1196-
CHECK(buffer.size() == 38);
1197+
CHECK(buffer.size() == 39);
11971198
sender.flush(buffer);
11981199
CHECK(server.recv() == 1);
11991200
std::string expect{"test,t1=v1,t2= f1=="};
1200-
push_double_to_buffer(expect, 0.5).append(" 10000000\n");
1201+
push_double_to_buffer(expect, 0.5).append(" 10000000n\n");
12011202
CHECK(server.msgs(0) == expect);
12021203
}
12031204

questdb-rs/src/ingress/buffer.rs

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@
2323
******************************************************************************/
2424
use crate::ingress::ndarr::{check_and_get_array_bytes_size, ArrayElementSealed};
2525
use crate::ingress::{
26-
ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, TimestampNanos,
27-
ARRAY_BINARY_FORMAT_TYPE, DOUBLE_BINARY_FORMAT_TYPE, MAX_ARRAY_DIMS, MAX_NAME_LEN_DEFAULT,
26+
ndarr, ArrayElement, DebugBytes, NdArrayView, ProtocolVersion, Timestamp, TimestampMicros,
27+
TimestampNanos, ARRAY_BINARY_FORMAT_TYPE, DOUBLE_BINARY_FORMAT_TYPE, MAX_ARRAY_DIMS,
28+
MAX_NAME_LEN_DEFAULT,
2829
};
2930
use crate::{error, Error};
3031
use std::fmt::{Debug, Formatter};
@@ -1147,19 +1148,19 @@ impl Buffer {
11471148
{
11481149
self.write_column_key(name)?;
11491150
let timestamp: Timestamp = value.try_into()?;
1150-
let mut buf = itoa::Buffer::new();
1151-
match timestamp {
1152-
Timestamp::Micros(ts) => {
1153-
let printed = buf.format(ts.as_i64());
1154-
self.output.extend_from_slice(printed.as_bytes());
1155-
self.output.push(b't');
1156-
}
1157-
Timestamp::Nanos(ts) => {
1158-
let printed = buf.format(ts.as_i64());
1159-
self.output.extend_from_slice(printed.as_bytes());
1160-
self.output.push(b'n');
1151+
let (number, suffix) = match (self.protocol_version, timestamp) {
1152+
(ProtocolVersion::V1, _) => {
1153+
let timestamp: TimestampMicros = timestamp.try_into()?;
1154+
(timestamp.as_i64(), b't')
11611155
}
1162-
}
1156+
(_, Timestamp::Micros(ts)) => (ts.as_i64(), b't'),
1157+
(_, Timestamp::Nanos(ts)) => (ts.as_i64(), b'n'),
1158+
};
1159+
1160+
let mut buf = itoa::Buffer::new();
1161+
let printed = buf.format(number);
1162+
self.output.extend_from_slice(printed.as_bytes());
1163+
self.output.push(suffix);
11631164
Ok(self)
11641165
}
11651166

@@ -1209,23 +1210,28 @@ impl Buffer {
12091210
self.check_op(Op::At)?;
12101211
let timestamp: Timestamp = timestamp.try_into()?;
12111212

1212-
// https://github.com/rust-lang/rust/issues/115880
1213-
let timestamp: crate::Result<TimestampNanos> = timestamp.try_into();
1214-
let timestamp: TimestampNanos = timestamp?;
1213+
let (number, termination) = match (self.protocol_version, timestamp) {
1214+
(ProtocolVersion::V1, _) => {
1215+
let timestamp: crate::Result<TimestampNanos> = timestamp.try_into();
1216+
(timestamp?.as_i64(), "\n")
1217+
}
1218+
(_, Timestamp::Micros(micros)) => (micros.as_i64(), "t\n"),
1219+
(_, Timestamp::Nanos(nanos)) => (nanos.as_i64(), "n\n"),
1220+
};
12151221

1216-
let epoch_nanos = timestamp.as_i64();
1217-
if epoch_nanos < 0 {
1222+
if number < 0 {
12181223
return Err(error::fmt!(
12191224
InvalidTimestamp,
12201225
"Timestamp {} is negative. It must be >= 0.",
1221-
epoch_nanos
1226+
number
12221227
));
12231228
}
1229+
12241230
let mut buf = itoa::Buffer::new();
1225-
let printed = buf.format(epoch_nanos);
1231+
let printed = buf.format(number);
12261232
self.output.push(b' ');
12271233
self.output.extend_from_slice(printed.as_bytes());
1228-
self.output.push(b'\n');
1234+
self.output.extend_from_slice(termination.as_bytes());
12291235
self.state.op_case = OpCase::MayFlushOrTable;
12301236
self.state.row_count += 1;
12311237
Ok(())

questdb-rs/src/tests/http.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,10 +723,16 @@ fn _test_sender_auto_detect_protocol_version(
723723
)?,
724724
Some(_) => server.send_settings_response()?,
725725
}
726+
727+
let designated_ts = if expect_version == ProtocolVersion::V1 {
728+
" 10000000\n"
729+
} else {
730+
" 10000000n\n"
731+
};
726732
let exp = &[
727733
b"test,t1=v1 ",
728734
crate::tests::sender::f64_to_bytes("f1", 0.5, expect_version).as_slice(),
729-
b" 10000000\n",
735+
designated_ts.as_bytes(),
730736
]
731737
.concat();
732738
let req = server.recv_http_q()?;

questdb-rs/src/tests/sender.rs

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,19 @@ fn test_basics(
8686
.at(ts_nanos)?;
8787

8888
assert_eq!(server.recv_q()?, 0);
89+
let (ts3_num, ts3_suffix, dts_suffix) = if version == ProtocolVersion::V1 {
90+
(ts_nanos_num / 1000, "t", "")
91+
} else {
92+
(ts_nanos_num, "n", "n")
93+
};
8994
let exp = &[
9095
"test,t1=v1 ".as_bytes(),
9196
f64_to_bytes("f1", 0.5, version).as_slice(),
9297
format!(
93-
",ts1=12345t,ts2={}t,ts3={ts_nanos_num}n {ts_nanos_num}\n",
94-
ts_micros_num
98+
",ts1=12345t,\
99+
ts2={ts_micros_num}t,\
100+
ts3={ts3_num}{ts3_suffix} \
101+
{ts_nanos_num}{dts_suffix}\n"
95102
)
96103
.as_bytes(),
97104
]
@@ -139,7 +146,7 @@ fn test_array_f64_basic() -> TestResult {
139146
&1.0f64.to_le_bytes(),
140147
&2.0f64.to_le_bytes(),
141148
&3.0f64.to_le_bytes(),
142-
format!(" {}\n", ts.as_i64()).as_bytes(),
149+
format!(" {}n\n", ts.as_i64()).as_bytes(),
143150
]
144151
.concat();
145152

@@ -217,7 +224,7 @@ fn test_array_f64_for_ndarray() -> TestResult {
217224
",arr3d=".as_bytes(),
218225
array_header3d,
219226
array_data3d.as_slice(),
220-
format!(" {}\n", ts.as_i64()).as_bytes(),
227+
format!(" {}n\n", ts.as_i64()).as_bytes(),
221228
]
222229
.concat();
223230

@@ -506,12 +513,12 @@ fn test_bad_key(
506513
}
507514

508515
#[test]
509-
fn test_timestamp_overloads() -> TestResult {
516+
fn test_timestamp_overloads_v1() -> TestResult {
510517
use std::time::SystemTime;
511518

512519
let tbl_name = TableName::new("tbl_name")?;
513520

514-
let mut buffer = Buffer::new(ProtocolVersion::V2);
521+
let mut buffer = Buffer::new(ProtocolVersion::V1);
515522
buffer
516523
.table(tbl_name)?
517524
.column_ts("a", TimestampMicros::new(12345))?
@@ -538,7 +545,7 @@ fn test_timestamp_overloads() -> TestResult {
538545
)?)?;
539546

540547
let exp = concat!(
541-
"tbl_name a=12345t,b=-100000000t,c=12345678n,d=-12345678n,e=-1t,f=-10000n 1000\n",
548+
"tbl_name a=12345t,b=-100000000t,c=12345t,d=-12345t,e=-1t,f=-10t 1000\n",
542549
"tbl_name a=1000000t 5000000000\n"
543550
)
544551
.as_bytes();
@@ -547,6 +554,48 @@ fn test_timestamp_overloads() -> TestResult {
547554
Ok(())
548555
}
549556

557+
#[test]
558+
fn test_timestamp_overloads_v2() -> TestResult {
559+
use std::time::SystemTime;
560+
561+
let tbl_name = TableName::new("tbl_name")?;
562+
563+
let mut buffer = Buffer::new(ProtocolVersion::V2);
564+
buffer
565+
.table(tbl_name)?
566+
.column_ts("a", TimestampMicros::new(12345))?
567+
.column_ts("b", TimestampMicros::new(-100000000))?
568+
.column_ts("c", TimestampNanos::new(12345678))?
569+
.column_ts("d", TimestampNanos::new(-12345678))?
570+
.column_ts("e", Timestamp::Micros(TimestampMicros::new(-1)))?
571+
.column_ts("f", Timestamp::Nanos(TimestampNanos::new(-10000)))?
572+
.at(TimestampMicros::new(1))?;
573+
buffer
574+
.table(tbl_name)?
575+
.column_ts(
576+
"a",
577+
TimestampMicros::from_systemtime(
578+
SystemTime::UNIX_EPOCH
579+
.checked_add(Duration::from_secs(1))
580+
.unwrap(),
581+
)?,
582+
)?
583+
.at(TimestampNanos::from_systemtime(
584+
SystemTime::UNIX_EPOCH
585+
.checked_add(Duration::from_secs(5))
586+
.unwrap(),
587+
)?)?;
588+
589+
let exp = concat!(
590+
"tbl_name a=12345t,b=-100000000t,c=12345678n,d=-12345678n,e=-1t,f=-10000n 1t\n",
591+
"tbl_name a=1000000t 5000000000n\n"
592+
)
593+
.as_bytes();
594+
assert_eq!(buffer.as_bytes(), exp);
595+
596+
Ok(())
597+
}
598+
550599
#[cfg(feature = "chrono_timestamp")]
551600
#[test]
552601
fn test_chrono_timestamp() -> TestResult {
@@ -559,7 +608,7 @@ fn test_chrono_timestamp() -> TestResult {
559608
let mut buffer = Buffer::new(ProtocolVersion::V2);
560609
buffer.table(tbl_name)?.column_ts("a", ts)?.at(ts)?;
561610

562-
let exp = b"tbl_name a=1000000000n 1000000000\n";
611+
let exp = b"tbl_name a=1000000000n 1000000000n\n";
563612
assert_eq!(buffer.as_bytes(), exp);
564613

565614
Ok(())
@@ -633,11 +682,17 @@ fn test_tls_with_file_ca(
633682
.column_f64("f1", 0.5)?
634683
.at(TimestampNanos::new(10000000))?;
635684

685+
let designated_ts = if version == ProtocolVersion::V1 {
686+
" 10000000\n"
687+
} else {
688+
" 10000000n\n"
689+
};
690+
636691
assert_eq!(server.recv_q()?, 0);
637692
let exp = &[
638693
"test,t1=v1 ".as_bytes(),
639694
f64_to_bytes("f1", 0.5, version).as_slice(),
640-
" 10000000\n".as_bytes(),
695+
designated_ts.as_bytes(),
641696
]
642697
.concat();
643698
assert_eq!(buffer.as_bytes(), exp);
@@ -740,10 +795,15 @@ fn test_tls_insecure_skip_verify(
740795
.at(TimestampNanos::new(10000000))?;
741796

742797
assert_eq!(server.recv_q()?, 0);
798+
let designated_ts = if version == ProtocolVersion::V1 {
799+
" 10000000\n"
800+
} else {
801+
" 10000000n\n"
802+
};
743803
let exp = &[
744804
"test,t1=v1 ".as_bytes(),
745805
f64_to_bytes("f1", 0.5, version).as_slice(),
746-
" 10000000\n".as_bytes(),
806+
designated_ts.as_bytes(),
747807
]
748808
.concat();
749809
assert_eq!(buffer.as_bytes(), exp);

0 commit comments

Comments
 (0)