初识kafka
文章目录kafka的数据结构kafka_2.11-2.1.1.tgz此版本的-前后两个版本的含义数据操作命令创建主题查看所有topic列表查看指定topic信息删除主题查看消费者组查看消费者组的详细信息删除消费者组控制台向topic生产数据控制台消费topic的数据根据 Partition Offset 查询topic数据查看controller修改指定topic的消费者组的消费索引查看kafka版本消息队列的两种模式点对点模式发布订阅模式概述使用消息队列的好处安装config目录下的server.propertiesbin目录下的常用命令配置修改server.propertieslog4J的日志路径修改异常处理kafka的数据结构ProducerProducer即生产者消息的产生者是消息的入口。kafka clusterBrokerBroker是kafka实例每个服务器上有一个或多个kafka的实例我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号如图中的broker-0、broker-1等……Topic消息的主题可以理解为消息的分类kafka的数据就保存在topic。在每个broker上都可以创建多个topic。消费者和生产者都可以创建主题。Partitiontopic的分区每个topic可以有多个分区分区的作用是做负载提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的partition的表现形式就是一个一个的文件夹Replication每一个分区都有多个副本副本的作用是做备胎。当主分区Leader故障的时候会选择一个备胎Follower上位成为Leader。在kafka中默认副本的最大数量是10个且副本的数量不能大于Broker的数量follower和leader绝对是在不同的机器同一机器对同一个分区也只可能存放一个副本包括自己。Message每一条发送的消息主体。Consumer消费者即消息的消费方是消息的出口。Consumer Group我们可以将多个消费者组成一个消费者组在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据这也是为了提高kafka的吞吐量Zookeeperkafka集群依赖zookeeper来保存集群的的元信息来保证系统的可用性。kafka_2.11-2.1.1.tgz此版本的-前后两个版本的含义Scala编译器版本2.11Kafka版本号2.1.1数据操作命令创建主题bin/kafka-topics.sh--zookeeper localhost:2181--create--topic t_cdr--partitions 3--replication-factor 2注partitions指定topic分区数replication-factor指定topic每个分区的副本数partitions分区数:partitions 分区数控制topic将分片成多少个log。可以显示指定如果不指定则会使用broker(server.properties)中的num.partitions配置的数量虽然增加分区数可以提供kafka集群的吞吐量、但是过多的分区数或者或是单台服务器上的分区数过多会增加不可用及延迟的风险。因为多的分区数意味着需要打开更多的文件句柄、增加点到点的延时、增加客户端的内存消耗。分区数也限制了consumer的并行度即限制了并行consumer消息的线程数不能大于分区数分区数也限制了producer发送消息是指定的分区。如创建topic时分区设置为1producer发送消息时通过自定义的分区方法指定分区为2或以上的数都会出错的这种情况可以通过alter –partitions 来增加分区数。1.replication-factor副本replication factor 控制消息保存在几个broker(服务器)上一般情况下等于broker的个数。如果没有在创建时显示指定或通过API向一个不存在的topic生产消息时会使用broker(server.properties)中的default.replication.factor配置的数量查看所有topic列表bin/kafka-topics.sh--zookeeper localhost:2181--list 新的版本使用 bin/kafka-topics.sh--list--bootstrap-server localhost:9092查看指定topic信息bin/kafka-topics.sh--zookeeper localhost:2181--describe--topic 主题名删除主题bin/kafka-topics.sh--delete--topic test--zookeeper localhost:2181 新的版本使用 bin/kafka-topics.sh--delete--topic topic_name--bootstrap-server localhost:9092查看消费者组bin/kafka-consumer-groups.sh--bootstrap-server ip:9092--list查看消费者组的详细信息bin/kafka-consumer-groups.sh--bootstrap-server ip:9092--group消费者组名--describe查询到的信息为GROUPTOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID order_log order_log 0 254 254 0 ClickHouse 18.16.1-7b82022e-6485-4944-87a6-96d5e665eb1a/10.177.4.38 ClickHouse 18.16.1讲解GROUP 对应的消费者组名称TOPIC 队列PARTITION 分区CURRENT-OFFSET 当前消费到的索引 如果和LOG-END-OFFSET相等则全部消费完LOG-END-OFFSET 队列的消息总数量LAG 消费积压情况删除消费者组bin/kafka-consumer-groups.sh--bootstrap-server ip:9092--delete--group消费者组名如果删除不掉需要把正在消费的消费者关闭再删除即可控制台向topic生产数据bin/kafka-console-producer.sh--broker-list localhost:9092--topic test控制台消费topic的数据bin/kafka-console-consumer.sh--bootstrap-server localhost:9092--topic test--from-beginning--from-beginning是从主题的初始位置开始消费即消费主题的所有信息根据 Partition Offset 查询topic数据bin/kafka-console-consumer.sh--bootstrap-server ip:port--topic 队列名称--partition 0--offset Offset索引--max-messages 1查看controller登录zookeeper客户端通过get /controller命令查看修改指定topic的消费者组的消费索引bin/kafka-consumer-groups.sh--bootstrap-server ip:port--groupgroupName--topic topicName--execute--reset-offsets--to-offset 257ip和port都是kafka的group 代表你的消费者分组topic 代表你消费的主题execute 代表支持复位偏移reset-offsets 代表要进行偏移操作to-offset 代表你要偏移到哪个位置 是long类型数值只能比前面查询出来的小如果报错Error: Assignments can only be reset if the group tablename is inactive, but the current state is Stable则需要把消费者的程序关闭如果是clickhouse可以通过DETACH TABLE your_kafka_table;暂停消费ATTACH TABLE your_kafka_table;恢复消费第二种方式第二种方式offset信息保存在zookeeper当中bin/kafka-consumer-groups.sh --zookeeper z1:2181,z2:2181,z3:2181 --group test-consumer-group --topic test --execute --reset-offsets --to-offset 10000–zookeeper 和 --bootstrap-server 只能选一种方式查看kafka版本bin/kafka-server-start.sh --version消息队列的两种模式kafka是消费者主动轮询拉取队列信息消费并且如果是新的消费者和消费者组会消费到队列的所有消息包括创建消费者之前的消息点对点模式一对一消费者主动拉取数据消息收到后消息删除 并且消息只能被一个消费者消费发布订阅模式一对多消费者消费数据后不会清除消息生产者将消息发布到topic中可以被所有订阅了的消费者消费消费者可以通过两种方式获取消息一种是主动拉取一种topic推送消息kafka是基于主动拉取模式的概述kafka是一个分布式的基于发布订阅模式的消息队列主要应用于大数据领域。使用消息队列的好处解耦:允许独立的修改活扩展两边的代码只要确保他们遵守同样的接口约束可恢复性系统的一部分组件失效时不会影响整个系统即一个处理消息的服务挂掉后加入队列中的消息仍然可以在服务恢复后处理缓冲有助于控制和优化数据流经过系统的速度解决生产消息和消费消息的不一致异步通信不需要立即处理消息可以在需要的时候再处理安装kafka依赖于zookeeper所以也必须安装zookeeper下载地址https://kafka.apache.org/downloads把此包解压到服务器即可config目录下的server.properties# broker的全局唯一标识集群中不能重复,broker.id0# returned from java.net.InetAddress.getCanonicalHostName().advertised.listenersPLAINTEXT://192.168.10.1:9092# 监听ip和端口案例 listenersPLAINTEXT://your.host.name:9092ip是外网可以访问的否则通过代码访问会访问不到 listenersPLAINTEXT://192.168.10.1:9092# 设置删除功能可使用,,新版本没有发现这个设置delete.topic.enabletrue# 服务器用于从网络接收请求并向网络发送响应的线程数 num.network.threads3# 服务器用于处理请求的线程数其中可能包括磁盘I/Onum.io.threads8# socket服务器使用的发送缓冲区 socket.send.buffer.bytes102400#socket服务器使用的接收缓冲区(SO_RCVBUF)socket.receive.buffer.bytes102400#套接字服务器将接受的请求的最大大小(针对OOM的保护)socket.request.max.bytes104857600############################# Log Basics ############################# # 用逗号分隔的目录列表在其中存储数据文件 log.dirs/tmp/kafka-logs # 每个主题的默认日志分区数。更多分区允许更大分区 num.partitions1# 每个数据目录用于启动时日志恢复和关闭时刷新的线程数。 num.recovery.threads.per.data.dir1############################# Internal Topic Settings ############################# # 组元数据内部主题“__consumer_offset”和“__transaction_state”的复制因子 # 对于开发测试以外的任何情况建议使用大于1的值以确保可用性例如3。 offsets.topic.replication.factor1transaction.state.log.replication.factor1transaction.state.log.min.isr1############################# Log Retention Policy ############################# # 由于过期而可以被删除的日志文件的最小过期时间 log.retention.hours168# 日志段文件的最大大小。当达到这个大小时将创建一个新的日志段。 log.segment.bytes1073741824# 检查日志段以确定是否可以根据其删除的时间间隔 log.retention.check.interval.ms300000# 集群连接案例127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002.# 连接zookeeper的ip地址 zookeeper.connectlocalhost:2181# 连接zookeeper超时时间 zookeeper.connection.timeout.ms6000#延迟初始消费者重新平衡的时间(以毫秒为单位)group.initial.rebalance.delay.ms0bin目录下的常用命令启动kafka自带的zookeepernohup bin/zookeeper-server-start.sh config/zookeeper.properties 关闭bin/zookeeper-server-stop.sh config/zookeeper.propertieskafka启动和关闭命令 : 后台启动命令nohup bin/kafka-server-start.sh -daemon config/server.properties关闭命令bin/kafka-server-stop.sh config/server.properties,启动或者关闭前要先启动zookeeper生产者和消费者控制台打印信息命令kafka-console-consumer.shkafka-console-producer.sh关于topic的操作增删改查kafka-topics.sh启动kafka自带的zookeeper的客户端bin/zookeeper-shell.sh ip:port配置修改server.properties# kafka集群的唯一标识必须为每个代理设置一个唯一的整数broker.id0log4J的日志路径修改kafka使用的log4j在conf目录下有log4j.properties配置文件里面所有的文件路径都是用的配置${kafka.logs.dir}经百度此参数的配置在bin目录的kafka-run-class.sh配置文件中只要我们在此文件中添加一行LOG_DIR /log/dir即可即可修改文件日志路径异常处理如果kafka启动异常可以查看logs目录下的server.log日志clickhouse消费不到kafka的数据但是通过命令从头消费是可以的可以查看消费者组的详细信息看消费索引消费到哪了如果没有信息显示可能消费者组有问题可以删除消费者组如果不能删除直接把kafka的数据文件全部删除和zookeeper的配置文件中的dataDir配置路径下的全部删掉这就相当于全新的kafka和全新的zookeeper重启即可Magic v1 does not support record headerskafka服务端版本过低导致参考https://www.bilibili.com/read/cv8517619/