Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,30 @@ type HTTPClient struct {

func DefaultClient(url url.URL, username string, password string) HTTPClient {
return HTTPClient{
client: http.Client{Timeout: 60 * time.Second},
client: http.Client{
Timeout: 60 * time.Second,
Transport: &http.Transport{},
},
Url: url,
Username: username,
Password: password,
}
}

// NewClient creates a new HTTPClient with its own connection pool.
// Use this in parallel tests to avoid connection sharing issues.
func NewClient(base HTTPClient) HTTPClient {
return HTTPClient{
client: http.Client{
Timeout: 60 * time.Second,
Transport: &http.Transport{},
},
Url: base.Url,
Username: base.Username,
Password: base.Password,
}
}

func (client *HTTPClient) baseAPIURL(path string) (x string) {
x, _ = url.JoinPath(client.Url.String(), "api/v1/", path)
return
Expand Down
49 changes: 33 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,29 +1,46 @@
module quest

go 1.21.1
go 1.24.9

require (
github.com/apache/arrow-go/v18 v18.5.2
github.com/minio/minio-go v6.0.14+incompatible
github.com/stretchr/testify v1.8.4
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20230919034749-0b16411e6349
github.com/stretchr/testify v1.11.1
)

require (
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.14.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/apache/thrift v0.22.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-ini/ini v1.25.4 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/goccy/go-json v0.10.5 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/google/flatbuffers v25.12.19+incompatible // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.18.4 // indirect
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pierrec/lz4/v4 v4.1.25 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/smartystreets/goconvey v1.8.1 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
github.com/zeebo/xxh3 v1.1.0 // indirect
golang.org/x/crypto v0.48.0 // indirect
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect
golang.org/x/mod v0.33.0 // indirect
golang.org/x/net v0.50.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/telemetry v0.0.0-20260209163413-e7419c687ee4 // indirect
golang.org/x/text v0.34.0 // indirect
golang.org/x/tools v0.42.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect
google.golang.org/grpc v1.79.1 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
1,109 changes: 92 additions & 1,017 deletions go.sum

Large diffs are not rendered by default.

146 changes: 81 additions & 65 deletions integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
Expand All @@ -32,51 +33,24 @@ import (
"testing"
"time"

"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/minio/minio-go"
"github.com/stretchr/testify/require"

"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/reader"
)

type Flog struct {
Host string `json:"host"`
UserId string `json:"user-identifier"`
Timestamp string `json:"datetime"`
Method string `json:"method"`
Request string `json:"request"`
Protocol string `json:"protocol"`
Status uint16 `json:"status"`
ByteCount uint64 `json:"bytes"`
Referer string `json:"referer"`
}

// Same as `Flog`, but all fields are pointers, because `parquet-go` is only
// working when fields are pointers.
type ParquetFlog struct {
Host *string `parquet:"name=host, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
UserId *string `parquet:"name=user-identifier, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Timestamp *string `parquet:"name=datetime, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Method *string `parquet:"name=method, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Request *string `parquet:"name=request, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Protocol *string `parquet:"name=protocol, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
Status *uint16 `parquet:"name=status, type=INT32, encoding=PLAIN"`
ByteCount *uint64 `parquet:"name=bytes, type=INT32, encoding=PLAIN"`
Referer *string `parquet:"name=referer, type=BYTE_ARRAY, convertedtype=UTF8, encoding=PLAIN_DICTIONARY"`
}

func (flog *ParquetFlog) Deref() Flog {
return Flog{
Host: *flog.Host,
UserId: *flog.UserId,
Timestamp: *flog.Timestamp,
Method: *flog.Method,
Request: *flog.Request,
Protocol: *flog.Protocol,
Status: *flog.Status,
ByteCount: *flog.ByteCount,
Referer: *flog.Referer,
}
Host string `json:"host"`
UserId string `json:"user-identifier"`
Timestamp string `json:"datetime"`
Method string `json:"method"`
Request string `json:"request"`
Protocol string `json:"protocol"`
Status float64 `json:"status"`
ByteCount float64 `json:"bytes"`
Referer string `json:"referer"`
}

// - Send logs to Parseable
Expand Down Expand Up @@ -130,14 +104,12 @@ func TestIntegrity(t *testing.T) {
parquetFiles := downloadParquetFiles(NewGlob.Stream, NewGlob.MinIoConfig)
actualFlogs := loadFlogsFromParquetFiles(parquetFiles)

rowCount := len(actualFlogs)
require.Equal(t, len(flogs), len(actualFlogs), "row count mismatch")

// Parseable now writes rows in ascending order, so we compare directly.
for i, expectedFlog := range flogs {
// The rows in parquet written by Parseable will be latest first, so we
// compare the first of ours with the last of what we got from Parseable's
// store.
actualFlog := actualFlogs[rowCount-i-1].Deref()
require.Equal(t, actualFlog, expectedFlog)
actualFlog := actualFlogs[i]
require.Equal(t, expectedFlog, actualFlog)
}

DeleteStream(t, NewGlob.QueryClient, NewGlob.Stream)
Expand Down Expand Up @@ -212,46 +184,90 @@ func downloadParquetFiles(stream string, config MinIoConfig) []string {
f.Close()
}

// Reverse the filenames, because we want latest files first (only if there are multiple files)
if len(downloadedFileNames) > 1 {
for i, j := 0, len(downloadedFileNames)-1; i < j; i, j = i+1, j-1 {
downloadedFileNames[i], downloadedFileNames[j] = downloadedFileNames[j], downloadedFileNames[i]
}
}

slog.Info("downloaded files", "paths", downloadedFileNames)

return downloadedFileNames
}

func loadFlogsFromParquetFile(path string) []ParquetFlog {
fr, err := local.NewLocalFileReader(path)
func loadFlogsFromParquetFile(path string) []Flog {
slog.Info("reading parquet file", "path", path)

rdr, err := file.OpenParquetFile(path, false)
if err != nil {
slog.Error("can't create local file reader", "error", err)
slog.Error("can't open parquet file", "error", err)
return nil
}
defer rdr.Close()

defer fr.Close()
arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
if err != nil {
slog.Error("can't create arrow reader", "error", err)
return nil
}

pr, err := reader.NewParquetReader(fr, new(ParquetFlog), 4)
tbl, err := arrowRdr.ReadTable(context.Background())
if err != nil {
slog.Error("can't create parquet reader", "error", err)
slog.Error("can't read table", "error", err)
return nil
}
defer tbl.Release()

numRows := int(tbl.NumRows())
slog.Info("read parquet", "rows", numRows, "path", path)

// Build column index for lookup by name
colIndex := make(map[string]int)
for i := 0; i < int(tbl.NumCols()); i++ {
colIndex[tbl.Column(i).Name()] = i
}

defer pr.ReadStop()
getStringCol := func(name string) *array.String {
idx, ok := colIndex[name]
if !ok {
return nil
}
return tbl.Column(idx).Data().Chunk(0).(*array.String)
}

flogs := make([]ParquetFlog, pr.GetNumRows())
getFloat64Col := func(name string) *array.Float64 {
idx, ok := colIndex[name]
if !ok {
return nil
}
return tbl.Column(idx).Data().Chunk(0).(*array.Float64)
}

if err = pr.Read(&flogs); err != nil {
slog.Error("can't read parquet file", "error", err)
hostCol := getStringCol("host")
userIdCol := getStringCol("user-identifier")
datetimeCol := getStringCol("datetime")
methodCol := getStringCol("method")
requestCol := getStringCol("request")
protocolCol := getStringCol("protocol")
statusCol := getFloat64Col("status")
bytesCol := getFloat64Col("bytes")
refererCol := getStringCol("referer")

flogs := make([]Flog, numRows)
for i := 0; i < numRows; i++ {
flogs[i] = Flog{
Host: hostCol.Value(i),
UserId: userIdCol.Value(i),
Timestamp: datetimeCol.Value(i),
Method: methodCol.Value(i),
Request: requestCol.Value(i),
Protocol: protocolCol.Value(i),
Status: statusCol.Value(i),
ByteCount: bytesCol.Value(i),
Referer: refererCol.Value(i),
}
}

return flogs
}

func loadFlogsFromParquetFiles(parquetFiles []string) []ParquetFlog {
func loadFlogsFromParquetFiles(parquetFiles []string) []Flog {
slog.Info("loading flogs from parquet files", "paths", parquetFiles, "count", len(parquetFiles))
flogs := make([]ParquetFlog, 0, len(parquetFiles)*10)
flogs := make([]Flog, 0, len(parquetFiles)*10)

for _, parquetFile := range parquetFiles {
flogs = append(flogs, loadFlogsFromParquetFile(parquetFile)...)
Expand Down
28 changes: 15 additions & 13 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,10 +455,13 @@ const RetentionBody string = `[
]`

const (
TestUser string = "alice"
dummyRole string = `{"actions":[{"privilege": "editor"},{"privilege": "writer", "resource": {"stream": "app"}}], "roleType":"user"}`
TestUser string = "alice"
)

func dummyRoleBody(stream string) string {
return fmt.Sprintf(`{"actions":[{"privilege": "editor"},{"privilege": "writer", "resource": {"stream": "%s"}}], "roleType":"user"}`, stream)
}

const RoleEditor string = `{"actions":[{"privilege": "editor"}],"roleType":"user"}`

func RoleWriter(stream string) string {
Expand All @@ -473,15 +476,14 @@ func Roleingestor(stream string) string {
return fmt.Sprintf(`{"actions":[{"privilege": "ingestor", "resource": {"stream": "%s"}}],"roleType":"user"}`, stream)
}

func getTargetBody() string {
return ` {
"name":"targetName",
func getTargetBody(name string) string {
return fmt.Sprintf(`{
"name":"%s",
"type": "webhook",
"endpoint": "https://webhook.site/ec627445-d52b-44e9-948d-56671df3581e",
"headers": {},
"skipTlsCheck": true
}
`
}`, name)
}

func getIdFromTargetResponse(body io.Reader) string {
Expand All @@ -498,11 +500,11 @@ func getIdFromTargetResponse(body io.Reader) string {
return target.Id
}

func getAlertBody(stream string, targetId string) string {
func getAlertBody(stream string, targetId string, alertTitle string) string {
return fmt.Sprintf(`
{
"severity": "medium",
"title": "AlertTitle",
"title": "%s",
"query": "select count(level) from %s where level = 'info'",
"alertType": "threshold",
"thresholdConfig": {
Expand Down Expand Up @@ -530,7 +532,7 @@ func getAlertBody(stream string, targetId string) string {
"%s"
],
"tags": ["quest-test"]
}`, stream, targetId)
}`, alertTitle, stream, targetId)
}

func getMetadataFromAlertResponse(body io.Reader) (string, string, string, []string) {
Expand All @@ -554,12 +556,12 @@ func getMetadataFromAlertResponse(body io.Reader) (string, string, string, []str
return alert.Id, alert.State, alert.Created, alert.Datasets
}

func createAlertResponse(id string, state string, created string, datasets []string) string {
func createAlertResponse(alertTitle string, id string, state string, created string, datasets []string) string {
datasetsJSON, _ := json.Marshal(datasets)
return fmt.Sprintf(`
[
{
"title": "AlertTitle",
"title": "%s",
"created": "%s",
"alertType": "threshold",
"id": "%s",
Expand All @@ -571,5 +573,5 @@ func createAlertResponse(id string, state string, created string, datasets []str
"datasets": %s,
"notificationState": "notify"
}
]`, created, id, state, string(datasetsJSON))
]`, alertTitle, created, id, state, string(datasetsJSON))
}
Binary file added quest
Binary file not shown.
Loading
Loading