第四节 Commit()实现

Commit()方法内部实现中,总体思路是:

  1. 先判定节点要不要合并、分裂
  2. 对空闲列表的判断,是否存在溢出的情况,溢出的话,需要重新分配空间
  3. 将事务中涉及改动的页进行排序(保证尽可能的顺序IO),排序后循环写入到磁盘中,最后再执行刷盘
  4. 当数据写入成功后,再将元信息页写到磁盘中,刷盘以保证持久化
  5. 上述操作中,但凡有失败,当前事务都会进行回滚
// Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs, or if Commit is
// called on a read-only transaction.

// 先更新数据然后再更新元信息
// 更新数据成功、元信息未来得及更新机器就挂掉了。数据如何恢复?
func (tx *Tx) Commit() error {
    _assert(!tx.managed, "managed tx commit not allowed")
    if tx.db == nil {
        return ErrTxClosed
    } else if !tx.writable {
        return ErrTxNotWritable
    }

    // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.

    // 删除时,进行平衡,页合并
    // Rebalance nodes which have had deletions.
    var startTime = time.Now()
    tx.root.rebalance()
    if tx.stats.Rebalance > 0 {
        tx.stats.RebalanceTime += time.Since(startTime)
    }

    // 页分裂
    // spill data onto dirty pages.
    startTime = time.Now()
    // 这个内部会往缓存tx.pages中加page
    if err := tx.root.spill(); err != nil {
        tx.rollback()
        return err
    }
    tx.stats.SpillTime += time.Since(startTime)

    // Free the old root bucket.
    tx.meta.root.root = tx.root.root

    opgid := tx.meta.pgid

    // Free the freelist and allocate new pages for it. This will overestimate
    // the size of the freelist but not underestimate the size (which would be bad).
    // 分配新的页面给freelist,然后将freelist写入新的页面
    tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
    // 空闲列表可能会增加,因此需要重新分配页用来存储空闲列表
    // 因为在开启写事务的时候,有去释放之前读事务占用的页信息,因此此处需要判断是否freelist会有溢出的问题
    p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
    if err != nil {
        tx.rollback()
        return err
    }
    // 将freelist写入到连续的新页中
    if err := tx.db.freelist.write(p); err != nil {
        tx.rollback()
        return err
    }
    // 更新元数据的页id
    tx.meta.freelist = p.id

    // If the high water mark has moved up then attempt to grow the database.
    // 在allocate中有可能会更改meta.pgid
    if tx.meta.pgid > opgid {
        if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
            tx.rollback()
            return err
        }
    }

    // Write dirty pages to disk.
    startTime = time.Now()
    // 写数据
    if err := tx.write(); err != nil {
        tx.rollback()
        return err
    }

    // If strict mode is enabled then perform a consistency check.
    // Only the first consistency error is reported in the panic.
    if tx.db.StrictMode {
        ch := tx.Check()
        var errs []string
        for {
            err, ok := <-ch
            if !ok {
                break
            }
            errs = append(errs, err.Error())
        }
        if len(errs) > 0 {
            panic("check fail: " + strings.Join(errs, "\n"))
        }
    }

    // Write meta to disk.
    // 元信息写入到磁盘
    if err := tx.writeMeta(); err != nil {
        tx.rollback()
        return err
    }
    tx.stats.WriteTime += time.Since(startTime)

    // Finalize the transaction.
    tx.close()

    // Execute commit handlers now that the locks have been removed.
    for _, fn := range tx.commitHandlers {
        fn()
    }

    return nil
}

// write writes any dirty pages to disk.
func (tx *Tx) write() error {
    // Sort pages by id.
    // 保证写的页是有序的
    pages := make(pages, 0, len(tx.pages))
    for _, p := range tx.pages {
        pages = append(pages, p)
    }
    // Clear out page cache early.
    tx.pages = make(map[pgid]*page)
    sort.Sort(pages)

    // Write pages to disk in order.
    for _, p := range pages {
        // 页数和偏移量
        size := (int(p.overflow) + 1) * tx.db.pageSize
        offset := int64(p.id) * int64(tx.db.pageSize)

        // Write out page in "max allocation" sized chunks.
        ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
        // 循环写某一页
        for {
            // Limit our write to our max allocation size.
            sz := size
            // 2^31=2G
            if sz > maxAllocSize-1 {
                sz = maxAllocSize - 1
            }

            // Write chunk to disk.
            buf := ptr[:sz]
            if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
                return err
            }

            // Update statistics.
            tx.stats.Write++

            // Exit inner for loop if we've written all the chunks.
            size -= sz
            if size == 0 {
                break
            }

            // Otherwise move offset forward and move pointer to next chunk.
            // 移动偏移量
            offset += int64(sz)
            // 同时指针也移动
            ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
        }
    }

    // Ignore file sync if flag is set on DB.
    if !tx.db.NoSync || IgnoreNoSync {
        if err := fdatasync(tx.db); err != nil {
            return err
        }
    }

    // Put small pages back to page pool.
    for _, p := range pages {
        // Ignore page sizes over 1 page.
        // These are allocated using make() instead of the page pool.
        if int(p.overflow) != 0 {
            continue
        }

        buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize]

        // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
        // 清空buf,然后放入pagePool中
        for i := range buf {
            buf[i] = 0
        }
        tx.db.pagePool.Put(buf)
    }

    return nil
}

// writeMeta writes the meta to the disk.
func (tx *Tx) writeMeta() error {
    // Create a temporary buffer for the meta page.
    buf := make([]byte, tx.db.pageSize)
    p := tx.db.pageInBuffer(buf, 0)
    // 将事务的元信息写入到页中
    tx.meta.write(p)

    // Write the meta page to file.
    if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
        return err
    }
    if !tx.db.NoSync || IgnoreNoSync {
        if err := fdatasync(tx.db); err != nil {
            return err
        }
    }

    // Update statistics.
    tx.stats.Write++

    return nil
}

// allocate returns a contiguous block of memory starting at a given page.
// 分配一段连续的页
func (tx *Tx) allocate(count int) (*page, error) {
    p, err := tx.db.allocate(count)
    if err != nil {
        return nil, err
    }

    // Save to our page cache.
    tx.pages[p.id] = p

    // Update statistics.
    tx.stats.PageCount++
    tx.stats.PageAlloc += count * tx.db.pageSize

    return p, nil
}

results matching ""

    No results matching ""