针对kafka_2.13版本测试过程中的一些坑
2020-05-23 16:04:27来源:博客园 阅读 ()
针对kafka_2.13版本测试过程中的一些坑
声明:这是在windows10上进行kafka_2.13demo搭建时的过程记录,提供给同学们参考。
1.jdk先要装一下。
2.先安装zookeeper,这里不赘述,贴一个链接 https://blog.csdn.net/ring300/article/details/80446918。记得测试一下zookeeper是否正确安装。
3.下载安装kafka_2.13。在这里下载https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/,并解压到你觉得OK的目录下。
自己安装的kafka最好检查一下配置文件中的参数(server.properties)。1.zookeeper.connect=localhost:2181 2.log.dirs=D:\\kafka_2.13-2.5.0\\kafka-logs (后面的地址就是放置日志的地方,可以自己先在目录下新建,可以看上~上一张图)。
4.开始启动服务。
这里需要说明一下,不想cmd到文件目录下的话,请在需要打开运行窗口的地方按住 shift 然后右键 在弹出的窗口上选择 在此处打开powershell 。
4.1先启动 zookeeper,在安装目录下的bin里直接点击zkserver.cmd 启动比较省事
4.2启动kafka服务。
在kafka的安装目录下直接通过(shift 然后右键 在弹出的窗口上选择 在此处打开powershell )打开powershell。然后输入
bin\windows\kafka-server-start.bat config\server.properties
回车 就可以启动服务。
4.3.创建一个topic 命名test(随意点就行)(shift 然后右键 在弹出的窗口上选择 在此处打开powershell )打开powershell,输入下面的命令 回车。
创建topic: bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
创建好之后,查看现有的topic: bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
4.4生产消息和消费消息。
打开shell 后 输入 bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test 生产消息
打开shell后输入 bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning 消费消息
(稍微等个几秒钟,有点慢)
到此就已经结束了整个test的 测试工作,接下来我们用java代码调一下这里的服务。
pom : "<dependencies></dependencies>"已经有了的话就去掉。
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
</dependencies>
1.生产
package com.test.kfserver;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class Producer {
public static String topic = "duanjt_test";//定义主题
public static void main(String[] args) throws InterruptedException {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");//kafka地址,多个地址用逗号分割
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
try {
while (true) {
String msg = "Hello," + new Random().nextInt(100);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
kafkaProducer.send(record);
System.out.println("消息发送成功:" + msg);
Thread.sleep(500);
}
} finally {
kafkaProducer.close();
}
}
}
2. 消费
package com.test.kfserver;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
public class Consumer {
public static void main(String[] args) {
Properties p = new Properties();
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
record.topic(), record.offset(), record.value()));
}
}
}
}
到这里我们的测试就算高一段落了,其实最新版的kafka里已经自带了zk 但是,如果用自己的zk ,只要是新版的kafka 就会报错
zookeeper is not a recognized option
意思就是没有zookeeper
这个参数
参考:https://www.cnblogs.com/duanjt/p/10132116.html
原文链接:https://www.cnblogs.com/volatile0509/p/12944163.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
上一篇:redis的缓存穿透、雪崩、击穿
- 数据源管理 | Kafka集群环境搭建,消息存储机制详解 2020-06-11
- kafka 2020-06-09
- 高吞吐量的分布式发布订阅消息系统Kafka之Producer源码分析 2020-05-30
- Spring拦截器WebMvcConfigurer针对Swagger的拦截问题 2020-05-21
- SpringBoot 和 Kafka集群案例详解,面试必学 2020-05-13
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash