Suricata 与 Kafka 连接

Suricata 本身是不支持输出到 Kafka 的,尽管曾经有相关的 提议

本文大量代码源自 output: Add kafka output ability for eve · CosmosSun/suricata@d9f400a (github.com) ,但站在巨人的肩膀上修改了部分代码,并且尝试给出了自己的理解


考虑到 Suricata 默认输出到文件,仍然需要一个类似 Logstash 的工具进行读取,不如直接输出到 Kafka 实现分布式读取处理

为了方便后续操作,应该使用信息尽可能全,并且易于处理的格式来进行数据的传输,这里选择 Suricata 的 EveJSON 格式,相关的代码位于 src/output-json-xxx.c 中,所有 Suricata 可处理的协议,都有一个对应的文件,用于设置解析格式。

代码修改

如果不进行更复杂的修改,只是单纯输出到 Kafka,那么是有现成的代码可供参考的。EveJSON 除去写入文件目录外,还可以写出到 Redis 中,因此可以参考对应的代码实现。

首先在项目中搜索所有的 "redis"。考虑到大部分文件都包含单词 "redistribute",所以应该用正则表达式搜索 redis[^t]。搜索出来 19 个文件中包含,除去一些文档等文件外,主要需要考虑的是

  • eve-json-output.c
  • util-log-redis.c
  • util-plugin.cutil-error.cutil-logopenfile.c

实际的实现函数位于 util-log-redis.c 中,其余的文件则是将 Redis 注册到 Suricata 中

只需要仿照 Redis 的操作,对 Suricata 进行同样的配置即可

日志上下文类

util-logopenfile.h

包括 Redis 在内,实际上是声明了一个 LogFileCtx 类型的变量(该类型位于 src/util-logopenfile.h 中),类似集成 LogFileCtx 基类实现了一个单例实例。

这里需要修改 4 个地方,首先是引入 Kafka 相关的头文件

#ifdef HAVE_LIBRDKAFKA
#include "util-log-kafka.h"
#endif

然后需要在 LogFileType 枚举中,注册 Kafka 类型

enum LogFileType { LOGFILE_TYPE_FILE,
                   LOGFILE_TYPE_SYSLOG,
                   LOGFILE_TYPE_UNIX_DGRAM,
                   LOGFILE_TYPE_UNIX_STREAM,
                   LOGFILE_TYPE_REDIS,
                   LOGFILE_TYPE_KAFKA,
                   LOGFILE_TYPE_PLUGIN };

LogFileCtx 中,使用联合(union)存储了 Kafka 相关的结构,前者对应一个打开的文件,或是已建立的 Kafka 连接;后者则是文件、Kafka 相关的配置信息

typedef struct LogFileCtx_ {
    union {
        FILE *fp;
        PcieFile *pcie_fp;
        LogThreadedFileCtx *threads;
        void *plugin_data;
#ifdef HAVE_LIBHIREDIS
        void *redis;
#endif
#ifdef HAVE_LIBRDKAFKA
        SCLogKafkaContext* kafka;
#endif
    };

    union {
        SyslogSetup syslog_setup;
#ifdef HAVE_LIBHIREDIS
        RedisSetup redis_setup;
#endif
#ifdef HAVE_LIBRDKAFKA
        KafkaSetup kafka_setup;
#endif
    };

    int (*Write)(const char *buffer, int buffer_len, struct LogFileCtx_ *fp);
    void (*Close)(struct LogFileCtx_ *fp);
    
    /*
    	......
    */
}

util-logopenfile.c

头文件修改后,实现中也应该进行相应的修改,告诉程序应该如何识别上面新加入的东西。

src/util-logopenfile.c 中,再次引入了 Redis 相应的头文件,不过考虑到上面已经引入了 util-logopenfile.h,并且在这个文件里已经引入过 Redis 头文件,所以个人认为这里实际上没有必要再次引入。


int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *log_ctx, const char *default_filename, int rotate) 函数中,存在关于 Redis 的操作。这个函数的任务是“打开文件”(如果是 Redis 或 Kafka,就是建立相应的 Socket)

