目前这种方案 在高并发的情况下回出现不断创建新的 inode 对象,给 GC 造成压力,但是可以保证 inode 进行原子交互无锁设计,这种把 atomic 拆解更细,可以复用 inode 结构,但是不能保证整个 structure 地址是原子更新,但是这两种我都跑了帮小时的 benchmark 都没有任何并非问题。

其实这两种都有一个逻辑 bug 就是 在 日志没有持久化的时候,mvcc 版本就已经被 cas 加 1 了,其实正确的做法是将这段代码移到外部去,类似于其他编程语言中 do while 的方式,先做在 CAS 更新。不用 我这个设计成 乐观锁 机制,先操作再看版本好对不对的上,对不上就是执行失败。

// UpdateSegmentWithCAS 通过类似于 MVCC 来实现更新操作数据一致性
func (lfs *LogStructuredFS) UpdateSegmentWithCAS(key string, expected uint64, newseg *Segment) error {
    inum := InodeNum(key)
    imap := lfs.indexs[inum%uint64(indexShard)]
    if imap == nil {
        return fmt.Errorf("inode index shard for %d not found", inum)
    }

    // 读取 inode 信息,使用读锁来防止并发写操作
    imap.mu.RLock()
    inode, ok := imap.index[inum]
    imap.mu.RUnlock()
    if !ok {
        return fmt.Errorf("inode index for %d not found", inum)
    }

    // MVCC: version is not modified by another thread
    if atomic.CompareAndSwapUint64(&inode.mvcc, expected, expected+1) {
        bytes, err := serializedSegment(newseg)
        if err != nil {
            return err
        }

        // 更新数据时使用锁
        lfs.mu.Lock()
        err = appendToActiveRegion(lfs.active, bytes)
        lfs.mu.Unlock()
        if err != nil {
            return fmt.Errorf("failed to update data: %w", err)
        }

        // 一次性原子更新 inode 指针
        atomic.StoreUint64(&inode.Position, atomic.LoadUint64(&lfs.offset))
        atomic.StoreUint64(&inode.CreatedAt, newseg.CreatedAt)
        atomic.StoreUint64(&inode.ExpiredAt, newseg.ExpiredAt)
        atomic.StoreUint64(&inode.RegionID, lfs.regionID)
        atomic.StoreUint32(&inode.Length, newseg.Size())

        // 使用原子操作更新 offset
        atomic.AddUint64(&lfs.offset, uint64(newseg.Size()))

        // 检查并创建新的区域
        if atomic.LoadUint64(&lfs.offset) >= uint64(regionThreshold) {
            lfs.mu.Lock()
            err := lfs.createActiveRegion()
            lfs.mu.Unlock()
            if err != nil {
                return err
            }
        }

        return nil
    }

    return errors.New("failed to update data due to version conflict")
}
// UpdateSegmentWithCAS 通过类似于 MVCC 来实现更新操作数据一致性
func (lfs *LogStructuredFS) UpdateSegmentWithCAS(key string, expected uint64, newseg *Segment) error {
    inum := InodeNum(key)
    imap := lfs.indexs[inum%uint64(indexShard)]
    if imap == nil {
        return fmt.Errorf("inode index shard for %d not found", inum)
    }

    // 读取 inode 信息,使用读锁来防止并发写操作
    imap.mu.RLock()
    inode, ok := imap.index[inum]
    imap.mu.RUnlock()
    if !ok {
        return fmt.Errorf("inode index for %d not found", inum)
    }

    // MVCC: version is not modified by another thread
    if atomic.CompareAndSwapUint64(&inode.mvcc, expected, expected+1) {
        bytes, err := serializedSegment(newseg)
        if err != nil {
            return err
        }
        // 更新数据时使用锁
        lfs.mu.Lock()
        err = appendToActiveRegion(lfs.active, bytes)
        lfs.mu.Unlock()
        if err != nil {
            return fmt.Errorf("failed to update data: %w", err)
        }

        newInode := &INode{
            RegionID:  atomic.LoadUint64(&lfs.regionID),
            Position:  atomic.LoadUint64(&lfs.offset),
            CreatedAt: newseg.CreatedAt,
            ExpiredAt: newseg.ExpiredAt,
            Length:    newseg.Size(),
        }

        // 一次性原子更新 inode 指针
        atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&inode)), unsafe.Pointer(newInode))

        // 使用原子操作更新 offset
        atomic.AddUint64(&lfs.offset, uint64(newseg.Size()))

        // 检查并创建新的区域
        if atomic.LoadUint64(&lfs.offset) >= uint64(regionThreshold) {
            lfs.mu.Lock()
            err := lfs.createActiveRegion()
            lfs.mu.Unlock()
            if err != nil {
                return err
            }
        }

        return nil
    }

    return errors.New("failed to update data due to version conflict")
}
便宜 VPS vultr
最后修改:2025 年 03 月 05 日
如果觉得我的文章对你有用,请随意赞赏 🌹 谢谢 !