nixos/vector: Add journald+ClickHouse test

This commit is contained in:
Jonathan Davies 2025-05-07 11:46:54 +00:00
parent 64f67b999e
commit 8932afecff
No known key found for this signature in database
2 changed files with 158 additions and 0 deletions

View file

@ -8,6 +8,7 @@
file-sink = import ./file-sink.nix { inherit system pkgs; };
api = import ./api.nix { inherit system pkgs; };
dnstap = import ./dnstap.nix { inherit system pkgs; };
journald-clickhouse = import ./journald-clickhouse.nix { inherit system pkgs; };
nginx-clickhouse = import ./nginx-clickhouse.nix { inherit system pkgs; };
syslog-quickwit = import ./syslog-quickwit.nix { inherit system pkgs; };
}

View file

@ -0,0 +1,157 @@
import ../make-test-python.nix (
{ lib, pkgs, ... }:
let
# Take the original journald message and create a new payload which only
# contains the relevant fields - these must match the database columns.
journalVrlRemapTransform = {
journald_remap = {
inputs = [ "journald" ];
type = "remap";
source = ''
m = {}
m.app = .SYSLOG_IDENTIFIER
m.host = .host
m.severity = to_int(.PRIORITY) ?? 0
m.level = to_syslog_level(m.severity) ?? ""
m.message = strip_ansi_escape_codes!(.message)
m.timestamp = .timestamp
m.uid = to_int(._UID) ?? 0
m.pid = to_int(._PID) ?? 0
. = [m]
'';
};
};
in
{
name = "vector-journald-clickhouse";
meta.maintainers = [ pkgs.lib.maintainers.happysalada ];
nodes = {
clickhouse =
{ config, pkgs, ... }:
{
virtualisation.diskSize = 5 * 1024;
virtualisation.memorySize = 4096;
networking.firewall.allowedTCPPorts = [ 6000 ];
services.vector = {
enable = true;
journaldAccess = true;
settings = {
sources = {
journald = {
type = "journald";
};
vector_source = {
type = "vector";
address = "[::]:6000";
};
};
transforms = journalVrlRemapTransform;
sinks = {
clickhouse = {
type = "clickhouse";
inputs = [
"journald_remap"
"vector_source"
];
endpoint = "http://localhost:8123";
database = "journald";
table = "logs";
date_time_best_effort = true;
};
};
};
};
services.clickhouse = {
enable = true;
};
};
vector =
{ config, pkgs, ... }:
{
services.vector = {
enable = true;
journaldAccess = true;
settings = {
sources = {
journald = {
type = "journald";
};
};
transforms = journalVrlRemapTransform;
sinks = {
vector_sink = {
type = "vector";
inputs = [ "journald_remap" ];
address = "clickhouse:6000";
};
};
};
};
};
};
testScript =
let
# work around quote/substitution complexity by Nix, Perl, bash and SQL.
databaseDDL = pkgs.writeText "database.sql" "CREATE DATABASE IF NOT EXISTS journald";
# https://clickhouse.com/blog/storing-log-data-in-clickhouse-fluent-bit-vector-open-telemetry
tableDDL = pkgs.writeText "table.sql" ''
CREATE TABLE IF NOT EXISTS journald.logs (
timestamp DateTime64(6),
app LowCardinality(String),
host LowCardinality(String),
level LowCardinality(String),
severity UInt8,
message String,
uid UInt16,
pid UInt32,
)
ENGINE = MergeTree()
ORDER BY (host, app, timestamp)
PARTITION BY toYYYYMM(timestamp)
'';
selectQuery = pkgs.writeText "select.sql" ''
SELECT COUNT(host) FROM journald.logs
WHERE message LIKE '%Vector has started%'
'';
in
''
clickhouse.wait_for_unit("clickhouse")
clickhouse.wait_for_open_port(6000)
clickhouse.wait_for_open_port(8123)
clickhouse.succeed(
"cat ${databaseDDL} | clickhouse-client"
)
clickhouse.succeed(
"cat ${tableDDL} | clickhouse-client"
)
for machine in clickhouse, vector:
machine.wait_for_unit("vector")
machine.wait_until_succeeds(
"journalctl -o cat -u vector.service | grep 'Vector has started'"
)
clickhouse.wait_until_succeeds(
"cat ${selectQuery} | clickhouse-client | grep 2"
)
'';
}
)