从 Redis 部分的代码可知,在这里完成的任务是

  • 读入 redis 相关配置
  • 调用 SCConfLogOpenRedis(ConfNode, void *)
  • 配置 log_ctx->type = LOGFILE_TYPE_REDIS 上下文

仿照这部分逻辑实现 Kafka 的代码即可,调用 SCConfLogOpenKafka(ConfNode, void*) 建立 Kafka 连接

output: Add kafka output ability for eve · CosmosSun/suricata@d9f400a (github.com) 的代码中,这里还额外设置了 Suricata 日志探测器 log_ctx->sensor_name 为当前主机名 gethostname(),不过似乎不是必须的

#ifdef HAVE_LIBHIREDIS
    } else if (strcasecmp(filetype, "redis") == 0) {
        ConfNode *redis_node = ConfNodeLookupChild(conf, "redis");
        if (SCConfLogOpenRedis(redis_node, log_ctx) < 0) {
            SCLogError(SC_ERR_REDIS, "failed to open redis output");
            return -1;
        }
        log_ctx->type = LOGFILE_TYPE_REDIS;
#endif
#ifdef HAVE_LIBRDKAFKA
    }
    else if (strcmp(filetype, "kafka") == 0) {
        ConfNode* kafka_node = ConfNodeLookupChild(conf, "kafka");
        if (SCConfLogOpenKafka(kafka_node, log_ctx) < 0) {
            SCLogError(SC_ERR_KAFKA, "failed to open kafka output");
            return -1;
        }
        log_ctx->type = LOGFILE_TYPE_KAFKA;
#endif

int LogFileWrite(LogFileCtx *, MemBuffer *) 中,则需要根据不同的类型,将日志写入对应的 “文件”。按照 Redis 的逻辑,这里只是调用 LogFileWriteRedis(void *, const char *, size_t) 函数,执行写入操作。在写入前后,通过对 &file_ctx->fp_mutex 进行加锁解锁操作,来避免并发问题(“如果在一次调用中不能将日志/警报记录写入文件,它将被锁定”),不过参考的代码中,省去了加锁解锁的代码,个人认为应该带上(大概 Kafka 自带锁?)

#ifdef HAVE_LIBHIREDIS
    else if (file_ctx->type == LOGFILE_TYPE_REDIS) {
        SCMutexLock(&file_ctx->fp_mutex);
        LogFileWriteRedis(file_ctx, (const char *)MEMBUFFER_BUFFER(buffer),
                MEMBUFFER_OFFSET(buffer));
        SCMutexUnlock(&file_ctx->fp_mutex);
    }
#endif
#ifdef HAVE_LIBRDKAFKA
    else if (file_ctx->type == LOGFILE_TYPE_KAFKA) {
        SCMutexLock(&file_ctx->fp_mutex);
        LogFileWriteKafka(file_ctx, (const char*)MEMBUFFER_BUFFER(buffer),
                MEMBUFFER_OFFSET(buffer));
        SCMutexUnlock(&file_ctx->fp_mutex);
}

输出到 Kafka 实现逻辑

首先从 util-log-kafka 开始着手,这个函数相对于其他几个文件,与 Suricata 联系不是那么紧密。

util-log-kafka.h

首先参考 util-log-redis.hutil-log-redis.c 两个文件,将其复制一份,进行修改。

从 Redis 的头文件中,可以看出,实际上这里根据编译时选项,判断了是否要编译该文件

  • 使用宏 __UTIL_LOG_REDIS_H__ 确保文件制备编译一次
  • 在编译时,根据是否拥有 libhiredis 依赖,配置 HAVE_LIBHIREDIS

接下来,头文件中配置了 Redis 连接结构体,用于存储 Redis 信息。并且导出了三个函数

  • void SCLogRedisInit(void);: 初始化 Redis
  • int SCConfLogOpenRedis(ConfNode *, void *);: Redis 配置
  • int LogFileWriteRedis(void *, const char *, size_t);: 将内容写出至 Redis
/**
 * \file
 *
 * \author Paulo Pacheco <fooinha@gmail.com>
 */

#ifndef __UTIL_LOG_REDIS_H__
#define __UTIL_LOG_REDIS_H__

#ifdef HAVE_LIBHIREDIS
#include <hiredis/hiredis.h>


#ifdef HAVE_LIBEVENT
#include <hiredis/async.h>
#endif /* HAVE_LIBEVENT */

#include "conf.h"            /* ConfNode   */

enum RedisMode { REDIS_LIST, REDIS_CHANNEL };

typedef struct RedisSetup_ {
    enum RedisMode mode;
    const char *command;
    const char *key;
    const char *server;
    uint16_t  port;
    int is_async;
    int  batch_size;
} RedisSetup;

typedef struct SCLogRedisContext_ {
    redisContext *sync;
#if HAVE_LIBEVENT
    redisAsyncContext *async;
    struct event_base *ev_base;
    int connected;
#endif /* HAVE_LIBEVENT */
    time_t tried;
    int  batch_count;
    time_t last_push;
} SCLogRedisContext;

void SCLogRedisInit(void);
int SCConfLogOpenRedis(ConfNode *, void *);
int LogFileWriteRedis(void *, const char *, size_t);

#endif /* HAVE_LIBHIREDIS */
#endif /* __UTIL_LOG_REDIS_H__ */

那么,只需要仿照这个实现 Kafka 即可,与 Redis 需要依赖 libhiridis 类似,Kafka 需要 librdkafka 依赖,其余完全可以仿照 kafka 的生产者 demo 实现

/**
 * \file
 *
 * \author OhYee <oyohyee@oyohyee.com>
 */

#ifndef __UTIL_LOG_KAFKA_H__
#define __UTIL_LOG_KAFKA_H__

#define HAVE_LIBRDKAFKA

#ifdef HAVE_LIBRDKAFKA
#include <librdkafka/rdkafka.h>

#include "conf.h"            /* ConfNode   */

 // Kafka 存储结构
typedef struct {
    const char* brokers;    // 地址
    const char* topic_name; // 主题名
    int partitions;         // 分区 ID
}KafkaSetup;

// Kafka 上下文
typedef struct {
    rd_kafka_t* rk;
    rd_kafka_topic_t* rkt;
    long partition;     // 分区
}SCLogKafkaContext;

// 初始化 Kafka
void SCLogKafkaInit(void);
// 根据配置文件连接 Kafka
int SCConfLogOpenKafka(ConfNode*, void*);
// 将内容写出到 Kafka
int LogFileWriteKafka(void*, const char*, size_t);

#endif

#endif

util-log-kafka.c

首先需要参考 kafka 的 demo,看一下生产者要生产一条消息都需要做什么:

  • 使用 rd_kafka_conf_new() 创建 kafka 客户端
  • 使用 rd_kafka_conf_set() 配置客户端信息(如服务端地址)
  • 使用 rd_kafka_conf_set_dr_msg_cb() 配置发送回调
  • 使用 rd_kafka_new() 创建生产者实例
  • 使用 rd_kafka_producev() 发送消息
  • 在主循环使用 rd_kafka_poll(),执行后台操作
  • 使用 rd_kafka_destroy() 销毁实例

那么,我们的文件中,只需要在 Suricata 启动时创建 Kafka 生产者实例,并且在每次有数据写入时,向 Kafka 写入消息;当 Suricata 关闭时,销毁对应的实例。

也即,至少需要三个函数来完成相应的工作(启动、发送、销毁)。从上面的步骤中,可以看到 Kafka 存在一个回调函数,因此我们还需要实现一个递交结果的处理回调函数。

下面分别针对这些函数进行讨论

SCConfLogOpenKafka(void)

这里将根据相关配置文件,初始化 Kafka 客户端

#include "signal.h"

int SCConfLogOpenKafka(ConfNode* kafka_node, void* lf_ctx) {
    LogFileCtx* log_ctx = lf_ctx;
    const char* partitions = NULL;
    SCLogKafkaContext* kafka_ctx = NULL;

    if (NULL == kafka_node) {
        return -1;
    }

    // 读入配置项
    log_ctx->kafka_setup.brokers = ConfNodeLookupChildValue(kafka_node, "brokers");
    log_ctx->kafka_setup.topic_name = ConfNodeLookupChildValue(kafka_node, "topic");
    partitions = ConfNodeLookupChildValue(kafka_node, "partitions");
    log_ctx->kafka_setup.partitions = atoi(partitions);

    /* 创建 kafka 客户端 */
    rd_kafka_conf_t* conf;
    rd_kafka_topic_conf_t* topic_conf;
    char tmp[16];
    char errstr[512];
    kafka_ctx = (SCLogKafkaContext*)SCCalloc(1, sizeof(SCLogKafkaContext));
    if (kafka_ctx == NULL) {
        SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate kafka context");
        exit(EXIT_FAILURE);
    }

    conf = rd_kafka_conf_new();

    /* 配置 Kafka */
    snprintf(tmp, sizeof(tmp), "%i", SIGIO);
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "internal.termination.signal",
        tmp,
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "Unable to allocate kafka context");
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "broker.version.fallback",
        "0.8.2",
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
    }
    if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf,
        "queue.buffering.max.messages",
        "500000",
        errstr,
        sizeof(errstr))) {
        SCLogError(SC_ERR_KAFKA, "%s", errstr);
    }

    // 配置回调
    rd_kafka_conf_set_dr_cb(conf, msg_delivered);

    /* 创建生产者实例 */
    if (!(kafka_ctx->rk = rd_kafka_new(RD_KAFKA_PRODUCER,
        conf,
        errstr,
        sizeof(errstr)))) {
        SCLogError(SC_ERR_KAFKA, "%% Failed to create new producer: %s", errstr);
        exit(EXIT_FAILURE);
    }
    if (0 == rd_kafka_brokers_add(kafka_ctx->rk,
        log_ctx->kafka_setup.brokers)) {
        SCLogError(SC_ERR_KAFKA, "%% No valid brokers specified");
        exit(EXIT_FAILURE);
    }

    /* 创建话题实例 */
    topic_conf = rd_kafka_topic_conf_new();
    kafka_ctx->rkt = rd_kafka_topic_new(kafka_ctx->rk,
        log_ctx->kafka_setup.topic_name, topic_conf);
    if (NULL == kafka_ctx->rkt) {
        SCLogError(SC_ERR_KAFKA, "%% Failed to create kafka topic %s",
            log_ctx->kafka_setup.topic_name);
        exit(EXIT_FAILURE);
    }

    // 补全结构体的数据
    kafka_ctx->partition = 0;
    log_ctx->kafka = kafka_ctx;
    log_ctx->Close = SCLogFileCloseKafka;

    return 0;
}

