Suricata 与 Kafka 连接
Suricata 本身是不支持输出到 Kafka 的,尽管曾经有相关的 提议
本文大量代码源自 output: Add kafka output ability for eve · CosmosSun/suricata@d9f400a (github.com) ,但站在巨人的肩膀上修改了部分代码,并且尝试给出了自己的理解
考虑到 Suricata 默认输出到文件,仍然需要一个类似 Logstash 的工具进行读取,不如直接输出到 Kafka 实现分布式读取处理
为了方便后续操作,应该使用信息尽可能全,并且易于处理的格式来进行数据的传输,这里选择 Suricata 的 EveJSON 格式,相关的代码位于 src/output-json-xxx.c
中,所有 Suricata 可处理的协议,都有一个对应的文件,用于设置解析格式。
代码修改
如果不进行更复杂的修改,只是单纯输出到 Kafka,那么是有现成的代码可供参考的。EveJSON 除去写入文件目录外,还可以写出到 Redis 中,因此可以参考对应的代码实现。
首先在项目中搜索所有的 "redis"。考虑到大部分文件都包含单词 "redistribute",所以应该用正则表达式搜索 redis[^t]
。搜索出来 19 个文件中包含,除去一些文档等文件外,主要需要考虑的是
eve-json-output.c
util-log-redis.c
util-plugin.c
、util-error.c
、util-logopenfile.c
实际的实现函数位于 util-log-redis.c
中,其余的文件则是将 Redis 注册到 Suricata 中
只需要仿照 Redis 的操作,对 Suricata 进行同样的配置即可
日志上下文类
util-logopenfile.h
包括 Redis 在内,实际上是声明了一个 LogFileCtx
类型的变量(该类型位于 src/util-logopenfile.h
中),类似集成 LogFileCtx
基类实现了一个单例实例。
这里需要修改 4 个地方,首先是引入 Kafka 相关的头文件
#ifdef HAVE_LIBRDKAFKA #include "util-log-kafka.h" #endif
然后需要在 LogFileType
枚举中,注册 Kafka 类型
enum LogFileType { LOGFILE_TYPE_FILE,
LOGFILE_TYPE_SYSLOG,
LOGFILE_TYPE_UNIX_DGRAM,
LOGFILE_TYPE_UNIX_STREAM,
LOGFILE_TYPE_REDIS,
LOGFILE_TYPE_KAFKA,
LOGFILE_TYPE_PLUGIN };
在 LogFileCtx
中,使用联合(union
)存储了 Kafka 相关的结构,前者对应一个打开的文件,或是已建立的 Kafka 连接;后者则是文件、Kafka 相关的配置信息
typedef struct LogFileCtx_ { union { FILE *fp; PcieFile *pcie_fp; LogThreadedFileCtx *threads; void *plugin_data; #ifdef HAVE_LIBHIREDIS void *redis; #endif #ifdef HAVE_LIBRDKAFKA SCLogKafkaContext* kafka; #endif }; union { SyslogSetup syslog_setup; #ifdef HAVE_LIBHIREDIS RedisSetup redis_setup; #endif #ifdef HAVE_LIBRDKAFKA KafkaSetup kafka_setup; #endif }; int (*Write)(const char *buffer, int buffer_len, struct LogFileCtx_ *fp); void (*Close)(struct LogFileCtx_ *fp); /* ...... */ }
util-logopenfile.c
头文件修改后,实现中也应该进行相应的修改,告诉程序应该如何识别上面新加入的东西。
在 src/util-logopenfile.c
中,再次引入了 Redis 相应的头文件,不过考虑到上面已经引入了 util-logopenfile.h
,并且在这个文件里已经引入过 Redis 头文件,所以个人认为这里实际上没有必要再次引入。
在 int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *log_ctx, const char *default_filename, int rotate)
函数中,存在关于 Redis 的操作。这个函数的任务是“打开文件”(如果是 Redis 或 Kafka,就是建立相应的 Socket)
从 Redis 部分的代码可知,在这里完成的任务是
- 读入
redis
相关配置 - 调用
SCConfLogOpenRedis(ConfNode, void *)
- 配置
log_ctx->type = LOGFILE_TYPE_REDIS
上下文
仿照这部分逻辑实现 Kafka 的代码即可,调用 SCConfLogOpenKafka(ConfNode, void*)
建立 Kafka 连接
在 output: Add kafka output ability for eve · CosmosSun/suricata@d9f400a (github.com) 的代码中,这里还额外设置了 Suricata 日志探测器 log_ctx->sensor_name
为当前主机名 gethostname()
,不过似乎不是必须的
#ifdef HAVE_LIBHIREDIS } else if (strcasecmp(filetype, "redis") == 0) { ConfNode *redis_node = ConfNodeLookupChild(conf, "redis"); if (SCConfLogOpenRedis(redis_node, log_ctx) < 0) { SCLogError(SC_ERR_REDIS, "failed to open redis output"); return -1; } log_ctx->type = LOGFILE_TYPE_REDIS; #endif #ifdef HAVE_LIBRDKAFKA } else if (strcmp(filetype, "kafka") == 0) { ConfNode* kafka_node = ConfNodeLookupChild(conf, "kafka"); if (SCConfLogOpenKafka(kafka_node, log_ctx) < 0) { SCLogError(SC_ERR_KAFKA, "failed to open kafka output"); return -1; } log_ctx->type = LOGFILE_TYPE_KAFKA; #endif
在 int LogFileWrite(LogFileCtx *, MemBuffer *)
中,则需要根据不同的类型,将日志写入对应的 “文件”。按照 Redis 的逻辑,这里只是调用 LogFileWriteRedis(void *, const char *, size_t)
函数,执行写入操作。在写入前后,通过对 &file_ctx->fp_mutex
进行加锁解锁操作,来避免并发问题(“如果在一次调用中不能将日志/警报记录写入文件,它将被锁定”),不过参考的代码中,省去了加锁解锁的代码,个人认为应该带上(大概 Kafka 自带锁?)
#ifdef HAVE_LIBHIREDIS else if (file_ctx->type == LOGFILE_TYPE_REDIS) { SCMutexLock(&file_ctx->fp_mutex); LogFileWriteRedis(file_ctx, (const char *)MEMBUFFER_BUFFER(buffer), MEMBUFFER_OFFSET(buffer)); SCMutexUnlock(&file_ctx->fp_mutex); } #endif #ifdef HAVE_LIBRDKAFKA else if (file_ctx->type == LOGFILE_TYPE_KAFKA) { SCMutexLock(&file_ctx->fp_mutex); LogFileWriteKafka(file_ctx, (const char*)MEMBUFFER_BUFFER(buffer), MEMBUFFER_OFFSET(buffer)); SCMutexUnlock(&file_ctx->fp_mutex); }
输出到 Kafka 实现逻辑
首先从 util-log-kafka
开始着手,这个函数相对于其他几个文件,与 Suricata 联系不是那么紧密。
util-log-kafka.h
首先参考 util-log-redis.h
和 util-log-redis.c
两个文件,将其复制一份,进行修改。
从 Redis 的头文件中,可以看出,实际上这里根据编译时选项,判断了是否要编译该文件
- 使用宏
__UTIL_LOG_REDIS_H__
确保文件制备编译一次 - 在编译时,根据是否拥有
libhiredis
依赖,配置HAVE_LIBHIREDIS
接下来,头文件中配置了 Redis 连接结构体,用于存储 Redis 信息。并且导出了三个函数
void SCLogRedisInit(void);
: 初始化 Redisint SCConfLogOpenRedis(ConfNode *, void *);
: Redis 配置int LogFileWriteRedis(void *, const char *, size_t);
: 将内容写出至 Redis
/** * \file * * \author Paulo Pacheco <fooinha@gmail.com> */ #ifndef __UTIL_LOG_REDIS_H__ #define __UTIL_LOG_REDIS_H__ #ifdef HAVE_LIBHIREDIS #include <hiredis/hiredis.h> #ifdef HAVE_LIBEVENT #include <hiredis/async.h> #endif /* HAVE_LIBEVENT */ #include "conf.h" /* ConfNode */ enum RedisMode { REDIS_LIST, REDIS_CHANNEL }; typedef struct RedisSetup_ { enum RedisMode mode; const char *command; const char *key; const char *server; uint16_t port; int is_async; int batch_size; } RedisSetup; typedef struct SCLogRedisContext_ { redisContext *sync; #if HAVE_LIBEVENT redisAsyncContext *async; struct event_base *ev_base; int connected; #endif /* HAVE_LIBEVENT */ time_t tried; int batch_count; time_t last_push; } SCLogRedisContext; void SCLogRedisInit(void); int SCConfLogOpenRedis(ConfNode *, void *); int LogFileWriteRedis(void *, const char *, size_t); #endif /* HAVE_LIBHIREDIS */ #endif /* __UTIL_LOG_REDIS_H__ */
那么,只需要仿照这个实现 Kafka 即可,与 Redis 需要依赖 libhiridis
类似,Kafka 需要 librdkafka
依赖,其余完全可以仿照 kafka 的生产者 demo 实现
/** * \file * * \author OhYee <oyohyee@oyohyee.com> */ #ifndef __UTIL_LOG_KAFKA_H__ #define __UTIL_LOG_KAFKA_H__ #define HAVE_LIBRDKAFKA #ifdef HAVE_LIBRDKAFKA #include <librdkafka/rdkafka.h> #include "conf.h" /* ConfNode */ // Kafka 存储结构 typedef struct { const char* brokers; // 地址 const char* topic_name; // 主题名 int partitions; // 分区 ID }KafkaSetup; // Kafka 上下文 typedef struct { rd_kafka_t* rk; rd_kafka_topic_t* rkt; long partition; // 分区 }SCLogKafkaContext; // 初始化 Kafka void SCLogKafkaInit(void); // 根据配置文件连接 Kafka int SCConfLogOpenKafka(ConfNode*, void*); // 将内容写出到 Kafka int LogFileWriteKafka(void*, const char*, size_t); #endif #endif
util-log-kafka.c
首先需要参考 kafka 的 demo,看一下生产者要生产一条消息都需要做什么:
- 使用
rd_kafka_conf_new()
创建 kafka 客户端 - 使用
rd_kafka_conf_set()
配置客户端信息(如服务端地址) - 使用
rd_kafka_conf_set_dr_msg_cb()
配置发送回调 - 使用
rd_kafka_new()
创建生产者实例 - 使用
rd_kafka_producev()
发送消息 - 在主循环使用
rd_kafka_poll()
,执行后台操作 - 使用
rd_kafka_destroy()
销毁实例
那么,我们的文件中,只需要在 Suricata 启动时创建 Kafka 生产者实例,并且在每次有数据写入时,向 Kafka 写入消息;当 Suricata 关闭时,销毁对应的实例。
也即,至少需要三个函数来完成相应的工作(启动、发送、销毁)。从上面的步骤中,可以看到 Kafka 存在一个回调函数,因此我们还需要实现一个递交结果的处理回调函数。
下面分别针对这些函数进行讨论
SCConfLogOpenKafka(void)
这里将根据相关配置文件,初始化 Kafka 客户端
#include "signal.h" int SCConfLogOpenKafka(ConfNode* kafka_node, void* lf_ctx) { LogFileCtx* log_ctx = lf_ctx; const char* partitions = NULL; SCLogKafkaContext* kafka_ctx = NULL; if (NULL == kafka_node) { return -1; } // 读入配置项 log_ctx->kafka_setup.brokers = ConfNodeLookupChildValue(kafka_node, "brokers"); log_ctx->kafka_setup.topic_name = ConfNodeLookupChildValue(kafka_node, "topic"); partitions = ConfNodeLookupChildValue(kafka_node, "partitions"); log_ctx->kafka_setup.partitions = atoi(partitions); /* 创建 kafka 客户端 */ rd_kafka_conf_t* conf; rd_kafka_topic_conf_t* topic_conf; char tmp[16]; char errstr[512]; kafka_ctx = (SCLogKafkaContext*)SCCalloc(1, sizeof(SCLogKafkaContext)); if (kafka_ctx == NULL) { SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate kafka context"); exit(EXIT_FAILURE); } conf = rd_kafka_conf_new(); /* 配置 Kafka */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf, "internal.termination.signal", tmp, errstr, sizeof(errstr))) { SCLogError(SC_ERR_KAFKA, "Unable to allocate kafka context"); } if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf, "broker.version.fallback", "0.8.2", errstr, sizeof(errstr))) { SCLogError(SC_ERR_KAFKA, "%s", errstr); } if (RD_KAFKA_CONF_OK != rd_kafka_conf_set(conf, "queue.buffering.max.messages", "500000", errstr, sizeof(errstr))) { SCLogError(SC_ERR_KAFKA, "%s", errstr); } // 配置回调 rd_kafka_conf_set_dr_cb(conf, msg_delivered); /* 创建生产者实例 */ if (!(kafka_ctx->rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)))) { SCLogError(SC_ERR_KAFKA, "%% Failed to create new producer: %s", errstr); exit(EXIT_FAILURE); } if (0 == rd_kafka_brokers_add(kafka_ctx->rk, log_ctx->kafka_setup.brokers)) { SCLogError(SC_ERR_KAFKA, "%% No valid brokers specified"); exit(EXIT_FAILURE); } /* 创建话题实例 */ topic_conf = rd_kafka_topic_conf_new(); kafka_ctx->rkt = rd_kafka_topic_new(kafka_ctx->rk, log_ctx->kafka_setup.topic_name, topic_conf); if (NULL == kafka_ctx->rkt) { SCLogError(SC_ERR_KAFKA, "%% Failed to create kafka topic %s", log_ctx->kafka_setup.topic_name); exit(EXIT_FAILURE); } // 补全结构体的数据 kafka_ctx->partition = 0; log_ctx->kafka = kafka_ctx; log_ctx->Close = SCLogFileCloseKafka; return 0; }
LogFileWriteKafka(void *, const)
这部分需要将传来的数据,发送至 Kafka 中。从逻辑上而言,我们只需要确定要发往哪一个主题(topic)即可,但是为了尽可能提升性能,还应该确定具体发至哪一个分区(partition)。在参考的代码中,额外维护了一个分区状态,每次将其自加 1 确保消息被平均分配至各个分区中。尽管分区使用 int
存储,但是考虑到 Suricata 本身作为一个流量检测工具,长时间启用还是存在溢出的风险,个人认为应该每次自加后都应该对其进行一次取模操作。
当消息发送失败时,需要进行适当的报错,因此需要注册相应的错误码,错误码的注册位于 src/util-error.h
和 src/util-error.c
。仿照 Redis 添加 Kafka 对应的错误码即可
// util-error.h SC_ERR_KAFKA, // util-error.c CASE_CODE (SC_ERR_KAFKA);
生产者对应的代码如下
#include "errno.h" int LogFileWriteKafka(void* lf_ctx, const char* string, size_t string_len) { LogFileCtx* log_ctx = lf_ctx; SCLogKafkaContext* kafka_ctx = log_ctx->kafka; int partition = kafka_ctx->partition % log_ctx->kafka_setup.partitions; kafka_ctx->partition = (partition + 1) % log_ctx->kafka_setup.partitions; if (rd_kafka_produce( /* 话题 */ kafka_ctx->rkt, /* 分区 */ partition, /* 字符串维护方式 */ RD_KAFKA_MSG_F_COPY, /* 字符串和字符串长度 */ (void*)string, string_len, /* key 和 key 长度 */ NULL, 0, /* 消息透明配置 */ NULL ) == -1) { SCLogError( SC_ERR_KAFKA, "%% Failed to produce to topic %s " "partition %i: %s\n", log_ctx->kafka_setup.topic_name, partition, rd_kafka_err2str(rd_kafka_errno2err(errno)) ); /* Poll to handle delivery reports */ rd_kafka_poll(kafka_ctx->rk, 0); } return -1; }
SCConfLogCloseKafka()
Kafka 实例存储在 log_ctx->kafka
中,使用 rd_kafka_destroy(log_ctx->kafka->rk)
即可销毁对应的实例。
通常,需要确保所有信息已经发送完成,使用 rd_kafka_outq_len(log_ctx->kafka->rk) > 0
判断是否所有消息已递交,并且使用 rd_kafka_pool(log_ctx->kafka->rk, 100)
执行 100ms 的等待。
在参考的代码中,同时销毁掉了话题,考虑到实际需要,这里没有销毁对应的话题,这样即使 Suricata 离线后,其他程序仍然可以继续处理数据
static void SCLogFileCloseKafka(LogFileCtx* log_ctx) { SCLogKafkaContext* kafka_ctx = log_ctx->kafka; if (NULL == kafka_ctx) return; if (kafka_ctx->rk) { rd_kafka_poll(kafka_ctx->rk, 0); while (rd_kafka_outq_len(kafka_ctx->rk) > 0) rd_kafka_poll(kafka_ctx->rk, 100); } if (kafka_ctx->rk) rd_kafka_destroy(kafka_ctx->rk); return; }
static void msg_delivered(rd_kafka_t*, void*, size_t, int, void*, void*)
这个函数在 Kafka 生产者递交后回调运行,主要用以判断是否递交成功
报错时直接输出即可,没有理解为什么参考的代码对数据进行了一次重新赋值,故省略掉相关代码
static void msg_delivered(rd_kafka_t* rk, void* payload, size_t len, int error_code, void* opaque, void* msg_opaque) { if (error_code) SCLogError(SC_ERR_KAFKA, "%% Message delivery failed: %s\n", rd_kafka_err2str(error_code)); }
Suricata 启用 Kafka 模块
上面的代码只是按照 Suricata 的要求实现了一个 Kafka 模块,还需要将其添加到 Suricata 的日志输出中,才能在输出数据时被正确调用
在 src/output-json.c
搜索 Redis,可以发现需要在 OutputInitResult OutputJsonInitCtx(ConfNode *)
进行类型的判断,共有两处需要添加
#ifdef HAVE_LIBHIREDIS SCLogRedisInit(); json_ctx->json_out = LOGFILE_TYPE_REDIS; #else FatalError(SC_ERR_FATAL, "redis JSON output option is not compiled"); #endif } else if (strcmp(output_s, "kafka") == 0) { #ifdef HAVE_LIBRDKAFKA json_ctx->json_out = LOGFILE_TYPE_KAFKA; #else FatalError(SC_ERR_FATAL, "kafka JSON output option is not compiled"); #endif // ... #ifdef HAVE_LIBHIREDIS else if (json_ctx->json_out == LOGFILE_TYPE_REDIS) { ConfNode *redis_node = ConfNodeLookupChild(conf, "redis"); if (!json_ctx->file_ctx->sensor_name) { char hostname[1024]; gethostname(hostname, 1023); json_ctx->file_ctx->sensor_name = SCStrdup(hostname); } if (json_ctx->file_ctx->sensor_name == NULL) { LogFileFreeCtx(json_ctx->file_ctx); SCFree(json_ctx); SCFree(output_ctx); return result; } if (SCConfLogOpenRedis(redis_node, json_ctx->file_ctx) < 0) { LogFileFreeCtx(json_ctx->file_ctx); SCFree(json_ctx); SCFree(output_ctx); return result; } } #endif #ifdef HAVE_LIBRDKAFKA else if (json_ctx->json_out == LOGFILE_TYPE_KAFKA) { ConfNode* kafka_node = ConfNodeLookupChild(conf, "kafka"); if (!json_ctx->file_ctx->sensor_name) { char hostname[1024]; gethostname(hostname, 1023); json_ctx->file_ctx->sensor_name = SCStrdup(hostname); } if (json_ctx->file_ctx->sensor_name == NULL) { LogFileFreeCtx(json_ctx->file_ctx); SCFree(json_ctx); SCFree(output_ctx); return result; } if (SCConfLogOpenKafka(kafka_node, json_ctx->file_ctx) < 0) { LogFileFreeCtx(json_ctx->file_ctx); SCFree(json_ctx); SCFree(output_ctx); return result; } } #endif
编译选项
代码写好了,还需要让 Suricata 知道如何编译。在 src/Makefile.am
中,添加我们新加入的文件
util-log-kafka.h util-log-kafka.c \
在 configure.ac
中添加需要检查的第三方依赖 librdkafka
# librdkafka AC_ARG_ENABLE(rdkafka, AS_HELP_STRING([--enable-rdkafka],[Enable Kafka support]), [ enable_rdkafka="$enableval"], [ enable_rdkafka="no"]) if test "$enable_rdkafka" = "yes"; then AC_CHECK_HEADER("librdkafka/rdkafka.h",RDKAFKA="yes",RDKAFKA="no") if test "$RDKAFKA" = "yes"; then AC_CHECK_LIB(rdkafka, rd_kafka_new,, RDKAFKA="no") fi if test "$RDKAFKA" = "no"; then echo echo " ERROR! librdkafka library not found, go get it" echo " from https://github.com/edenhill/librdkafka or your distribution:" echo echo " Ubuntu: apt-get install librdkafka-dev" echo " Fedora: dnf install rdkafka-devel" echo " CentOS/RHEL: yum install librdkafka-devel" echo exit 1 fi if test "$RDKAFKA" = "yes"; then AC_DEFINE([HAVE_LIBRDKAFKA],[1],[librdkafka available]) enable_rdkafka="yes" fi fi
然后,则是修改相应的 Suricata 文档,让其他用户知晓如何配置 Kafka,具体在 doc/userguide/output/eve/eve-json-output.rst
中,照着 Redis 复制即可
最后,由于这里添加了新的可用配置,因此需要在 src/util-plugin.c
中的 bool SCPluginRegisterFileType(SCPluginFileType *)
中修改对应的代码,以避免检查失败
重新编译
要添加 Kafka 相关功能,需要添加 --enable-rdkafka
参数(详情见 configure.ac
的修改)
./autogen.sh
./configure --prefix=/opt/suricata --enable-luajit --enable-rdkafka
sudo make install-full
只需要修改相应的配置项如下即可
outputs: - eve-log: enabled: yes filetype: kafka kafka: brokers: localhost:9092 partitions: 1 topic: test