From 5d17bb8bb079576a6f601b82d0ff4d5ec82b9318 Mon Sep 17 00:00:00 2001 From: winlin Date: Thu, 3 Oct 2019 12:45:38 +0800 Subject: [PATCH] Remove KAFKA. 3.0.53 --- README.md | 2 +- trunk/auto/options.sh | 21 - trunk/configure | 9 +- trunk/src/app/srs_app_config.cpp | 74 +- trunk/src/app/srs_app_config.hpp | 8 - trunk/src/app/srs_app_kafka.cpp | 659 ----------- trunk/src/app/srs_app_kafka.hpp | 190 --- trunk/src/app/srs_app_rtmp_conn.cpp | 14 - trunk/src/app/srs_app_rtmp_conn.hpp | 3 - trunk/src/app/srs_app_server.cpp | 12 - trunk/src/app/srs_app_server.hpp | 3 - trunk/src/core/srs_core.hpp | 2 +- trunk/src/kernel/srs_kernel_balance.hpp | 2 +- trunk/src/kernel/srs_kernel_consts.hpp | 8 - trunk/src/kernel/srs_kernel_error.hpp | 10 +- trunk/src/main/srs_main_server.cpp | 1 - trunk/src/protocol/srs_kafka_stack.cpp | 1415 ----------------------- trunk/src/protocol/srs_kafka_stack.hpp | 932 --------------- 18 files changed, 7 insertions(+), 3358 deletions(-) delete mode 100644 trunk/src/app/srs_app_kafka.cpp delete mode 100644 trunk/src/app/srs_app_kafka.hpp delete mode 100644 trunk/src/protocol/srs_kafka_stack.cpp delete mode 100644 trunk/src/protocol/srs_kafka_stack.hpp diff --git a/README.md b/README.md index 5113eb005c..af053f02b7 100755 --- a/README.md +++ b/README.md @@ -164,6 +164,7 @@ Please select according to languages: ### V3 changes +* v3.0, 2019-10-03, Remove KAFKA. 3.0.53 * v3.0, 2019-05-14, Covert Kernel File reader/writer. 3.0.52 * v3.0, 2019-04-30, Refine typo in files. 3.0.51 * v3.0, 2019-04-25, Upgrade http-parser from 2.1 to 2.9.2 and cover it. 3.0.50 @@ -710,7 +711,6 @@ Comparing with other media servers, SRS is much better and stronger, for details | Reload | Stable | X | X | X | X | | Forward | Stable | X | X | X | X | | ATC | Stable | X | X | X | X | -| KAFKA | Experiment| X | X | X | X | #### Stream Service diff --git a/trunk/auto/options.sh b/trunk/auto/options.sh index 6787d89cf2..d938fb52ce 100755 --- a/trunk/auto/options.sh +++ b/trunk/auto/options.sh @@ -18,7 +18,6 @@ help=no SRS_HDS=NO SRS_NGINX=NO SRS_FFMPEG_TOOL=NO -SRS_KAFKA=NO SRS_LIBRTMP=NO SRS_RESEARCH=YES SRS_UTEST=YES @@ -117,7 +116,6 @@ Options: --with-hds enable hds streaming, mux RTMP to F4M/F4V files. --with-nginx enable delivery HTTP stream with nginx. --with-stream-caster enable stream caster to serve other stream over other protocol. - --with-kafka enable srs kafka producer to report to kafka. --with-ffmpeg enable transcoding tool ffmpeg. --with-transcode enable transcoding features. --with-ingest enable ingest features. @@ -138,7 +136,6 @@ Options: --without-hds disable hds, the adobe http dynamic streaming. --without-nginx disable delivery HTTP stream with nginx. --without-stream-caster disable stream caster, only listen and serve RTMP/HTTP. - --without-kafka disable the srs kafka producer. --without-ffmpeg disable the ffmpeg transcode tool feature. --without-transcode disable the transcoding feature. --without-ingest disable the ingest feature. @@ -230,7 +227,6 @@ function parse_user_option() { --with-ingest) SRS_INGEST=YES ;; --with-stat) SRS_STAT=YES ;; --with-stream-caster) SRS_STREAM_CASTER=YES ;; - --with-kafka) SRS_KAFKA=YES ;; --with-librtmp) SRS_LIBRTMP=YES ;; --with-research) SRS_RESEARCH=YES ;; --with-utest) SRS_UTEST=YES ;; @@ -251,7 +247,6 @@ function parse_user_option() { --without-ingest) SRS_INGEST=NO ;; --without-stat) SRS_STAT=NO ;; --without-stream-caster) SRS_STREAM_CASTER=NO ;; - --without-kafka) SRS_KAFKA=NO ;; --without-librtmp) SRS_LIBRTMP=NO ;; --without-research) SRS_RESEARCH=NO ;; --without-utest) SRS_UTEST=NO ;; @@ -387,7 +382,6 @@ function apply_user_presets() { SRS_HDS=NO SRS_NGINX=NO SRS_FFMPEG_TOOL=NO - SRS_KAFKA=NO SRS_LIBRTMP=NO SRS_RESEARCH=NO SRS_UTEST=NO @@ -399,7 +393,6 @@ function apply_user_presets() { SRS_HDS=YES SRS_NGINX=YES SRS_FFMPEG_TOOL=YES - SRS_KAFKA=YES SRS_LIBRTMP=YES SRS_RESEARCH=YES SRS_UTEST=YES @@ -411,7 +404,6 @@ function apply_user_presets() { SRS_HDS=NO SRS_NGINX=NO SRS_FFMPEG_TOOL=NO - SRS_KAFKA=NO SRS_LIBRTMP=NO SRS_RESEARCH=NO SRS_UTEST=NO @@ -423,7 +415,6 @@ function apply_user_presets() { SRS_HDS=NO SRS_NGINX=NO SRS_FFMPEG_TOOL=NO - SRS_KAFKA=NO SRS_LIBRTMP=NO SRS_RESEARCH=NO SRS_UTEST=NO @@ -435,7 +426,6 @@ function apply_user_presets() { SRS_HDS=YES SRS_NGINX=NO SRS_FFMPEG_TOOL=NO - SRS_KAFKA=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO SRS_UTEST=NO @@ -448,7 +438,6 @@ function apply_user_presets() { SRS_HDS=YES SRS_NGINX=NO SRS_FFMPEG_TOOL=NO - SRS_KAFKA=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO SRS_UTEST=NO @@ -460,7 +449,6 @@ function apply_user_presets() { SRS_HDS=YES SRS_NGINX=NO SRS_FFMPEG_TOOL=NO - SRS_KAFKA=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO SRS_UTEST=YES @@ -472,7 +460,6 @@ function apply_user_presets() { SRS_HDS=YES SRS_NGINX=NO SRS_FFMPEG_TOOL=NO - SRS_KAFKA=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO SRS_UTEST=YES @@ -487,7 +474,6 @@ function apply_user_presets() { SRS_HDS=YES SRS_NGINX=NO SRS_FFMPEG_TOOL=YES - SRS_KAFKA=YES SRS_LIBRTMP=YES SRS_RESEARCH=YES SRS_UTEST=YES @@ -499,7 +485,6 @@ function apply_user_presets() { SRS_HDS=YES SRS_NGINX=NO SRS_FFMPEG_TOOL=NO - SRS_KAFKA=YES SRS_LIBRTMP=NO SRS_RESEARCH=NO SRS_UTEST=NO @@ -511,7 +496,6 @@ function apply_user_presets() { SRS_HDS=YES SRS_NGINX=NO SRS_FFMPEG_TOOL=YES - SRS_KAFKA=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO SRS_UTEST=YES @@ -523,7 +507,6 @@ function apply_user_presets() { SRS_HDS=YES SRS_NGINX=NO SRS_FFMPEG_TOOL=NO - SRS_KAFKA=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO SRS_UTEST=NO @@ -535,7 +518,6 @@ function apply_user_presets() { SRS_HDS=YES SRS_NGINX=NO SRS_FFMPEG_TOOL=YES - SRS_KAFKA=YES SRS_LIBRTMP=YES SRS_RESEARCH=NO SRS_UTEST=NO @@ -595,7 +577,6 @@ function apply_user_detail_options() { SRS_INGEST=NO SRS_STAT=NO SRS_STREAM_CASTER=NO - SRS_KAFKA=NO SRS_LIBRTMP=YES SRS_RESEARCH=YES SRS_UTEST=NO @@ -627,7 +608,6 @@ SRS_AUTO_CONFIGURE="--prefix=${SRS_PREFIX}" if [ $SRS_HTTP_CALLBACK = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-callback"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-callback"; fi if [ $SRS_HTTP_SERVER = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-server"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-server"; fi if [ $SRS_STREAM_CASTER = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-stream-caster"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-stream-caster"; fi - if [ $SRS_KAFKA = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-kafka"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-kafka"; fi if [ $SRS_HTTP_API = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-http-api"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-http-api"; fi if [ $SRS_LIBRTMP = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-librtmp"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-librtmp"; fi if [ $SRS_RESEARCH = YES ]; then SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --with-research"; else SRS_AUTO_CONFIGURE="${SRS_AUTO_CONFIGURE} --without-research"; fi @@ -713,7 +693,6 @@ function check_option_conflicts() { if [ $SRS_SSL = RESERVED ]; then echo "you must specifies the ssl, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_FFMPEG_TOOL = RESERVED ]; then echo "you must specifies the ffmpeg, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_STREAM_CASTER = RESERVED ]; then echo "you must specifies the stream-caster, see: ./configure --help"; __check_ok=NO; fi - if [ $SRS_KAFKA = RESERVED ]; then echo "you must specifies the kafka, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_LIBRTMP = RESERVED ]; then echo "you must specifies the librtmp, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_RESEARCH = RESERVED ]; then echo "you must specifies the research, see: ./configure --help"; __check_ok=NO; fi if [ $SRS_UTEST = RESERVED ]; then echo "you must specifies the utest, see: ./configure --help"; __check_ok=NO; fi diff --git a/trunk/configure b/trunk/configure index a324d69059..301b997f54 100755 --- a/trunk/configure +++ b/trunk/configure @@ -195,7 +195,7 @@ ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSSLRoot}) MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_rtmp_stack" "srs_rtmp_handshake" "srs_protocol_utility" "srs_rtmp_msg_array" "srs_protocol_stream" "srs_raw_avc" "srs_rtsp_stack" "srs_http_stack" "srs_protocol_kbps" "srs_protocol_json" - "srs_kafka_stack" "srs_protocol_format") + "srs_protocol_format") PROTOCOL_INCS="src/protocol"; MODULE_DIR=${PROTOCOL_INCS} . auto/modules.sh PROTOCOL_OBJS="${MODULE_OBJS[@]}" # @@ -225,7 +225,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_heartbeat" "srs_app_empty" "srs_app_http_client" "srs_app_http_static" "srs_app_recv_thread" "srs_app_security" "srs_app_statistic" "srs_app_hds" "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call" - "srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec" "srs_app_kafka" + "srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec" "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr" "srs_app_coworkers") DEFINES="" @@ -577,11 +577,6 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then else echo -e "${GREEN}Note: StreamCaster is disabled.${BLACK}" fi - if [ $SRS_KAFKA = YES ]; then - echo -e "${GREEN}Kafka is enabled.${BLACK}" - else - echo -e "${YELLOW}Warning: Kafka is disabled.${BLACK}" - fi if [ $SRS_HDS = YES ]; then echo -e "${YELLOW}Experiment: HDS is enabled.${BLACK}" else diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 0210eed1d7..87f5a25de1 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -1555,7 +1555,6 @@ srs_error_t SrsConfig::reload_conf(SrsConfig* conf) } // TODO: FIXME: support reload stream_caster. - // TODO: FIXME: support reload kafka. // merge config: vhost if ((err = reload_vhost(old_root)) != srs_success) { @@ -2139,19 +2138,6 @@ srs_error_t SrsConfig::global_to_json(SrsJsonObject* obj) } } obj->set(dir->name, sobj); - } else if (dir->name == "kafka") { - SrsJsonObject* sobj = SrsJsonAny::object(); - for (int j = 0; j < (int)dir->directives.size(); j++) { - SrsConfDirective* sdir = dir->directives.at(j); - if (sdir->name == "enabled") { - sobj->set(sdir->name, sdir->dumps_arg0_to_boolean()); - } else if (sdir->name == "brokers") { - sobj->set(sdir->name, sdir->dumps_args()); - } else if (sdir->name == "topic") { - sobj->set(sdir->name, sdir->dumps_arg0_to_str()); - } - } - obj->set(dir->name, sobj); } else if (dir->name == "stream_caster") { SrsJsonObject* sobj = SrsJsonAny::object(); for (int j = 0; j < (int)dir->directives.size(); j++) { @@ -3511,7 +3497,7 @@ srs_error_t SrsConfig::check_normal_config() && n != "srs_log_tank" && n != "srs_log_level" && n != "srs_log_file" && n != "max_connections" && n != "daemon" && n != "heartbeat" && n != "http_api" && n != "stats" && n != "vhost" && n != "pithy_print_ms" - && n != "http_server" && n != "stream_caster" && n != "kafka" + && n != "http_server" && n != "stream_caster" && n != "utc_time" && n != "work_dir" && n != "asprocess" ) { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal directive %s", n.c_str()); @@ -3545,15 +3531,6 @@ srs_error_t SrsConfig::check_normal_config() } } } - if (true) { - SrsConfDirective* conf = root->get("kafka"); - for (int i = 0; conf && i < (int)conf->directives.size(); i++) { - string n = conf->at(i)->name; - if (n != "enabled" && n != "brokers" && n != "topic") { - return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal kafka.%s", n.c_str()); - } - } - } if (true) { SrsConfDirective* conf = get_heartbeart(); for (int i = 0; conf && i < (int)conf->directives.size(); i++) { @@ -4208,55 +4185,6 @@ int SrsConfig::get_stream_caster_rtp_port_max(SrsConfDirective* conf) return ::atoi(conf->arg0().c_str()); } -bool SrsConfig::get_kafka_enabled() -{ - static bool DEFAULT = false; - - SrsConfDirective* conf = root->get("kafka"); - if (!conf) { - return DEFAULT; - } - - conf = conf->get("enabled"); - if (!conf || conf->arg0().empty()) { - return DEFAULT; - } - - return SRS_CONF_PERFER_FALSE(conf->arg0()); -} - -SrsConfDirective* SrsConfig::get_kafka_brokers() -{ - SrsConfDirective* conf = root->get("kafka"); - if (!conf) { - return NULL; - } - - conf = conf->get("brokers"); - if (!conf || conf->args.empty()) { - return NULL; - } - - return conf; -} - -string SrsConfig::get_kafka_topic() -{ - static string DEFAULT = "srs"; - - SrsConfDirective* conf = root->get("kafka"); - if (!conf) { - return DEFAULT; - } - - conf = conf->get("topic"); - if (!conf || conf->arg0().empty()) { - return DEFAULT; - } - - return conf->arg0(); -} - SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost) { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 9d30cae869..6ff503bb96 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -455,14 +455,6 @@ class SrsConfig virtual int get_stream_caster_rtp_port_min(SrsConfDirective* conf); // Get the max udp port for rtp of stream caster rtsp. virtual int get_stream_caster_rtp_port_max(SrsConfDirective* conf); -// kafka section. -public: - // Whether the kafka enabled. - virtual bool get_kafka_enabled(); - // Get the broker list, each is format in . - virtual SrsConfDirective* get_kafka_brokers(); - // Get the kafka topic to use for srs. - virtual std::string get_kafka_topic(); // vhost specified section public: // Get the vhost directive by vhost name. diff --git a/trunk/src/app/srs_app_kafka.cpp b/trunk/src/app/srs_app_kafka.cpp deleted file mode 100644 index 6ce886dc98..0000000000 --- a/trunk/src/app/srs_app_kafka.cpp +++ /dev/null @@ -1,659 +0,0 @@ -/** - * The MIT License (MIT) - * - * Copyright (c) 2013-2019 Winlin - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#include - -#include -using namespace std; - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#ifdef SRS_AUTO_KAFKA - -#define SRS_KAFKA_PRODUCER_TIMEOUT (30 * SRS_UTIME_MILLISECONDS) -#define SRS_KAFKA_PRODUCER_AGGREGATE_SIZE 1 - -std::string srs_kafka_metadata_summary(SrsKafkaTopicMetadataResponse* metadata) -{ - vector bs; - for (int i = 0; i < metadata->brokers.size(); i++) { - SrsKafkaBroker* broker = metadata->brokers.at(i); - - string hostport = srs_int2str(broker->node_id) + "/" + broker->host.to_str(); - if (broker->port > 0) { - hostport += ":" + srs_int2str(broker->port); - } - - bs.push_back(hostport); - } - - vector ps; - for (int i = 0; i < metadata->metadatas.size(); i++) { - SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i); - - for (int j = 0; j < topic->metadatas.size(); j++) { - string desc = "topic=" + topic->name.to_str(); - - SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j); - - desc += "?partition=" + srs_int2str(partition->partition_id); - desc += "&leader=" + srs_int2str(partition->leader); - - vector replicas = srs_kafka_array2vector(&partition->replicas); - desc += "&replicas=" + srs_join_vector_string(replicas, ","); - - ps.push_back(desc); - } - } - - std::stringstream ss; - ss << "brokers=" << srs_join_vector_string(bs, ","); - ss << ", " << srs_join_vector_string(ps, ", "); - - return ss.str(); -} - -std::string srs_kafka_summary_partitions(const vector& partitions) -{ - vector ret; - - vector::const_iterator it; - for (it = partitions.begin(); it != partitions.end(); ++it) { - SrsKafkaPartition* partition = *it; - - string desc = "tcp://"; - desc += partition->host + ":" + srs_int2str(partition->port); - desc += "?broker=" + srs_int2str(partition->broker); - desc += "&partition=" + srs_int2str(partition->id); - ret.push_back(desc); - } - - return srs_join_vector_string(ret, ", "); -} - -void srs_kafka_metadata2connector(string topic_name, SrsKafkaTopicMetadataResponse* metadata, vector& partitions) -{ - for (int i = 0; i < metadata->metadatas.size(); i++) { - SrsKafkaTopicMetadata* topic = metadata->metadatas.at(i); - - for (int j = 0; j < topic->metadatas.size(); j++) { - SrsKafkaPartitionMetadata* partition = topic->metadatas.at(j); - - SrsKafkaPartition* p = new SrsKafkaPartition(); - - p->topic = topic_name; - p->id = partition->partition_id; - p->broker = partition->leader; - - for (int i = 0; i < metadata->brokers.size(); i++) { - SrsKafkaBroker* broker = metadata->brokers.at(i); - if (broker->node_id == p->broker) { - p->host = broker->host.to_str(); - p->port = broker->port; - break; - } - } - - partitions.push_back(p); - } - } -} - -SrsKafkaPartition::SrsKafkaPartition() -{ - id = broker = 0; - port = SRS_CONSTS_KAFKA_DEFAULT_PORT; - - transport = NULL; - kafka = NULL; -} - -SrsKafkaPartition::~SrsKafkaPartition() -{ - disconnect(); -} - -string SrsKafkaPartition::hostport() -{ - if (ep.empty()) { - ep = host + ":" + srs_int2str(port); - } - - return ep; -} - -srs_error_t SrsKafkaPartition::connect() -{ - srs_error_t err = srs_success; - - if (transport) { - return err; - } - transport = new SrsTcpClient(host, port, SRS_KAFKA_PRODUCER_TIMEOUT); - kafka = new SrsKafkaClient(transport); - - if ((err = transport->connect()) != srs_success) { - disconnect(); - return srs_error_wrap(err, "connect to %s partition=%d failed", hostport().c_str(), id); - } - - srs_trace("connect at %s, partition=%d, broker=%d", hostport().c_str(), id, broker); - - return err; -} - -srs_error_t SrsKafkaPartition::flush(SrsKafkaPartitionCache* pc) -{ - return kafka->write_messages(topic, id, *pc); -} - -void SrsKafkaPartition::disconnect() -{ - srs_freep(kafka); - srs_freep(transport); -} - -SrsKafkaMessage::SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j) -{ - producer = p; - key = k; - obj = j; -} - -SrsKafkaMessage::~SrsKafkaMessage() -{ - srs_freep(obj); -} - -srs_error_t SrsKafkaMessage::call() -{ - srs_error_t err = producer->send(key, obj); - - // the obj is manged by producer now. - obj = NULL; - - return srs_error_wrap(err, "kafka send"); -} - -string SrsKafkaMessage::to_string() -{ - return "kafka"; -} - -SrsKafkaCache::SrsKafkaCache() -{ - count = 0; - nb_partitions = 0; -} - -SrsKafkaCache::~SrsKafkaCache() -{ - map::iterator it; - for (it = cache.begin(); it != cache.end(); ++it) { - SrsKafkaPartitionCache* pc = it->second; - - for (vector::iterator it2 = pc->begin(); it2 != pc->end(); ++it2) { - SrsJsonObject* obj = *it2; - srs_freep(obj); - } - pc->clear(); - - srs_freep(pc); - } - cache.clear(); -} - -void SrsKafkaCache::append(int key, SrsJsonObject* obj) -{ - count++; - - int partition = 0; - if (nb_partitions > 0) { - partition = key % nb_partitions; - } - - SrsKafkaPartitionCache* pc = NULL; - map::iterator it = cache.find(partition); - if (it == cache.end()) { - pc = new SrsKafkaPartitionCache(); - cache[partition] = pc; - } else { - pc = it->second; - } - - pc->push_back(obj); -} - -int SrsKafkaCache::size() -{ - return count; -} - -bool SrsKafkaCache::fetch(int* pkey, SrsKafkaPartitionCache** ppc) -{ - map::iterator it; - for (it = cache.begin(); it != cache.end(); ++it) { - int32_t key = it->first; - SrsKafkaPartitionCache* pc = it->second; - - if (!pc->empty()) { - *pkey = (int)key; - *ppc = pc; - return true; - } - } - - return false; -} - -srs_error_t SrsKafkaCache::flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc) -{ - srs_error_t err = srs_success; - - // ensure the key exists. - srs_assert (cache.find(key) != cache.end()); - - // the cache is vector, which is continous store. - // we remember the messages we have written and clear it when completed. - int nb_msgs = (int)pc->size(); - if (pc->empty()) { - return err; - } - - // connect transport. - if ((err = partition->connect()) != srs_success) { - return srs_error_wrap(err, "connect partition"); - } - - // write the json objects. - if ((err = partition->flush(pc)) != srs_success) { - return srs_error_wrap(err, "flush partition"); - } - - // free all wrote messages. - for (vector::iterator it = pc->begin(); it != pc->end(); ++it) { - SrsJsonObject* obj = *it; - srs_freep(obj); - } - - // remove the messages from cache. - if ((int)pc->size() == nb_msgs) { - pc->clear(); - } else { - pc->erase(pc->begin(), pc->begin() + nb_msgs); - } - - return err; -} - -ISrsKafkaCluster::ISrsKafkaCluster() -{ -} - -ISrsKafkaCluster::~ISrsKafkaCluster() -{ -} - -// @global kafka event producer, user must use srs_initialize_kafka to initialize it. -ISrsKafkaCluster* _srs_kafka = NULL; - -srs_error_t srs_initialize_kafka() -{ - srs_error_t err = srs_success; - - SrsKafkaProducer* kafka = new SrsKafkaProducer(); - _srs_kafka = kafka; - - if ((err = kafka->initialize()) != srs_success) { - return srs_error_wrap(err, "initialize kafka producer"); - } - - if ((err = kafka->start()) != srs_success) { - return srs_error_wrap(err, "start kafka producer"); - } - - return err; -} - -void srs_dispose_kafka() -{ - SrsKafkaProducer* kafka = dynamic_cast(_srs_kafka); - if (!kafka) { - return; - } - - kafka->stop(); - - srs_freep(kafka); - _srs_kafka = NULL; -} - -SrsKafkaProducer::SrsKafkaProducer() -{ - metadata_ok = false; - metadata_expired = srs_cond_new(); - - lock = srs_mutex_new(); - trd = new SrsDummyCoroutine(); - worker = new SrsAsyncCallWorker(); - cache = new SrsKafkaCache(); - - lb = new SrsLbRoundRobin(); -} - -SrsKafkaProducer::~SrsKafkaProducer() -{ - clear_metadata(); - - srs_freep(lb); - - srs_freep(worker); - srs_freep(trd); - srs_freep(cache); - - srs_mutex_destroy(lock); - srs_cond_destroy(metadata_expired); -} - -srs_error_t SrsKafkaProducer::initialize() -{ - enabled = _srs_config->get_kafka_enabled(); - srs_info("initialize kafka ok, enabled=%d.", enabled); - return srs_success; -} - -srs_error_t SrsKafkaProducer::start() -{ - srs_error_t err = srs_success; - - if (!enabled) { - return err; - } - - if ((err = worker->start()) != srs_success) { - return srs_error_wrap(err, "async worker"); - } - - srs_freep(trd); - trd = new SrsSTCoroutine("kafka", this, _srs_context->get_id()); - if ((err = trd->start()) != srs_success) { - return srs_error_wrap(err, "coroutine"); - } - - refresh_metadata(); - - return err; -} - -void SrsKafkaProducer::stop() -{ - if (!enabled) { - return; - } - - trd->stop(); - worker->stop(); -} - -srs_error_t SrsKafkaProducer::send(int key, SrsJsonObject* obj) -{ - srs_error_t err = srs_success; - - // cache the json object. - cache->append(key, obj); - - // too few messages, ignore. - if (cache->size() < SRS_KAFKA_PRODUCER_AGGREGATE_SIZE) { - return err; - } - - // too many messages, warn user. - if (cache->size() > SRS_KAFKA_PRODUCER_AGGREGATE_SIZE * 10) { - srs_warn("kafka cache too many messages: %d", cache->size()); - } - - // sync with backgound metadata worker. - SrsLocker(lock); - - // flush message when metadata is ok. - if (metadata_ok) { - err = flush(); - } - - return err; -} - -srs_error_t SrsKafkaProducer::on_client(int key, SrsListenerType type, string ip) -{ - srs_error_t err = srs_success; - - if (!enabled) { - return err; - } - - SrsJsonObject* obj = SrsJsonAny::object(); - - obj->set("msg", SrsJsonAny::str("accept")); - obj->set("type", SrsJsonAny::integer(type)); - obj->set("ip", SrsJsonAny::str(ip.c_str())); - - return worker->execute(new SrsKafkaMessage(this, key, obj)); -} - -srs_error_t SrsKafkaProducer::on_close(int key) -{ - srs_error_t err = srs_success; - - if (!enabled) { - return err; - } - - SrsJsonObject* obj = SrsJsonAny::object(); - - obj->set("msg", SrsJsonAny::str("close")); - - return worker->execute(new SrsKafkaMessage(this, key, obj)); -} - -#define SRS_KAKFA_CIMS (3 * SRS_UTIME_SECONDS) - -srs_error_t SrsKafkaProducer::cycle() -{ - srs_error_t err = srs_success; - - // wait for the metadata expired. - // when metadata is ok, wait for it expired. - if (metadata_ok) { - srs_cond_wait(metadata_expired); - } - - // request to lock to acquire the socket. - SrsLocker(lock); - - while (true) { - if ((err = do_cycle()) != srs_success) { - srs_warn("KafkaProducer: Ignore error, %s", srs_error_desc(err).c_str()); - srs_freep(err); - } - - if ((err = trd->pull()) != srs_success) { - return srs_error_wrap(err, "kafka cycle"); - } - - srs_usleep(SRS_KAKFA_CIMS); - } - - return err; -} - -void SrsKafkaProducer::clear_metadata() -{ - vector::iterator it; - - for (it = partitions.begin(); it != partitions.end(); ++it) { - SrsKafkaPartition* partition = *it; - srs_freep(partition); - } - - partitions.clear(); -} - -srs_error_t SrsKafkaProducer::do_cycle() -{ - srs_error_t err = srs_success; - - // ignore when disabled. - if (!enabled) { - return err; - } - - // when kafka enabled, request metadata when startup. - if ((err = request_metadata()) != srs_success) { - return srs_error_wrap(err, "request metadata"); - } - - return err; -} - -srs_error_t SrsKafkaProducer::request_metadata() -{ - srs_error_t err = srs_success; - - // ignore when disabled. - if (!enabled) { - return err; - } - - // select one broker to connect to. - SrsConfDirective* brokers = _srs_config->get_kafka_brokers(); - if (!brokers) { - srs_warn("ignore for empty brokers."); - return err; - } - - std::string server; - int port = SRS_CONSTS_KAFKA_DEFAULT_PORT; - if (true) { - srs_assert(!brokers->args.empty()); - std::string broker = lb->select(brokers->args); - srs_parse_endpoint(broker, server, port); - } - - std::string topic = _srs_config->get_kafka_topic(); - if (true) { - std::string senabled = srs_bool2switch(enabled); - std::string sbrokers = srs_join_vector_string(brokers->args, ","); - srs_trace("kafka request enabled:%s, brokers:%s, current:[%d]%s:%d, topic:%s", - senabled.c_str(), sbrokers.c_str(), lb->current(), server.c_str(), port, topic.c_str()); - } - - SrsTcpClient* transport = new SrsTcpClient(server, port, SRS_CONSTS_KAFKA_TIMEOUT); - SrsAutoFree(SrsTcpClient, transport); - - SrsKafkaClient* kafka = new SrsKafkaClient(transport); - SrsAutoFree(SrsKafkaClient, kafka); - - // reconnect to kafka server. - if ((err = transport->connect()) != srs_success) { - return srs_error_wrap(err, "connect %s:%d failed", server.c_str(), port); - } - - // do fetch medata from broker. - SrsKafkaTopicMetadataResponse* metadata = NULL; - if ((err = kafka->fetch_metadata(topic, &metadata)) != srs_success) { - return srs_error_wrap(err, "fetch metadata"); - } - SrsAutoFree(SrsKafkaTopicMetadataResponse, metadata); - - // we may need to request multiple times. - // for example, the first time to create a none-exists topic, then query metadata. - if (!metadata->metadatas.empty()) { - SrsKafkaTopicMetadata* topic = metadata->metadatas.at(0); - if (topic->metadatas.empty()) { - srs_warn("topic %s metadata empty, retry.", topic->name.to_str().c_str()); - return err; - } - } - - // show kafka metadata. - string summary = srs_kafka_metadata_summary(metadata); - srs_trace("kafka metadata: %s", summary.c_str()); - - // generate the partition info. - srs_kafka_metadata2connector(topic, metadata, partitions); - srs_trace("kafka connector: %s", srs_kafka_summary_partitions(partitions).c_str()); - - // update the total partition for cache. - cache->nb_partitions = (int)partitions.size(); - - metadata_ok = true; - - return err; -} - -void SrsKafkaProducer::refresh_metadata() -{ - clear_metadata(); - - metadata_ok = false; - srs_cond_signal(metadata_expired); - srs_trace("kafka async refresh metadata in background"); -} - -srs_error_t SrsKafkaProducer::flush() -{ - srs_error_t err = srs_success; - - // flush all available partition caches. - while (true) { - int key = -1; - SrsKafkaPartitionCache* pc = NULL; - - // all flushed, or no kafka partition to write to. - if (!cache->fetch(&key, &pc) || partitions.empty()) { - break; - } - - // flush specified partition. - srs_assert(key >= 0 && pc); - SrsKafkaPartition* partition = partitions.at(key % partitions.size()); - if ((err = cache->flush(partition, key, pc)) != srs_success) { - return srs_error_wrap(err, "flush partition"); - } - } - - return err; -} - -#endif - diff --git a/trunk/src/app/srs_app_kafka.hpp b/trunk/src/app/srs_app_kafka.hpp deleted file mode 100644 index 835a096867..0000000000 --- a/trunk/src/app/srs_app_kafka.hpp +++ /dev/null @@ -1,190 +0,0 @@ -/** - * The MIT License (MIT) - * - * Copyright (c) 2013-2019 Winlin - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#ifndef SRS_APP_KAFKA_HPP -#define SRS_APP_KAFKA_HPP - -#include - -#include -#include - -class SrsLbRoundRobin; -class SrsAsyncCallWorker; -class SrsTcpClient; -class SrsKafkaClient; -class SrsJsonObject; -class SrsKafkaProducer; - -#include -#include -#include - -#ifdef SRS_AUTO_KAFKA - -// The partition messages cache. -typedef std::vector SrsKafkaPartitionCache; - -// The kafka partition info. -struct SrsKafkaPartition -{ -private: - std::string ep; - // Not NULL when connected. - SrsTcpClient* transport; - SrsKafkaClient* kafka; -public: - int id; - std::string topic; - // leader. - int broker; - std::string host; - int port; -public: - SrsKafkaPartition(); - virtual ~SrsKafkaPartition(); -public: - virtual std::string hostport(); - virtual srs_error_t connect(); - virtual srs_error_t flush(SrsKafkaPartitionCache* pc); -private: - virtual void disconnect(); -}; - -// The following is all types of kafka messages. -class SrsKafkaMessage : public ISrsAsyncCallTask -{ -private: - SrsKafkaProducer* producer; - int key; - SrsJsonObject* obj; -public: - SrsKafkaMessage(SrsKafkaProducer* p, int k, SrsJsonObject* j); - virtual ~SrsKafkaMessage(); -// Interface ISrsAsyncCallTask -public: - virtual srs_error_t call(); - virtual std::string to_string(); -}; - -// A message cache for kafka. -class SrsKafkaCache -{ -public: - // The total partitions, - // for the key to map to the parition by key%nb_partitions. - int nb_partitions; -private: - // Total messages for all partitions. - int count; - // The key is the partition id, value is the message set to write to this partition. - // @remark, when refresh metadata, the partition will increase, - // so maybe some message will dispatch to new partition. - std::map< int32_t, SrsKafkaPartitionCache*> cache; -public: - SrsKafkaCache(); - virtual ~SrsKafkaCache(); -public: - virtual void append(int key, SrsJsonObject* obj); - virtual int size(); - // Fetch out a available partition cache. - // @return true when got a key and pc; otherwise, false. - virtual bool fetch(int* pkey, SrsKafkaPartitionCache** ppc); - // Flush the specified partition cache. - virtual srs_error_t flush(SrsKafkaPartition* partition, int key, SrsKafkaPartitionCache* pc); -}; - -// The kafka cluster interface. -class ISrsKafkaCluster -{ -public: - ISrsKafkaCluster(); - virtual ~ISrsKafkaCluster(); -public: - // When got any client connect to SRS, notify kafka. - // @param key the partition map key, the client id or hash(ip). - // @param type the type of client. - // @param ip the peer ip of client. - virtual srs_error_t on_client(int key, SrsListenerType type, std::string ip) = 0; - // When client close or disconnect for error. - // @param key the partition map key, the client id or hash(ip). - virtual srs_error_t on_close(int key) = 0; -}; - -// @global kafka event producer. -extern ISrsKafkaCluster* _srs_kafka; -// kafka initialize and disposer for global object. -extern srs_error_t srs_initialize_kafka(); -extern void srs_dispose_kafka(); - -// The kafka producer used to save log to kafka cluster. -class SrsKafkaProducer : virtual public ISrsCoroutineHandler, virtual public ISrsKafkaCluster -{ -private: - // TODO: FIXME: support reload. - bool enabled; - srs_mutex_t lock; - SrsCoroutine* trd; -private: - bool metadata_ok; - srs_cond_t metadata_expired; -public: - std::vector partitions; - SrsKafkaCache* cache; -private: - SrsLbRoundRobin* lb; - SrsAsyncCallWorker* worker; -public: - SrsKafkaProducer(); - virtual ~SrsKafkaProducer(); -public: - virtual srs_error_t initialize(); - virtual srs_error_t start(); - virtual void stop(); -// internal: for worker to call task to send object. -public: - // Send json object to kafka cluster. - // The producer will aggregate message and send in kafka message set. - // @param key the key to map to the partition, user can use cid or hash. - // @param obj the json object; user must never free it again. - virtual srs_error_t send(int key, SrsJsonObject* obj); -// Interface ISrsKafkaCluster -public: - virtual srs_error_t on_client(int key, SrsListenerType type, std::string ip); - virtual srs_error_t on_close(int key); -// Interface ISrsReusableThreadHandler -public: - virtual srs_error_t cycle(); -private: - virtual void clear_metadata(); - virtual srs_error_t do_cycle(); - virtual srs_error_t request_metadata(); - // Set the metadata to invalid and refresh it. - virtual void refresh_metadata(); - virtual srs_error_t flush(); -}; - -#endif - -#endif - diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 9fc028ef64..850e4a1ccd 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -55,7 +55,6 @@ using namespace std; #include #include #include -#include // the timeout in srs_utime_t to wait encoder to republish // if timeout, close the connection. @@ -154,13 +153,6 @@ srs_error_t SrsRtmpConn::do_cycle() srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd)); - // notify kafka cluster. -#ifdef SRS_AUTO_KAFKA - if ((err = _srs_kafka->on_client(srs_id(), SrsListenerRtmpStream, ip)) != srs_success) { - return srs_error_wrap(err, "kafka on client"); - } -#endif - rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT); rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); @@ -1194,12 +1186,6 @@ srs_error_t SrsRtmpConn::on_disconnect() http_hooks_on_close(); -#ifdef SRS_AUTO_KAFKA - if ((err = _srs_kafka->on_close(srs_id())) != srs_success) { - return srs_error_wrap(err, "kafka on close"); - } -#endif - // TODO: FIXME: Implements it. return err; diff --git a/trunk/src/app/srs_app_rtmp_conn.hpp b/trunk/src/app/srs_app_rtmp_conn.hpp index b84d6f4a83..100fb00257 100644 --- a/trunk/src/app/srs_app_rtmp_conn.hpp +++ b/trunk/src/app/srs_app_rtmp_conn.hpp @@ -54,9 +54,6 @@ class SrsSecurity; class ISrsWakable; class SrsCommonMessage; class SrsPacket; -#ifdef SRS_AUTO_KAFKA -class ISrsKafkaCluster; -#endif // The simple rtmp client for SRS. class SrsSimpleRtmpClient : public SrsBasicRtmpClient diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 4d2241bc93..1901da4db4 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -49,7 +49,6 @@ using namespace std; #include #include #include -#include #include #include @@ -523,10 +522,6 @@ void SrsServer::dispose() // @remark don't dispose ingesters, for too slow. -#ifdef SRS_AUTO_KAFKA - srs_dispose_kafka(); -#endif - // dispose the source for hls and dvr. SrsSource::dispose_all(); @@ -590,13 +585,6 @@ srs_error_t SrsServer::initialize_st() // set current log id. _srs_context->generate_id(); - // initialize the conponents that depends on st. -#ifdef SRS_AUTO_KAFKA - if ((err = srs_initialize_kafka()) != srs_success) { - return srs_error_wrap(err, "initialize kafka"); - } -#endif - // check asprocess. bool asprocess = _srs_config->get_asprocess(); if (asprocess && ppid == 1) { diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index b8366ea21b..aad92c653d 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -50,9 +50,6 @@ class ISrsUdpHandler; class SrsUdpListener; class SrsTcpListener; class SrsAppCasterFlv; -#ifdef SRS_AUTO_KAFKA -class SrsKafkaProducer; -#endif class SrsCoroutineManager; // The listener type for server to identify the connection, diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp index 5f7e22218a..9ea41f532d 100644 --- a/trunk/src/core/srs_core.hpp +++ b/trunk/src/core/srs_core.hpp @@ -27,7 +27,7 @@ // The version config. #define VERSION_MAJOR 3 #define VERSION_MINOR 0 -#define VERSION_REVISION 52 +#define VERSION_REVISION 53 // The macros generated by configure script. #include diff --git a/trunk/src/kernel/srs_kernel_balance.hpp b/trunk/src/kernel/srs_kernel_balance.hpp index d6def79b1e..5efc895149 100644 --- a/trunk/src/kernel/srs_kernel_balance.hpp +++ b/trunk/src/kernel/srs_kernel_balance.hpp @@ -31,7 +31,7 @@ /** * the round-robin load balance algorithm, - * used for edge pull, kafka and other multiple server feature. + * used for edge pull and other multiple server feature. */ class SrsLbRoundRobin { diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp index 8f09912ff0..bb6d852921 100644 --- a/trunk/src/kernel/srs_kernel_consts.hpp +++ b/trunk/src/kernel/srs_kernel_consts.hpp @@ -390,13 +390,5 @@ #define SRS_CONSTS_RTSP_RTSPVersionNotSupported_str "RTSP Version Not Supported" #define SRS_CONSTS_RTSP_OptionNotSupported_str "Option not support" -/////////////////////////////////////////////////////////// -// KAFKA consts values -/////////////////////////////////////////////////////////// -#define SRS_CONSTS_KAFKA_DEFAULT_PORT 9092 - -// The common io timeout, for both recv and send. -#define SRS_CONSTS_KAFKA_TIMEOUT (30 * SRS_UTIME_MILLISECONDS) - #endif diff --git a/trunk/src/kernel/srs_kernel_error.hpp b/trunk/src/kernel/srs_kernel_error.hpp index 45a7751c7b..88672030fb 100644 --- a/trunk/src/kernel/srs_kernel_error.hpp +++ b/trunk/src/kernel/srs_kernel_error.hpp @@ -284,7 +284,7 @@ #define ERROR_OCLUSTER_REDIRECT 3091 /////////////////////////////////////////////////////// -// HTTP/StreamCaster/KAFKA protocol error. +// HTTP/StreamCaster protocol error. /////////////////////////////////////////////////////// #define ERROR_HTTP_PATTERN_EMPTY 4000 #define ERROR_HTTP_PATTERN_DUPLICATED 4001 @@ -316,14 +316,6 @@ #define ERROR_AVC_NALU_UEV 4027 #define ERROR_AAC_BYTES_INVALID 4028 #define ERROR_HTTP_REQUEST_EOF 4029 -#define ERROR_KAFKA_CODEC_STRING 4030 -#define ERROR_KAFKA_CODEC_BYTES 4031 -#define ERROR_KAFKA_CODEC_REQUEST 4032 -#define ERROR_KAFKA_CODEC_RESPONSE 4033 -#define ERROR_KAFKA_CODEC_ARRAY 4034 -#define ERROR_KAFKA_CODEC_METADATA 4035 -#define ERROR_KAFKA_CODEC_MESSAGE 4036 -#define ERROR_KAFKA_CODEC_PRODUCER 4037 #define ERROR_HTTP_302_INVALID 4038 #define ERROR_BASE64_DECODE 4039 diff --git a/trunk/src/main/srs_main_server.cpp b/trunk/src/main/srs_main_server.cpp index 39d518fa06..22932c1604 100644 --- a/trunk/src/main/srs_main_server.cpp +++ b/trunk/src/main/srs_main_server.cpp @@ -228,7 +228,6 @@ void show_macro_features() ss << ", trans:" << srs_bool2switch(true); // inge(ingest) ss << ", inge:" << srs_bool2switch(true); - ss << ", kafka:" << srs_bool2switch(SRS_AUTO_KAFKA_BOOL); ss << ", stat:" << srs_bool2switch(true); ss << ", nginx:" << srs_bool2switch(SRS_AUTO_NGINX_BOOL); // ff(ffmpeg) diff --git a/trunk/src/protocol/srs_kafka_stack.cpp b/trunk/src/protocol/srs_kafka_stack.cpp deleted file mode 100644 index 8c73b5b70d..0000000000 --- a/trunk/src/protocol/srs_kafka_stack.cpp +++ /dev/null @@ -1,1415 +0,0 @@ -/** - * The MIT License (MIT) - * - * Copyright (c) 2013-2019 Winlin - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#include - -#include -using namespace std; - -#include -#include -#include -#include -#include -#include -#include -#include - -#ifdef SRS_AUTO_KAFKA - -#define SRS_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS 300000 - -SrsKafkaString::SrsKafkaString() -{ - _size = -1; - data = NULL; -} - -SrsKafkaString::SrsKafkaString(string v) -{ - _size = -1; - data = NULL; - - set_value(v); -} - -SrsKafkaString::~SrsKafkaString() -{ - srs_freepa(data); -} - -bool SrsKafkaString::null() -{ - return _size == -1; -} - -bool SrsKafkaString::empty() -{ - return _size <= 0; -} - -string SrsKafkaString::to_str() -{ - string ret; - if (_size > 0) { - ret.append(data, _size); - } - return ret; -} - -void SrsKafkaString::set_value(string v) -{ - // free previous data. - srs_freepa(data); - - // copy new value to data. - _size = (int16_t)v.length(); - - srs_assert(_size > 0); - data = new char[_size]; - memcpy(data, v.data(), _size); -} - -int SrsKafkaString::nb_bytes() -{ - return _size == -1? 2 : 2 + _size; -} - -srs_error_t SrsKafkaString::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(2)) { - return srs_error_new(ERROR_KAFKA_CODEC_STRING, "requires 2 only %d bytes", buf->left()); - } - buf->write_2bytes(_size); - - if (_size <= 0) { - return err; - } - - if (!buf->require(_size)) { - return srs_error_new(ERROR_KAFKA_CODEC_STRING, "requires %d only %d bytes", _size, buf->left()); - } - buf->write_bytes(data, _size); - - return err; -} - -srs_error_t SrsKafkaString::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(2)) { - return srs_error_new(ERROR_KAFKA_CODEC_STRING, "requires 2 only %d bytes", buf->left()); - } - _size = buf->read_2bytes(); - - if (_size != -1 && _size < 0) { - return srs_error_new(ERROR_KAFKA_CODEC_STRING, "invalid size=%d", _size); - } - - if (_size <= 0) { - return err; - } - - if (!buf->require(_size)) { - return srs_error_new(ERROR_KAFKA_CODEC_STRING, "requires %d only %d bytes", _size, buf->left()); - } - - srs_freepa(data); - data = new char[_size]; - - buf->read_bytes(data, _size); - - return err; -} - -SrsKafkaBytes::SrsKafkaBytes() -{ - _size = -1; - _data = NULL; -} - -SrsKafkaBytes::SrsKafkaBytes(const char* v, int nb_v) -{ - _size = -1; - _data = NULL; - - set_value(v, nb_v); -} - -SrsKafkaBytes::~SrsKafkaBytes() -{ - srs_freepa(_data); -} - -char* SrsKafkaBytes::data() -{ - return _data; -} - -int SrsKafkaBytes::size() -{ - return _size; -} - -bool SrsKafkaBytes::null() -{ - return _size == -1; -} - -bool SrsKafkaBytes::empty() -{ - return _size <= 0; -} - -void SrsKafkaBytes::set_value(string v) -{ - set_value(v.data(), (int)v.length()); -} - -void SrsKafkaBytes::set_value(const char* v, int nb_v) -{ - // free previous data. - srs_freepa(_data); - - // copy new value to data. - _size = (int16_t)nb_v; - - srs_assert(_size > 0); - _data = new char[_size]; - memcpy(_data, v, _size); -} - -uint32_t SrsKafkaBytes::crc32(uint32_t previous) -{ - char bsize[4]; - SrsBuffer(bsize, 4).write_4bytes(_size); - - if (_size <= 0) { - return srs_crc32_ieee(bsize, 4, previous); - } - - uint32_t crc = srs_crc32_ieee(bsize, 4, previous); - crc = srs_crc32_ieee(_data, _size, crc); - - return crc; -} - -int SrsKafkaBytes::nb_bytes() -{ - return 4 + (_size == -1? 0 : _size); -} - -srs_error_t SrsKafkaBytes::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(4)) { - return srs_error_new(ERROR_KAFKA_CODEC_BYTES, "requires 4 only %d bytes", buf->left()); - } - buf->write_4bytes(_size); - - if (_size <= 0) { - return err; - } - - if (!buf->require(_size)) { - return srs_error_new(ERROR_KAFKA_CODEC_BYTES, "requires %d only %d bytes", _size, buf->left()); - } - buf->write_bytes(_data, _size); - - return err; -} - -srs_error_t SrsKafkaBytes::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(4)) { - return srs_error_new(ERROR_KAFKA_CODEC_BYTES, "requires 4 only %d bytes", buf->left()); - } - _size = buf->read_4bytes(); - - if (_size != -1 && _size < 0) { - return srs_error_new(ERROR_KAFKA_CODEC_BYTES, "invalid size=%d", _size); - } - - if (!buf->require(_size)) { - return srs_error_new(ERROR_KAFKA_CODEC_BYTES, "requires %d only %d bytes", _size, buf->left()); - } - - srs_freepa(_data); - _data = new char[_size]; - buf->read_bytes(_data, _size); - - return err; -} - -SrsKafkaRequestHeader::SrsKafkaRequestHeader() -{ - _size = 0; - _api_key = api_version = 0; - _correlation_id = 0; - client_id = new SrsKafkaString(); -} - -SrsKafkaRequestHeader::~SrsKafkaRequestHeader() -{ - srs_freep(client_id); -} - -int SrsKafkaRequestHeader::header_size() -{ - return 2 + 2 + 4 + client_id->nb_bytes(); -} - -int SrsKafkaRequestHeader::message_size() -{ - return _size - header_size(); -} - -int SrsKafkaRequestHeader::total_size() -{ - return 4 + _size; -} - -void SrsKafkaRequestHeader::set_total_size(int s) -{ - _size = s - 4; -} - -int32_t SrsKafkaRequestHeader::correlation_id() -{ - return _correlation_id; -} - -void SrsKafkaRequestHeader::set_correlation_id(int32_t cid) -{ - _correlation_id = cid; -} - -SrsKafkaApiKey SrsKafkaRequestHeader::api_key() -{ - return (SrsKafkaApiKey)_api_key; -} - -void SrsKafkaRequestHeader::set_api_key(SrsKafkaApiKey key) -{ - _api_key = (int16_t)key; -} - -bool SrsKafkaRequestHeader::is_producer_request() -{ - return _api_key == SrsKafkaApiKeyProduceRequest; -} - -bool SrsKafkaRequestHeader::is_fetch_request() -{ - return _api_key == SrsKafkaApiKeyFetchRequest; -} - -bool SrsKafkaRequestHeader::is_offset_request() -{ - return _api_key == SrsKafkaApiKeyOffsetRequest; -} - -bool SrsKafkaRequestHeader::is_metadata_request() -{ - return _api_key == SrsKafkaApiKeyMetadataRequest; -} - -bool SrsKafkaRequestHeader::is_offset_commit_request() -{ - return _api_key == SrsKafkaApiKeyOffsetCommitRequest; -} - -bool SrsKafkaRequestHeader::is_offset_fetch_request() -{ - return _api_key == SrsKafkaApiKeyOffsetFetchRequest; -} - -bool SrsKafkaRequestHeader::is_consumer_metadata_request() -{ - return _api_key == SrsKafkaApiKeyConsumerMetadataRequest; -} - -int SrsKafkaRequestHeader::nb_bytes() -{ - return 4 + header_size(); -} - -srs_error_t SrsKafkaRequestHeader::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(4 + _size)) { - return srs_error_new(ERROR_KAFKA_CODEC_REQUEST, "requires %d only %d bytes", 4 + _size, buf->left()); - } - - buf->write_4bytes(_size); - buf->write_2bytes(_api_key); - buf->write_2bytes(api_version); - buf->write_4bytes(_correlation_id); - - if ((err = client_id->encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode client_id"); - } - - return err; -} - -srs_error_t SrsKafkaRequestHeader::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(4)) { - return srs_error_new(ERROR_KAFKA_CODEC_REQUEST, "requires %d only %d bytes", 4, buf->left()); - } - _size = buf->read_4bytes(); - - if (_size <= 0) { - srs_warn("kafka got empty request"); - return err; - } - - if (!buf->require(_size)) { - return srs_error_new(ERROR_KAFKA_CODEC_REQUEST, "requires %d only %d bytes", _size, buf->left()); - } - _api_key = buf->read_2bytes(); - api_version = buf->read_2bytes(); - _correlation_id = buf->read_4bytes(); - - if ((err = client_id->decode(buf)) != srs_success) { - return srs_error_wrap(err, "decode client_id"); - } - - return err; -} - -SrsKafkaResponseHeader::SrsKafkaResponseHeader() -{ - _size = 0; - _correlation_id = 0; -} - -SrsKafkaResponseHeader::~SrsKafkaResponseHeader() -{ -} - -int SrsKafkaResponseHeader::header_size() -{ - return 4; -} - -int SrsKafkaResponseHeader::message_size() -{ - return _size - header_size(); -} - -int SrsKafkaResponseHeader::total_size() -{ - return 4 + _size; -} - -void SrsKafkaResponseHeader::set_total_size(int s) -{ - _size = s - 4; -} - -int32_t SrsKafkaResponseHeader::correlation_id() -{ - return _correlation_id; -} - -int SrsKafkaResponseHeader::nb_bytes() -{ - return 4 + header_size(); -} - -srs_error_t SrsKafkaResponseHeader::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(4 + _size)) { - return srs_error_new(ERROR_KAFKA_CODEC_RESPONSE, "requires %d only %d bytes", 4 + _size, buf->left()); - } - - buf->write_4bytes(_size); - buf->write_4bytes(_correlation_id); - - return err; -} - -srs_error_t SrsKafkaResponseHeader::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(4)) { - return srs_error_new(ERROR_KAFKA_CODEC_RESPONSE, "requires %d only %d bytes", 4, buf->left()); - } - _size = buf->read_4bytes(); - - if (_size <= 0) { - srs_warn("kafka got empty response"); - return err; - } - - if (!buf->require(_size)) { - return srs_error_new(ERROR_KAFKA_CODEC_RESPONSE, "requires %d only %d bytes", _size, buf->left()); - } - _correlation_id = buf->read_4bytes(); - - return err; -} - -SrsKafkaRawMessage::SrsKafkaRawMessage() -{ - offset = 0; - message_size = 0; - - crc = 0; - magic_byte = attributes = 0; - key = new SrsKafkaBytes(); - value = new SrsKafkaBytes(); -} - -SrsKafkaRawMessage::~SrsKafkaRawMessage() -{ - srs_freep(key); - srs_freep(value); -} - -srs_error_t SrsKafkaRawMessage::create(SrsJsonObject* obj) -{ - srs_error_t err = srs_success; - - // current must be 0. - magic_byte = 0; - - // no compression codec. - attributes = 0; - - // dumps the json to string. - value->set_value(obj->dumps()); - - // crc32 message. - crc = srs_crc32_ieee(&magic_byte, 1); - crc = srs_crc32_ieee(&attributes, 1, crc); - crc = key->crc32(crc); - crc = value->crc32(crc); - - srs_info("crc32 message is %#x", crc); - - message_size = raw_message_size(); - - return err; -} - -int SrsKafkaRawMessage::raw_message_size() -{ - return 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes(); -} - -int SrsKafkaRawMessage::nb_bytes() -{ - return 8 + 4 + 4 + 1 + 1 + key->nb_bytes() + value->nb_bytes(); -} - -srs_error_t SrsKafkaRawMessage::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - int nb_required = 8 + 4 + 4 + 1 + 1; - if (!buf->require(nb_required)) { - return srs_error_new(ERROR_KAFKA_CODEC_MESSAGE, "requires %d only %d bytes", nb_required, buf->left()); - } - buf->write_8bytes(offset); - buf->write_4bytes(message_size); - buf->write_4bytes(crc); - buf->write_1bytes(magic_byte); - buf->write_1bytes(attributes); - - if ((err = key->encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode key"); - } - - if ((err = value->encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode value"); - } - - return err; -} - -srs_error_t SrsKafkaRawMessage::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - int nb_required = 8 + 4 + 4 + 1 + 1; - if (!buf->require(nb_required)) { - return srs_error_new(ERROR_KAFKA_CODEC_MESSAGE, "requires %d only %d bytes", nb_required, buf->left()); - } - offset = buf->read_8bytes(); - message_size = buf->read_4bytes(); - crc = buf->read_4bytes(); - magic_byte = buf->read_1bytes(); - attributes = buf->read_1bytes(); - - if ((err = key->decode(buf)) != srs_success) { - return srs_error_wrap(err, "decode key"); - } - - if ((err = value->decode(buf)) != srs_success) { - return srs_error_wrap(err, "decode value"); - } - - return err; -} - -SrsKafkaRawMessageSet::SrsKafkaRawMessageSet() -{ -} - -SrsKafkaRawMessageSet::~SrsKafkaRawMessageSet() -{ - vector::iterator it; - for (it = messages.begin(); it != messages.end(); ++it) { - SrsKafkaRawMessage* message = *it; - srs_freep(message); - } - messages.clear(); -} - -void SrsKafkaRawMessageSet::append(SrsKafkaRawMessage* msg) -{ - messages.push_back(msg); -} - -int SrsKafkaRawMessageSet::nb_bytes() -{ - int s = 0; - - vector::iterator it; - for (it = messages.begin(); it != messages.end(); ++it) { - SrsKafkaRawMessage* message = *it; - s += message->nb_bytes(); - } - - return s; -} - -srs_error_t SrsKafkaRawMessageSet::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - vector::iterator it; - for (it = messages.begin(); it != messages.end(); ++it) { - SrsKafkaRawMessage* message = *it; - if ((err = message->encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode message"); - } - } - - return err; -} - -srs_error_t SrsKafkaRawMessageSet::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - while (!buf->empty()) { - SrsKafkaRawMessage* message = new SrsKafkaRawMessage(); - - if ((err = message->decode(buf)) != srs_success) { - srs_freep(message); - return srs_error_wrap(err, "decode message"); - } - - messages.push_back(message); - } - - return err; -} - -SrsKafkaRequest::SrsKafkaRequest() -{ - header.set_correlation_id(SrsKafkaCorrelationPool::instance()->generate_correlation_id()); -} - -SrsKafkaRequest::~SrsKafkaRequest() -{ -} - -void SrsKafkaRequest::update_header(int s) -{ - header.set_total_size(s); -} - -int32_t SrsKafkaRequest::correlation_id() -{ - return header.correlation_id(); -} - -SrsKafkaApiKey SrsKafkaRequest::api_key() -{ - return header.api_key(); -} - -int SrsKafkaRequest::nb_bytes() -{ - return header.nb_bytes(); -} - -srs_error_t SrsKafkaRequest::encode(SrsBuffer* buf) -{ - return header.encode(buf); -} - -srs_error_t SrsKafkaRequest::decode(SrsBuffer* buf) -{ - return header.decode(buf); -} - -SrsKafkaResponse::SrsKafkaResponse() -{ -} - -SrsKafkaResponse::~SrsKafkaResponse() -{ -} - -void SrsKafkaResponse::update_header(int s) -{ - header.set_total_size(s); -} - -int SrsKafkaResponse::nb_bytes() -{ - return header.nb_bytes(); -} - -srs_error_t SrsKafkaResponse::encode(SrsBuffer* buf) -{ - return header.encode(buf); -} - -srs_error_t SrsKafkaResponse::decode(SrsBuffer* buf) -{ - return header.decode(buf); -} - -SrsKafkaTopicMetadataRequest::SrsKafkaTopicMetadataRequest() -{ - header.set_api_key(SrsKafkaApiKeyMetadataRequest); -} - -SrsKafkaTopicMetadataRequest::~SrsKafkaTopicMetadataRequest() -{ -} - -void SrsKafkaTopicMetadataRequest::add_topic(string topic) -{ - topics.append(new SrsKafkaString(topic)); -} - -int SrsKafkaTopicMetadataRequest::nb_bytes() -{ - return SrsKafkaRequest::nb_bytes() + topics.nb_bytes(); -} - -srs_error_t SrsKafkaTopicMetadataRequest::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if ((err = SrsKafkaRequest::encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode request"); - } - - if ((err = topics.encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode topics"); - } - - return err; -} - -srs_error_t SrsKafkaTopicMetadataRequest::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if ((err = SrsKafkaRequest::decode(buf)) != srs_success) { - return srs_error_wrap(err, "decode request"); - } - - if ((err = topics.decode(buf)) != srs_success) { - return srs_error_wrap(err, "decode topics"); - } - - return err; -} - -SrsKafkaBroker::SrsKafkaBroker() -{ - node_id = port = 0; -} - -SrsKafkaBroker::~SrsKafkaBroker() -{ -} - -int SrsKafkaBroker::nb_bytes() -{ - return 4 + host.nb_bytes() + 4; -} - -srs_error_t SrsKafkaBroker::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(4)) { - return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 4, buf->left()); - } - buf->write_4bytes(node_id); - - if ((err = host.encode(buf)) != srs_success) { - return srs_error_wrap(err, "host"); - } - - if (!buf->require(4)) { - return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 4, buf->left()); - } - buf->write_4bytes(port); - - return err; -} - -srs_error_t SrsKafkaBroker::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(4)) { - return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 4, buf->left()); - } - node_id = buf->read_4bytes(); - - if ((err = host.decode(buf)) != srs_success) { - return srs_error_wrap(err, "host"); - } - - if (!buf->require(4)) { - return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 4, buf->left()); - } - port = buf->read_4bytes(); - - return err; -} - -SrsKafkaPartitionMetadata::SrsKafkaPartitionMetadata() -{ - error_code = 0; - partition_id = 0; - leader = 0; -} - -SrsKafkaPartitionMetadata::~SrsKafkaPartitionMetadata() -{ -} - -int SrsKafkaPartitionMetadata::nb_bytes() -{ - return 2 + 4 + 4 + replicas.nb_bytes() + isr.nb_bytes(); -} - -srs_error_t SrsKafkaPartitionMetadata::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - int nb_required = 2 + 4 + 4; - if (!buf->require(nb_required)) { - return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", nb_required, buf->left()); - } - buf->write_2bytes(error_code); - buf->write_4bytes(partition_id); - buf->write_4bytes(leader); - - if ((err = replicas.encode(buf)) != srs_success) { - return srs_error_wrap(err, "replicas"); - } - if ((err = isr.encode(buf)) != srs_success) { - return srs_error_wrap(err, "isr"); - } - - return err; -} - -srs_error_t SrsKafkaPartitionMetadata::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - int nb_required = 2 + 4 + 4; - if (!buf->require(nb_required)) { - return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", nb_required, buf->left()); - } - error_code = buf->read_2bytes(); - partition_id = buf->read_4bytes(); - leader = buf->read_4bytes(); - - if ((err = replicas.decode(buf)) != srs_success) { - return srs_error_wrap(err, "replicas"); - } - if ((err = isr.decode(buf)) != srs_success) { - return srs_error_wrap(err, "isr"); - } - - return err; -} - -SrsKafkaTopicMetadata::SrsKafkaTopicMetadata() -{ - error_code = 0; -} - -SrsKafkaTopicMetadata::~SrsKafkaTopicMetadata() -{ -} - -int SrsKafkaTopicMetadata::nb_bytes() -{ - return 2 + name.nb_bytes() + metadatas.nb_bytes(); -} - -srs_error_t SrsKafkaTopicMetadata::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(2)) { - return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 2, buf->left()); - } - buf->write_2bytes(error_code); - - if ((err = name.encode(buf)) != srs_success) { - return srs_error_wrap(err, "name"); - } - - if ((err = metadatas.encode(buf)) != srs_success) { - return srs_error_wrap(err, "metadatas"); - } - - return err; -} - -srs_error_t SrsKafkaTopicMetadata::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if (!buf->require(2)) { - return srs_error_new(ERROR_KAFKA_CODEC_METADATA, "requires %d only %d bytes", 2, buf->left()); - } - error_code = buf->read_2bytes(); - - if ((err = name.decode(buf)) != srs_success) { - return srs_error_wrap(err, "name"); - } - - if ((err = metadatas.decode(buf)) != srs_success) { - return srs_error_wrap(err, "metadatas"); - } - - return err; -} - -SrsKafkaTopicMetadataResponse::SrsKafkaTopicMetadataResponse() -{ -} - -SrsKafkaTopicMetadataResponse::~SrsKafkaTopicMetadataResponse() -{ -} - -int SrsKafkaTopicMetadataResponse::nb_bytes() -{ - return SrsKafkaResponse::nb_bytes() + brokers.nb_bytes() + metadatas.nb_bytes(); -} - -srs_error_t SrsKafkaTopicMetadataResponse::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if ((err = SrsKafkaResponse::encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode response"); - } - - if ((err = brokers.encode(buf)) != srs_success) { - return srs_error_wrap(err, "brokers"); - } - - if ((err = metadatas.encode(buf)) != srs_success) { - return srs_error_wrap(err, "metadatas"); - } - - return err; -} - -srs_error_t SrsKafkaTopicMetadataResponse::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if ((err = SrsKafkaResponse::decode(buf)) != srs_success) { - return srs_error_wrap(err, "decode response"); - } - - if ((err = brokers.decode(buf)) != srs_success) { - return srs_error_wrap(err, "brokers"); - } - - if ((err = metadatas.decode(buf)) != srs_success) { - return srs_error_wrap(err, "metadatas"); - } - - return err; -} - -int SrsKafkaProducerPartitionMessages::nb_bytes() -{ - return 4 + 4 + messages.nb_bytes(); -} - -srs_error_t SrsKafkaProducerPartitionMessages::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - int nb_required = 4 + 4; - if (!buf->require(nb_required)) { - return srs_error_new(ERROR_KAFKA_CODEC_PRODUCER, "requires %d only %d bytes", nb_required, buf->left()); - } - buf->write_4bytes(partition); - buf->write_4bytes(message_set_size); - - if ((err = messages.encode(buf)) != srs_success) { - return srs_error_wrap(err, "messages"); - } - - return err; -} - -srs_error_t SrsKafkaProducerPartitionMessages::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - int nb_required = 4 + 4; - if (!buf->require(nb_required)) { - return srs_error_new(ERROR_KAFKA_CODEC_PRODUCER, "requires %d only %d bytes", nb_required, buf->left()); - } - partition = buf->read_4bytes(); - message_set_size = buf->read_4bytes(); - - // for the message set decode util empty, we must create a new buffer when - // there exists other objects after message set. - if (buf->left() != message_set_size) { - SrsBuffer* tbuf = new SrsBuffer(buf->data() + buf->pos(), message_set_size); - SrsAutoFree(SrsBuffer, tbuf); - - if ((err = messages.decode(buf)) != srs_success) { - return srs_error_wrap(err, "messages"); - } - } else { - if ((err = messages.decode(buf)) != srs_success) { - return srs_error_wrap(err, "messages"); - } - } - - return err; -} - -int SrsKafkaProducerTopicMessages::nb_bytes() -{ - return topic_name.nb_bytes() + partitions.nb_bytes(); -} - -srs_error_t SrsKafkaProducerTopicMessages::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if ((err = topic_name.encode(buf)) != srs_success) { - return srs_error_wrap(err, "topic_name"); - } - - if ((err = partitions.encode(buf)) != srs_success) { - return srs_error_wrap(err, "partitions"); - } - - return err; -} - -srs_error_t SrsKafkaProducerTopicMessages::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if ((err = topic_name.decode(buf)) != srs_success) { - return srs_error_wrap(err, "topic_name"); - } - - if ((err = partitions.decode(buf)) != srs_success) { - return srs_error_wrap(err, "partitions"); - } - - return err; -} - -SrsKafkaProducerRequest::SrsKafkaProducerRequest() -{ - required_acks = 0; - timeout = 0; -} - -SrsKafkaProducerRequest::~SrsKafkaProducerRequest() -{ -} - -int SrsKafkaProducerRequest::nb_bytes() -{ - return SrsKafkaRequest::nb_bytes() + 2 + 4 + topics.nb_bytes(); -} - -srs_error_t SrsKafkaProducerRequest::encode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if ((err = SrsKafkaRequest::encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode request"); - } - - int nb_required = 2 + 4; - if (!buf->require(nb_required)) { - return srs_error_new(ERROR_KAFKA_CODEC_PRODUCER, "requires %d only %d bytes", nb_required, buf->left()); - } - buf->write_2bytes(required_acks); - buf->write_4bytes(timeout); - - if ((err = topics.encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode topics"); - } - - return err; -} - -srs_error_t SrsKafkaProducerRequest::decode(SrsBuffer* buf) -{ - srs_error_t err = srs_success; - - if ((err = SrsKafkaRequest::decode(buf)) != srs_success) { - return srs_error_wrap(err, "decode request"); - } - - int nb_required = 2 + 4; - if (!buf->require(nb_required)) { - return srs_error_new(ERROR_KAFKA_CODEC_PRODUCER, "requires %d only %d bytes", nb_required, buf->left()); - } - required_acks = buf->read_2bytes(); - timeout = buf->read_4bytes(); - - if ((err = topics.decode(buf)) != srs_success) { - return srs_error_wrap(err, "decode topics"); - } - - return err; -} - -SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::_instance = new SrsKafkaCorrelationPool(); - -SrsKafkaCorrelationPool* SrsKafkaCorrelationPool::instance() -{ - return _instance; -} - -SrsKafkaCorrelationPool::SrsKafkaCorrelationPool() -{ -} - -SrsKafkaCorrelationPool::~SrsKafkaCorrelationPool() -{ - correlation_ids.clear(); -} - -int32_t SrsKafkaCorrelationPool::generate_correlation_id() -{ - static int32_t cid = 1; - return cid++; -} - -SrsKafkaApiKey SrsKafkaCorrelationPool::set(int32_t correlation_id, SrsKafkaApiKey request) -{ - SrsKafkaApiKey previous = SrsKafkaApiKeyUnknown; - - std::map::iterator it = correlation_ids.find(correlation_id); - if (it != correlation_ids.end()) { - previous = it->second; - } - - correlation_ids[correlation_id] = request; - - return previous; -} - -SrsKafkaApiKey SrsKafkaCorrelationPool::unset(int32_t correlation_id) -{ - std::map::iterator it = correlation_ids.find(correlation_id); - - if (it != correlation_ids.end()) { - SrsKafkaApiKey key = it->second; - correlation_ids.erase(it); - return key; - } - - return SrsKafkaApiKeyUnknown; -} - -SrsKafkaApiKey SrsKafkaCorrelationPool::get(int32_t correlation_id) -{ - if (correlation_ids.find(correlation_id) == correlation_ids.end()) { - return SrsKafkaApiKeyUnknown; - } - - return correlation_ids[correlation_id]; -} - -SrsKafkaProtocol::SrsKafkaProtocol(ISrsProtocolReadWriter* io) -{ - skt = io; - reader = new SrsFastStream(); -} - -SrsKafkaProtocol::~SrsKafkaProtocol() -{ - srs_freep(reader); -} - -srs_error_t SrsKafkaProtocol::send_and_free_message(SrsKafkaRequest* msg) -{ - srs_error_t err = srs_success; - - // TODO: FIXME: refine for performance issue. - SrsAutoFree(SrsKafkaRequest, msg); - - int size = msg->nb_bytes(); - if (size <= 0) { - return err; - } - - // update the header of message. - msg->update_header(size); - - // cache the request correlation id to discovery response message. - SrsKafkaCorrelationPool* pool = SrsKafkaCorrelationPool::instance(); - pool->set(msg->correlation_id(), msg->api_key()); - - // TODO: FIXME: refine for performance issue. - char* bytes = new char[size]; - SrsAutoFreeA(char, bytes); - - // TODO: FIXME: refine for performance issue. - SrsBuffer* buf = new SrsBuffer(bytes, size); - SrsAutoFree(SrsBuffer, buf); - - if ((err = msg->encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode msg"); - } - - if ((err = skt->write(bytes, size, NULL)) != srs_success) { - return srs_error_wrap(err, "write msg"); - } - - return err; -} - -srs_error_t SrsKafkaProtocol::recv_message(SrsKafkaResponse** pmsg) -{ - *pmsg = NULL; - - srs_error_t err = srs_success; - - while (true) { - SrsKafkaResponseHeader header; - - // ensure enough bytes for response header. - if ((err = reader->grow(skt, header.nb_bytes())) != srs_success) { - return srs_error_wrap(err, "grow buffer"); - } - - // decode response header. - SrsBuffer* buf = new SrsBuffer(reader->bytes(), reader->size()); - SrsAutoFree(SrsBuffer, buf); - - if ((err = header.decode(buf)) != srs_success) { - return srs_error_wrap(err, "decode header"); - } - - // skip the used buffer for header. - buf->skip(-1 * buf->pos()); - - // fetch cached api key. - SrsKafkaCorrelationPool* pool = SrsKafkaCorrelationPool::instance(); - SrsKafkaApiKey key = pool->unset(header.correlation_id()); - srs_info("kafka got %d bytes response, key=%d", header.total_size(), header.correlation_id()); - - // create message by cached api key. - SrsKafkaResponse* res = NULL; - switch (key) { - case SrsKafkaApiKeyMetadataRequest: - srs_info("kafka got metadata response"); - res = new SrsKafkaTopicMetadataResponse(); - break; - case SrsKafkaApiKeyUnknown: - default: - break; - } - - // ensure enough bytes to decode message. - if ((err = reader->grow(skt, header.total_size())) != srs_success) { - srs_freep(res); - return srs_error_wrap(err, "grow buffer"); - } - - // dropped message, fetch next. - if (!res) { - reader->skip(header.total_size()); - srs_warn("kafka ignore unknown message, size=%d.", header.total_size()); - continue; - } - - // parse the whole message. - if ((err = res->decode(buf)) != srs_success) { - srs_freep(res); - return srs_error_wrap(err, "decode response"); - } - - *pmsg = res; - break; - } - - return err; -} - -SrsKafkaClient::SrsKafkaClient(ISrsProtocolReadWriter* io) -{ - protocol = new SrsKafkaProtocol(io); -} - -SrsKafkaClient::~SrsKafkaClient() -{ - srs_freep(protocol); -} - -srs_error_t SrsKafkaClient::fetch_metadata(string topic, SrsKafkaTopicMetadataResponse** pmsg) -{ - *pmsg = NULL; - - srs_error_t err = srs_success; - - SrsKafkaTopicMetadataRequest* req = new SrsKafkaTopicMetadataRequest(); - - req->add_topic(topic); - - if ((err = protocol->send_and_free_message(req)) != srs_success) { - return srs_error_wrap(err, "send request"); - } - - if ((err = protocol->expect_message(pmsg)) != srs_success) { - return srs_error_wrap(err, "expect message"); - } - - return err; -} - -srs_error_t SrsKafkaClient::write_messages(std::string topic, int32_t partition, vector& msgs) -{ - srs_error_t err = srs_success; - - SrsKafkaProducerRequest* req = new SrsKafkaProducerRequest(); - - // 0 the server will not send any response. - req->required_acks = 0; - // timeout of producer message. - req->timeout = SRS_KAFKA_PRODUCER_MESSAGE_TIMEOUT_MS; - - // create the topic and partition to write message to. - SrsKafkaProducerTopicMessages* topics = new SrsKafkaProducerTopicMessages(); - SrsKafkaProducerPartitionMessages* partitions = new SrsKafkaProducerPartitionMessages(); - - topics->partitions.append(partitions); - req->topics.append(topics); - - topics->topic_name.set_value(topic); - partitions->partition = partition; - - // convert json objects to kafka raw messages. - vector::iterator it; - for (it = msgs.begin(); it != msgs.end(); ++it) { - SrsJsonObject* obj = *it; - SrsKafkaRawMessage* msg = new SrsKafkaRawMessage(); - - if ((err = msg->create(obj)) != srs_success) { - srs_freep(msg); - srs_freep(req); - return srs_error_wrap(err, "create message"); - } - - partitions->messages.append(msg); - } - - partitions->message_set_size = partitions->messages.nb_bytes(); - - // write to kafka cluster. - if ((err = protocol->send_and_free_message(req)) != srs_success) { - return srs_error_wrap(err, "send request"); - } - - return err; -} - -vector srs_kafka_array2vector(SrsKafkaArray* arr) -{ - vector strs; - - for (int i = 0; i < arr->size(); i++) { - SrsKafkaString* elem = arr->at(i); - strs.push_back(elem->to_str()); - } - - return strs; -} - -vector srs_kafka_array2vector(SrsKafkaArray* arr) -{ - vector strs; - - for (int i = 0; i < arr->size(); i++) { - int32_t elem = arr->at(i); - strs.push_back(srs_int2str(elem)); - } - - return strs; -} - -#endif - diff --git a/trunk/src/protocol/srs_kafka_stack.hpp b/trunk/src/protocol/srs_kafka_stack.hpp deleted file mode 100644 index 8665d74fa6..0000000000 --- a/trunk/src/protocol/srs_kafka_stack.hpp +++ /dev/null @@ -1,932 +0,0 @@ -/** - * The MIT License (MIT) - * - * Copyright (c) 2013-2019 Winlin - * - * Permission is hereby granted, free of charge, to any person obtaining a copy of - * this software and associated documentation files (the "Software"), to deal in - * the Software without restriction, including without limitation the rights to - * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of - * the Software, and to permit persons to whom the Software is furnished to do so, - * subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS - * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR - * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER - * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - */ - -#ifndef SRS_PROTOCOL_KAFKA_HPP -#define SRS_PROTOCOL_KAFKA_HPP - -#include - -#include -#include -#include - -#include -#include -#include - -class SrsFastStream; -class ISrsProtocolReadWriter; -class SrsJsonObject; - -#ifdef SRS_AUTO_KAFKA - -/** - * the api key used to identify the request type. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys - */ -enum SrsKafkaApiKey -{ - SrsKafkaApiKeyUnknown = -1, - - SrsKafkaApiKeyProduceRequest = 0, - SrsKafkaApiKeyFetchRequest = 1, - SrsKafkaApiKeyOffsetRequest = 2, - SrsKafkaApiKeyMetadataRequest = 3, - /* Non-user facing control APIs 4-7 */ - SrsKafkaApiKeyOffsetCommitRequest = 8, - SrsKafkaApiKeyOffsetFetchRequest = 9, - SrsKafkaApiKeyConsumerMetadataRequest = 10, -}; - -/** - * These types consist of a signed integer giving a length N followed by N bytes of content. - * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes - */ -class SrsKafkaString : public ISrsCodec -{ -private: - int16_t _size; - char* data; -public: - SrsKafkaString(); - SrsKafkaString(std::string v); - virtual ~SrsKafkaString(); -public: - virtual bool null(); - virtual bool empty(); - virtual std::string to_str(); - virtual void set_value(std::string v); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * These types consist of a signed integer giving a length N followed by N bytes of content. - * A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProtocolPrimitiveTypes - */ -class SrsKafkaBytes : public ISrsCodec -{ -private: - int32_t _size; - char* _data; -public: - SrsKafkaBytes(); - SrsKafkaBytes(const char* v, int nb_v); - virtual ~SrsKafkaBytes(); -public: - virtual char* data(); - virtual int size(); - virtual bool null(); - virtual bool empty(); - virtual void set_value(std::string v); - virtual void set_value(const char* v, int nb_v); - virtual uint32_t crc32(uint32_t previous); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * This is a notation for handling repeated structures. These will always be encoded as an - * int32 size containing the length N followed by N repetitions of the structure which can - * itself be made up of other primitive types. In the BNF grammars below we will show an - * array of a structure foo as [foo]. - * - * Usage: - * SrsKafkaArray body; - * body.append(new SrsKafkaBytes()); - * @remark array elem is the T*, which must be ISrsCodec* - * - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests - */ -template -class SrsKafkaArray : public ISrsCodec -{ -private: - int32_t length; - std::vector elems; - typedef typename std::vector::iterator SrsIterator; -public: - SrsKafkaArray() - { - length = 0; - } - virtual ~SrsKafkaArray() - { - for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { - T* elem = *it; - srs_freep(elem); - } - elems.clear(); - } -public: - virtual void append(T* elem) - { - length++; - elems.push_back(elem); - } - virtual int size() - { - return length; - } - virtual bool empty() - { - return elems.empty(); - } - virtual T* at(int index) - { - return elems.at(index); - } -// Interface ISrsCodec -public: - virtual int nb_bytes() - { - int s = 4; - - for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { - T* elem = *it; - s += elem->nb_bytes(); - } - - return s; - } - virtual srs_error_t encode(SrsBuffer* buf) - { - srs_error_t err = srs_success; - - if (!buf->require(4)) { - return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left()); - } - buf->write_4bytes(length); - - for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { - T* elem = *it; - if ((err = elem->encode(buf)) != srs_success) { - return srs_error_wrap(err, "encode elem"); - } - } - - return err; - } - virtual srs_error_t decode(SrsBuffer* buf) - { - srs_error_t err = srs_success; - - if (!buf->require(4)) { - return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left()); - } - length = buf->read_4bytes(); - - for (int i = 0; i < length; i++) { - T* elem = new T(); - if ((err = elem->decode(buf)) != srs_success) { - srs_freep(elem); - return srs_error_wrap(err, "decode elem"); - } - - elems.push_back(elem); - } - - return err; - } -}; -template<> -class SrsKafkaArray : public ISrsCodec -{ -private: - int32_t length; - std::vector elems; - typedef std::vector::iterator SrsIterator; -public: - SrsKafkaArray() - { - length = 0; - } - virtual ~SrsKafkaArray() - { - elems.clear(); - } -public: - virtual void append(int32_t elem) - { - length++; - elems.push_back(elem); - } - virtual int size() - { - return length; - } - virtual bool empty() - { - return elems.empty(); - } - virtual int32_t at(int index) - { - return elems.at(index); - } -// Interface ISrsCodec -public: - virtual int nb_bytes() - { - return 4 + 4 * (int)elems.size(); - } - virtual srs_error_t encode(SrsBuffer* buf) - { - srs_error_t err = srs_success; - - int nb_required = 4 + sizeof(int32_t) * (int)elems.size(); - if (!buf->require(nb_required)) { - return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires %d only %d bytes", nb_required, buf->left()); - } - buf->write_4bytes(length); - - for (SrsIterator it = elems.begin(); it != elems.end(); ++it) { - int32_t elem = *it; - buf->write_4bytes(elem); - } - - return err; - } - virtual srs_error_t decode(SrsBuffer* buf) - { - srs_error_t err = srs_success; - - if (!buf->require(4)) { - return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires 4 only %d bytes", buf->left()); - } - length = buf->read_4bytes(); - - for (int i = 0; i < length; i++) { - if (!buf->require(sizeof(int32_t))) { - return srs_error_new(ERROR_KAFKA_CODEC_ARRAY, "requires %d only %d bytes", sizeof(int32_t), buf->left()); - - } - - int32_t elem = buf->read_4bytes(); - elems.push_back(elem); - } - - return err; - } -}; - -/** - * the header of request, includes the size of request. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests - */ -class SrsKafkaRequestHeader : public ISrsCodec -{ -private: - /** - * The MessageSize field gives the size of the subsequent request or response - * message in bytes. The client can read requests by first reading this 4 byte - * size as an integer N, and then reading and parsing the subsequent N bytes - * of the request. - */ - int32_t _size; -private: - /** - * This is a numeric id for the API being invoked (i.e. is it - * a metadata request, a produce request, a fetch request, etc). - * @remark MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest - */ - int16_t _api_key; - /** - * This is a numeric version number for this api. We version each API and - * this version number allows the server to properly interpret the request - * as the protocol evolves. Responses will always be in the format corresponding - * to the request version. Currently the supported version for all APIs is 0. - */ - int16_t api_version; - /** - * This is a user-supplied integer. It will be passed back in - * the response by the server, unmodified. It is useful for matching - * request and response between the client and server. - */ - int32_t _correlation_id; - /** - * This is a user supplied identifier for the client application. - * The user can use any identifier they like and it will be used - * when logging errors, monitoring aggregates, etc. For example, - * one might want to monitor not just the requests per second overall, - * but the number coming from each client application (each of - * which could reside on multiple servers). This id acts as a - * logical grouping across all requests from a particular client. - */ - SrsKafkaString* client_id; -public: - SrsKafkaRequestHeader(); - virtual ~SrsKafkaRequestHeader(); -private: - /** - * the layout of request: - * +-----------+----------------------------------+ - * | 4B _size | [_size] bytes | - * +-----------+------------+---------------------+ - * | 4B _size | header | message | - * +-----------+------------+---------------------+ - * | total size = 4 + header + message | - * +----------------------------------------------+ - * where the header is specifies this request header without the start 4B size. - * @remark size = 4 + header + message. - */ - virtual int header_size(); - /** - * the size of message, the bytes left after the header. - */ - virtual int message_size(); - /** - * the total size of the request, includes the 4B size. - */ - virtual int total_size(); -public: - /** - * when got the whole message size, update the header. - * @param s the whole message, including the 4 bytes size size. - */ - virtual void set_total_size(int s); - /** - * get the correlation id for message. - */ - virtual int32_t correlation_id(); - /** - * set the correlation id for message. - */ - virtual void set_correlation_id(int32_t cid); - /** - * get the api key of header for message. - */ - virtual SrsKafkaApiKey api_key(); - /** - * set the api key of header for message. - */ - virtual void set_api_key(SrsKafkaApiKey key); -public: - /** - * the api key enumeration. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ApiKeys - */ - virtual bool is_producer_request(); - virtual bool is_fetch_request(); - virtual bool is_offset_request(); - virtual bool is_metadata_request(); - virtual bool is_offset_commit_request(); - virtual bool is_offset_fetch_request(); - virtual bool is_consumer_metadata_request(); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * the header of response, include the size of response. - * The response will always match the paired request (e.g. we will - * send a MetadataResponse in return to a MetadataRequest). - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Responses - */ -class SrsKafkaResponseHeader : public ISrsCodec -{ -private: - /** - * The MessageSize field gives the size of the subsequent request or response - * message in bytes. The client can read requests by first reading this 4 byte - * size as an integer N, and then reading and parsing the subsequent N bytes - * of the request. - */ - int32_t _size; -private: - /** - * This is a user-supplied integer. It will be passed back in - * the response by the server, unmodified. It is useful for matching - * request and response between the client and server. - */ - int32_t _correlation_id; -public: - SrsKafkaResponseHeader(); - virtual ~SrsKafkaResponseHeader(); -private: - /** - * the layout of response: - * +-----------+----------------------------------+ - * | 4B _size | [_size] bytes | - * +-----------+------------+---------------------+ - * | 4B _size | 4B header | message | - * +-----------+------------+---------------------+ - * | total size = 4 + 4 + message | - * +----------------------------------------------+ - * where the header is specifies this request header without the start 4B size. - * @remark size = 4 + 4 + message. - */ - virtual int header_size(); - /** - * the size of message, the bytes left after the header. - */ - virtual int message_size(); -public: - /** - * the total size of the request, includes the 4B size and message body. - */ - virtual int total_size(); -public: - /** - * when got the whole message size, update the header. - * @param s the whole message, including the 4 bytes size size. - */ - virtual void set_total_size(int s); - /** - * get the correlation id of response message. - */ - virtual int32_t correlation_id(); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * the kafka message in message set. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets - */ -struct SrsKafkaRawMessage : public ISrsCodec -{ - // metadata. -public: - /** - * This is the offset used in kafka as the log sequence number. When the - * producer is sending messages it doesn't actually know the offset and - * can fill in any value here it likes. - */ - int64_t offset; - /** - * the size of this message. - */ - int32_t message_size; - // message. -public: - /** - * The CRC is the CRC32 of the remainder of the message bytes. - * This is used to check the integrity of the message on the broker and consumer. - */ - int32_t crc; - /** - * This is a version id used to allow backwards compatible evolution - * of the message binary format. The current value is 0. - */ - int8_t magic_byte; - /** - * This byte holds metadata attributes about the message. - * The lowest 2 bits contain the compression codec used - * for the message. The other bits should be set to 0. - */ - int8_t attributes; - /** - * The key is an optional message key that was used for - * partition assignment. The key can be null. - */ - SrsKafkaBytes* key; - /** - * The value is the actual message contents as an opaque byte array. - * Kafka supports recursive messages in which case this may itself - * contain a message set. The message can be null. - */ - SrsKafkaBytes* value; -public: - SrsKafkaRawMessage(); - virtual ~SrsKafkaRawMessage(); -public: - /** - * create message from json object. - */ - virtual srs_error_t create(SrsJsonObject* obj); -private: - /** - * get the raw message, bytes after the message_size. - */ - virtual int raw_message_size(); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * a set of kafka message. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets - * @remark because the message set are not preceded by int32, so we decode the buffer util empty. - */ -class SrsKafkaRawMessageSet : public ISrsCodec -{ -private: - std::vector messages; -public: - SrsKafkaRawMessageSet(); - virtual ~SrsKafkaRawMessageSet(); -public: - virtual void append(SrsKafkaRawMessage* msg); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * the kafka request message, for protocol to send. - */ -class SrsKafkaRequest : public ISrsCodec -{ -protected: - SrsKafkaRequestHeader header; -public: - SrsKafkaRequest(); - virtual ~SrsKafkaRequest(); -public: - /** - * update the size in header. - * @param s an int value specifies the size of message in header. - */ - virtual void update_header(int s); - /** - * get the correlation id of header for message. - */ - virtual int32_t correlation_id(); - /** - * get the api key of request. - */ - virtual SrsKafkaApiKey api_key(); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * the kafka response message, for protocol to recv. - */ -class SrsKafkaResponse : public ISrsCodec -{ -protected: - SrsKafkaResponseHeader header; -public: - SrsKafkaResponse(); - virtual ~SrsKafkaResponse(); -public: - /** - * update the size in header. - * @param s an int value specifies the size of message in header. - */ - virtual void update_header(int s); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * request the metadata from broker. - * This API answers the following questions: - * What topics exist? - * How many partitions does each topic have? - * Which broker is currently the leader for each partition? - * What is the host and port for each of these brokers? - * This is the only request that can be addressed to any broker in the cluster. - * - * Since there may be many topics the client can give an optional list of topic - * names in order to only return metadata for a subset of topics. - * - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI - */ -class SrsKafkaTopicMetadataRequest : public SrsKafkaRequest -{ -private: - SrsKafkaArray topics; -public: - SrsKafkaTopicMetadataRequest(); - virtual ~SrsKafkaTopicMetadataRequest(); -public: - virtual void add_topic(std::string topic); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * the metadata response data. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse - */ -struct SrsKafkaBroker : public ISrsCodec -{ -public: - int32_t node_id; - SrsKafkaString host; - int32_t port; -public: - SrsKafkaBroker(); - virtual ~SrsKafkaBroker(); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; -struct SrsKafkaPartitionMetadata : public ISrsCodec -{ -public: - int16_t error_code; - int32_t partition_id; - int32_t leader; - SrsKafkaArray replicas; - SrsKafkaArray isr; -public: - SrsKafkaPartitionMetadata(); - virtual ~SrsKafkaPartitionMetadata(); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; -struct SrsKafkaTopicMetadata : public ISrsCodec -{ -public: - int16_t error_code; - SrsKafkaString name; - SrsKafkaArray metadatas; -public: - SrsKafkaTopicMetadata(); - virtual ~SrsKafkaTopicMetadata(); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * response for the metadata request from broker. - * The response contains metadata for each partition, - * with partitions grouped together by topic. This - * metadata refers to brokers by their broker id. - * The brokers each have a host and port. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataResponse - */ -class SrsKafkaTopicMetadataResponse : public SrsKafkaResponse -{ -public: - SrsKafkaArray brokers; - SrsKafkaArray metadatas; -public: - SrsKafkaTopicMetadataResponse(); - virtual ~SrsKafkaTopicMetadataResponse(); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - - -/** - * the messages for producer to send. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest - */ -struct SrsKafkaProducerPartitionMessages : public ISrsCodec -{ -public: - /** - * The partition that data is being published to. - */ - int32_t partition; - /** - * The size, in bytes, of the message set that follows. - */ - int32_t message_set_size; - /** - * messages in set. - */ - SrsKafkaRawMessageSet messages; -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; -struct SrsKafkaProducerTopicMessages : public ISrsCodec -{ -public: - /** - * The topic that data is being published to. - */ - SrsKafkaString topic_name; - /** - * messages of partitions. - */ - SrsKafkaArray partitions; -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * the request for producer to send message. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest - */ -class SrsKafkaProducerRequest : public SrsKafkaRequest -{ -public: - /** - * This field indicates how many acknowledgements the servers should receive - * before responding to the request. If it is 0 the server will not send any - * response (this is the only case where the server will not reply to a request). - * If it is 1, the server will wait the data is written to the local log - * before sending a response. If it is -1 the server will block until the - * message is committed by all in sync replicas before sending a response. - * For any number > 1 the server will block waiting for this number of - * acknowledgements to occur (but the server will never wait for more - * acknowledgements than there are in-sync replicas). - */ - int16_t required_acks; - /** - * This provides a maximum time in milliseconds the server can await the receipt - * of the number of acknowledgements in RequiredAcks. The timeout is not an exact - * limit on the request time for a few reasons: (1) it does not include network - * latency, (2) the timer begins at the beginning of the processing of this request - * so if many requests are queued due to server overload that wait time will not - * be included, (3) we will not terminate a local write so if the local write - * time exceeds this timeout it will not be respected. To get a hard timeout of - * this type the client should use the socket timeout. - */ - int32_t timeout; - /** - * messages of topics. - */ - SrsKafkaArray topics; -public: - SrsKafkaProducerRequest(); - virtual ~SrsKafkaProducerRequest(); -// Interface ISrsCodec -public: - virtual int nb_bytes(); - virtual srs_error_t encode(SrsBuffer* buf); - virtual srs_error_t decode(SrsBuffer* buf); -}; - -/** - * the poll to discovery reponse. - * @param CorrelationId This is a user-supplied integer. It will be passed back - * in the response by the server, unmodified. It is useful for matching - * request and response between the client and server. - * @see https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Requests - */ -class SrsKafkaCorrelationPool -{ -private: - static SrsKafkaCorrelationPool* _instance; -public: - static SrsKafkaCorrelationPool* instance(); -private: - std::map correlation_ids; -private: - SrsKafkaCorrelationPool(); -public: - virtual ~SrsKafkaCorrelationPool(); -public: - /** - * generate a global correlation id. - */ - virtual int32_t generate_correlation_id(); - /** - * set the correlation id to specified request key. - */ - virtual SrsKafkaApiKey set(int32_t correlation_id, SrsKafkaApiKey request); - /** - * unset the correlation id. - * @return the previous api key; unknown if not set. - */ - virtual SrsKafkaApiKey unset(int32_t correlation_id); - /** - * get the key by specified correlation id. - * @return the specified api key; unknown if no correlation id. - */ - virtual SrsKafkaApiKey get(int32_t correlation_id); -}; - -/** - * the kafka protocol stack, use to send and recv kakfa messages. - */ -class SrsKafkaProtocol -{ -private: - ISrsProtocolReadWriter* skt; - SrsFastStream* reader; -public: - SrsKafkaProtocol(ISrsProtocolReadWriter* io); - virtual ~SrsKafkaProtocol(); -public: - /** - * write the message to kafka server. - * @param msg the msg to send. user must not free it again. - */ - virtual srs_error_t send_and_free_message(SrsKafkaRequest* msg); - /** - * read the message from kafka server. - * @param pmsg output the received message. user must free it. - */ - virtual srs_error_t recv_message(SrsKafkaResponse** pmsg); -public: - /** - * expect specified message. - */ - template - srs_error_t expect_message(T** pmsg) - { - srs_error_t err = srs_success; - - while (true) { - SrsKafkaResponse* res = NULL; - if ((err = recv_message(&res)) != srs_success) { - return srs_error_wrap(err, "recv message"); - } - - // drop not matched. - T* msg = dynamic_cast(res); - if (!msg) { - srs_info("kafka drop response."); - srs_freep(res); - continue; - } - - *pmsg = msg; - break; - } - - return err; - } -}; - -/** - * the kafka client, for producer or consumer. - */ -class SrsKafkaClient -{ -private: - SrsKafkaProtocol* protocol; -public: - SrsKafkaClient(ISrsProtocolReadWriter* io); - virtual ~SrsKafkaClient(); -public: - /** - * fetch the metadata from broker for topic. - */ - virtual srs_error_t fetch_metadata(std::string topic, SrsKafkaTopicMetadataResponse** pmsg); - /** - * write the messages to partition of topic. - */ - virtual srs_error_t write_messages(std::string topic, int32_t partition, std::vector& msgs); -}; - -// convert kafka array[string] to vector[string] -extern std::vector srs_kafka_array2vector(SrsKafkaArray* arr); -extern std::vector srs_kafka_array2vector(SrsKafkaArray* arr); - -#endif - -#endif -