目前这种方案 在高并发的情况下回出现不断创建新的 inode 对象,给 GC 造成压力,但是可以保证 inode 进行原子交互无锁设计,这种把 atomic 拆解更细,可以复用 inode 结构,但是不能保证整个 structure 地址是原子更新,但是这两种我都跑了帮小时的 benchmark 都没有任何并非问题。
其实这两种都有一个逻辑 bug 就是 在 日志没有持久化的时候,mvcc 版本就已经被 cas 加 1 了,其实正确的做法是将这段代码移到外部去,类似于其他编程语言中 do while 的方式,先做在 CAS 更新。不用 我这个设计成 乐观锁 机制,先操作再看版本好对不对的上,对不上就是执行失败。
// UpdateSegmentWithCAS 通过类似于 MVCC 来实现更新操作数据一致性
func (lfs *LogStructuredFS) UpdateSegmentWithCAS(key string, expected uint64, newseg *Segment) error {
inum := InodeNum(key)
imap := lfs.indexs[inum%uint64(indexShard)]
if imap == nil {
return fmt.Errorf("inode index shard for %d not found", inum)
}
// 读取 inode 信息,使用读锁来防止并发写操作
imap.mu.RLock()
inode, ok := imap.index[inum]
imap.mu.RUnlock()
if !ok {
return fmt.Errorf("inode index for %d not found", inum)
}
// MVCC: version is not modified by another thread
if atomic.CompareAndSwapUint64(&inode.mvcc, expected, expected+1) {
bytes, err := serializedSegment(newseg)
if err != nil {
return err
}
// 更新数据时使用锁
lfs.mu.Lock()
err = appendToActiveRegion(lfs.active, bytes)
lfs.mu.Unlock()
if err != nil {
return fmt.Errorf("failed to update data: %w", err)
}
// 一次性原子更新 inode 指针
atomic.StoreUint64(&inode.Position, atomic.LoadUint64(&lfs.offset))
atomic.StoreUint64(&inode.CreatedAt, newseg.CreatedAt)
atomic.StoreUint64(&inode.ExpiredAt, newseg.ExpiredAt)
atomic.StoreUint64(&inode.RegionID, lfs.regionID)
atomic.StoreUint32(&inode.Length, newseg.Size())
// 使用原子操作更新 offset
atomic.AddUint64(&lfs.offset, uint64(newseg.Size()))
// 检查并创建新的区域
if atomic.LoadUint64(&lfs.offset) >= uint64(regionThreshold) {
lfs.mu.Lock()
err := lfs.createActiveRegion()
lfs.mu.Unlock()
if err != nil {
return err
}
}
return nil
}
return errors.New("failed to update data due to version conflict")
}
// UpdateSegmentWithCAS 通过类似于 MVCC 来实现更新操作数据一致性
func (lfs *LogStructuredFS) UpdateSegmentWithCAS(key string, expected uint64, newseg *Segment) error {
inum := InodeNum(key)
imap := lfs.indexs[inum%uint64(indexShard)]
if imap == nil {
return fmt.Errorf("inode index shard for %d not found", inum)
}
// 读取 inode 信息,使用读锁来防止并发写操作
imap.mu.RLock()
inode, ok := imap.index[inum]
imap.mu.RUnlock()
if !ok {
return fmt.Errorf("inode index for %d not found", inum)
}
// MVCC: version is not modified by another thread
if atomic.CompareAndSwapUint64(&inode.mvcc, expected, expected+1) {
bytes, err := serializedSegment(newseg)
if err != nil {
return err
}
// 更新数据时使用锁
lfs.mu.Lock()
err = appendToActiveRegion(lfs.active, bytes)
lfs.mu.Unlock()
if err != nil {
return fmt.Errorf("failed to update data: %w", err)
}
newInode := &INode{
RegionID: atomic.LoadUint64(&lfs.regionID),
Position: atomic.LoadUint64(&lfs.offset),
CreatedAt: newseg.CreatedAt,
ExpiredAt: newseg.ExpiredAt,
Length: newseg.Size(),
}
// 一次性原子更新 inode 指针
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&inode)), unsafe.Pointer(newInode))
// 使用原子操作更新 offset
atomic.AddUint64(&lfs.offset, uint64(newseg.Size()))
// 检查并创建新的区域
if atomic.LoadUint64(&lfs.offset) >= uint64(regionThreshold) {
lfs.mu.Lock()
err := lfs.createActiveRegion()
lfs.mu.Unlock()
if err != nil {
return err
}
}
return nil
}
return errors.New("failed to update data due to version conflict")
}