Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ type Options struct {
AutoSync bool
// At what segment size it will rollover to a new segment. Defaults to 1MB.
Rollover int64
// Check the head segment for integrity, before opening it for reading/writing.
Check bool
// CheckAndRecover check the head segment for integrity. If it fails in write mode will recover the data.
CheckAndRecover bool
// Upgrade specifies how to upgrade the versions
Version VersionOptions
}
Expand Down
70 changes: 39 additions & 31 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import (
"github.com/klev-dev/klevdb/pkg/segment"
)

var errNoKeyIndex = fmt.Errorf("%w by key", ErrNoIndex)
var errKeyNotFound = fmt.Errorf("key %w", message.ErrNotFound)
var errNoTimeIndex = fmt.Errorf("%w by time", ErrNoIndex)
var errTimeNotFound = fmt.Errorf("time %w", message.ErrNotFound)
var errDeleteRelative = fmt.Errorf("%w: delete relative offsets", message.ErrInvalidOffset)
var (
errNoKeyIndex = fmt.Errorf("%w by key", ErrNoIndex)
errKeyNotFound = fmt.Errorf("key %w", message.ErrNotFound)
errNoTimeIndex = fmt.Errorf("%w by time", ErrNoIndex)
errTimeNotFound = fmt.Errorf("time %w", message.ErrNotFound)
errDeleteRelative = fmt.Errorf("%w: delete relative offsets", message.ErrInvalidOffset)
)

