结合别人的文章,做RocketMQ的一点原理分析,结合源码(尽量)----未完待续


Broker


与Namesrv的关系

1.从namesrv获取配置信息

 1 /**
 2      * BrokerConfig类
 3      *
 4      * broker每隔30秒(此时间无法更改)向所有nameserver发送心跳,心跳包含了自身的topic配置信息。
 5      * 这里的“此时间无法更改”是别人的总结,还没搞懂为啥此时间不可更改,明明原版注释中写的是取值范围在10,000到60,000之间
 6      *
 7      * This configurable item defines interval of topics registration of broker to name server. Allowing values are
 8      * between 10, 000 and 60, 000 milliseconds.
 9      */
10     private int registerNameServerPeriod = 1000 * 30;

2.namesrv检查broker的心跳

 1         /**
 2          * NamesrvController类
 3          * Namesrv定时检查Broker心跳
 4          * 每10秒检查一次,时间戳超过2分钟,则认为Broker失效,从brokerLiveTable中移除
 5          */
 6         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 7 
 8             @Override
 9             public void run() {
10                 NamesrvController.this.routeInfoManager.scanNotActiveBroker();
11             }
12         }, 5, 10, TimeUnit.SECONDS);
13 
14 ...
15 
16     /**
17      * RouteInfoManager类
18      * Namesrv查看不活跃|失效的Broker
19      */
20     public void scanNotActiveBroker() {
21         Iterator> it = this.brokerLiveTable.entrySet().iterator();
22         while (it.hasNext()) {
23             Entry next = it.next();
24             long last = next.getValue().getLastUpdateTimestamp();
25             if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
26                 RemotingUtil.closeChannel(next.getValue().getChannel());
27                 it.remove();
28                 log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
29                 this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
30             }
31         }
32     }

3.Namesrv检查与Broker的长连接

 1 /**
 2      * BrokerHousekeepingService类
 3      * 当NameServer和Broker的长连接断掉以后,onChannelDestroy函数会被调用,把这个Broker的信息清理出去。
 4      */
 5     @Override
 6     public void onChannelClose(String remoteAddr, Channel channel) {
 7         this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
 8     }
 9 
10     /**
11      * BrokerHousekeepingService类
12      * 当NameServer和Broker的长连接断掉以后,onChannelDestroy函数会被调用,把这个Broker的信息清理出去。
13      */
14     @Override
15     public void onChannelException(String remoteAddr, Channel channel) {
16         this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
17     }
18 
19     /**
20      * BrokerHousekeepingService类
21      * 当NameServer和Broker的长连接断掉以后,onChannelDestroy函数会被调用,把这个Broker的信息清理出去。
22      */
23     @Override
24     public void onChannelIdle(String remoteAddr, Channel channel) {
25         this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
26     }

Consumer|Producer


与Namesrv的关系

 1     /**
 2      * ClientConfig类
 3      *
 4      * 默认情况下,消费者每隔30秒从nameserver获取所有topic的最新队列情况,
 5      * 这意味着某个broker如果宕机,客户端最多要30秒才能感知。
 6      *
 7      * 默认情况下,生产者每隔30秒从nameserver获取所有topic的最新队列情况,
 8      * 这意味着某个broker如果宕机,生产者最多要30秒才能感知,在此期间,发往该broker的消息发送失败。
 9      *
10      * Pulling topic information interval from the named server
11      */
12     private int pollNameServerInterval = 1000 * 30;

 

与Broker的关系

1.客户端(生产者|消费者)向Broker发送心跳

1     /**
2      * ClientConfig类
3      *
4      * 默认情况下,消费者|生产者 每隔30秒向所有broker发送心跳
5      *
6      * Heartbeat interval in microseconds with message broker
7      */
8     private int heartbeatBrokerInterval = 1000 * 30;

2.Broker定期检查客户端心跳检查

 1     /**
 2      * ClientHousekeepingService类
 3      *
 4      * Broker定时清理不活动的客户端(生产者|消费者)连接
 5      * 每10秒检查一次,时间戳超过2分钟,则关闭客户端(生产者|消费者)连接
 6      */
 7     public void start() {
 8 
 9         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
10             @Override
11             public void run() {
12                 try {
13                     ClientHousekeepingService.this.scanExceptionChannel();
14                 } catch (Throwable e) {
15                     log.error("Error occurred when scan not active client channels.", e);
16                 }
17             }
18         }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
19     }
20 
21 ...
22 
23     /**
24      * ClientHousekeepingService类
25      *
26      * Broker定时清理不活动的客户端(生产者|消费者)连接
27      * 每10秒检查一次,时间戳超过2分钟,则关闭客户端(生产者|消费者)连接
28      */
29     private void scanExceptionChannel() {
30         this.brokerController.getProducerManager().scanNotActiveChannel();
31         this.brokerController.getConsumerManager().scanNotActiveChannel();
32         this.brokerController.getFilterServerManager().scanNotActiveChannel();
33     }

3.Broker检查与客户端的长连接

 1     /**
 2      * ClientHousekeepingService类
 3      *
 4      * 当客户端(Producer|Consumer)和Broker的长连接断掉以后,doChannelCloseEvent函数会被调用,把对应客户端(Producer|Consumer)断开连接
 5      */
 6     @Override
 7     public void onChannelClose(String remoteAddr, Channel channel) {
 8         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
 9         this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
10         this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
11     }
12 
13     /**
14      * ClientHousekeepingService类
15      *
16      * 当客户端(Producer|Consumer)和Broker的长连接断掉以后,doChannelCloseEvent函数会被调用,把对应客户端(Producer|Consumer)断开连接
17      */
18     @Override
19     public void onChannelException(String remoteAddr, Channel channel) {
20         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
21         this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
22         this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
23     }
24 
25     /**
26      * ClientHousekeepingService类
27      *
28      * 当客户端(Producer|Consumer)和Broker的长连接断掉以后,doChannelCloseEvent函数会被调用,把对应客户端(Producer|Consumer)断开连接
29      */
30     @Override
31     public void onChannelIdle(String remoteAddr, Channel channel) {
32         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
33         this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
34         this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
35     }