From 8cbd41d42f0e94cd8fed08c5596188fdfe6fc4a4 Mon Sep 17 00:00:00 2001 From: Nikolay Petrov Date: Tue, 21 Apr 2026 23:21:05 -0400 Subject: [PATCH] recover head segment on open log --- api.go | 4 +-- log.go | 70 ++++++++++++++++++++++------------------ log_test.go | 8 ++--- pkg/index/format.go | 32 ++++++++++++------ pkg/index/format_test.go | 6 ++-- pkg/segment/segment.go | 24 +++++++------- 6 files changed, 83 insertions(+), 61 deletions(-) diff --git a/api.go b/api.go index 330a2f1..9617297 100644 --- a/api.go +++ b/api.go @@ -55,8 +55,8 @@ type Options struct { AutoSync bool // At what segment size it will rollover to a new segment. Defaults to 1MB. Rollover int64 - // Check the head segment for integrity, before opening it for reading/writing. - Check bool + // CheckAndRecover check the head segment for integrity. If it fails in write mode will recover the data. + CheckAndRecover bool // Upgrade specifies how to upgrade the versions Version VersionOptions } diff --git a/log.go b/log.go index 345c26d..fca3d61 100644 --- a/log.go +++ b/log.go @@ -15,11 +15,13 @@ import ( "github.com/klev-dev/klevdb/pkg/segment" ) -var errNoKeyIndex = fmt.Errorf("%w by key", ErrNoIndex) -var errKeyNotFound = fmt.Errorf("key %w", message.ErrNotFound) -var errNoTimeIndex = fmt.Errorf("%w by time", ErrNoIndex) -var errTimeNotFound = fmt.Errorf("time %w", message.ErrNotFound) -var errDeleteRelative = fmt.Errorf("%w: delete relative offsets", message.ErrInvalidOffset) +var ( + errNoKeyIndex = fmt.Errorf("%w by key", ErrNoIndex) + errKeyNotFound = fmt.Errorf("key %w", message.ErrNotFound) + errNoTimeIndex = fmt.Errorf("%w by time", ErrNoIndex) + errTimeNotFound = fmt.Errorf("time %w", message.ErrNotFound) + errDeleteRelative = fmt.Errorf("%w: delete relative offsets", message.ErrInvalidOffset) +) // Open opens or creates a [Log] based on a dir and set of options func Open(dir string, opts Options) (result Log, err error) { @@ -71,27 +73,38 @@ func Open(dir string, opts Options) (result Log, err error) { segments, err := segment.Find(dir, opts.AutoSync) if err != nil { - return nil, err + return nil, fmt.Errorf("open find segments: %w", err) } - if len(segments) == 0 { - if opts.Readonly { - ix := newReaderIndex(nil, params.Keys, 0, true) - rdr := reopenReader(segment.New(dir, 0, opts.AutoSync), params, opts.Version.NewSegmentsVersion, ix) - l.readers = []*reader{rdr} - } else { - w, err := openWriter(segment.New(dir, 0, opts.AutoSync), params, opts.Version.NewSegmentsVersion, 0) - if err != nil { - return nil, err + switch { + case opts.Readonly && len(segments) == 0: + ix := newReaderIndex(nil, params.Keys, 0, true) + rdr := reopenReader(segment.New(dir, 0, opts.AutoSync), params, opts.Version.NewSegmentsVersion, ix) + l.readers = []*reader{rdr} + case opts.Readonly: + if opts.CheckAndRecover { + head := segments[len(segments)-1] + if err := head.Check(params); err != nil { + return nil, fmt.Errorf("open check: %w", err) } - l.writer = w - l.readers = []*reader{w.reader} } - } else { - if opts.Check { + + for i, seg := range segments { + rdr := openReader(seg, params, opts.Version.NewSegmentsVersion, i == len(segments)-1) + l.readers = append(l.readers, rdr) + } + case len(segments) == 0: + w, err := openWriter(segment.New(dir, 0, opts.AutoSync), params, opts.Version.NewSegmentsVersion, 0) + if err != nil { + return nil, fmt.Errorf("open new writer: %w", err) + } + l.writer = w + l.readers = []*reader{w.reader} + default: + if opts.CheckAndRecover { head := segments[len(segments)-1] - if err := head.Check(params); err != nil { - return nil, err + if err := head.Recover(params); err != nil { + return nil, fmt.Errorf("open recover: %w", err) } } @@ -109,17 +122,12 @@ func Open(dir string, opts Options) (result Log, err error) { l.readers = append(l.readers, rdr) } - if opts.Readonly { - rdr := openReader(head, params, opts.Version.NewSegmentsVersion, true) - l.readers = append(l.readers, rdr) - } else { - wrt, err := openWriter(head, params, opts.Version.NewSegmentsVersion, 0) - if err != nil { - return nil, err - } - l.writer = wrt - l.readers = append(l.readers, wrt.reader) + wrt, err := openWriter(head, params, opts.Version.NewSegmentsVersion, 0) + if err != nil { + return nil, fmt.Errorf("open writer: %w", err) } + l.writer = wrt + l.readers = append(l.readers, wrt.reader) } return l, nil diff --git a/log_test.go b/log_test.go index 10afbe6..54172e0 100644 --- a/log_test.go +++ b/log_test.go @@ -1524,10 +1524,10 @@ func TestConcurrent(t *testing.T) { func testConcurrentPubsubRecent(t *testing.T) { defer os.RemoveAll("test_pubsub") s, err := Open("test_pubsub", Options{ - CreateDirs: true, - AutoSync: true, - Check: true, - Rollover: 1024 * 64, + CreateDirs: true, + AutoSync: true, + CheckAndRecover: true, + Rollover: 1024 * 64, }) require.NoError(t, err) defer s.Close() diff --git a/pkg/index/format.go b/pkg/index/format.go index bac6552..92e4540 100644 --- a/pkg/index/format.go +++ b/pkg/index/format.go @@ -366,38 +366,52 @@ func Read(path string, offset int64, opts Params) ([]Item, error) { return items, nil } -func Stat(path string, offset int64, opts Params) (int64, int, error) { +func detect(path string, offset int64, opts Params) (Version, int64, error) { f, err := os.Open(path) if err != nil { - return -1, -1, fmt.Errorf("read index open: %w", err) + return VUnknown, -1, fmt.Errorf("read index open: %w", err) } defer func() { _ = f.Close() }() stat, err := os.Stat(path) if err != nil { - return -1, -1, fmt.Errorf("read index stat: %w", err) + return VUnknown, -1, fmt.Errorf("read index stat: %w", err) } dataSize := stat.Size() if dataSize == 0 { - return dataSize, 0, nil + return V1, 0, nil } var h [HeaderSize]byte if _, err := io.ReadFull(f, h[:]); err != nil { - return -1, -1, fmt.Errorf("%w: reading header: %w", ErrCorrupted, err) + return VUnknown, -1, fmt.Errorf("%w: reading header: %w", ErrCorrupted, err) } v, err := headerParse(h[:], offset, opts) if err != nil { - return -1, -1, fmt.Errorf("parse index header: %w", err) + return VUnknown, -1, fmt.Errorf("parse index header: %w", err) } - switch v { + return v, dataSize, nil +} + +func GetVersion(path string, offset int64, opts Params) (Version, error) { + version, _, err := detect(path, offset, opts) + return version, err +} + +func Stat(path string, offset int64, opts Params) (int64, int, error) { + version, dataSize, err := detect(path, offset, opts) + if err != nil { + return -1, -1, fmt.Errorf("index stat: %w", err) + } + + switch version { case V1: return dataSize, int(dataSize / opts.Size()), nil case V2: - return dataSize, int((dataSize - int64(len(h))) / opts.Size()), nil + return dataSize, int((dataSize - HeaderSize) / opts.Size()), nil default: - return -1, -1, fmt.Errorf("%w: unknown version %d", ErrCorrupted, v.marker) + return -1, -1, fmt.Errorf("%w: unknown version %d", ErrCorrupted, version) } } diff --git a/pkg/index/format_test.go b/pkg/index/format_test.go index c32dca9..8cb0b20 100644 --- a/pkg/index/format_test.go +++ b/pkg/index/format_test.go @@ -67,8 +67,9 @@ func TestInvalidHeader(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) - _, err = Read(path, 0, iopts) + got, err := Read(path, 0, iopts) require.ErrorIs(t, err, ErrCorrupted) + require.Empty(t, got) } func TestPartialIndexHeader(t *testing.T) { @@ -80,8 +81,9 @@ func TestPartialIndexHeader(t *testing.T) { require.NoError(t, err) require.NoError(t, f.Close()) - _, err = Read(path, 0, iopts) + got, err := Read(path, 0, iopts) require.ErrorIs(t, err, ErrCorrupted) + require.Empty(t, got) } func TestStat(t *testing.T) { diff --git a/pkg/segment/segment.go b/pkg/segment/segment.go index eace60f..b5e0908 100644 --- a/pkg/segment/segment.go +++ b/pkg/segment/segment.go @@ -67,9 +67,6 @@ func (s Segment) Stat(params index.Params) (Stats, error) { }, nil } -var errIndexSize = fmt.Errorf("%w: incorrect size", index.ErrCorrupted) -var errIndexItem = fmt.Errorf("%w: incorrect item", index.ErrCorrupted) - func (s Segment) Check(params index.Params) error { log, err := message.OpenReader(s.Log, s.Offset) if err != nil { @@ -79,7 +76,7 @@ func (s Segment) Check(params index.Params) error { var position = log.InitialPosition() var indexTime int64 - var logIndex []index.Item + var checkIndex []index.Item for { msg, nextPosition, err := log.Read(position) if errors.Is(err, io.EOF) { @@ -89,7 +86,7 @@ func (s Segment) Check(params index.Params) error { } item := params.NewItem(msg, position, indexTime) - logIndex = append(logIndex, item) + checkIndex = append(checkIndex, item) position = nextPosition indexTime = item.Timestamp @@ -100,14 +97,8 @@ func (s Segment) Check(params index.Params) error { return nil case err != nil: return err - case len(logIndex) != len(items): - return errIndexSize - default: - for i, item := range logIndex { - if item != items[i] { - return errIndexItem - } - } + case !slices.Equal(checkIndex, items): + return index.ErrCorrupted } return nil @@ -170,6 +161,7 @@ func (s Segment) Recover(params index.Params) error { } var corruptedIndex = false + var indexVersion = index.VUnknown switch items, err := index.Read(s.Index, s.Offset, params); { case errors.Is(err, os.ErrNotExist): return nil @@ -178,6 +170,7 @@ func (s Segment) Recover(params index.Params) error { case err != nil: return err case !slices.Equal(items, restoreIndex): + indexVersion, _ = index.GetVersion(s.Index, s.Offset, params) corruptedIndex = true } @@ -185,6 +178,11 @@ func (s Segment) Recover(params index.Params) error { if err := os.Remove(s.Index); err != nil { return fmt.Errorf("restore index delete: %w", err) } + if indexVersion != index.VUnknown { + if err := index.Write(s.Index, s.Offset, indexVersion, params, restoreIndex); err != nil { + return fmt.Errorf("restore index write: %w", err) + } + } } if err := s.syncDir(); err != nil {