第二节 Bucket遍历之Cursor

本节我们先做一节内容的铺垫,暂时不讲如何创建、获取、删除一个Bucket。而是介绍一个boltdb中的新对象Cursor。

答案是:所有的上述操作都是建立在首先定位到一个Bucket所属的位置,然后才能对其进行操作。而定位一个Bucket的功能就是由Cursor来完成的。所以我们先这一节给大家介绍一下boltdb中的Cursor。

我们先看下官方文档对Cursor的描述

Cursor represents an iterator that can traverse over all key/value pairs in a bucket in sorted order.

用大白话讲,既然一个Bucket逻辑上是一颗b+树,那就意味着我们可以对其进行遍历。前面提到的set、get操作,无非是要在Bucket上先找到合适的位置,然后再进行操作。而“找”这个操作就是交由Cursor来完成的。简而言之对Bucket这颗b+树的遍历工作由Cursor来执行。一个Bucket对象关联一个Cursor。下面我们先看看Bucket和Cursor之间的关系。

// Cursor creates a cursor associated with the bucket.
// The cursor is only valid as long as the transaction is open.
// Do not use a cursor after the transaction is closed.
func (b *Bucket) Cursor() *Cursor {
    // Update transaction statistics.
    b.tx.stats.CursorCount++

    // Allocate and return a cursor.
    return &Cursor{
        bucket: b,
        stack:  make([]elemRef, 0),
    }
}

3.2.1 Cursor结构

从上面可以清楚的看到,在获取一个游标Cursor对象时,会将当前的Bucket对象传进去,同时还初始化了一个栈对象,结合数据结构中学习的树的知识。我们也就知道,它的内部就是对树进行遍历。下面我们详细介绍Cursor这个人物。

// Cursor represents an iterator that can traverse over all key/value pairs in a bucket in sorted order.
// Cursors see nested buckets with value == nil.
// Cursors can be obtained from a transaction and are valid as long as the transaction is open.
//
// Keys and values returned from the cursor are only valid for the life of the transaction.
//
// Changing data while traversing with a cursor may cause it to be invalidated
// and return unexpected keys and/or values. You must reposition your cursor
// after mutating data.
type Cursor struct {
    bucket *Bucket
    // 保存遍历搜索的路径
    stack []elemRef
}

// elemRef represents a reference to an element on a given page/node.
type elemRef struct {
    page  *page
    node  *node
    index int
}

// isLeaf returns whether the ref is pointing at a leaf page/node.
func (r *elemRef) isLeaf() bool {
    if r.node != nil {
        return r.node.isLeaf
    }
    return (r.page.flags & leafPageFlag) != 0
}

// count returns the number of inodes or page elements.
func (r *elemRef) count() int {
    if r.node != nil {
        return len(r.node.inodes)
    }
    return int(r.page.count)
}

3.2.2 Cursor对外接口

下面我们看一下Cursor对外暴露的接口有哪些。看之前也可以心里先想一下。针对一棵树我们需要哪些遍历接口呢?

主体也就是三类:定位到某一个元素的位置、在当前位置从前往后找、在当前位置从后往前找

// First moves the cursor to the first item in the bucket and returns its key and value.
// If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) First() (key []byte, value []byte)

// Last moves the cursor to the last item in the bucket and returns its key and value.
// If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Last() (key []byte, value []byte)

// Next moves the cursor to the next item in the bucket and returns its key and value.
// If the cursor is at the end of the bucket then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Next() (key []byte, value []byte)

// Prev moves the cursor to the previous item in the bucket and returns its key and value.
// If the cursor is at the beginning of the bucket then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Prev() (key []byte, value []byte)

// Delete removes the current key/value under the cursor from the bucket.
// Delete fails if current key/value is a bucket or if the transaction is not writable.
func (c *Cursor) Delete() error

// Seek moves the cursor to a given key and returns it.
// If the key does not exist then the next key is used. If no keys
// follow, a nil key is returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Seek(seek []byte) (key []byte, value []byte)

下面我们详细分析一下Seek()、First()、Last()、Next()、Prev()、Delete()这三个方法的内部实现。其余的方法我们代码就不贴出来了。大致思路可以梳理一下。

3.2.3 Seek(key)实现分析

Seek()方法内部主要调用了seek()私有方法,我们重点关注seek()这个方法的实现,该方法有三个返回值,前两个为key、value、第三个为叶子节点的类型。前面提到在boltdb中,叶子节点元素有两种类型:一种是嵌套的子桶、一种是普通的key/value。而这二者就是通过flags来区分的。如果叶子节点元素为嵌套的子桶时,返回的flags为1,也就是bucketLeafFlag取值。

