2020
2121#include <fluent-bit.h>
2222#include "flb_tests_runtime.h"
23+ #include <unistd.h>
24+ #include <sys/stat.h>
25+ #include <ftw.h>
26+ #include <limits.h>
27+ #include <errno.h>
2328
2429/* Test data */
30+
31+ static int flb_kusto_unlink_cb (const char * fpath , const struct stat * sb , int typeflag , struct FTW * ftwbuf )
32+ {
33+ return remove (fpath );
34+ }
35+
36+ static void flb_kusto_rm_rf (const char * path )
37+ {
38+ struct stat st ;
39+
40+ if (stat (path , & st ) != 0 ) {
41+ return ;
42+ }
43+
44+ nftw (path , flb_kusto_unlink_cb , 64 , FTW_DEPTH | FTW_PHYS );
45+ }
46+
2547#include "data/common/json_invalid.h" /* JSON_INVALID */
2648
2749/* Test functions */
@@ -30,6 +52,7 @@ void flb_test_azure_kusto_managed_identity_system(void);
3052void flb_test_azure_kusto_managed_identity_user (void );
3153void flb_test_azure_kusto_service_principal (void );
3254void flb_test_azure_kusto_workload_identity (void );
55+ void flb_test_azure_kusto_buffering_backlog (void );
3356
3457/* Test list */
3558TEST_LIST = {
@@ -38,6 +61,7 @@ TEST_LIST = {
3861 {"managed_identity_user" , flb_test_azure_kusto_managed_identity_user },
3962 {"service_principal" , flb_test_azure_kusto_service_principal },
4063 {"workload_identity" , flb_test_azure_kusto_workload_identity },
64+ {"buffering_backlog" , flb_test_azure_kusto_buffering_backlog },
4165 {NULL , NULL }
4266};
4367
@@ -210,4 +234,88 @@ void flb_test_azure_kusto_workload_identity(void)
210234
211235 flb_stop (ctx );
212236 flb_destroy (ctx );
237+ }
238+
239+ /* Regression: exercise buffering-enabled backlog processing on restart to validate nested mk_list_foreach_safe fix */
240+ void flb_test_azure_kusto_buffering_backlog (void )
241+ {
242+ int i ;
243+ int ret ;
244+ int bytes ;
245+ char sample [] = "{\"k\":\"v\"}" ;
246+ size_t sample_size = sizeof (sample ) - 1 ;
247+ char buffer_dir [PATH_MAX ];
248+ pid_t pid ;
249+ flb_ctx_t * ctx ;
250+ int in_ffd ;
251+ int out_ffd ;
252+
253+ pid = getpid ();
254+ snprintf (buffer_dir , sizeof (buffer_dir ), "/tmp/flb-kusto-test-%d" , (int ) pid );
255+
256+ /* Ensure a clean buffer directory before starting */
257+ flb_kusto_rm_rf (buffer_dir );
258+ mkdir (buffer_dir , 0700 );
259+
260+ /* First run: enable buffering and write data to disk */
261+ ctx = flb_create ();
262+ flb_service_set (ctx , "Flush" , "1" , "Grace" , "1" , "Log_Level" , "error" , NULL );
263+
264+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
265+ TEST_CHECK (in_ffd >= 0 );
266+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
267+
268+ out_ffd = flb_output (ctx , (char * ) "azure_kusto" , NULL );
269+ TEST_CHECK (out_ffd >= 0 );
270+ flb_output_set (ctx , out_ffd , "match" , "test" , NULL );
271+ flb_output_set (ctx , out_ffd , "auth_type" , "managed_identity" , NULL );
272+ flb_output_set (ctx , out_ffd , "client_id" , "system" , NULL );
273+ flb_output_set (ctx , out_ffd , "ingestion_endpoint" , "https://ingest-CLUSTER.kusto.windows.net" , NULL );
274+ flb_output_set (ctx , out_ffd , "database_name" , "telemetrydb" , NULL );
275+ flb_output_set (ctx , out_ffd , "table_name" , "logs" , NULL );
276+ flb_output_set (ctx , out_ffd , "buffering_enabled" , "true" , NULL );
277+ flb_output_set (ctx , out_ffd , "buffer_dir" , buffer_dir , NULL );
278+
279+ ret = flb_start (ctx );
280+ TEST_CHECK (ret == 0 );
281+
282+ for (i = 0 ; i < 5 ; i ++ ) {
283+ bytes = flb_lib_push (ctx , in_ffd , sample , sample_size );
284+ TEST_CHECK (bytes == (int ) sample_size );
285+ }
286+
287+ sleep (1 ); /* allow flush to write buffered chunks */
288+
289+ flb_stop (ctx );
290+ flb_destroy (ctx );
291+
292+ /* Second run: restart to process backlog from buffer_dir */
293+ ctx = flb_create ();
294+ flb_service_set (ctx , "Flush" , "1" , "Grace" , "1" , "Log_Level" , "error" , NULL );
295+
296+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
297+ TEST_CHECK (in_ffd >= 0 );
298+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
299+
300+ out_ffd = flb_output (ctx , (char * ) "azure_kusto" , NULL );
301+ TEST_CHECK (out_ffd >= 0 );
302+ flb_output_set (ctx , out_ffd , "match" , "test" , NULL );
303+ flb_output_set (ctx , out_ffd , "auth_type" , "managed_identity" , NULL );
304+ flb_output_set (ctx , out_ffd , "client_id" , "system" , NULL );
305+ flb_output_set (ctx , out_ffd , "ingestion_endpoint" , "https://ingest-CLUSTER.kusto.windows.net" , NULL );
306+ flb_output_set (ctx , out_ffd , "database_name" , "telemetrydb" , NULL );
307+ flb_output_set (ctx , out_ffd , "table_name" , "logs" , NULL );
308+ flb_output_set (ctx , out_ffd , "buffering_enabled" , "true" , NULL );
309+ flb_output_set (ctx , out_ffd , "buffer_dir" , buffer_dir , NULL );
310+
311+ ret = flb_start (ctx );
312+ TEST_CHECK (ret == 0 );
313+
314+ sleep (1 ); /* ingest_all_chunks runs on startup for buffered backlog */
315+
316+ flb_stop (ctx );
317+ flb_destroy (ctx );
318+
319+ /* Cleanup buffer directory after test */
320+ flb_kusto_rm_rf (buffer_dir );
213321}
0 commit comments