Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions Apps/LogExporterApp/App.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ You should have received a copy of the GNU General Public License

namespace LogExporter
{
public sealed class App : IDnsApplication, IDnsQueryLogger
public sealed class App : IDnsApplication, IDnsQueryLogger, IDisposable
{
#region variables

Expand All @@ -41,7 +41,7 @@ public sealed class App : IDnsApplication, IDnsQueryLogger

bool _enableLogging;

readonly ConcurrentQueue<LogEntry> _queuedLogs = new ConcurrentQueue<LogEntry>();
ConcurrentQueue<LogEntry>? _queuedLogs;
readonly Timer _queueTimer;
const int QUEUE_TIMER_INTERVAL = 10000;
const int BULK_INSERT_COUNT = 1000;
Expand Down Expand Up @@ -75,7 +75,7 @@ private void Dispose(bool disposing)
{
_queueTimer?.Dispose();

ExportLogsAsync().Sync(); //flush any pending logs
ExportLogsAsync().Sync(); // flush any pending logs

_exportManager.Dispose();
}
Expand All @@ -96,6 +96,10 @@ public Task InitializeAsync(IDnsServer dnsServer, string config)
if (_config is null)
throw new DnsClientException("Invalid application configuration.");

// Initialize bounded queue
_queuedLogs = new ConcurrentQueue<LogEntry>();

// File export strategy
if (_config.FileTarget!.Enabled)
{
_exportManager.RemoveStrategy(typeof(FileExportStrategy));
Expand All @@ -106,6 +110,7 @@ public Task InitializeAsync(IDnsServer dnsServer, string config)
_exportManager.RemoveStrategy(typeof(FileExportStrategy));
}

// HTTP export strategy
if (_config.HttpTarget!.Enabled)
{
_exportManager.RemoveStrategy(typeof(HttpExportStrategy));
Expand All @@ -116,10 +121,14 @@ public Task InitializeAsync(IDnsServer dnsServer, string config)
_exportManager.RemoveStrategy(typeof(HttpExportStrategy));
}

// Syslog export strategy
if (_config.SyslogTarget!.Enabled)
{
_exportManager.RemoveStrategy(typeof(SyslogExportStrategy));
_exportManager.AddStrategy(new SyslogExportStrategy(_config.SyslogTarget.Address, _config.SyslogTarget.Port, _config.SyslogTarget.Protocol));
_exportManager.AddStrategy(new SyslogExportStrategy(
_config.SyslogTarget.Address,
_config.SyslogTarget.Port,
_config.SyslogTarget.Protocol));
}
else
{
Expand All @@ -138,10 +147,12 @@ public Task InitializeAsync(IDnsServer dnsServer, string config)

public Task InsertLogAsync(DateTime timestamp, DnsDatagram request, IPEndPoint remoteEP, DnsTransportProtocol protocol, DnsDatagram response)
{
if (_enableLogging)
if (_enableLogging && _queuedLogs is not null && _config is not null)
{
if (_queuedLogs.Count < _config!.MaxQueueSize)
_queuedLogs.Enqueue(new LogEntry(timestamp, remoteEP, protocol, request, response));
// Drop oldest when full
while (_queuedLogs.Count >= _config.MaxQueueSize && _queuedLogs.TryDequeue(out _)) { }

_queuedLogs.Enqueue(new LogEntry(timestamp, remoteEP, protocol, request, response));
}

return Task.CompletedTask;
Expand All @@ -153,24 +164,26 @@ public Task InsertLogAsync(DateTime timestamp, DnsDatagram request, IPEndPoint r

private async Task ExportLogsAsync()
{
if (_queuedLogs is null)
return;

try
{
List<LogEntry> logs = new List<LogEntry>(BULK_INSERT_COUNT);

while (true)
while (_queuedLogs.TryDequeue(out var log))
{
while (logs.Count < BULK_INSERT_COUNT && _queuedLogs.TryDequeue(out LogEntry? log))
logs.Add(log);

if (logs.Count >= BULK_INSERT_COUNT)
{
logs.Add(log);
await _exportManager.ImplementStrategyAsync(logs);
logs.Clear();
}
}

if (logs.Count < 1)
break;

if (logs.Count > 0)
await _exportManager.ImplementStrategyAsync(logs);

logs.Clear();
}
}
catch (Exception ex)
{
Expand All @@ -182,7 +195,6 @@ private async void HandleExportLogCallback(object? state)
{
try
{
// Process logs within the timer interval, then let the timer reschedule
await ExportLogsAsync();
}
catch (Exception ex)
Expand All @@ -206,7 +218,10 @@ private async void HandleExportLogCallback(object? state)

public string Description
{
get { return "Allows exporting query logs to third party sinks. It supports exporting to File, HTTP endpoint, and Syslog (UDP, TCP, TLS, and Local protocols)."; }
get
{
return "Allows exporting query logs to third party sinks. It supports exporting to File, HTTP endpoint, and Syslog (UDP, TCP, TLS, and Local protocols).";
}
}

#endregion
Expand Down