// Seek moves the cursor to a given key and returns it.
// If the key does not exist then the next key is used. If no keys
// follow, a nil key is returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Seek(seek []byte) (key []byte, value []byte) {
    k, v, flags := c.seek(seek)

    // If we ended up after the last element of a page then move to the next one.
    // 下面这一段逻辑是必须的,因为在seek()方法中,如果ref.index>ref.count()的话,就直接返回nil,nil,0了
    // 这里需要返回下一个
    if ref := &c.stack[len(c.stack)-1]; ref.index >= ref.count() {
        k, v, flags = c.next()
    }

    if k == nil {
        return nil, nil
        //     子桶的话
    } else if (flags & uint32(bucketLeafFlag)) != 0 {
        return k, nil
    }
    return k, v
}

// seek moves the cursor to a given key and returns it.
// If the key does not exist then the next key is used.
func (c *Cursor) seek(seek []byte) (key []byte, value []byte, flags uint32) {
    _assert(c.bucket.tx.db != nil, "tx closed")

    // Start from root page/node and traverse to correct page.
    c.stack = c.stack[:0]
    // 开始根据seek的key值搜索root
    c.search(seek, c.bucket.root)
    // 执行完搜索后,stack中保存了所遍历的路径
    ref := &c.stack[len(c.stack)-1]

    // If the cursor is pointing to the end of page/node then return nil.
    if ref.index >= ref.count() {
        return nil, nil, 0
    }
    //获取值
    // If this is a bucket then return a nil value.
    return c.keyValue()
}



// keyValue returns the key and value of the current leaf element.
func (c *Cursor) keyValue() ([]byte, []byte, uint32) {
  //最后一个节点为叶子节点
    ref := &c.stack[len(c.stack)-1]
    if ref.count() == 0 || ref.index >= ref.count() {
        return nil, nil, 0
    }

    // Retrieve value from node.
    // 先从内存中取
    if ref.node != nil {
        inode := &ref.node.inodes[ref.index]
        return inode.key, inode.value, inode.flags
    }

    // 其次再从文件page中取
    // Or retrieve value from page.
    elem := ref.page.leafPageElement(uint16(ref.index))
    return elem.key(), elem.value(), elem.flags
}

seek()中最核心的方法就是调用search()了,search()方法中,传入的就是要搜索的key值和该桶的root节点。search()方法中,其内部是通过递归的层层往下搜索,下面我们详细了解一下search()内部的实现机制。

// 从根节点开始遍历
// search recursively performs a binary search against a given page/node until it finds a given key.
func (c *Cursor) search(key []byte, pgid pgid) {
    // root,3
    // 层层找page,bucket->tx->db->dataref
    p, n := c.bucket.pageNode(pgid)
    if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) == 0 {
        panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags))
    }
    e := elemRef{page: p, node: n}
    //记录遍历过的路径
    c.stack = append(c.stack, e)

    // If we're on a leaf page/node then find the specific node.
    // 如果是叶子节点,找具体的值node
    if e.isLeaf() {
        c.nsearch(key)
        return
    }

    if n != nil {
        // 先搜索node,因为node是加载到内存中的
        c.searchNode(key, n)
        return
    }
    // 其次再在page中搜索
    c.searchPage(key, p)
}

// pageNode returns the in-memory node, if it exists.
// Otherwise returns the underlying page.
func (b *Bucket) pageNode(id pgid) (*page, *node) {
    // Inline buckets have a fake page embedded in their value so treat them
    // differently. We'll return the rootNode (if available) or the fake page.
    // 内联页的话,就直接返回其page了
    if b.root == 0 {
        if id != 0 {
            panic(fmt.Sprintf("inline bucket non-zero page access(2): %d != 0", id))
        }
        if b.rootNode != nil {
            return nil, b.rootNode
        }
        return b.page, nil
    }

    // Check the node cache for non-inline buckets.
    if b.nodes != nil {
        if n := b.nodes[id]; n != nil {
            return nil, n
        }
    }

    // Finally lookup the page from the transaction if no node is materialized.
    return b.tx.page(id), nil
}


//node中搜索
func (c *Cursor) searchNode(key []byte, n *node) {
    var exact bool
    //二分搜索
    index := sort.Search(len(n.inodes), func(i int) bool {
        // TODO(benbjohnson): Optimize this range search. It's a bit hacky right now.
        // sort.Search() finds the lowest index where f() != -1 but we need the highest index.
        ret := bytes.Compare(n.inodes[i].key, key)
        if ret == 0 {
            exact = true
        }
        return ret != -1
    })
    if !exact && index > 0 {
        index--
    }
    c.stack[len(c.stack)-1].index = index

    // Recursively search to the next page.
    c.search(key, n.inodes[index].pgid)
}

//页中搜索
func (c *Cursor) searchPage(key []byte, p *page) {
    // Binary search for the correct range.
    inodes := p.branchPageElements()

    var exact bool
    index := sort.Search(int(p.count), func(i int) bool {
        // TODO(benbjohnson): Optimize this range search. It's a bit hacky right now.
        // sort.Search() finds the lowest index where f() != -1 but we need the highest index.
        ret := bytes.Compare(inodes[i].key(), key)
        if ret == 0 {
            exact = true
        }
        return ret != -1
    })
    if !exact && index > 0 {
        index--
    }
    c.stack[len(c.stack)-1].index = index

    // Recursively search to the next page.
    c.search(key, inodes[index].pgid)
}


// nsearch searches the leaf node on the top of the stack for a key.
// 搜索叶子页
func (c *Cursor) nsearch(key []byte) {
    e := &c.stack[len(c.stack)-1]
    p, n := e.page, e.node

    // If we have a node then search its inodes.
    // 先搜索node
    if n != nil {
        //又是二分搜索
        index := sort.Search(len(n.inodes), func(i int) bool {
            return bytes.Compare(n.inodes[i].key, key) != -1
        })
        e.index = index
        return
    }

    // If we have a page then search its leaf elements.
    // 再搜索page
    inodes := p.leafPageElements()
    index := sort.Search(int(p.count), func(i int) bool {
        return bytes.Compare(inodes[i].key(), key) != -1
    })
    e.index = index
}

到这儿我们就已经看完所有的seek()查找一个key的过程了,其内部也很简单,就是从根节点开始,通过不断递归遍历每层节点,采用二分法来定位到具体的叶子节点。到达叶子节点时,其叶子节点内部存储的数据也是有序的,因此继续按照二分查找来找到最终的下标。

值得需要注意点:

在遍历时,我们都知道,有可能遍历到的当前分支节点数据并没有在内存中,此时就需要从page中加载数据遍历。所以在遍历过程中,优先在node中找,如果node为空的时候才会采用page来查找。

3.2.4 First()、Last()实现分析

前面看了定位到具体某个key的一个过程,现在我们看一下,在定位到第一个元素时,我们知道它一定是位于最左侧的第一个叶子节点的第一个元素。同理,在定位到最后一个元素时,它一定是位于最右侧的第一个叶子节点的最后一个元素。下面是其内部的实现逻辑:

First()实现

// First moves the cursor to the first item in the bucket and returns its key and value.
// If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) First() (key []byte, value []byte) {
    _assert(c.bucket.tx.db != nil, "tx closed")
    // 清空stack
    c.stack = c.stack[:0]
    p, n := c.bucket.pageNode(c.bucket.root)

    // 一直找到第一个叶子节点,此处在天添加stack时,一直让index设置为0即可
    ref := elemRef{page: p, node: n, index: 0}
    c.stack = append(c.stack, ref)


    c.first()

    // If we land on an empty page then move to the next value.
    // https://github.com/boltdb/bolt/issues/450
    // 当前页时空的话,找下一个
    if c.stack[len(c.stack)-1].count() == 0 {
        c.next()
    }

    k, v, flags := c.keyValue()
    // 是桶
    if (flags & uint32(bucketLeafFlag)) != 0 {
        return k, nil
    }
    return k, v

}

// first moves the cursor to the first leaf element under the last page in the stack.
// 找到最后一个非叶子节点的第一个叶子节点。index=0的节点
func (c *Cursor) first() {
    for {
        // Exit when we hit a leaf page.
        var ref = &c.stack[len(c.stack)-1]
        if ref.isLeaf() {
            break
        }

        // Keep adding pages pointing to the first element to the stack.
        var pgid pgid
        if ref.node != nil {
            pgid = ref.node.inodes[ref.index].pgid
        } else {
            pgid = ref.page.branchPageElement(uint16(ref.index)).pgid
        }
        p, n := c.bucket.pageNode(pgid)
        c.stack = append(c.stack, elemRef{page: p, node: n, index: 0})
    }
}

Last()实现

// Last moves the cursor to the last item in the bucket and returns its key and value.
// If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Last() (key []byte, value []byte) {
    _assert(c.bucket.tx.db != nil, "tx closed")

    c.stack = c.stack[:0]
    p, n := c.bucket.pageNode(c.bucket.root)

    ref := elemRef{page: p, node: n}
    // 设置其index为当前页元素的最后一个
    ref.index = ref.count() - 1
    c.stack = append(c.stack, ref)

    c.last()

    k, v, flags := c.keyValue()
    if (flags & uint32(bucketLeafFlag)) != 0 {
        return k, nil
    }
    return k, v
}

// last moves the cursor to the last leaf element under the last page in the stack.
// 移动到栈中最后一个节点的最后一个叶子节点
func (c *Cursor) last() {
    for {
        // Exit when we hit a leaf page.
        ref := &c.stack[len(c.stack)-1]
        if ref.isLeaf() {
            break
        }

        // Keep adding pages pointing to the last element in the stack.
        var pgid pgid
        if ref.node != nil {
            pgid = ref.node.inodes[ref.index].pgid
        } else {
            pgid = ref.page.branchPageElement(uint16(ref.index)).pgid
        }
        p, n := c.bucket.pageNode(pgid)

        var nextRef = elemRef{page: p, node: n}
        nextRef.index = nextRef.count() - 1
        c.stack = append(c.stack, nextRef)
    }
}

3.2.5 Next()、Prev()实现分析

再此我们从当前位置查找前一个或者下一个时,需要注意一个问题,如果当前节点中元素已经完了,那么此时需要回退到遍历路径的上一个节点。然后再继续查找,下面进行代码分析。

Next()分析

// Next moves the cursor to the next item in the bucket and returns its key and value.
// If the cursor is at the end of the bucket then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Next() (key []byte, value []byte) {
    _assert(c.bucket.tx.db != nil, "tx closed")
    k, v, flags := c.next()
    if (flags & uint32(bucketLeafFlag)) != 0 {
        return k, nil
    }
    return k, v
}

// next moves to the next leaf element and returns the key and value.
// If the cursor is at the last leaf element then it stays there and returns nil.
func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
    for {
        // Attempt to move over one element until we're successful.
        // Move up the stack as we hit the end of each page in our stack.
        var i int
        for i = len(c.stack) - 1; i >= 0; i-- {
            elem := &c.stack[i]
            if elem.index < elem.count()-1 {
                // 元素还有时,往后移动一个
                elem.index++
                break
            }
        }
        // 最后的结果elem.index++

        // If we've hit the root page then stop and return. This will leave the
        // cursor on the last element of the last page.
        // 所有页都遍历完了
        if i == -1 {
            return nil, nil, 0
        }

        // Otherwise start from where we left off in the stack and find the
        // first element of the first leaf page.
        // 剩余的节点里面找,跳过原先遍历过的节点
        c.stack = c.stack[:i+1]
        // 如果是叶子节点,first()啥都不做,直接退出。返回elem.index+1的数据
        // 非叶子节点的话,需要移动到stack中最后一个路径的第一个元素
        c.first()

        // If this is an empty page then restart and move back up the stack.
        // https://github.com/boltdb/bolt/issues/450
        if c.stack[len(c.stack)-1].count() == 0 {
            continue
        }

        return c.keyValue()
    }
}

Prev()实现

// Prev moves the cursor to the previous item in the bucket and returns its key and value.
// If the cursor is at the beginning of the bucket then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Prev() (key []byte, value []byte) {
    _assert(c.bucket.tx.db != nil, "tx closed")

    // Attempt to move back one element until we're successful.
    // Move up the stack as we hit the beginning of each page in our stack.
    for i := len(c.stack) - 1; i >= 0; i-- {
        elem := &c.stack[i]
        if elem.index > 0 {
            // 往前移动一格
            elem.index--
            break
        }
        c.stack = c.stack[:i]
    }

    // If we've hit the end then return nil.
    if len(c.stack) == 0 {
        return nil, nil
    }

    // Move down the stack to find the last element of the last leaf under this branch.
    // 如果当前节点是叶子节点的话,则直接退出了,啥都不做。否则的话移动到新页的最后一个节点
    c.last()
    k, v, flags := c.keyValue()
    if (flags & uint32(bucketLeafFlag)) != 0 {
        return k, nil
    }
    return k, v
}

3.2.6 Delete()方法分析

Delete()方法中,移动当前位置的元素

// Delete removes the current key/value under the cursor from the bucket.
// Delete fails if current key/value is a bucket or if the transaction is not writable.
func (c *Cursor) Delete() error {
    if c.bucket.tx.db == nil {
        return ErrTxClosed
    } else if !c.bucket.Writable() {
        return ErrTxNotWritable
    }

    key, _, flags := c.keyValue()
    // Return an error if current value is a bucket.
    if (flags & bucketLeafFlag) != 0 {
        return ErrIncompatibleValue
    }
    // 从node中移除,本质上将inode数组进行移动
    c.node().del(key)

    return nil
}

// del removes a key from the node.
func (n *node) del(key []byte) {
    // Find index of key.
    index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 })

    // Exit if the key isn't found.
    if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {
        return
    }

    // Delete inode from the node.
    n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)

    // Mark the node as needing rebalancing.
    n.unbalanced = true
}

results matching ""

    No results matching ""