Kafka 部署和基础使用
基本术语
-
Topic: 每一个消息类别被称为一个话题(topic)
消息可以用于区分不同的业务系统,每个话题都是多订阅者模式,一个话题可以包含一个或多个消费者
-
Partition: 每一个话题内,包含多个分区,每个分区可能包含多个副本,其中有一个副本作为 leader,用于对外提供服务
分区是物理层面上存储的形式,每个分区都会被存储在一个 Broker 中,只有分区内可以确保绝对的顺序
写入到一个话题的消息,会按照下面的方式进行分配
- 轮询策略: 按顺序存储到分区中
- 随机策略: 随机存储到一个分区中
- 键哈希策略: 根据 key 求哈希值,存储到哈希对应的分区中
-
Producer: 发布消息的对象称为生产者(Kafka topic producer)
-
Consumer: 订阅消息并处理消息的对象称为消费者
-
Broker: 已发布的消息需要保存在服务器中,称为 Kafka 集群,集群中的每一个服务器都是一个代理(Broker)
-
系统模式
- 队列模式: 以队列形式管理消息,队列允许消息有多个消费者,并且消息在消费后将从队列移除(每个消息只会被消费一次)
- 发布订阅模式: 当有新消息被添加后,将会通知所有订阅的消费者,所有的消费者维护自己的队列
Docker 镜像
优先在 Docker 中测试,鉴于镜像较大,可以使用国内镜像拉取(http://hub-mirror.c.163.com/
,https://registry.cn-hangzhou.aliyuncs.com
)
docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka
可以选择使用 docker-compose 启动 kafka
version: '2' services: zoo: image: wurstmeister/zookeeper:latest restart: unless-stopped hostname: zoo ports: - "2181:2181" container_name: zookeeper kafka: image: wurstmeister/kafka:latest ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_ZOOKEEPER_CONNECT: "zoo:2181" KAFKA_BROKER_ID: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CREATE_TOPICS: "stream-in:1:1,stream-out:1:1" depends_on: - zoo container_name: kafka
镜像启动后,会挂载一个 volume 存储 kafka 所需要的文件,也即简单地关闭容器后,数据会被保存在 volume 中,重启可以恢复
这里,KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
需要配置为服务器的地址(如 host.docker.internal:9092
)。当客户端和 Kafka 运行在不同的网络中,这个参数将会默认为 localhost
,导致客户端连接失败
C 语言 DEMO
Kafka 本身的通过一个基于 TCP 的私有协议通信,因此要快速地在代码里操作 Kafka 需要使用第三方模块。
C/C++ 下,有 librdkafka 可以用于操作 kafka
- 在 Ubuntu 下使用
apt install -y librdkafka-dev
安装 - 在 Arch Linux 下,使用
pacman -Sy librdkafka --noconfirm
安装
安装完成后,即可使用官方样例的代码进行测试
生产者
#include <stdio.h> #include <signal.h> #include <string.h> #include <librdkafka/rdkafka.h> static volatile sig_atomic_t run = 1; /** * @brief 程序终止的信号 */ static void stop(int sig) { run = 0; fclose(stdin); /* abort fgets() */ } /** * @brief 消息传递报告回调。 * * 这个回调在每条信息中被精确地调用一次 * * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) 表示传递成功 * (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) 表示递交失败 * */ static void dr_msg_cb( rd_kafka_t* rk, const rd_kafka_message_t* rkmessage, void* opaque ) { if (rkmessage->err) fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err)); else fprintf(stderr, "%% Message delivered (%zd bytes, " "partition %" PRId32 ")\n", rkmessage->len, rkmessage->partition); /* rkmessage 的销毁由 librdkafka 维护 */ } int main(int argc, char** argv) { rd_kafka_t* rk; /* 生产者实例 */ rd_kafka_conf_t* conf; /* 临时配置对象 */ char errstr[512]; /* librdkafka 接口错误缓冲区 */ char buf[512]; /* 临时消息缓冲区 */ const char* brokers; /* 参数: broker 列表 */ const char* topic; /* 参数: 使用的话题 */ /* * 参数检查 * 使用 程序名 <broker> <topic> */ if (argc != 3) { fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]); return 1; } brokers = argv[1]; topic = argv[2]; printf("%s\n%s\n", brokers, topic); /* * 创建 Kafka 客户端 */ conf = rd_kafka_conf_new(); /* * 配置 Kafka 客户端 * 使用逗号分割的 broker 列表 */ if (rd_kafka_conf_set( conf, "bootstrap.servers", brokers, errstr, sizeof(errstr) ) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); return 1; } /* * 配置交付回调 */ rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); /* * 创建生产者示例 * * rd_kafka_new() 取得 conf 对象所有权,后续程序不能再次引用 conf * */ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "%% Failed to create new producer: %s\n", errstr); return 1; } /* 监听 CTRL + C */ signal(SIGINT, stop); fprintf(stderr, "%% 键入文本,并使用回车递交消息\n" "%% 直接回车,只展示递交报告\n" "%% 使用 Ctrl + C 或 Ctrl + D 结束\n"); // 只要程序未接收到关闭信号,且未 EOF,则无限循环 while (run && fgets(buf, sizeof(buf), stdin)) { size_t len = strlen(buf); rd_kafka_resp_err_t err; /* 清除换行符 */ if (buf[len - 1] == '\n') buf[--len] = '\0'; /* 空行,仅用于交付报告 */ if (len == 0) { rd_kafka_poll(rk, 0/*non-blocking */); continue; } /* * 发送(生产)消息 * * 异步调用,通过回调函数接收结果 * */ retry: err = rd_kafka_producev( /* 生产者处理 */ rk, /* 话题名称 */ RD_KAFKA_V_TOPIC(topic), /* 消息拷贝 */ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), /* 消息长度 */ RD_KAFKA_V_VALUE(buf, len), /* 消息非透明 */ RD_KAFKA_V_OPAQUE(NULL), /* 结束哨兵 */ RD_KAFKA_V_END ); if (err) { /* * 生产失败 */ fprintf(stderr, "%% Failed to produce to topic %s: %s\n", topic, rd_kafka_err2str(err)); if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) { /* * 如果失败的原因是因为队列已满,则等待重试 */ rd_kafka_poll(rk, 1000 /* 等待 1000ms */); goto retry; } } else { fprintf(stderr, "%% Enqueued message (%zd bytes) " "for topic %s\n", len, topic); } /* * 生产者需要不断使用 rd_kafka_poll 生产消息(未生产消息也需要调用) * 1. 在主循环中不断调用 * 2. 或者在专用线程中 */ rd_kafka_poll(rk, 0/*non-blocking*/); } /* * 等待最后的消息递交或失败 * rd_kafka_flush 会等待所有消息被送达 */ fprintf(stderr, "%% Flushing final messages..\n"); rd_kafka_flush(rk, 10 * 1000 /* wait for max 10 seconds */); /* 如果输出队列非空,则存在问题 */ if (rd_kafka_outq_len(rk) > 0) fprintf(stderr, "%% %d message(s) were not delivered\n", rd_kafka_outq_len(rk)); /* 销毁生产者实例 */ rd_kafka_destroy(rk); return 0; }
消费者
#include <stdio.h> #include <signal.h> #include <string.h> #include <ctype.h> #include <librdkafka/rdkafka.h> static volatile sig_atomic_t run = 1; /** * @brief 程序终止的信号 */ static void stop(int sig) { run = 0; } /** * 判断是否所有内容都是可打印的 * @returns 返回 1 表示都可打印 */ static int is_printable(const char* buf, size_t size) { size_t i; for (i = 0; i < size; i++) if (!isprint((int)buf[i])) return 0; return 1; } int main(int argc, char** argv) { rd_kafka_t* rk; /* 消费者 */ rd_kafka_conf_t* conf; /* 临时配置对象 */ rd_kafka_resp_err_t err; /* Kafka 接口错误码 */ char errstr[512]; /* librdkafka 接口错误缓冲区 */ const char* brokers; /* 参数: broker 列表 */ const char* groupid; /* 参数:消费者组 ID */ char** topics; /* 参数:订阅的话题 */ int topic_cnt; /* 订阅的话题数 */ rd_kafka_topic_partition_list_t* subscription; /* 订阅的话题列表 */ int i; /* * 参数检查 */ if (argc < 4) { fprintf(stderr, "%% Usage: " "%s <broker> <group.id> <topic1> <topic2>..\n", argv[0]); return 1; } brokers = argv[1]; groupid = argv[2]; topics = &argv[3]; topic_cnt = argc - 3; /* * Create Kafka client configuration place-holder */ conf = rd_kafka_conf_new(); /* Set bootstrap broker(s) as a comma-separated list of * host or host:port (default port 9092). * librdkafka will use the bootstrap brokers to acquire the full * set of brokers from the cluster. */ if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); rd_kafka_conf_destroy(conf); return 1; } /* Set the consumer group id. * All consumers sharing the same group id will join the same * group, and the subscribed topic' partitions will be assigned * according to the partition.assignment.strategy * (consumer config property) to the consumers in the group. */ if (rd_kafka_conf_set(conf, "group.id", groupid, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); rd_kafka_conf_destroy(conf); return 1; } /* If there is no previously committed offset for a partition * the auto.offset.reset strategy will be used to decide where * in the partition to start fetching messages. * By setting this to earliest the consumer will read all messages * in the partition if there was no previously committed offset. */ if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { fprintf(stderr, "%s\n", errstr); rd_kafka_conf_destroy(conf); return 1; } /* * Create consumer instance. * * NOTE: rd_kafka_new() takes ownership of the conf object * and the application must not reference it again after * this call. */ rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); if (!rk) { fprintf(stderr, "%% Failed to create new consumer: %s\n", errstr); return 1; } conf = NULL; /* Configuration object is now owned, and freed, * by the rd_kafka_t instance. */ /* Redirect all messages from per-partition queues to * the main queue so that messages can be consumed with one * call from all assigned partitions. * * The alternative is to poll the main queue (for events) * and each partition queue separately, which requires setting * up a rebalance callback and keeping track of the assignment: * but that is more complex and typically not recommended. */ rd_kafka_poll_set_consumer(rk); /* Convert the list of topics to a format suitable for librdkafka */ subscription = rd_kafka_topic_partition_list_new(topic_cnt); for (i = 0; i < topic_cnt; i++) rd_kafka_topic_partition_list_add(subscription, topics[i], /* the partition is ignored * by subscribe() */ RD_KAFKA_PARTITION_UA); /* Subscribe to the list of topics */ err = rd_kafka_subscribe(rk, subscription); if (err) { fprintf(stderr, "%% Failed to subscribe to %d topics: %s\n", subscription->cnt, rd_kafka_err2str(err)); rd_kafka_topic_partition_list_destroy(subscription); rd_kafka_destroy(rk); return 1; } fprintf(stderr, "%% Subscribed to %d topic(s), " "waiting for rebalance and messages...\n", subscription->cnt); rd_kafka_topic_partition_list_destroy(subscription); /* Signal handler for clean shutdown */ signal(SIGINT, stop); /* Subscribing to topics will trigger a group rebalance * which may take some time to finish, but there is no need * for the application to handle this idle period in a special way * since a rebalance may happen at any time. * Start polling for messages. */ while (run) { rd_kafka_message_t* rkm; rkm = rd_kafka_consumer_poll(rk, 100); if (!rkm) continue; /* Timeout: no message within 100ms, * try again. This short timeout allows * checking for `run` at frequent intervals. */ /* consumer_poll() will return either a proper message * or a consumer error (rkm->err is set). */ if (rkm->err) { /* Consumer errors are generally to be considered * informational as the consumer will automatically * try to recover from all types of errors. */ fprintf(stderr, "%% Consumer error: %s\n", rd_kafka_message_errstr(rkm)); rd_kafka_message_destroy(rkm); continue; } /* Proper message. */ printf("Message on %s [%"PRId32"] at offset %"PRId64":\n", rd_kafka_topic_name(rkm->rkt), rkm->partition, rkm->offset); /* Print the message key. */ if (rkm->key && is_printable(rkm->key, rkm->key_len)) printf(" Key: %.*s\n", (int)rkm->key_len, (const char*)rkm->key); else if (rkm->key) printf(" Key: (%d bytes)\n", (int)rkm->key_len); /* Print the message value/payload. */ if (rkm->payload && is_printable(rkm->payload, rkm->len)) printf(" Value: %.*s\n", (int)rkm->len, (const char*)rkm->payload); else if (rkm->payload) printf(" Value: (%d bytes)\n", (int)rkm->len); rd_kafka_message_destroy(rkm); } /* Close the consumer: commit final offsets and leave the group. */ fprintf(stderr, "%% Closing consumer\n"); rd_kafka_consumer_close(rk); /* Destroy the consumer */ rd_kafka_destroy(rk); return 0; }
测试
由于需要链接 librdkafka 动态链接库,因此编译时,需要添加编译参数 -lrdkafka
(C 语言)或-lrdkafka++
(C++)
也即,完整的编译指令为
gcc src/producer.c -o producer -lrdkafka gcc src/consumer.c -o consumer -lrdkafka
编译后,即可启动两个程序,进行测试
./producer 127.0.0.1:9092 test
./consumer 127.0.0.1:9092 0 test
两个程序的话题(topic)需要一致,消费者需要额外传入一个 group ID,用于区分成组的消费者(每个消息对于一个组内只会被处理一次)
可以先启动生产者,并预发送几条消息,而后启动消费者,查看是否能接收到该消息。接下来则是在两者都启动的情况下,传输消息