#include "../utils/TestUtility.h" #include "kafka/KafkaConsumer.h" #include "kafka/addons/KafkaRecoverableProducer.h" #include "gtest/gtest.h" TEST(KafkaRecoverableProducer, SendMessages) { // Prepare messages to test const std::vector> messages = { {"key1", "value1"}, {"key2", "value2"}, {"key3", "value3"}, }; const kafka::Topic topic = kafka::utility::getRandomString(); const kafka::Partition partition = 0; KafkaTestUtility::CreateKafkaTopic(topic, 5, 3); { // Properties for the producer const auto props = KafkaTestUtility::GetKafkaClientCommonConfig().put(kafka::clients::producer::Config::ACKS, "all"); // Recoverable producer kafka::clients::KafkaRecoverableProducer producer(props); // Send messages kafka::clients::producer::ProducerRecord::Id id = 0; for (const auto& msg: messages) { auto record = kafka::clients::producer::ProducerRecord(topic, partition, kafka::Key(msg.first.c_str(), msg.first.size()), kafka::Value(msg.second.c_str(), msg.second.size()), id++); std::cout << "[" <(records[i].key().data()), records[i].key().size())); EXPECT_EQ(messages[i/2].second, std::string(static_cast(records[i].value().data()), records[i].value().size())); } } } #ifdef KAFKA_API_ENABLE_UNIT_TEST_STUBS TEST(KafkaRecoverableProducer, MockFatalError) { const kafka::Topic topic = kafka::utility::getRandomString(); const kafka::Partition partition = 0; KafkaTestUtility::CreateKafkaTopic(topic, 5, 3); // Properties for the producer const auto props = KafkaTestUtility::GetKafkaClientCommonConfig(); // Recoverable producer kafka::clients::KafkaRecoverableProducer producer(props); // Prepare messages to send static constexpr int MSG_NUM = 50; std::mutex messagesMutex; std::list messagesToSend; for (std::size_t i = 0; i < MSG_NUM; ++i) messagesToSend.push_back(i); int sendCount = 0; int deliveryCount = 0; while (deliveryCount < sendCount || !messagesToSend.empty()) { if (!messagesToSend.empty()) { auto toSend = messagesToSend.front(); { std::lock_guard lock(messagesMutex); messagesToSend.pop_front(); } std::shared_ptr payload = std::make_shared(std::to_string(toSend)); auto record = kafka::clients::producer::ProducerRecord(topic, partition, kafka::NullKey, kafka::Value(payload->c_str(), payload->size()), toSend); std::cout << "[" < lock(messagesMutex); messagesToSend.push_front(static_cast(std::stoi(*payload))); } ++deliveryCount; // Mock fatal error if (deliveryCount % 11 == 0) { std::cout << "[" < countMap; for (const auto& record: records) { std::string payload(static_cast(record.value().data()), record.value().size()); ++countMap[static_cast(std::stoi(payload))]; } for (std::size_t i = 0; i < MSG_NUM; ++i) { auto id = static_cast(i); auto count = countMap[id]; EXPECT_TRUE(count > 0); if (count > 1) std::cout << "Message " << id << " was duplicated! count[" << count << "]" << std::endl; } } #endif