Skip to content

Commit 7118177

Browse files
authored
Add consumer start detail (#116)
* feat: add Consumer start detail
1 parent bf1fcb5 commit 7118177

File tree

2 files changed

+298
-0
lines changed

2 files changed

+298
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@
281281
- [RocketMQ ConsumeQueue详解](docs/rocketmq/rocketmq-consumequeue.md)
282282
- [RocketMQ CommitLog详解](docs/rocketmq/rocketmq-commitlog.md)
283283
- [RocketMQ IndexFile详解](docs/rocketmq/rocketmq-indexfile.md)
284+
- [RocketMQ 消费者启动流程](docs/rocketmq/rocketmq-consumer-start.md)
284285

285286
## 番外篇(JDK 1.8)
286287

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,297 @@
1+
# RocketMQ 消费者启动流程
2+
3+
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
4+
5+
`1、检查配置信息`
6+
7+
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#checkConfig
8+
9+
校验消费组的长度不能大于 255
10+
11+
`public static final int CHARACTER_MAX_LENGTH = 255;`
12+
13+
```java
14+
if (group.length() >CHARACTER_MAX_LENGTH) {
15+
throw new MQClientException("the specified group is longer than group max length 255.", null);
16+
}
17+
```
18+
19+
消费组名称只能包含数字、字母、%、-、_、|
20+
21+
```java
22+
// regex: ^[%|a-zA-Z0-9_-]+$
23+
// %
24+
VALID_CHAR_BIT_MAP['%'] = true;
25+
// -
26+
VALID_CHAR_BIT_MAP['-'] = true;
27+
// _
28+
VALID_CHAR_BIT_MAP['_'] = true;
29+
// |
30+
VALID_CHAR_BIT_MAP['|'] = true;
31+
for (int i = 0; i <VALID_CHAR_BIT_MAP.length; i++) {
32+
if (i >= '0' && i <= '9') {
33+
// 0-9
34+
VALID_CHAR_BIT_MAP[i] = true;
35+
} else if (i >= 'A' && i <= 'Z') {
36+
// A-Z
37+
VALID_CHAR_BIT_MAP[i] = true;
38+
} else if (i >= 'a' && i <= 'z') {
39+
// a-z
40+
VALID_CHAR_BIT_MAP[i] = true;
41+
}
42+
}
43+
```
44+
45+
```java
46+
public static boolean isTopicOrGroupIllegal(String str) {
47+
int strLen = str.length();
48+
int len =VALID_CHAR_BIT_MAP.length;
49+
boolean[] bitMap =VALID_CHAR_BIT_MAP;
50+
for (int i = 0; i < strLen; i++) {
51+
char ch = str.charAt(i);
52+
if (ch >= len || !bitMap[ch]) {
53+
return true;
54+
}
55+
}
56+
return false;
57+
}
58+
```
59+
60+
消费组名称不能是`DEFAULT_CONSUMER`
61+
62+
`public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";`
63+
64+
```java
65+
if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
66+
throw new MQClientException("consumerGroup can not equal " + MixAll.DEFAULT_CONSUMER_GROUP
67+
+ ", please specify another one." + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);
68+
}
69+
```
70+
71+
消费者最小线程数需要在 1-1000 之间
72+
73+
```java
74+
if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
75+
|| this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
76+
throw new MQClientException("consumeThreadMin Out of range [1, 1000]"
77+
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);
78+
}
79+
```
80+
81+
消费者最大线程数需要在 1-1000 之间
82+
83+
```java
84+
if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
85+
throw new MQClientException("consumeThreadMax Out of range [1, 1000]"
86+
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);
87+
}
88+
```
89+
90+
`2、设置订阅信息`
91+
92+
构造主题订阅消息`SubscriptionData`并将其加入`RebalanceImpl`,如果是消费模式是集群,订阅默认的重试主题并且构造`SubscriptionData`加入`RebalanceImpl`
93+
94+
```java
95+
private void copySubscription() throws MQClientException {
96+
try {
97+
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
98+
if (sub != null) {
99+
for (final Map.Entry<String, String> entry : sub.entrySet()) {
100+
final String topic = entry.getKey();
101+
final String subString = entry.getValue();
102+
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
103+
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
104+
}
105+
}
106+
107+
if (null == this.messageListenerInner) {
108+
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
109+
}
110+
111+
switch (this.defaultMQPushConsumer.getMessageModel()) {
112+
caseBROADCASTING:
113+
break;
114+
caseCLUSTERING:
115+
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
116+
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
117+
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
118+
break;
119+
default:
120+
break;
121+
}
122+
} catch (Exception e) {
123+
throw new MQClientException("subscription exception", e);
124+
}
125+
}
126+
```
127+
128+
`3、初始化MqClientInstance、RebalanceImpl、PullApiWrapper`
129+
130+
创建`MqClientInstance`, 无论在生产者端还是消费者端都是一个很重要的类, 封装了Topic信息、broker信息,当然还有生产者和消费者的信息。
131+
132+
```java
133+
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
134+
String clientId = clientConfig.buildMQClientId();
135+
MQClientInstance instance = this.factoryTable.get(clientId);
136+
if (null == instance) {
137+
instance = new MQClientInstance(clientConfig.cloneClientConfig(),
138+
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
139+
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
140+
if (prev != null) {
141+
instance = prev;
142+
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
143+
} else {
144+
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
145+
}
146+
}
147+
148+
return instance;
149+
}
150+
```
151+
152+
构造`RebalanceImpl` 用来负载消费者与队列的消费关系
153+
154+
```java
155+
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
156+
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
157+
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
158+
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
159+
```
160+
161+
构造`PullApiWrapper` 消费者拉取消息类
162+
163+
```java
164+
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
165+
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
166+
```
167+
168+
`4、设置消息偏移量`
169+
170+
如果是广播模式消费,消息消费进度存储在消费端,如果是集群模式消费,消息消费进度存储在 broker 端
171+
172+
```java
173+
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
174+
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
175+
} else {
176+
switch (this.defaultMQPushConsumer.getMessageModel()) {
177+
caseBROADCASTING:
178+
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
179+
break;
180+
caseCLUSTERING:
181+
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
182+
break;
183+
default:
184+
break;
185+
}
186+
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
187+
}
188+
this.offsetStore.load();
189+
```
190+
191+
`5、是否是顺序消费`
192+
193+
根据是否是顺序消费构造不同的`ConsumeMessageService`
194+
195+
```java
196+
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
197+
this.consumeOrderly = true;
198+
this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
199+
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
200+
this.consumeOrderly = false;
201+
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
202+
}
203+
```
204+
205+
区别在于启动的线程任务不同:
206+
207+
顺序消费线程:
208+
209+
```java
210+
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
211+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
212+
@Override
213+
public void run() {
214+
try {
215+
ConsumeMessageOrderlyService.this.lockMQPeriodically();
216+
} catch (Throwable e) {
217+
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
218+
}
219+
}
220+
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
221+
}
222+
```
223+
224+
正常消费线程:
225+
226+
```java
227+
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
228+
229+
@Override
230+
public void run() {
231+
try {
232+
cleanExpireMsg();
233+
} catch (Throwable e) {
234+
log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
235+
}
236+
}
237+
238+
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
239+
```
240+
241+
6`、启动MQClientInstance`
242+
243+
消费者与生产者共用 MQClientInstance
244+
245+
大部分流程已经在生产者启动流程中讲解,这里主要讲解与生产者不同的部分
246+
247+
启动保证消费者偏移量最终一致性的任务
248+
249+
```java
250+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
251+
252+
@Override
253+
public void run() {
254+
try {
255+
MQClientInstance.this.persistAllConsumerOffset();
256+
} catch (Exception e) {
257+
log.error("ScheduledTask persistAllConsumerOffset exception", e);
258+
}
259+
}
260+
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
261+
```
262+
263+
启动调整线程池大小任务:
264+
265+
```java
266+
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
267+
268+
@Override
269+
public void run() {
270+
try {
271+
MQClientInstance.this.adjustThreadPool();
272+
} catch (Exception e) {
273+
log.error("ScheduledTask adjustThreadPool exception", e);
274+
}
275+
}
276+
}, 1, 1, TimeUnit.MINUTES);
277+
```
278+
279+
启动重平衡服务:
280+
281+
`this.rebalanceService.start();`
282+
283+
7`、更新订阅主题信息`
284+
285+
更新主题订阅信息:
286+
287+
```java
288+
private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
289+
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
290+
if (subTable != null) {
291+
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
292+
final String topic = entry.getKey();
293+
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
294+
}
295+
}
296+
}
297+
```

0 commit comments

Comments
 (0)