第六节 db.Batch()实现分析

现在对Batch()方法稍作分析,在DB定义的那一节中我们可以看到,一个DB对象拥有一个batch对象,该对象是全局的。当我们使用Batch()方法时,内部会对将传递进去的fn缓存在calls中。

其内部也是调用了Update,只不过是在Update内部遍历之前缓存的calls。

有两种情况会触发调用Update。

  1. 第一种情况是到达了MaxBatchDelay时间,就会触发Update
  2. 第二种情况是len(db.batch.calls) >= db.MaxBatchSize,即缓存的calls个数大于等于MaxBatchSize时,也会触发Update。

Batch的本质是: 将每次写、每次刷盘的操作转变成了多次写、一次刷盘,从而提升性能。

// Batch calls fn as part of a batch. It behaves similar to Update,
// except:
//
// 1. concurrent Batch calls can be combined into a single Bolt
// transaction.
//
// 2. the function passed to Batch may be called multiple times,
// regardless of whether it returns error or not.
//
// This means that Batch function side effects must be idempotent and
// take permanent effect only after a successful return is seen in
// caller.
// 幂等
// The maximum batch size and delay can be adjusted with DB.MaxBatchSize
// and DB.MaxBatchDelay, respectively.
//
// Batch is only useful when there are multiple goroutines calling it.
func (db *DB) Batch(fn func(*Tx) error) error {
    errCh := make(chan error, 1)

    db.batchMu.Lock()
    if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
        // There is no existing batch, or the existing batch is full; start a new one.
        db.batch = &batch{
            db: db,
        }
        db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
    }
    db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
    if len(db.batch.calls) >= db.MaxBatchSize {
        // wake up batch, it's ready to run
        go db.batch.trigger()
    }
    db.batchMu.Unlock()

    err := <-errCh
    if err == trySolo {
        err = db.Update(fn)
    }
    return err
}

type call struct {
    fn  func(*Tx) error
    err chan<- error
}

type batch struct {
    db    *DB
    timer *time.Timer
    start sync.Once
    calls []call
}

// trigger runs the batch if it hasn't already been run.
func (b *batch) trigger() {
    b.start.Do(b.run)
}

// run performs the transactions in the batch and communicates results
// back to DB.Batch.
func (b *batch) run() {
    b.db.batchMu.Lock()
    b.timer.Stop()
    // Make sure no new work is added to this batch, but don't break
    // other batches.
    if b.db.batch == b {
        b.db.batch = nil
    }
    b.db.batchMu.Unlock()

retry:
    // 内部多次调用Update,最后一次Commit刷盘,提升性能
    for len(b.calls) > 0 {
        var failIdx = -1
        err := b.db.Update(func(tx *Tx) error {
            遍历calls中的函数c,多次调用,最后一次提交刷盘
            for i, c := range b.calls {
                // safelyCall里面捕获了panic
                if err := safelyCall(c.fn, tx); err != nil {
                    failIdx = i
                    //只要又失败,事务就不提交
                    return err
                }
            }
            return nil
        })

        if failIdx >= 0 {
            // take the failing transaction out of the batch. it's
            // safe to shorten b.calls here because db.batch no longer
            // points to us, and we hold the mutex anyway.
            c := b.calls[failIdx]
            //这儿只是把失败的事务给踢出去了,然后其他的事务会重新执行
            b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
            // tell the submitter re-run it solo, continue with the rest of the batch
            c.err <- trySolo
            continue retry
        }

        // pass success, or bolt internal errors, to all callers
        for _, c := range b.calls {
            c.err <- err
        }
        break retry
    }
}

// trySolo is a special sentinel error value used for signaling that a
// transaction function should be re-run. It should never be seen by
// callers.
var trySolo = errors.New("batch function returned an error and should be re-run solo")

type panicked struct {
    reason interface{}
}

func (p panicked) Error() string {
    if err, ok := p.reason.(error); ok {
        return err.Error()
    }
    return fmt.Sprintf("panic: %v", p.reason)
}

func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
    defer func() {
        if p := recover(); p != nil {
            err = panicked{p}
        }
    }()
    return fn(tx)
}

results matching ""

    No results matching ""