Skip to content

Commit bf1fcb5

Browse files
authored
feat: add IndexFile detail (#115)
* add IndexFile detail
1 parent b897d6d commit bf1fcb5

File tree

2 files changed

+167
-0
lines changed

2 files changed

+167
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@
280280
- [RocketMQ MappedFile内存映射文件详解](docs/rocketmq/rocketmq-mappedfile-detail.md)
281281
- [RocketMQ ConsumeQueue详解](docs/rocketmq/rocketmq-consumequeue.md)
282282
- [RocketMQ CommitLog详解](docs/rocketmq/rocketmq-commitlog.md)
283+
- [RocketMQ IndexFile详解](docs/rocketmq/rocketmq-indexfile.md)
283284

284285
## 番外篇(JDK 1.8)
285286

docs/rocketmq/rocketmq-indexfile.md

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
# RocketMQ IndexFile 详解
2+
3+
首先明确一下 IndexFile 的文件结构
4+
5+
Index header + 哈希槽,每个槽下面挂载 index 索引,类似哈希表的结构
6+
7+
一个 Index 文件默认包含 500 万个哈希槽,一个哈希槽最多存储 4 个 index,也就是一个 IndexFile 默认最多包含 2000 万个 index
8+
9+
Index header:
10+
11+
40byte Index header = 8byte 的 beginTimestamp(IndexFile 对应第一条消息的存储时间) + 8byte 的 endTimestamp (IndexFile 对应最后一条消息的存储时间) + 8byte 的 beginPhyoffset(IndexFile 对应第一条消息在 CommitLog 的物理偏移量) + 8byte 的 endPhyoffset(IndexFile 对应最后一条消息在 CommitLog 的物理偏移量)+ 4byte 的 hashSlotCount(已有 index 的槽个数)+ 4byte 的 indexCount(索引个数)
12+
13+
哈希槽:
14+
15+
每个哈希槽占用 4 字节,存储当前槽下面最新的 index 的序号
16+
17+
Index:
18+
19+
20byte 的 index = 4byte 的 keyHash(key 的哈希码) + 8byte 的 phyOffset(消息在文件中的物理偏移量)+ 4byte 的 timeDiff(该索引对应消息的存储时间与当前索引文件第一条消息的存储时间的差值)+ 4byte 的 preIndexNo(该条目的前一个 Index 的索引值)
20+
21+
1、将消息索引键与消息偏移量的映射关系写入 indexFile
22+
23+
org.apache.rocketmq.store.index.IndexFile#putKey
24+
25+
当前已使用的 Index 大于等于允许的最大个数时,返回 false,表示当前 Index 文件已满。
26+
27+
如果当前 Index 文件未满,则根据 key 计算出哈希码,然后对槽数量取余定位到某一个哈希槽位置,
28+
29+
哈希槽的物理偏移量 = IndexHeader 的大小(默认 40Byte) + 哈希槽位置 * 每个哈希槽的大小(4 字节)
30+
31+
```java
32+
int keyHash = indexKeyHashMethod(key);
33+
int slotPos = keyHash % this.hashSlotNum;
34+
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE+ slotPos * hashSlotSize;
35+
```
36+
37+
读取哈希槽中的数据,如果哈希槽中的数据小于 0 或者大于 index 的个数,则为无效索引,将 slotValue 置为 0
38+
39+
```java
40+
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
41+
if (slotValue <=invalidIndex|| slotValue > this.indexHeader.getIndexCount()) {
42+
slotValue =invalidIndex;
43+
}
44+
```
45+
46+
计算本次存储消息的时间戳与 indexFile 第一条消息存储时间戳的差值并转换为秒
47+
48+
```java
49+
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
50+
51+
timeDiff = timeDiff / 1000;
52+
53+
if (this.indexHeader.getBeginTimestamp() <= 0) {
54+
timeDiff = 0;
55+
} else if (timeDiff > Integer.MAX_VALUE) {
56+
timeDiff = Integer.MAX_VALUE;
57+
} else if (timeDiff < 0) {
58+
timeDiff = 0;
59+
}
60+
```
61+
62+
新添加的消息 index 的物理偏移量 = IndexHeader 大小(40Byte) + Index 文件哈希槽的数量 * 哈希槽的大小(4Byte ) + Index 文件索引数量 * 索引大小(20Byte)
63+
64+
将消息哈希码、消息物理偏移量、消息存储时间戳与 Index 文件第一条消息的时间戳的差值、当前哈希槽的值、当前 Indexfile 的索引个数存入 mappedByteBuffer
65+
66+
```java
67+
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE+ this.hashSlotNum *hashSlotSize
68+
+ this.indexHeader.getIndexCount() *indexSize;
69+
70+
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
71+
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
72+
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
73+
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
74+
75+
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
76+
```
77+
78+
更新 IndexHeader 信息:
79+
80+
如果该 IndexFile 哈希槽中消息的数量小于等于 1,更新 IndexHeader 的 beginPhyOffset 和 beginTimesttamp
81+
82+
每次添加消息之后更新 IndexCount、endPhyOffset、endTimestamp
83+
84+
```java
85+
if (this.indexHeader.getIndexCount() <= 1) {
86+
this.indexHeader.setBeginPhyOffset(phyOffset);
87+
this.indexHeader.setBeginTimestamp(storeTimestamp);
88+
}
89+
90+
if (invalidIndex== slotValue) {
91+
this.indexHeader.incHashSlotCount();
92+
}
93+
this.indexHeader.incIndexCount();
94+
this.indexHeader.setEndPhyOffset(phyOffset);
95+
this.indexHeader.setEndTimestamp(storeTimestamp);
96+
```
97+
98+
2、根据 key 查找消息
99+
100+
org.apache.rocketmq.store.index.IndexFile#selectPhyOffset
101+
102+
参数如下:
103+
104+
`List<Long> phyOffsets`: 查询到的物理偏移量
105+
106+
`String key: 索引key`
107+
108+
`int maxNum`:本次查找的最大消息条数
109+
110+
`long begin`:开始时间戳
111+
112+
long end: 结束时间戳
113+
114+
根据 key 计算哈希码,哈希码与哈希槽的数量取余得到哈希槽的索引
115+
116+
哈希槽的物理地址 = IndexHeader(40byte) + 哈希槽索引 * 每个哈希槽的大小(4byte)
117+
118+
```java
119+
int keyHash = indexKeyHashMethod(key);
120+
int slotPos = keyHash % this.hashSlotNum;
121+
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE+ slotPos * hashSlotSize;
122+
```
123+
124+
`从mappedByteBuffer`获取哈希槽的值,如果值小于等于 0 或者值大于 IndexCount
125+
126+
或者 IndexCount 的 值小于等于 1 则表示没有有效的结果数据
127+
128+
如果查询返回的结果数量大于等于要查询的最大消息条数,终止循环
129+
130+
```java
131+
if (slotValue <=invalidIndex|| slotValue > this.indexHeader.getIndexCount()
132+
|| this.indexHeader.getIndexCount() <= 1) {
133+
} else {
134+
for (int nextIndexToRead = slotValue; ; ) {
135+
if (phyOffsets.size() >= maxNum) {
136+
break;
137+
}
138+
```
139+
140+
如果存储的时间戳小于 0,结束查找,如果哈希码匹配并且存储时间在要查找的开始时间戳和结束时间戳之间,将结果偏移量加入返回结果中
141+
142+
```java
143+
if (timeDiff < 0) {
144+
break;
145+
}
146+
147+
timeDiff *= 1000L;
148+
149+
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
150+
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
151+
152+
if (keyHash == keyHashRead && timeMatched) {
153+
phyOffsets.add(phyOffsetRead);
154+
}
155+
```
156+
157+
校验该 index 的上一个 index,如果上一个 index 的索引大于 0 并且小于等于 indexCount,时间戳大于等于要查找的开始时间戳,则继续查找
158+
159+
```java
160+
if (prevIndexRead <=invalidIndex || prevIndexRead > this.indexHeader.getIndexCount()
161+
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
162+
break;
163+
}
164+
165+
nextIndexToRead = prevIndexRead;
166+
```

0 commit comments

Comments
 (0)