LogFileWriteKafka(void *, const)

这部分需要将传来的数据,发送至 Kafka 中。从逻辑上而言,我们只需要确定要发往哪一个主题(topic)即可,但是为了尽可能提升性能,还应该确定具体发至哪一个分区(partition)。在参考的代码中,额外维护了一个分区状态,每次将其自加 1 确保消息被平均分配至各个分区中。尽管分区使用 int 存储,但是考虑到 Suricata 本身作为一个流量检测工具,长时间启用还是存在溢出的风险,个人认为应该每次自加后都应该对其进行一次取模操作。

当消息发送失败时,需要进行适当的报错,因此需要注册相应的错误码,错误码的注册位于 src/util-error.hsrc/util-error.c。仿照 Redis 添加 Kafka 对应的错误码即可

// util-error.h
SC_ERR_KAFKA,

// util-error.c
CASE_CODE (SC_ERR_KAFKA);

生产者对应的代码如下

#include "errno.h"

int LogFileWriteKafka(void* lf_ctx, const char* string, size_t string_len) {
    LogFileCtx* log_ctx = lf_ctx;
    SCLogKafkaContext* kafka_ctx = log_ctx->kafka;
    int partition = kafka_ctx->partition % log_ctx->kafka_setup.partitions;
    kafka_ctx->partition = (partition + 1) % log_ctx->kafka_setup.partitions;

    if (rd_kafka_produce(
        /* 话题 */
        kafka_ctx->rkt,
        /* 分区 */
        partition,
        /* 字符串维护方式 */
        RD_KAFKA_MSG_F_COPY,
        /* 字符串和字符串长度 */
        (void*)string,
        string_len,
        /* key 和 key 长度 */
        NULL,
        0,
        /* 消息透明配置 */
        NULL
    ) == -1) {
        SCLogError(
            SC_ERR_KAFKA,
            "%% Failed to produce to topic %s "
            "partition %i: %s\n",
            log_ctx->kafka_setup.topic_name, partition,
            rd_kafka_err2str(rd_kafka_errno2err(errno))
        );
        /* Poll to handle delivery reports */
        rd_kafka_poll(kafka_ctx->rk, 0);
    }

    return -1;
}

