面试漫谈(一)- 队列篇

Posted by YiBo on December 4, 2020

前记

这个系列的文章是我在面试前所做的一项最终总结,文章内的字词话语尽量是针对面试现场的话术,也就是和面试官的正常对话。

面试kafka不一定问的很细 大部分是问raft 很多人自己kafka都没搞清楚

其实,不止重复消费。更重要是保证partition的有序性

zookeeper主要是作为分布式协同服务:共享存储–保证数据的一致性

下面说说zookeeper的应用场景

  • 用zookeeper来存储一些常用且多变的配置数据,保证一致性。在zookeeper上选取一个节点用来存储数据,然后及集群中的每台机器在启动初始化阶段会读取上面存储的信息,还要注册一个watcher监听

  • 利用临时节点,来监控主机状态

  • 分布式日志收集系统

    1. 注册收集器机器

      /log/collection/hostname1 每个收集器在启动的时候,都会在收集器节点创建自己的节点,这里是永久类型的,通过state节点来判断

    2. 在收集器节点上注册日志源机器列表

      将日志源机器分别写到收集器机器节点(如 /log/collection.hostname1)上去

    3. 状态汇报

      每隔收集器创建完成后,需要再子节点上创建一个状态子节点(/logs/collector/host1/status)每隔一点时间写入自己的状态信息以及负载(心跳检测机制)日志系统根据该状态来判断收集器是否存活

    4. 如果收集器机器挂掉或者扩容

      日志系统时钟关注这 /logs/collector/节点的变更,一旦检测到变化,就将之前分配给收集器的所有任务进行转移—-根据汇报状态的负载进行转移

  • 共享锁 临时顺序节点

    /shared_lock/host1-R/W-0001

    读请求 – 获取所有子节点 – 向比自己小的最后一个写请求注册监听

    写请求– 获取所有子节点–向比自己序号小的最后一个节点注册

zookeeper 的选举机制

一定要奇数台节点

5台节点选举, myId分别为1-5

第一台myId为1启动,选举自己,

第二台myId:2启动,选举自己,2>1所以候选人是2

第三台myId:3启动,选举自己,3>1所以候选人是3,且超过半数,所以第三台当选leader

说说队列

主要用来解耦,削峰

比如一个订单系统,创建订单后,库存系统,支付系统,通知系统都会通过队列来解耦调用,防止因为一个子系统故障造成整个下单系统的瘫痪

系统遇到流量的猛增,会将系统压垮,详细队列将大量请求缓存起来

RocketMQ 大概是怎样的

由 Name Server , Broker , Producer , Custmer 构成

怎么保障消息可靠性

同步双写,异步复制到slaver

RocketMQ本质都是采用消费端主动拉取的方式

push方式:consumer把长轮询封装了,然后注册了一个监听器,收到消息后,唤醒长轮询来消费

提升写入性能

采用oneway方式,不用等待消费者返回确认

增加producer的并发量

提升消费性能

提高消费并行度(总的consumer数量不超过read queue)

以批量的方式消费

这里写写,突然想到的场景

我们当时消息在消费端出现了很大的堆积,通过修改producer consumer的并行度也不是很有效,然后就查了一下堆积的任务,大都是一些update这样的任务,想到了类似于kafka中的批量消费

首先要在producer端设置这类update任务的tag

通过tag的过滤,再设置consumeMessageBatchMaxSize 来批量update任务。

消息是存储在哪里的

目前主流的队列,都是采用消息刷盘至所部署机器的文件系统

rocketMQ消息用顺序写,600MB/s

存储的结构:

​ 每个topic下的每message queue都对应一个索引 consumeQueue ,来指向e CommitLog

​ consumeQueue 记录了指定topic下的队列消息在commitLog中的起始物理偏移量,消息大小,消息tag的hashcode值

过滤消息

RocketMQ是在consumer端进行消息过滤的

  1. consumer端构建一个SubscriptionData发送一个pull消息给Broker端
  2. broker端从RocketMQ的文件存储层-store读取数据前,会构建一个MessageFilter,然后传给Store
  3. store从ConsumeQueue读取到一条记录后,会用他的消息记录tag Hash过滤
  4. 服务端根据hashcode进行判断,无法精确对tag原始字符串进行过滤,在consumer拉取到消息后,还需要对消息的原始tag进行对比

零拷贝原理

如何保证消息顺序

保证部分有效即可

a. 发送端把同一个业务的ID消息发送到同一个message Queue(hash取模)

消费过程中从同一个message Queue读取,使用MessageListenerOrderly类,为每个message Queue加锁,消费每个消息前,需要先获得这个消息对应的锁,保证同一个时间,同一个consumer Queue对的消费不背并发消费,但不同的consumer Queue的消息可以并发处理

事务消息

RocketMQ采用俩阶段提交的方式实现事务消息

https://developer.aliyun.com/article/771097

用rocketMQ实现 数据库+发消息 事务

实现一个 TransactionListener 接口,包括俩个方法:executeLocalTransaction, checkLocalTransaction

具体的就是利用俩阶段提交,在事务中,先执行数据库的操作,然后进入prepared状态,执行executeLocalTransaction方法,返回三个参数

  • COMMIT_MESSAGE

进入俩阶段的第二阶段commited状态,可以消费这个消息

  • ROLLBACK_MESSAGE

回滚,消息被删除,消费者不能消费这个消息

  • UNKNOW

每隔一段时间调用一次checkLocalTransacrtion , 在BrokerConfig中可以配置,默认是1分钟,检查一次,最多5次


以上在操作过程中是有问题的,如果在service报错了,那么就会出现数据库回滚但是消息发出了,是因为数据库的隔离机制,在事务提交前是可读的,所以消息提交了。所以在 executeLocalTansaction 中固定返回 UNKNOW ,checkLocalTransaction 在 sendTransactionMQ 后执行,而且和 sendTransactionMQ 不在同一事务中

三个主流的消息队列的区别

https://machenxing.github.io/2019/05/20/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E7%9A%84%E6%9C%AC%E8%B4%A8%E5%8C%BA%E5%88%AB/

从broker来说

  • 暴力路由

    kafaka为了提升性能,简化了MQ功能模型,仅仅提供了一些基础的MQ相关功能,但是大幅度优化和提升了吞吐量。一套服务集群,每台机器上都有一个kafka broker进程,负责接收请求,存储数据,发送数据

    数据流模型,叫做topic,可以往这个topic写数据,然后让别人从这里来消费,可以划分为多个partition每个partition放一台机器上,存储一部分数据。在写消息到topic的时候, 会自动把你这个消息给分发到某一个partition上去。在消费消息的时候,部署在多个机器上的consumer可以组成一个group,一个partition只能给一个consumer消费,一个consumer可以消费多个partition,这是最核心的点。

    通过这个模型,保证一个topic里的每条数据,只会交给consumer group里的一个consumer来消费,形成了一个queue

    image-20210104110146281

    image-20210104110340066

    除此之外,kafka就没有任何其他消费功能了,属于一种比较暴力直接的流派

  • 有broker的复杂路由

    以RabbitMQ为代表,提供非常强大,复杂而且完善的消息路由功能。不是简单的topic-partition消费模型了,引入了 exchange , 根据复杂的业务规则把消息路由到内部的不同queue里去

    image-20210104110720109

    image-20210104110755706

    还可以做其他复杂的事情:

    如果想某个consumer只能消费到某一类型的数据,那么exchange可以把消息里比如带的XXX前缀的消息路由到某个queue。或者可以限定某个consumer只能消费某一个部分数据

    因为这些复杂的消费模型,所以吞吐量比Kafka低一个数量级