#include "../utils/TestUtility.h" #include "kafka/AdminClient.h" #include "kafka/KafkaConsumer.h" #include "kafka/KafkaProducer.h" #include "gtest/gtest.h" #include TEST(KafkaProducer, SendMessagesWithAcks1) { // Prepare messages to test const std::vector> messages = { {"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"}, }; const kafka::Topic topic = kafka::utility::getRandomString(); const kafka::Partition partition = 0; KafkaTestUtility::CreateKafkaTopic(topic, 5, 3); // Properties for the producer const auto props = KafkaTestUtility::GetKafkaClientCommonConfig().put(kafka::clients::producer::Config::ACKS, "1"); // Sync-send producer kafka::clients::KafkaProducer producer(props); // Send messages for (const auto& msg: messages) { auto record = kafka::clients::producer::ProducerRecord(topic, partition, kafka::Key(msg.first.c_str(), msg.first.size()), kafka::Value(msg.second.c_str(), msg.second.size())); std::cout << "[" <(records[i].key().data()), records[i].key().size())); EXPECT_EQ(messages[i].second, std::string(static_cast(records[i].value().data()), records[i].value().size())); } } TEST(KafkaProducer, SendMessagesWithAcksAll) { // Prepare messages to test const std::vector> messages = { {"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"}, }; const kafka::Topic topic = kafka::utility::getRandomString(); const kafka::Partition partition = 0; KafkaTestUtility::CreateKafkaTopic(topic, 5, 3); // Properties for the producer const auto props = KafkaTestUtility::GetKafkaClientCommonConfig().put(kafka::clients::producer::Config::ACKS, "all"); // Async-send producer kafka::clients::KafkaProducer producer(props); // Send messages for (const auto& msg: messages) { auto record = kafka::clients::producer::ProducerRecord(topic, partition, kafka::Key(msg.first.c_str(), msg.first.size()), kafka::Value(msg.second.c_str(), msg.second.size())); std::cout << "[" <(records[i].key().data()), records[i].key().size())); EXPECT_EQ(messages[i].second, std::string(static_cast(records[i].value().data()), records[i].value().size())); } } TEST(KafkaProducer, FailToSendMessagesWithAcksAll) { // Prepare messages to test const std::vector> messages = { {"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"}, }; // Create a topic (replication factor is 1) with AdminClient const kafka::Topic topic = kafka::utility::getRandomString(); const int numPartitions = 5; const int replicationFactor = 1; KafkaTestUtility::CreateKafkaTopic(topic, numPartitions, replicationFactor); // Properties for the producer const auto props = KafkaTestUtility::GetKafkaClientCommonConfig() .put(kafka::clients::producer::Config::ACKS, "all") .put(kafka::clients::producer::Config::MESSAGE_TIMEOUT_MS, "5000"); // To shorten the test // Async-send producer kafka::clients::KafkaProducer producer(props); if (auto brokerMetadata = producer.fetchBrokerMetadata(topic)) { std::cout << brokerMetadata->toString() << std::endl; } // Send messages for (const auto& msg: messages) { auto record = kafka::clients::producer::ProducerRecord(topic, kafka::Key(msg.first.c_str(), msg.first.size()), kafka::Value(msg.second.c_str(), msg.second.size())); std::cout << "[" < partitionCounts; static constexpr int MSG_NUM = 20; for (int i = 0; i < MSG_NUM; ++i) { std::string key = "k" + std::to_string(i); std::string value = "v" + std::to_string(i); auto record = kafka::clients::producer::ProducerRecord(topic, kafka::Key(key.c_str(), key.size()), kafka::Value(value.c_str(), value.size())); auto metadata = producer.syncSend(record); partitionCounts[metadata.partition()]++; } // Not all be sent to the same paritition for (const auto& count: partitionCounts) EXPECT_NE(MSG_NUM, count.second); } TEST(KafkaProducer, TryOtherPartitioners) { const kafka::Topic topic = kafka::utility::getRandomString(); KafkaTestUtility::CreateKafkaTopic(topic, 5, 3); // Try another "partitioner" instead of the default one { auto props = KafkaTestUtility::GetKafkaClientCommonConfig(); // Partitioner "murmur2": if with no available key, all these records would be partitioned to the same partition props.put(kafka::clients::producer::Config::PARTITIONER, "murmur2"); kafka::clients::KafkaProducer producer(props); std::map partitionCounts; static constexpr int MSG_NUM = 20; for (int i = 0; i < MSG_NUM; ++i) { std::string key; std::string value = "v" + std::to_string(i); auto record = kafka::clients::producer::ProducerRecord(topic, kafka::Key(key.c_str(), key.size()), kafka::Value(value.c_str(), value.size())); auto metadata = producer.syncSend(record); std::cout << metadata.toString() << std::endl; partitionCounts[metadata.partition()]++; } // All were hashed to the same paritition EXPECT_EQ(1, partitionCounts.size()); EXPECT_TRUE(std::all_of(partitionCounts.cbegin(), partitionCounts.cend(), [](const auto& count) {return count.second == MSG_NUM; })); } { auto props = KafkaTestUtility::GetKafkaClientCommonConfig(); // An invalid partitioner props.put(kafka::clients::producer::Config::PARTITIONER, "invalid"); // An exception would be thrown for invalid "partitioner" setting EXPECT_KAFKA_THROW(kafka::clients::KafkaProducer producer(props), RD_KAFKA_RESP_ERR__INVALID_ARG); } } TEST(KafkaProducer, RecordWithEmptyOrNullFields) { auto sendMessages = [](const kafka::clients::producer::ProducerRecord& record, std::size_t repeat, const std::string& partitioner) { kafka::clients::KafkaProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig() .put(kafka::clients::producer::Config::PARTITIONER, partitioner)); producer.setLogLevel(kafka::Log::Level::Crit); for (std::size_t i = 0; i < repeat; ++i) { producer.syncSend(record); } }; enum class FieldType { Empty, Null }; auto runTest = [sendMessages](FieldType fieldType, const std::string& partitioner, bool expectRandomlyPartitioned) { KafkaTestUtility::PrintDividingLine("Run test for partitioner[" + partitioner + "], with " + (fieldType == FieldType::Empty ? "EmptyField" : "NullField")); const kafka::Topic topic = kafka::utility::getRandomString(); KafkaTestUtility::CreateKafkaTopic(topic, 5, 3); const std::string emptyStr{}; const auto emptyField = kafka::ConstBuffer(emptyStr.c_str(), emptyStr.size()); auto producerRecord = (fieldType == FieldType::Empty ? kafka::clients::producer::ProducerRecord(topic, emptyField, emptyField) : kafka::clients::producer::ProducerRecord(topic, kafka::NullKey, kafka::NullValue)); sendMessages(producerRecord, 10, partitioner); // The auto-commit consumer kafka::clients::KafkaConsumer consumer(KafkaTestUtility::GetKafkaClientCommonConfig() .put(kafka::clients::consumer::Config::AUTO_OFFSET_RESET, "earliest")); // Subscribe topics consumer.subscribe({topic}); // Poll all messages auto records = KafkaTestUtility::ConsumeMessagesUntilTimeout(consumer); // Check the key/value (empty or null) std::map counts; for (const auto& record: records) { if (fieldType == FieldType::Empty) { EXPECT_TRUE(record.key().size() == 0); EXPECT_TRUE(record.value().size() == 0); EXPECT_TRUE(record.key().data() != nullptr); EXPECT_TRUE(record.value().data() != nullptr); } else { EXPECT_TRUE(record.key().size() == 0); EXPECT_TRUE(record.value().size() == 0); EXPECT_TRUE(record.key().data() == nullptr); EXPECT_TRUE(record.value().data() == nullptr); } counts[record.partition()] += 1; } // Should be hashed to the same partition? if (expectRandomlyPartitioned) { EXPECT_TRUE(counts.size() > 1); } else { EXPECT_TRUE(counts.size() == 1); } }; runTest(FieldType::Null, "consistent_random", true); runTest(FieldType::Empty, "consistent_random", true); runTest(FieldType::Null, "murmur2_random", true); runTest(FieldType::Empty, "murmur2_random", false); // empty keys are mapped to a single partition runTest(FieldType::Null, "fnv1a_random", true); runTest(FieldType::Empty, "fnv1a_random", false); // empty keys are mapped to a single partition runTest(FieldType::Null, "consistent", false); runTest(FieldType::Empty, "consistent", false); } TEST(KafkaProducer, ThreadCount) { { kafka::clients::KafkaProducer producer(KafkaTestUtility::GetKafkaClientCommonConfig()); std::cout << "[" <> messages = { {"key1", "value1", 1}, {"key2", "value2", 2}, {"key3", "value3", 3}, }; const kafka::Topic topic = kafka::utility::getRandomString(); const kafka::Partition partition = 0; KafkaTestUtility::CreateKafkaTopic(topic, 5, 3); // Properties for the producer std::set msgIdsSent; // Delivery callback kafka::clients::producer::Callback drCallback = [&msgIdsSent, topic, partition](const kafka::clients::producer::RecordMetadata& metadata, const kafka::Error& error) { std::cout << "[" <(msg).c_str(), std::get<0>(msg).size()), kafka::Value(std::get<1>(msg).c_str(), std::get<1>(msg).size()), std::get<2>(msg)); std::cout << "[" <(msg); EXPECT_NE(msgIdsSent.end(), msgIdsSent.find(id)); } } TEST(KafkaProducer, DeliveryCallback_ManuallyPollEvents) { // Prepare messages to test const std::vector> messages = { {"key1", "value1", 1}, {"key2", "value2", 2}, {"key3", "value3", 3}, }; const kafka::Topic topic = kafka::utility::getRandomString(); const kafka::Partition partition = 0; KafkaTestUtility::CreateKafkaTopic(topic, 5, 3); // Properties for the producer std::set msgIdsSent; // Delivery callback kafka::clients::producer::Callback drCallback = [&msgIdsSent, topic, partition, appThreadId = std::this_thread::get_id()] (const kafka::clients::producer::RecordMetadata& metadata, const kafka::Error& error) { std::cout << "[" <(msg).c_str(), std::get<0>(msg).size()), kafka::Value(std::get<1>(msg).c_str(), std::get<1>(msg).size()), std::get<2>(msg)); std::cout << "[" <(msg); EXPECT_NE(msgIdsSent.end(), msgIdsSent.find(id)); } } TEST(KafkaProducer, NoBlockSendingWhileQueueIsFull_ManuallyPollEvents) { const kafka::Topic topic = kafka::utility::getRandomString(); KafkaTestUtility::CreateKafkaTopic(topic, 5, 3); const auto appThreadId = std::this_thread::get_id(); int msgSentCnt = 0; kafka::clients::producer::Callback drCallback = [&msgSentCnt, appThreadId](const kafka::clients::producer::RecordMetadata& metadata, const kafka::Error& error) { EXPECT_EQ(std::this_thread::get_id(), appThreadId); // It should be polled by the same thread EXPECT_FALSE(error); std::cout << "[" <> messages = { {"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"}, }; auto record = kafka::clients::producer::ProducerRecord(topic, kafka::NullKey, kafka::NullValue); // To send the 1st message, should succeed record.setKey(kafka::Key(messages[0].first.c_str(), messages[0].first.size())); record.setValue(kafka::Value(messages[0].second.c_str(), messages[0].second.size())); std::cout << "[" <