SCConfLogCloseKafka()

Kafka 实例存储在 log_ctx->kafka 中,使用 rd_kafka_destroy(log_ctx->kafka->rk) 即可销毁对应的实例。

通常,需要确保所有信息已经发送完成,使用 rd_kafka_outq_len(log_ctx->kafka->rk) > 0 判断是否所有消息已递交,并且使用 rd_kafka_pool(log_ctx->kafka->rk, 100) 执行 100ms 的等待。

在参考的代码中,同时销毁掉了话题,考虑到实际需要,这里没有销毁对应的话题,这样即使 Suricata 离线后,其他程序仍然可以继续处理数据

static void SCLogFileCloseKafka(LogFileCtx* log_ctx) {
    SCLogKafkaContext* kafka_ctx = log_ctx->kafka;
    if (NULL == kafka_ctx) return;
    if (kafka_ctx->rk) {
        rd_kafka_poll(kafka_ctx->rk, 0);
        while (rd_kafka_outq_len(kafka_ctx->rk) > 0)
            rd_kafka_poll(kafka_ctx->rk, 100);
    }
    if (kafka_ctx->rk) rd_kafka_destroy(kafka_ctx->rk);
    return;
}

static void msg_delivered(rd_kafka_t*, void*, size_t, int, void*, void*)