// Open opens or creates a [Log] based on a dir and set of options
func Open(dir string, opts Options) (result Log, err error) {
Expand Down Expand Up @@ -71,27 +73,38 @@ func Open(dir string, opts Options) (result Log, err error) {

segments, err := segment.Find(dir, opts.AutoSync)
if err != nil {
return nil, err
return nil, fmt.Errorf("open find segments: %w", err)
}

if len(segments) == 0 {
if opts.Readonly {
ix := newReaderIndex(nil, params.Keys, 0, true)
rdr := reopenReader(segment.New(dir, 0, opts.AutoSync), params, opts.Version.NewSegmentsVersion, ix)
l.readers = []*reader{rdr}
} else {
w, err := openWriter(segment.New(dir, 0, opts.AutoSync), params, opts.Version.NewSegmentsVersion, 0)
if err != nil {
return nil, err
switch {
case opts.Readonly && len(segments) == 0:
ix := newReaderIndex(nil, params.Keys, 0, true)
rdr := reopenReader(segment.New(dir, 0, opts.AutoSync), params, opts.Version.NewSegmentsVersion, ix)
l.readers = []*reader{rdr}
case opts.Readonly:
if opts.CheckAndRecover {
head := segments[len(segments)-1]
if err := head.Check(params); err != nil {
return nil, fmt.Errorf("open check: %w", err)
}
l.writer = w
l.readers = []*reader{w.reader}
}
} else {
if opts.Check {

for i, seg := range segments {
rdr := openReader(seg, params, opts.Version.NewSegmentsVersion, i == len(segments)-1)
l.readers = append(l.readers, rdr)
}
case len(segments) == 0:
w, err := openWriter(segment.New(dir, 0, opts.AutoSync), params, opts.Version.NewSegmentsVersion, 0)
if err != nil {
return nil, fmt.Errorf("open new writer: %w", err)
}
l.writer = w
l.readers = []*reader{w.reader}
default:
if opts.CheckAndRecover {
head := segments[len(segments)-1]
if err := head.Check(params); err != nil {
return nil, err
if err := head.Recover(params); err != nil {
return nil, fmt.Errorf("open recover: %w", err)
}
}

Expand All @@ -109,17 +122,12 @@ func Open(dir string, opts Options) (result Log, err error) {
l.readers = append(l.readers, rdr)
}

if opts.Readonly {
rdr := openReader(head, params, opts.Version.NewSegmentsVersion, true)
l.readers = append(l.readers, rdr)
} else {
wrt, err := openWriter(head, params, opts.Version.NewSegmentsVersion, 0)
if err != nil {
return nil, err
}
l.writer = wrt
l.readers = append(l.readers, wrt.reader)
wrt, err := openWriter(head, params, opts.Version.NewSegmentsVersion, 0)
if err != nil {
return nil, fmt.Errorf("open writer: %w", err)
}
l.writer = wrt
l.readers = append(l.readers, wrt.reader)
}

return l, nil
Expand Down
8 changes: 4 additions & 4 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1524,10 +1524,10 @@ func TestConcurrent(t *testing.T) {
func testConcurrentPubsubRecent(t *testing.T) {
defer os.RemoveAll("test_pubsub")
s, err := Open("test_pubsub", Options{
CreateDirs: true,
AutoSync: true,
Check: true,
Rollover: 1024 * 64,
CreateDirs: true,
AutoSync: true,
CheckAndRecover: true,
Rollover: 1024 * 64,
})
require.NoError(t, err)
defer s.Close()
Expand Down
32 changes: 23 additions & 9 deletions pkg/index/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,38 +366,52 @@ func Read(path string, offset int64, opts Params) ([]Item, error) {
return items, nil
}

func Stat(path string, offset int64, opts Params) (int64, int, error) {
func detect(path string, offset int64, opts Params) (Version, int64, error) {
f, err := os.Open(path)
if err != nil {
return -1, -1, fmt.Errorf("read index open: %w", err)
return VUnknown, -1, fmt.Errorf("read index open: %w", err)
}
defer func() { _ = f.Close() }()

stat, err := os.Stat(path)
if err != nil {
return -1, -1, fmt.Errorf("read index stat: %w", err)
return VUnknown, -1, fmt.Errorf("read index stat: %w", err)
}
dataSize := stat.Size()

if dataSize == 0 {
return dataSize, 0, nil
return V1, 0, nil
}

var h [HeaderSize]byte
if _, err := io.ReadFull(f, h[:]); err != nil {
return -1, -1, fmt.Errorf("%w: reading header: %w", ErrCorrupted, err)
return VUnknown, -1, fmt.Errorf("%w: reading header: %w", ErrCorrupted, err)
}
v, err := headerParse(h[:], offset, opts)
if err != nil {
return -1, -1, fmt.Errorf("parse index header: %w", err)
return VUnknown, -1, fmt.Errorf("parse index header: %w", err)
}

switch v {
return v, dataSize, nil
}

func GetVersion(path string, offset int64, opts Params) (Version, error) {
version, _, err := detect(path, offset, opts)
return version, err
}

func Stat(path string, offset int64, opts Params) (int64, int, error) {
version, dataSize, err := detect(path, offset, opts)
if err != nil {
return -1, -1, fmt.Errorf("index stat: %w", err)
}

switch version {
case V1:
return dataSize, int(dataSize / opts.Size()), nil
case V2:
return dataSize, int((dataSize - int64(len(h))) / opts.Size()), nil
return dataSize, int((dataSize - HeaderSize) / opts.Size()), nil
default:
return -1, -1, fmt.Errorf("%w: unknown version %d", ErrCorrupted, v.marker)
return -1, -1, fmt.Errorf("%w: unknown version %d", ErrCorrupted, version)
}
}
6 changes: 4 additions & 2 deletions pkg/index/format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ func TestInvalidHeader(t *testing.T) {
require.NoError(t, err)
require.NoError(t, f.Close())

_, err = Read(path, 0, iopts)
got, err := Read(path, 0, iopts)
require.ErrorIs(t, err, ErrCorrupted)
require.Empty(t, got)
}

func TestPartialIndexHeader(t *testing.T) {
Expand All @@ -80,8 +81,9 @@ func TestPartialIndexHeader(t *testing.T) {
require.NoError(t, err)
require.NoError(t, f.Close())

_, err = Read(path, 0, iopts)
got, err := Read(path, 0, iopts)
require.ErrorIs(t, err, ErrCorrupted)
require.Empty(t, got)
}

func TestStat(t *testing.T) {
Expand Down
24 changes: 11 additions & 13 deletions pkg/segment/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ func (s Segment) Stat(params index.Params) (Stats, error) {
}, nil
}

var errIndexSize = fmt.Errorf("%w: incorrect size", index.ErrCorrupted)
var errIndexItem = fmt.Errorf("%w: incorrect item", index.ErrCorrupted)

func (s Segment) Check(params index.Params) error {
log, err := message.OpenReader(s.Log, s.Offset)
if err != nil {
Expand All @@ -79,7 +76,7 @@ func (s Segment) Check(params index.Params) error {

var position = log.InitialPosition()
var indexTime int64
var logIndex []index.Item
var checkIndex []index.Item
for {
msg, nextPosition, err := log.Read(position)
if errors.Is(err, io.EOF) {
Expand All @@ -89,7 +86,7 @@ func (s Segment) Check(params index.Params) error {
}

item := params.NewItem(msg, position, indexTime)
logIndex = append(logIndex, item)
checkIndex = append(checkIndex, item)

position = nextPosition
indexTime = item.Timestamp
Expand All @@ -100,14 +97,8 @@ func (s Segment) Check(params index.Params) error {
return nil
case err != nil:
return err
case len(logIndex) != len(items):
return errIndexSize
default:
for i, item := range logIndex {
if item != items[i] {
return errIndexItem
}
}
case !slices.Equal(checkIndex, items):
return index.ErrCorrupted
}

return nil
Expand Down Expand Up @@ -170,6 +161,7 @@ func (s Segment) Recover(params index.Params) error {
}

var corruptedIndex = false
var indexVersion = index.VUnknown
switch items, err := index.Read(s.Index, s.Offset, params); {
case errors.Is(err, os.ErrNotExist):
return nil
Expand All @@ -178,13 +170,19 @@ func (s Segment) Recover(params index.Params) error {
case err != nil:
return err
case !slices.Equal(items, restoreIndex):
indexVersion, _ = index.GetVersion(s.Index, s.Offset, params)
corruptedIndex = true
}

if corruptedIndex {
if err := os.Remove(s.Index); err != nil {
return fmt.Errorf("restore index delete: %w", err)
}
if indexVersion != index.VUnknown {
if err := index.Write(s.Index, s.Offset, indexVersion, params, restoreIndex); err != nil {
return fmt.Errorf("restore index write: %w", err)
}
}
}

if err := s.syncDir(); err != nil {
Expand Down
Loading