|
- // Copyright 2019 Lunny Xiao. All rights reserved.
- // Use of this source code is governed by a MIT-style
- // license that can be found in the LICENSE file.
-
- package levelqueue
-
- import (
- "bytes"
- "encoding/binary"
- "sync"
-
- "github.com/syndtr/goleveldb/leveldb"
- "github.com/syndtr/goleveldb/leveldb/errors"
- )
-
- const (
- lowKeyStr = "low"
- highKeyStr = "high"
- )
-
- // Queue defines a queue struct
- type Queue struct {
- db *leveldb.DB
- highLock sync.Mutex
- lowLock sync.Mutex
- low int64
- high int64
- lowKey []byte
- highKey []byte
- prefix []byte
- closeUnderlyingDB bool
- }
-
- // Open opens a queue from the db path or creates a
- // queue if it doesn't exist.
- // The keys will not be prefixed by default
- func Open(dataDir string) (*Queue, error) {
- db, err := leveldb.OpenFile(dataDir, nil)
- if err != nil {
- if !errors.IsCorrupted(err) {
- return nil, err
- }
- db, err = leveldb.RecoverFile(dataDir, nil)
- if err != nil {
- return nil, err
- }
- }
- return NewQueue(db, []byte{}, true)
- }
-
- // NewQueue creates a queue from a db. The keys will be prefixed with prefix
- // and at close the db will be closed as per closeUnderlyingDB
- func NewQueue(db *leveldb.DB, prefix []byte, closeUnderlyingDB bool) (*Queue, error) {
- var err error
-
- var queue = &Queue{
- db: db,
- closeUnderlyingDB: closeUnderlyingDB,
- }
-
- queue.prefix = make([]byte, len(prefix))
- copy(queue.prefix, prefix)
- queue.lowKey = withPrefix(prefix, []byte(lowKeyStr))
- queue.highKey = withPrefix(prefix, []byte(highKeyStr))
-
- queue.low, err = queue.readID(queue.lowKey)
- if err == leveldb.ErrNotFound {
- queue.low = 1
- err = db.Put(queue.lowKey, id2bytes(1), nil)
- }
- if err != nil {
- return nil, err
- }
-
- queue.high, err = queue.readID(queue.highKey)
- if err == leveldb.ErrNotFound {
- err = db.Put(queue.highKey, id2bytes(0), nil)
- }
- if err != nil {
- return nil, err
- }
-
- return queue, nil
- }
-
- func (queue *Queue) readID(key []byte) (int64, error) {
- bs, err := queue.db.Get(key, nil)
- if err != nil {
- return 0, err
- }
- return bytes2id(bs)
- }
-
- func (queue *Queue) highincrement() (int64, error) {
- id := queue.high + 1
- queue.high = id
- err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
- if err != nil {
- queue.high = queue.high - 1
- return 0, err
- }
- return id, nil
- }
-
- func (queue *Queue) highdecrement() (int64, error) {
- queue.high = queue.high - 1
- err := queue.db.Put(queue.highKey, id2bytes(queue.high), nil)
- if err != nil {
- queue.high = queue.high + 1
- return 0, err
- }
- return queue.high, nil
- }
-
- func (queue *Queue) lowincrement() (int64, error) {
- queue.low = queue.low + 1
- err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
- if err != nil {
- queue.low = queue.low - 1
- return 0, err
- }
- return queue.low, nil
- }
-
- func (queue *Queue) lowdecrement() (int64, error) {
- queue.low = queue.low - 1
- err := queue.db.Put(queue.lowKey, id2bytes(queue.low), nil)
- if err != nil {
- queue.low = queue.low + 1
- return 0, err
- }
- return queue.low, nil
- }
-
- // Len returns the length of the queue
- func (queue *Queue) Len() int64 {
- queue.lowLock.Lock()
- queue.highLock.Lock()
- l := queue.high - queue.low + 1
- queue.highLock.Unlock()
- queue.lowLock.Unlock()
- return l
- }
-
- func id2bytes(id int64) []byte {
- var buf = make([]byte, 8)
- binary.PutVarint(buf, id)
- return buf
- }
-
- func bytes2id(b []byte) (int64, error) {
- return binary.ReadVarint(bytes.NewReader(b))
- }
-
- func withPrefix(prefix []byte, value []byte) []byte {
- if len(prefix) == 0 {
- return value
- }
- prefixed := make([]byte, len(prefix)+1+len(value))
- copy(prefixed[0:len(prefix)], prefix)
- prefixed[len(prefix)] = '-'
- copy(prefixed[len(prefix)+1:], value)
- return prefixed
- }
-
- // RPush pushes a data from right of queue
- func (queue *Queue) RPush(data []byte) error {
- queue.highLock.Lock()
- id, err := queue.highincrement()
- if err != nil {
- queue.highLock.Unlock()
- return err
- }
- err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
- queue.highLock.Unlock()
- return err
- }
-
- // LPush pushes a data from left of queue
- func (queue *Queue) LPush(data []byte) error {
- queue.lowLock.Lock()
- id, err := queue.lowdecrement()
- if err != nil {
- queue.lowLock.Unlock()
- return err
- }
- err = queue.db.Put(withPrefix(queue.prefix, id2bytes(id)), data, nil)
- queue.lowLock.Unlock()
- return err
- }
-
- // RPop pop a data from right of queue
- func (queue *Queue) RPop() ([]byte, error) {
- queue.highLock.Lock()
- defer queue.highLock.Unlock()
- currentID := queue.high
-
- res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
- if err != nil {
- if err == leveldb.ErrNotFound {
- return nil, ErrNotFound
- }
- return nil, err
- }
-
- _, err = queue.highdecrement()
- if err != nil {
- return nil, err
- }
-
- err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
- if err != nil {
- return nil, err
- }
- return res, nil
- }
-
- // RHandle receives a user callback function to handle the right element of the queue, if function return nil, then delete the element, otherwise keep the element.
- func (queue *Queue) RHandle(h func([]byte) error) error {
- queue.highLock.Lock()
- defer queue.highLock.Unlock()
- currentID := queue.high
-
- res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
- if err != nil {
- if err == leveldb.ErrNotFound {
- return ErrNotFound
- }
- return err
- }
-
- if err = h(res); err != nil {
- return err
- }
-
- _, err = queue.highdecrement()
- if err != nil {
- return err
- }
-
- return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
- }
-
- // LPop pop a data from left of queue
- func (queue *Queue) LPop() ([]byte, error) {
- queue.lowLock.Lock()
- defer queue.lowLock.Unlock()
- currentID := queue.low
-
- res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
- if err != nil {
- if err == leveldb.ErrNotFound {
- return nil, ErrNotFound
- }
- return nil, err
- }
-
- _, err = queue.lowincrement()
- if err != nil {
- return nil, err
- }
-
- err = queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
- if err != nil {
- return nil, err
- }
- return res, nil
- }
-
- // LHandle receives a user callback function to handle the left element of the queue, if function return nil, then delete the element, otherwise keep the element.
- func (queue *Queue) LHandle(h func([]byte) error) error {
- queue.lowLock.Lock()
- defer queue.lowLock.Unlock()
- currentID := queue.low
-
- res, err := queue.db.Get(withPrefix(queue.prefix, id2bytes(currentID)), nil)
- if err != nil {
- if err == leveldb.ErrNotFound {
- return ErrNotFound
- }
- return err
- }
-
- if err = h(res); err != nil {
- return err
- }
-
- _, err = queue.lowincrement()
- if err != nil {
- return err
- }
-
- return queue.db.Delete(withPrefix(queue.prefix, id2bytes(currentID)), nil)
- }
-
- // Close closes the queue (and the underlying db is set to closeUnderlyingDB)
- func (queue *Queue) Close() error {
- if !queue.closeUnderlyingDB {
- queue.db = nil
- return nil
- }
- err := queue.db.Close()
- queue.db = nil
- return err
- }
|