diff --git a/Apps/LogExporterApp/App.cs b/Apps/LogExporterApp/App.cs index b4d70e46..cf0a55fa 100644 --- a/Apps/LogExporterApp/App.cs +++ b/Apps/LogExporterApp/App.cs @@ -30,7 +30,7 @@ You should have received a copy of the GNU General Public License namespace LogExporter { - public sealed class App : IDnsApplication, IDnsQueryLogger + public sealed class App : IDnsApplication, IDnsQueryLogger, IDisposable { #region variables @@ -41,7 +41,7 @@ public sealed class App : IDnsApplication, IDnsQueryLogger bool _enableLogging; - readonly ConcurrentQueue _queuedLogs = new ConcurrentQueue(); + ConcurrentQueue? _queuedLogs; readonly Timer _queueTimer; const int QUEUE_TIMER_INTERVAL = 10000; const int BULK_INSERT_COUNT = 1000; @@ -75,7 +75,7 @@ private void Dispose(bool disposing) { _queueTimer?.Dispose(); - ExportLogsAsync().Sync(); //flush any pending logs + ExportLogsAsync().Sync(); // flush any pending logs _exportManager.Dispose(); } @@ -96,6 +96,10 @@ public Task InitializeAsync(IDnsServer dnsServer, string config) if (_config is null) throw new DnsClientException("Invalid application configuration."); + // Initialize bounded queue + _queuedLogs = new ConcurrentQueue(); + + // File export strategy if (_config.FileTarget!.Enabled) { _exportManager.RemoveStrategy(typeof(FileExportStrategy)); @@ -106,6 +110,7 @@ public Task InitializeAsync(IDnsServer dnsServer, string config) _exportManager.RemoveStrategy(typeof(FileExportStrategy)); } + // HTTP export strategy if (_config.HttpTarget!.Enabled) { _exportManager.RemoveStrategy(typeof(HttpExportStrategy)); @@ -116,10 +121,14 @@ public Task InitializeAsync(IDnsServer dnsServer, string config) _exportManager.RemoveStrategy(typeof(HttpExportStrategy)); } + // Syslog export strategy if (_config.SyslogTarget!.Enabled) { _exportManager.RemoveStrategy(typeof(SyslogExportStrategy)); - _exportManager.AddStrategy(new SyslogExportStrategy(_config.SyslogTarget.Address, _config.SyslogTarget.Port, _config.SyslogTarget.Protocol)); + _exportManager.AddStrategy(new SyslogExportStrategy( + _config.SyslogTarget.Address, + _config.SyslogTarget.Port, + _config.SyslogTarget.Protocol)); } else { @@ -138,10 +147,12 @@ public Task InitializeAsync(IDnsServer dnsServer, string config) public Task InsertLogAsync(DateTime timestamp, DnsDatagram request, IPEndPoint remoteEP, DnsTransportProtocol protocol, DnsDatagram response) { - if (_enableLogging) + if (_enableLogging && _queuedLogs is not null && _config is not null) { - if (_queuedLogs.Count < _config!.MaxQueueSize) - _queuedLogs.Enqueue(new LogEntry(timestamp, remoteEP, protocol, request, response)); + // Drop oldest when full + while (_queuedLogs.Count >= _config.MaxQueueSize && _queuedLogs.TryDequeue(out _)) { } + + _queuedLogs.Enqueue(new LogEntry(timestamp, remoteEP, protocol, request, response)); } return Task.CompletedTask; @@ -153,24 +164,26 @@ public Task InsertLogAsync(DateTime timestamp, DnsDatagram request, IPEndPoint r private async Task ExportLogsAsync() { + if (_queuedLogs is null) + return; + try { List logs = new List(BULK_INSERT_COUNT); - while (true) + while (_queuedLogs.TryDequeue(out var log)) { - while (logs.Count < BULK_INSERT_COUNT && _queuedLogs.TryDequeue(out LogEntry? log)) + logs.Add(log); + + if (logs.Count >= BULK_INSERT_COUNT) { - logs.Add(log); + await _exportManager.ImplementStrategyAsync(logs); + logs.Clear(); } + } - if (logs.Count < 1) - break; - + if (logs.Count > 0) await _exportManager.ImplementStrategyAsync(logs); - - logs.Clear(); - } } catch (Exception ex) { @@ -182,7 +195,6 @@ private async void HandleExportLogCallback(object? state) { try { - // Process logs within the timer interval, then let the timer reschedule await ExportLogsAsync(); } catch (Exception ex) @@ -206,7 +218,10 @@ private async void HandleExportLogCallback(object? state) public string Description { - get { return "Allows exporting query logs to third party sinks. It supports exporting to File, HTTP endpoint, and Syslog (UDP, TCP, TLS, and Local protocols)."; } + get + { + return "Allows exporting query logs to third party sinks. It supports exporting to File, HTTP endpoint, and Syslog (UDP, TCP, TLS, and Local protocols)."; + } } #endregion