这个函数在 Kafka 生产者递交后回调运行,主要用以判断是否递交成功

报错时直接输出即可,没有理解为什么参考的代码对数据进行了一次重新赋值,故省略掉相关代码

static void msg_delivered(rd_kafka_t* rk,
    void* payload, size_t len,
    int error_code,
    void* opaque, void* msg_opaque) {
    if (error_code)
        SCLogError(SC_ERR_KAFKA, "%% Message delivery failed: %s\n",
            rd_kafka_err2str(error_code));
}

Suricata 启用 Kafka 模块

上面的代码只是按照 Suricata 的要求实现了一个 Kafka 模块,还需要将其添加到 Suricata 的日志输出中,才能在输出数据时被正确调用

src/output-json.c 搜索 Redis,可以发现需要在 OutputInitResult OutputJsonInitCtx(ConfNode *) 进行类型的判断,共有两处需要添加

#ifdef HAVE_LIBHIREDIS
                SCLogRedisInit();
                json_ctx->json_out = LOGFILE_TYPE_REDIS;
#else
                           FatalError(SC_ERR_FATAL,
                                      "redis JSON output option is not compiled");
#endif
            } else if (strcmp(output_s, "kafka") == 0) {
#ifdef HAVE_LIBRDKAFKA
                json_ctx->json_out = LOGFILE_TYPE_KAFKA;
#else
                           FatalError(SC_ERR_FATAL,
                               "kafka JSON output option is not compiled");
#endif
// ...
#ifdef HAVE_LIBHIREDIS
        else if (json_ctx->json_out == LOGFILE_TYPE_REDIS) {
            ConfNode *redis_node = ConfNodeLookupChild(conf, "redis");
            if (!json_ctx->file_ctx->sensor_name) {
                char hostname[1024];
                gethostname(hostname, 1023);
                json_ctx->file_ctx->sensor_name = SCStrdup(hostname);
            }
            if (json_ctx->file_ctx->sensor_name  == NULL) {
                LogFileFreeCtx(json_ctx->file_ctx);
                SCFree(json_ctx);
                SCFree(output_ctx);
                return result;
            }

            if (SCConfLogOpenRedis(redis_node, json_ctx->file_ctx) < 0) {
                LogFileFreeCtx(json_ctx->file_ctx);
                SCFree(json_ctx);
                SCFree(output_ctx);
                return result;
            }
        }
#endif
#ifdef HAVE_LIBRDKAFKA
        else if (json_ctx->json_out == LOGFILE_TYPE_KAFKA) {
            ConfNode* kafka_node = ConfNodeLookupChild(conf, "kafka");
            if (!json_ctx->file_ctx->sensor_name) {
                char hostname[1024];
                gethostname(hostname, 1023);
                json_ctx->file_ctx->sensor_name = SCStrdup(hostname);
            }
            if (json_ctx->file_ctx->sensor_name == NULL) {
                LogFileFreeCtx(json_ctx->file_ctx);
                SCFree(json_ctx);
                SCFree(output_ctx);
                return result;
            }

            if (SCConfLogOpenKafka(kafka_node, json_ctx->file_ctx) < 0) {
                LogFileFreeCtx(json_ctx->file_ctx);
                SCFree(json_ctx);
                SCFree(output_ctx);
                return result;
            }
        }
#endif

编译选项

代码写好了,还需要让 Suricata 知道如何编译。在 src/Makefile.am 中,添加我们新加入的文件

util-log-kafka.h util-log-kafka.c \

configure.ac 中添加需要检查的第三方依赖 librdkafka

# librdkafka
    AC_ARG_ENABLE(rdkafka,
	        AS_HELP_STRING([--enable-rdkafka],[Enable Kafka support]),
	        [ enable_rdkafka="$enableval"],
	        [ enable_rdkafka="no"])

    if test "$enable_rdkafka" = "yes"; then
        AC_CHECK_HEADER("librdkafka/rdkafka.h",RDKAFKA="yes",RDKAFKA="no")
        if test "$RDKAFKA" = "yes"; then
            AC_CHECK_LIB(rdkafka, rd_kafka_new,, RDKAFKA="no")
        fi
        if test "$RDKAFKA" = "no"; then
            echo
            echo "   ERROR!  librdkafka library not found, go get it"
            echo "   from https://github.com/edenhill/librdkafka or your distribution:"
			echo
            echo "   Ubuntu: apt-get install librdkafka-dev"
            echo "   Fedora: dnf install rdkafka-devel"
            echo "   CentOS/RHEL: yum install librdkafka-devel"
            echo
            exit 1
        fi
        if test "$RDKAFKA" = "yes"; then
            AC_DEFINE([HAVE_LIBRDKAFKA],[1],[librdkafka available])
			enable_rdkafka="yes"
		fi
	fi

然后,则是修改相应的 Suricata 文档,让其他用户知晓如何配置 Kafka,具体在 doc/userguide/output/eve/eve-json-output.rst 中,照着 Redis 复制即可

最后,由于这里添加了新的可用配置,因此需要在 src/util-plugin.c 中的 bool SCPluginRegisterFileType(SCPluginFileType *) 中修改对应的代码,以避免检查失败

重新编译

要添加 Kafka 相关功能,需要添加 --enable-rdkafka 参数(详情见 configure.ac 的修改)

./autogen.sh
./configure --prefix=/opt/suricata --enable-luajit --enable-rdkafka
sudo make install-full

只需要修改相应的配置项如下即可

outputs:
  - eve-log:
      enabled: yes
      filetype: kafka
      kafka:
        brokers: localhost:9092
        partitions: 1
        topic: test

参考资料