第四节 Bucket的相关操作

前面我们分析完了如何遍历、查找一个Bucket之后,下面我们来看看如何创建、获取、删除一个Bucket对象。

3.4.1 创建一个Bucket

1. CreateBucketIfNotExists()、CreateBucket()分析

根据指定的key来创建一个Bucket,如果指定key的Bucket已经存在,则会报错。如果指定的key之前有插入过元素,也会报错。否则的话,会在当前的Bucket中找到合适的位置,然后新建一个Bucket插入进去,最后返回给客户端。


// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
// Returns an error if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) {
    child, err := b.CreateBucket(key)
    if err == ErrBucketExists {
        return b.Bucket(key), nil
    } else if err != nil {
        return nil, err
    }
    return child, nil
}

// CreateBucket creates a new bucket at the given key and returns the new bucket.
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
    if b.tx.db == nil {
        return nil, ErrTxClosed
    } else if !b.tx.writable {
        return nil, ErrTxNotWritable
    } else if len(key) == 0 {
        return nil, ErrBucketNameRequired
    }

    // Move cursor to correct position.
    // 拿到游标
    c := b.Cursor()
    // 开始遍历、找到合适的位置
    k, _, flags := c.seek(key)

    // Return an error if there is an existing key.
    if bytes.Equal(key, k) {
        // 是桶,已经存在了
        if (flags & bucketLeafFlag) != 0 {
            return nil, ErrBucketExists
        }
        // 不是桶、但key已经存在了
        return nil, ErrIncompatibleValue
    }

    // Create empty, inline bucket.
    var bucket = Bucket{
        bucket:      &bucket{},
        rootNode:    &node{isLeaf: true},
        FillPercent: DefaultFillPercent,
    }
    // 拿到bucket对应的value
    var value = bucket.write()

    // Insert into node.
    key = cloneBytes(key)
    // 插入到inode中
    // c.node()方法会在内存中建立这棵树,调用n.read(page)
    c.node().put(key, key, value, 0, bucketLeafFlag)

    // Since subbuckets are not allowed on inline buckets, we need to
    // dereference the inline page, if it exists. This will cause the bucket
    // to be treated as a regular, non-inline bucket for the rest of the tx.
    b.page = nil

    //根据key获取一个桶
    return b.Bucket(key), nil
}

// write allocates and writes a bucket to a byte slice.
// 内联桶的话,其value中bucketHeaderSize后面的内容为其page的数据
func (b *Bucket) write() []byte {
    // Allocate the appropriate size.
    var n = b.rootNode
    var value = make([]byte, bucketHeaderSize+n.size())

    // Write a bucket header.
    var bucket = (*bucket)(unsafe.Pointer(&value[0]))
    *bucket = *b.bucket

    // Convert byte slice to a fake page and write the root node.
    var p = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
    // 将该桶中的元素压缩存储,放在value中
    n.write(p)

    return value
}


// node returns the node that the cursor is currently positioned on.
func (c *Cursor) node() *node {
    _assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack")

    // If the top of the stack is a leaf node then just return it.
    if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {
        return ref.node
    }

    // Start from root and traverse down the hierarchy.
    var n = c.stack[0].node
    if n == nil {
        n = c.bucket.node(c.stack[0].page.id, nil)
    }
    // 非叶子节点
    for _, ref := range c.stack[:len(c.stack)-1] {
        _assert(!n.isLeaf, "expected branch node")
        n = n.childAt(int(ref.index))
    }
    _assert(n.isLeaf, "expected leaf node")
    return n
}

// put inserts a key/value.
// 如果put的是一个key、value的话,不需要指定pgid。
// 如果put的一个树枝节点,则需要指定pgid,不需要指定value
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
    if pgid >= n.bucket.tx.meta.pgid {
        panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", pgid, n.bucket.tx.meta.pgid))
    } else if len(oldKey) <= 0 {
        panic("put: zero-length old key")
    } else if len(newKey) <= 0 {
        panic("put: zero-length new key")
    }

    // Find insertion index.
    index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 })

    // Add capacity and shift nodes if we don't have an exact match and need to insert.
    exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey))
    if !exact {
        n.inodes = append(n.inodes, inode{})
        copy(n.inodes[index+1:], n.inodes[index:])
    }

    inode := &n.inodes[index]
    inode.flags = flags
    inode.key = newKey
    inode.value = value
    inode.pgid = pgid
    _assert(len(inode.key) > 0, "put: zero-length inode key")
}

3.4.2 获取一个Bucket

根据指定的key来获取一个Bucket。如果找不到则返回nil。

// Bucket retrieves a nested bucket by name.
// Returns nil if the bucket does not exist.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) Bucket(name []byte) *Bucket {
    if b.buckets != nil {
        if child := b.buckets[string(name)]; child != nil {
            return child
        }
    }

    // Move cursor to key.
    // 根据游标找key
    c := b.Cursor()
    k, v, flags := c.seek(name)

    // Return nil if the key doesn't exist or it is not a bucket.
    if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {
        return nil
    }

    // Otherwise create a bucket and cache it.
    // 根据找到的value来打开桶。
    var child = b.openBucket(v)
    // 加速缓存的作用
    if b.buckets != nil {
        b.buckets[string(name)] = child
    }

    return child
}

// Helper method that re-interprets a sub-bucket value
// from a parent into a Bucket
func (b *Bucket) openBucket(value []byte) *Bucket {
    var child = newBucket(b.tx)

    // If unaligned load/stores are broken on this arch and value is
    // unaligned simply clone to an aligned byte array.
    unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0

    if unaligned {
        value = cloneBytes(value)
    }

    // If this is a writable transaction then we need to copy the bucket entry.
    // Read-only transactions can point directly at the mmap entry.
    if b.tx.writable && !unaligned {
        child.bucket = &bucket{}
        *child.bucket = *(*bucket)(unsafe.Pointer(&value[0]))
    } else {
        child.bucket = (*bucket)(unsafe.Pointer(&value[0]))
    }

    // Save a reference to the inline page if the bucket is inline.
    // 内联桶
    if child.root == 0 {
        child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
    }

    return &child
}

3.4.3 删除一个Bucket

DeleteBucket()方法用来删除一个指定key的Bucket。其内部实现逻辑是先递归的删除其子桶。然后再释放该Bucket的page,并最终从叶子节点中移除

// DeleteBucket deletes a bucket at the given key.
// Returns an error if the bucket does not exists, or if the key represents a non-bucket value.
func (b *Bucket) DeleteBucket(key []byte) error {
    if b.tx.db == nil {
        return ErrTxClosed
    } else if !b.Writable() {
        return ErrTxNotWritable
    }

    // Move cursor to correct position.
    c := b.Cursor()
    k, _, flags := c.seek(key)

    // Return an error if bucket doesn't exist or is not a bucket.
    if !bytes.Equal(key, k) {
        return ErrBucketNotFound
    } else if (flags & bucketLeafFlag) == 0 {
        return ErrIncompatibleValue
    }

    // Recursively delete all child buckets.
    child := b.Bucket(key)
    // 将该桶下面的所有桶都删除
    err := child.ForEach(func(k, v []byte) error {
        if v == nil {
            if err := child.DeleteBucket(k); err != nil {
                return fmt.Errorf("delete bucket: %s", err)
            }
        }
        return nil
    })
    if err != nil {
        return err
    }

    // Remove cached copy.
    delete(b.buckets, string(key))

    // Release all bucket pages to freelist.
    child.nodes = nil
    child.rootNode = nil
    child.free()

    // Delete the node if we have a matching key.
    c.node().del(key)

    return nil
}

// del removes a key from the node.
func (n *node) del(key []byte) {
    // Find index of key.
    index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 })

    // Exit if the key isn't found.
    if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {
        return
    }

    // Delete inode from the node.
    n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)

    // Mark the node as needing rebalancing.
    n.unbalanced = true
}

// free recursively frees all pages in the bucket.
func (b *Bucket) free() {
    if b.root == 0 {
        return
    }

    var tx = b.tx
    b.forEachPageNode(func(p *page, n *node, _ int) {
        if p != nil {
            tx.db.freelist.free(tx.meta.txid, p)
        } else {
            n.free()
        }
    })
    b.root = 0
}

results matching ""

    No results matching ""