diff --git a/examples/python-notebook/example.py b/examples/python-notebook/example.py index 02b8227..f0457e2 100644 --- a/examples/python-notebook/example.py +++ b/examples/python-notebook/example.py @@ -7,7 +7,7 @@ # "xarray==2025.7.1", # ] # [tool.uv.sources] -# dp-sdk = { url = "https://github.com/openclimatefix/data-platform/releases/download/v0.25.0/dp_sdk-0.25.0-py3-none-any.whl" } +# dp-sdk = { url = "https://github.com/openclimatefix/data-platform/releases/download/v0.27.0/dp_sdk-0.27.0-py3-none-any.whl" } # /// """Example script for pulling data from the data platform. @@ -45,6 +45,7 @@ async def main() -> None: location_type_filter=dp.LocationType.NATION, ) glresp = await dpc.list_locations(glreq) + print(f":: -> {len(glresp)} available locations") uk_location = next(l for l in glresp.locations if l.location_name == "uk") print(f"\t{uk_location.effective_capacity_watts=}") diff --git a/internal/server/postgres/.sqlc.yaml b/internal/server/postgres/.sqlc.yaml index ba4ac12..0f29781 100644 --- a/internal/server/postgres/.sqlc.yaml +++ b/internal/server/postgres/.sqlc.yaml @@ -14,6 +14,7 @@ sql: emit_methods_with_db_argument: false emit_empty_slices: true emit_interface: true + emit_exported_queries: true overrides: - db_type: "uuid" go_type: diff --git a/internal/server/postgres/dataserverimpl.go b/internal/server/postgres/dataserverimpl.go index e04c5cd..9a1324b 100644 --- a/internal/server/postgres/dataserverimpl.go +++ b/internal/server/postgres/dataserverimpl.go @@ -475,7 +475,7 @@ func (s *DataPlatformDataServiceServerImpl) StreamForecastData( ) error { l := zerolog.Ctx(stream.Context()) - querier := db.New(ix.GetTxFromContext(stream.Context())) + tx := ix.GetTxFromContext(stream.Context()) locationUuid, err := uuid.Parse(req.LocationUuid) if err != nil { @@ -483,111 +483,102 @@ func (s *DataPlatformDataServiceServerImpl) StreamForecastData( return status.Errorf(codes.InvalidArgument, "Invalid location UUID: %v", err) } - // Get the source as it was at the initial time of the time window - srcprms := db.GetSourceAtTimestampParams{ - GeometryUuid: locationUuid, - SourceTypeID: int16(req.EnergySource.Number()), - AtTimestampUtc: pgtype.Timestamp{ + var ( + fNames []string + fVersions []string + ) + + for _, fc := range req.Forecasters { + fNames = append(fNames, fc.ForecasterName) + fVersions = append(fVersions, fc.ForecasterVersion) + } + + // Query with the transaction directly so the returned data can be streamed. + // This is to avoid very large data requests choking the memory of the API. + // Normally I woudn't want to bypass SQLC's type safety - but this RPC is specifically for + // ML debugging and isn't on any hot path, so I don't mind here. + rows, err := tx.Query( + stream.Context(), + db.ListPredictionsForForecasts, + int16(req.EnergySource.Number()), + fNames, + fVersions, + locationUuid, + pgtype.Timestamp{ Time: req.TimeWindow.StartTimestampUtc.AsTime(), Valid: true, }, - } - - dbSource, err := querier.GetSourceAtTimestamp(stream.Context(), srcprms) + pgtype.Timestamp{ + Time: req.TimeWindow.EndTimestampUtc.AsTime(), + Valid: true, + }, + ) if err != nil { - l.Err(err).Msgf("querier.GetSourceAtTimestamp(%+v)", srcprms) - - return status.Errorf( - codes.NotFound, "No location found for uuid %s with source type '%s'.", - req.LocationUuid, req.EnergySource, + l.Err(err).Msg("tx.Query(ListPredictionsForForecasts) failed") + return status.Errorf(codes.Internal, "Failed to stream predictions") + } + defer rows.Close() + + for rows.Next() { + var row db.ListPredictionsForForecastsRow + + err := rows.Scan( + &row.InitTimeUtc, + &row.ForecasterName, + &row.ForecasterVersion, + &row.CreatedAtUtc, + &row.HorizonMins, + &row.P50Sip, + &row.OtherStatsFractions, + &row.CapacityWatts, + &row.Metadata, ) - } - - forecasts := make([]db.ListForecastsRow, 0) - for _, forecaster := range req.Forecasters { - fcprms := db.ListForecastsParams{ - GeometryUuid: dbSource.GeometryUuid, - SourceTypeID: dbSource.SourceTypeID, - ForecasterName: forecaster.ForecasterName, - ForecasterVersion: forecaster.ForecasterVersion, - StartTimestamp: pgtype.Timestamp{ - Time: req.TimeWindow.StartTimestampUtc.AsTime(), - Valid: true, - }, - EndTimestamp: pgtype.Timestamp{ - Time: req.TimeWindow.EndTimestampUtc.AsTime(), - Valid: true, - }, - } - - dbForecasts, err := querier.ListForecasts(stream.Context(), fcprms) - if err != nil { - l.Err(err).Msgf("querier.ListForecasts(%+v)", fcprms) - - return status.Errorf( - codes.NotFound, - "No forecasts found for location '%s' and forecaster %s:%s between %s and %s.", - req.LocationUuid, - forecaster.ForecasterName, - forecaster.ForecasterVersion, - req.TimeWindow.StartTimestampUtc.AsTime(), - req.TimeWindow.EndTimestampUtc.AsTime(), - ) - } - - forecasts = append(forecasts, dbForecasts...) - } - - for _, forecast := range forecasts { - psprms := db.ListPredictionsForForecastParams{ForecastUuid: forecast.ForecastUuid} - - dbPreds, err := querier.ListPredictionsForForecast(stream.Context(), psprms) if err != nil { - l.Err(err).Msgf("querier.ListPredictionsForForecast(%+v)", psprms) - - return status.Errorf( - codes.NotFound, - "No predicted generation values found for forecast with init time %s", - forecast.InitTimeUtc.Time, - ) + l.Err(err).Msg("rows.Scan failed") + return status.Errorf(codes.Internal, "Error reading prediction stream") } - for _, pred := range dbPreds { - otherStatistics := make(map[string]float32) - if pred.OtherStatsFractions != nil { - for k, v := range pred.OtherStatsFractions.AsMap() { - otherStatistics[k] = float32(v.(float64)) - } + otherStatistics := make(map[string]float32) + if row.OtherStatsFractions != nil { + for k, v := range row.OtherStatsFractions.AsMap() { + otherStatistics[k] = float32(v.(float64)) } + } - metadata := make(map[string]string) - if req.IncludeMetadata && pred.Metadata != nil { - for k, v := range pred.Metadata.AsMap() { - metadata[k] = v.(string) - } + metadata := make(map[string]string) + if req.IncludeMetadata && row.Metadata != nil { + for k, v := range row.Metadata.AsMap() { + metadata[k] = v.(string) } + } - err = stream.Send(&pb.StreamForecastDataResponse{ - InitTimestamp: timestamppb.New(forecast.InitTimeUtc.Time), - LocationUuid: forecast.GeometryUuid.String(), - ForecasterFullname: fmt.Sprintf( - "%s:%s", - forecast.ForecasterName, - forecast.ForecasterVersion, - ), - HorizonMins: uint32(pred.HorizonMins), - P50Fraction: float32(pred.P50Sip) / 30000.0, - OtherStatisticsFractions: otherStatistics, - CreatedTimestampUtc: timestamppb.New(forecast.CreatedAtUtc.Time), - EffectiveCapacityWatts: uint64(pred.CapacityWatts), - Metadata: metadata, - }) - if err != nil { - return err - } + err = stream.Send(&pb.StreamForecastDataResponse{ + InitTimestamp: timestamppb.New(row.InitTimeUtc.Time), + LocationUuid: req.LocationUuid, + ForecasterFullname: fmt.Sprintf( + "%s:%s", + row.ForecasterName, + row.ForecasterVersion, + ), + HorizonMins: uint32(row.HorizonMins), + P50Fraction: float32(row.P50Sip) / 30000.0, + OtherStatisticsFractions: otherStatistics, + CreatedTimestampUtc: timestamppb.New(row.CreatedAtUtc.Time), + EffectiveCapacityWatts: uint64(row.CapacityWatts), + Metadata: metadata, + }) + // If the client disconnects or network fails, break the loop + if err != nil { + l.Warn().Err(err).Msg("Client stream closed prematurely") + return err } } + if err := rows.Err(); err != nil { + l.Err(err).Msg("rows.Err() reported a streaming failure") + return status.Errorf(codes.Internal, "Stream interrupted") + } + return nil } @@ -1447,7 +1438,6 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries( querier := db.New(ix.GetTxFromContext(ctx)) - // Get the location and source gsprms := db.GetSourceAtTimestampParams{ GeometryUuid: uuid.MustParse(req.LocationUuid), SourceTypeID: int16(req.EnergySource.Number()), @@ -1467,6 +1457,55 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries( ) } + // If in init time has been requested, only return the values for that single forecast. + if req.InitializationTimestampUtc != nil { + llprms := db.ListPredictionsForForecastsParams{ + GeometryUuid: uuid.MustParse(req.LocationUuid), + SourceTypeID: int16(req.EnergySource.Number()), + ForecasterNames: []string{req.Forecaster.ForecasterName}, + ForecasterVersions: []string{req.Forecaster.ForecasterVersion}, + StartTimestamp: pgtype.Timestamp{ + Time: req.InitializationTimestampUtc.AsTime(), + Valid: true, + }, + EndTimestamp: pgtype.Timestamp{ + Time: req.InitializationTimestampUtc.AsTime(), + Valid: true, + }, + } + + dbPreds, err := querier.ListPredictionsForForecasts(ctx, llprms) + if err != nil { + l.Err(err).Msgf("querier.ListPredictionsForForecasts(%+v)", llprms) + + return nil, status.Error( + codes.InvalidArgument, + "No forecasts found for the given parameters.", + ) + } + + out := make([]*pb.GetForecastAsTimeseriesResponse_Value, len(dbPreds)) + for i, pred := range dbPreds { + out[i] = &pb.GetForecastAsTimeseriesResponse_Value{ + TargetTimestampUtc: timestamppb.New( + pred.InitTimeUtc.Time.Add(time.Duration(pred.HorizonMins) * time.Minute), + ), + P50ValueFraction: float32(pred.P50Sip) / 30000.0, + EffectiveCapacityWatts: uint64(pred.CapacityWatts), + InitializationTimestampUtc: timestamppb.New(pred.InitTimeUtc.Time), + CreatedTimestampUtc: timestamppb.New(pred.CreatedAtUtc.Time), + Metadata: pred.Metadata, + } + } + + return &pb.GetForecastAsTimeseriesResponse{ + LocationUuid: req.LocationUuid, + LocationName: dbSource.GeometryName, + Values: out, + }, nil + } + + // Otherwise, return the collapsed timeseries. // Get the relevant forecaster gpprms := db.GetForecasterElseLatestParams{ ForecasterName: req.Forecaster.ForecasterName, diff --git a/internal/server/postgres/dataserverimpl_test.go b/internal/server/postgres/dataserverimpl_test.go index ddcd22c..f6f56f7 100644 --- a/internal/server/postgres/dataserverimpl_test.go +++ b/internal/server/postgres/dataserverimpl_test.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "io" "maps" "math/rand/v2" "strings" @@ -786,6 +787,7 @@ func TestGetLocationsAsGeoJSON(t *testing.T) { func TestGetForecastAsTimeseries(t *testing.T) { pivotTime := time.Date(2025, 2, 5, 12, 0, 0, 0, time.UTC) + // Create a site to attach the forecasts to metadata, err := structpb.NewStruct(map[string]any{"source": "test"}) require.NoError(t, err) @@ -799,6 +801,7 @@ func TestGetForecastAsTimeseries(t *testing.T) { ValidFromUtc: timestamppb.New(pivotTime.Add(-time.Hour * 49)), }) require.NoError(t, err) + // Update the capacity of the site to check it is reflected in the values _, err = dc.UpdateLocation(t.Context(), &pb.UpdateLocationRequest{ LocationUuid: siteResp.LocationUuid, @@ -843,16 +846,27 @@ func TestGetForecastAsTimeseries(t *testing.T) { require.NoError(t, err) } - // For each horizon, get the predicted timeseries + // Standardise the time window reused across most requests + defaultTimeWindow := &pb.TimeWindow{ + StartTimestampUtc: timestamppb.New(pivotTime.Add(-time.Hour * 48)), + EndTimestampUtc: timestamppb.New(pivotTime.Add(time.Hour * 36)), + } + testcases := []struct { name string - horizonMins int32 - pivotTime time.Time + req *pb.GetForecastAsTimeseriesRequest expectedValues []float32 + expectErr bool }{ { - name: "Should return expected values for horizon 0 mins", - horizonMins: 0, + name: "Should return expected values for horizon 0 mins", + req: &pb.GetForecastAsTimeseriesRequest{ + LocationUuid: siteResp.LocationUuid, + Forecaster: forecasterResp.Forecaster, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + HorizonMins: 0, + TimeWindow: defaultTimeWindow, + }, // For horizon 0, we should get all the values from the latest forecast, // plus the values from the previous forecasts that have the lowest horizon // for each target time. @@ -870,10 +884,17 @@ func TestGetForecastAsTimeseries(t *testing.T) { 0.00, 0.08, 0.16, 0.24, 0.32, 0.40, 0.00, 0.08, 0.16, 0.24, 0.32, 0.40, 0.48, 0.56, 0.64, 0.72, 0.80, 0.88, }, + expectErr: false, }, { - name: "Should return expected values for horizon 14 mins", - horizonMins: 14, + name: "Should return expected values for horizon 14 mins", + req: &pb.GetForecastAsTimeseriesRequest{ + LocationUuid: siteResp.LocationUuid, + Forecaster: forecasterResp.Forecaster, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + HorizonMins: 14, + TimeWindow: defaultTimeWindow, + }, // For horizon of 14 minutes, anything with a lesser horizon should not be included. // So the value for 0, 5, and 10 minutes should not be included. expectedValues: []float32{ @@ -882,79 +903,83 @@ func TestGetForecastAsTimeseries(t *testing.T) { 0.24, 0.32, 0.40, 0.48, 0.56, 0.64, 0.24, 0.32, 0.40, 0.48, 0.56, 0.64, 0.72, 0.80, 0.88, }, + expectErr: false, }, { - name: "Should return expected values for horizon 30 mins", - horizonMins: 30, + name: "Should return expected values for horizon 30 mins", + req: &pb.GetForecastAsTimeseriesRequest{ + LocationUuid: siteResp.LocationUuid, + Forecaster: forecasterResp.Forecaster, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + HorizonMins: 30, + TimeWindow: defaultTimeWindow, + }, expectedValues: []float32{ 0.48, 0.56, 0.64, 0.72, 0.80, 0.88, 0.48, 0.56, 0.64, 0.72, 0.80, 0.88, 0.48, 0.56, 0.64, 0.72, 0.80, 0.88, 0.48, 0.56, 0.64, 0.72, 0.80, 0.88, }, + expectErr: false, + }, + { + name: "Shouldn't return successfully for horizon 60 mins", + req: &pb.GetForecastAsTimeseriesRequest{ + LocationUuid: siteResp.LocationUuid, + Forecaster: forecasterResp.Forecaster, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + HorizonMins: 60, + TimeWindow: defaultTimeWindow, + }, + expectErr: true, }, { - name: "Shouldn't return successfully for horizon 60 mins", - horizonMins: 60, - }, - // NOTE: Need to think about how to spoof CreatedTime to make this work. - // { - // name: "Should return expected values for horizon 14 minutes with pivot time", - // horizonMins: 14, - // pivotTime: pivotTime.Add(-15 * time.Minute), - // // For horizon of 14 minutes and a pivot time of 15 minutes before the latest, - // // we should expect the same as for the 14 minute horizon no pivot time case, - // // only this time the latest forecast should not be included at all. - // // Hence we only see data for three forecasts. - // expectedValues: []float32{ - // 0.24, 0.32, 0.40, 0.48, 0.56, 0.64, - // 0.24, 0.32, 0.40, 0.48, 0.56, 0.64, - // 0.24, 0.32, 0.40, 0.48, 0.56, 0.64, 0.72, 0.80, 0.88, - // }, - // }, + name: "Should return all predictions for a specific initialization time", + req: &pb.GetForecastAsTimeseriesRequest{ + LocationUuid: siteResp.LocationUuid, + Forecaster: forecasterResp.Forecaster, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + HorizonMins: 0, + TimeWindow: defaultTimeWindow, + InitializationTimestampUtc: timestamppb.New(pivotTime.Add(-30 * time.Minute)), + }, + expectedValues: []float32{ + 0.00, 0.08, 0.16, 0.24, 0.32, 0.40, 0.48, 0.56, 0.64, 0.72, 0.80, 0.88, + }, + expectErr: false, + }, } for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - if tc.pivotTime.Equal((time.Time{})) { - tc.pivotTime = pivotTime - } + resp, err := dc.GetForecastAsTimeseries(t.Context(), tc.req) - resp, err := dc.GetForecastAsTimeseries(t.Context(), &pb.GetForecastAsTimeseriesRequest{ - LocationUuid: siteResp.LocationUuid, - HorizonMins: uint32(tc.horizonMins), - Forecaster: forecasterResp.Forecaster, - EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, - TimeWindow: &pb.TimeWindow{ - StartTimestampUtc: timestamppb.New(pivotTime.Add(-time.Hour * 48)), - EndTimestampUtc: timestamppb.New(pivotTime.Add(time.Hour * 36)), - }, - }) - if strings.Contains(tc.name, "Shouldn't") { + if tc.expectErr { require.Error(t, err) - } else { - require.NoError(t, err) - require.NotNil(t, resp) + return + } - targetTimes := make([]int64, len(resp.Values)) + require.NoError(t, err) + require.NotNil(t, resp) - actualValues := make([]float32, len(resp.Values)) - for i, v := range resp.Values { - targetTimes[i] = v.TargetTimestampUtc.AsTime().Unix() - actualValues[i] = v.P50ValueFraction + targetTimes := make([]int64, len(resp.Values)) + actualValues := make([]float32, len(resp.Values)) - // Assert that the capacity change has been picked up - if v.TargetTimestampUtc.AsTime(). - After(pivotTime.Add(-1 * time.Hour).Add(-1 * time.Second)) { - require.Equal(t, 1500000, int(v.EffectiveCapacityWatts)) - } else { - require.Equal(t, 1000000, int(v.EffectiveCapacityWatts)) - } - } + for i, v := range resp.Values { + targetTimes[i] = v.TargetTimestampUtc.AsTime().Unix() + actualValues[i] = v.P50ValueFraction - require.IsIncreasing(t, targetTimes) - require.Equal(t, tc.expectedValues, actualValues) + // Assert that the capacity change has been picked up + if v.TargetTimestampUtc.AsTime(). + After(pivotTime.Add(-1 * time.Hour).Add(-1 * time.Second)) { + require.Equal(t, 1500000, int(v.EffectiveCapacityWatts)) + } else { + require.Equal(t, 1000000, int(v.EffectiveCapacityWatts)) + } } + + require.IsIncreasing(t, targetTimes) + require.Equal(t, tc.expectedValues, actualValues) }) } } @@ -1935,3 +1960,195 @@ func TestGetLatestForecasts(t *testing.T) { }) } } + +func TestStreamForecastData(t *testing.T) { + pivotTime := time.Date(2026, 4, 24, 12, 0, 0, 0, time.UTC) + + // Create a site to attach the forecasts to + metadata, err := structpb.NewStruct(map[string]any{"stream_test": "true"}) + require.NoError(t, err) + siteResp, err := dc.CreateLocation(t.Context(), &pb.CreateLocationRequest{ + LocationName: "test_stream_forecast_data_site", + GeometryWkt: "POINT(-60.25 57.5)", + EffectiveCapacityWatts: 1000000, + Metadata: metadata, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + LocationType: pb.LocationType_LOCATION_TYPE_SITE, + ValidFromUtc: timestamppb.New(pivotTime.Add(-time.Hour * 48)), + }) + require.NoError(t, err) + + // Create two forecasters + fc1Resp, err := dc.CreateForecaster(t.Context(), &pb.CreateForecasterRequest{ + Name: "stream_forecaster_alpha", + Version: "v1", + }) + require.NoError(t, err) + + fc2Resp, err := dc.CreateForecaster(t.Context(), &pb.CreateForecasterRequest{ + Name: "stream_forecaster_beta", + Version: "v2", + }) + require.NoError(t, err) + + // Generate 3 forecasts for each forecaster, each with 4 horizon values (0, 5, 10, 15) + yields := make([]*pb.CreateForecastRequest_ForecastValue, 4) + for i := range yields { + yields[i] = &pb.CreateForecastRequest_ForecastValue{ + HorizonMins: uint32(i * 5), + P50Fraction: float32(i) * 0.25, + OtherStatisticsFractions: map[string]float32{ + "p90": float32(i)*0.25 + 0.1, + }, + Metadata: metadata, + } + } + + // Seed forecasts for both forecasters spanning backwards from pivot time + for i := 2; i >= 0; i-- { + initTime := timestamppb.New(pivotTime.Add(time.Duration(-i*60) * time.Minute)) + + _, err = dc.CreateForecast(t.Context(), &pb.CreateForecastRequest{ + LocationUuid: siteResp.LocationUuid, + Forecaster: fc1Resp.Forecaster, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + InitTimeUtc: initTime, + Values: yields, + }) + require.NoError(t, err) + + _, err = dc.CreateForecast(t.Context(), &pb.CreateForecastRequest{ + LocationUuid: siteResp.LocationUuid, + Forecaster: fc2Resp.Forecaster, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + InitTimeUtc: initTime, + Values: yields, + }) + require.NoError(t, err) + } + + defaultTimeWindow := &pb.StreamForecastDataRequest_TimeWindow{ + StartTimestampUtc: timestamppb.New(pivotTime.Add(-time.Hour * 10)), + EndTimestampUtc: timestamppb.New(pivotTime.Add(time.Hour * 10)), + } + + testcases := []struct { + name string + req *pb.StreamForecastDataRequest + expectedRowsCount int + expectErr bool + checkMetadata bool + }{ + { + name: "Should successfully stream forecasts for a single forecaster", + req: &pb.StreamForecastDataRequest{ + LocationUuid: siteResp.LocationUuid, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + Forecasters: []*pb.Forecaster{fc1Resp.Forecaster}, + TimeWindow: defaultTimeWindow, + IncludeMetadata: false, + }, + // 3 forecasts * 4 values = 12 rows + expectedRowsCount: 12, + expectErr: false, + }, + { + name: "Should successfully stream forecasts for multiple forecasters", + req: &pb.StreamForecastDataRequest{ + LocationUuid: siteResp.LocationUuid, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + Forecasters: []*pb.Forecaster{fc1Resp.Forecaster, fc2Resp.Forecaster}, + TimeWindow: defaultTimeWindow, + IncludeMetadata: false, + }, + // (3 forecasts * 4 values) * 2 forecasters = 24 rows + expectedRowsCount: 24, + expectErr: false, + }, + { + name: "Should only return forecasts whos init times respect the time window boundaries", + req: &pb.StreamForecastDataRequest{ + LocationUuid: siteResp.LocationUuid, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + Forecasters: []*pb.Forecaster{fc1Resp.Forecaster}, + TimeWindow: &pb.StreamForecastDataRequest_TimeWindow{ + // Constrain the window to only capture the most recent forecast + StartTimestampUtc: timestamppb.New(pivotTime.Add(-time.Minute * 10)), + EndTimestampUtc: timestamppb.New(pivotTime.Add(time.Minute * 10)), + }, + IncludeMetadata: false, + }, + expectedRowsCount: 4, + expectErr: false, + }, + { + name: "Should include metadata when asked", + req: &pb.StreamForecastDataRequest{ + LocationUuid: siteResp.LocationUuid, + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + Forecasters: []*pb.Forecaster{fc1Resp.Forecaster}, + TimeWindow: defaultTimeWindow, + IncludeMetadata: true, + }, + expectedRowsCount: 12, + expectErr: false, + checkMetadata: true, + }, + { + name: "Should fail gracefully with an invalid UUID", + req: &pb.StreamForecastDataRequest{ + LocationUuid: "not-a-valid-uuid", + EnergySource: pb.EnergySource_ENERGY_SOURCE_SOLAR, + Forecasters: []*pb.Forecaster{fc1Resp.Forecaster}, + TimeWindow: defaultTimeWindow, + }, + expectErr: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + stream, err := dc.StreamForecastData(t.Context(), tc.req) + if err != nil { + if tc.expectErr { + return + } + + require.NoError(t, err) + } + + var actualRowsCount int + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + + if tc.expectErr { + require.Error(t, err) + return + } + + require.NoError(t, err) + require.NotNil(t, resp) + + actualRowsCount++ + + if tc.checkMetadata { + require.NotNil(t, resp.Metadata) + require.Equal(t, "true", resp.Metadata["stream_test"]) + } else { + require.Empty(t, resp.Metadata) + } + + require.NotZero(t, resp.EffectiveCapacityWatts) + require.NotNil(t, resp.OtherStatisticsFractions) + require.Contains(t, resp.OtherStatisticsFractions, "p90") + } + + if !tc.expectErr { + require.Equal(t, tc.expectedRowsCount, actualRowsCount) + } + }) + } +} diff --git a/internal/server/postgres/sql/queries/predictions.sql b/internal/server/postgres/sql/queries/predictions.sql index 68f330d..e258f0f 100644 --- a/internal/server/postgres/sql/queries/predictions.sql +++ b/internal/server/postgres/sql/queries/predictions.sql @@ -128,6 +128,59 @@ INSERT INTO pred.predicted_generation_values ( $1, $2, $3, $4, $5, $6 ); +-- name: ListPredictionsForForecasts :many +/* ListPredictionsForForecasts retrieves all predicted generation values for a given location, + * source type, and dynamic list of forecasters within a time window. + */ +WITH requested_forecasters AS ( + SELECT + UNNEST(sqlc.arg(forecaster_names)::TEXT []) AS fname, + UNNEST(sqlc.arg(forecaster_versions)::TEXT []) AS fversion +), +matched_forecasters AS ( + SELECT + f.forecaster_id, + f.forecaster_name, + f.forecaster_version + FROM pred.forecasters AS f + INNER JOIN requested_forecasters AS rf + ON f.forecaster_name = LOWER(rf.fname) + AND f.forecaster_version = LOWER(rf.fversion) +), +target_forecasts AS ( + SELECT + f.forecast_uuid, + f.created_at_utc, + f.geometry_uuid, + f.metadata AS forecast_metadata, + mf.forecaster_name, + mf.forecaster_version, + UUIDV7_EXTRACT_TIMESTAMP(f.forecast_uuid)::TIMESTAMP AS init_time_utc + FROM pred.forecasts AS f + INNER JOIN matched_forecasters AS mf USING (forecaster_id) + WHERE f.geometry_uuid = sqlc.arg(geometry_uuid)::UUID + AND f.source_type_id = sqlc.arg(source_type_id)::SMALLINT + AND f.forecast_uuid >= UUIDV7_BOUNDARY(sqlc.arg(start_timestamp)::TIMESTAMP) + AND f.forecast_uuid < UUIDV7_BOUNDARY(sqlc.arg(end_timestamp)::TIMESTAMP + INTERVAL '1 millisecond') +) +SELECT + tf.init_time_utc, + tf.forecaster_name, + tf.forecaster_version, + tf.created_at_utc, + pg.horizon_mins, + pg.p50_sip, + pg.other_stats_fractions, + sv.capacity_watts, + COALESCE(pg.metadata || tf.forecast_metadata, pg.metadata, tf.forecast_metadata) AS metadata +FROM target_forecasts AS tf + INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid) + INNER JOIN loc.sources_mv AS sv + ON tf.geometry_uuid = sv.geometry_uuid + AND sv.source_type_id = sqlc.arg(source_type_id)::SMALLINT +WHERE sv.sys_period @> pg.target_time_utc +ORDER BY tf.init_time_utc ASC, pg.horizon_mins ASC; + -- name: GetLatestForecastsAtHorizonSincePivot :many /* GetLatestForecastAtHorizonSincePivot retrieves the latest forecasts for a given location * and source type made by each individual forecaster name. @@ -169,54 +222,6 @@ FROM pred.forecasters AS fr ) AS f ORDER BY fr.forecaster_name ASC, f.init_time_utc DESC; --- name: ListForecasts :many -/* ListForecasts retrieves all the forecasts for a given location, source type, and forecaster - * between the input times. It does not return forecast values. - */ -WITH desired_forecaster AS ( - SELECT - forecaster_id, - forecaster_name, - forecaster_version - FROM pred.forecasters - WHERE forecaster_name = LOWER(sqlc.arg(forecaster_name)::TEXT) - AND forecaster_version = LOWER(sqlc.arg(forecaster_version)::TEXT) -) -SELECT - forecasts.forecast_uuid, - forecasts.init_time_utc, - forecasts.geometry_uuid, - forecasts.metadata, - desired_forecaster.forecaster_name, - desired_forecaster.forecaster_version, - UUIDV7_EXTRACT_TIMESTAMP(forecasts.forecast_uuid) AS created_at_utc -FROM pred.forecasts AS forecasts - INNER JOIN desired_forecaster USING (forecaster_id) -WHERE forecasts.geometry_uuid = $1 - AND forecasts.source_type_id = $2 - AND forecasts.forecast_uuid >= UUIDV7_BOUNDARY(sqlc.arg(start_timestamp)::TIMESTAMP) - AND forecasts.forecast_uuid < UUIDV7_BOUNDARY( - sqlc.arg(end_timestamp)::TIMESTAMP + INTERVAL '1 millisecond' - ); - --- name: ListPredictionsForForecast :many -/* ListPredictionsForForecast retrieves predicted generation values - * for a given forecast as smallint percentages (sip) of capacity; - * with 0 representing 0% and 30000 representing 100% of capacity. - */ -SELECT - pg.horizon_mins, - pg.p50_sip, - pg.target_time_utc, - pg.other_stats_fractions, - sv.capacity_watts, - COALESCE(pg.metadata || f.metadata, pg.metadata, f.metadata) AS metadata -FROM pred.forecasts AS f - INNER JOIN pred.predicted_generation_values AS pg USING (forecast_uuid) - INNER JOIN loc.sources_mv AS sv USING (geometry_uuid, source_type_id) -WHERE f.forecast_uuid = $1 - AND sv.sys_period @> pg.target_time_utc; - -- name: ListPredictionsForLocation :many /* ListPredictionsForLocation retrieves predicted generation values as a timeseries. * Multiple overlapping forecasts can make up the timeseries, so predictions with the same target time diff --git a/proto/ocf/dp/dp-data.messages.proto b/proto/ocf/dp/dp-data.messages.proto index d5149c6..60b04cf 100644 --- a/proto/ocf/dp/dp-data.messages.proto +++ b/proto/ocf/dp/dp-data.messages.proto @@ -76,10 +76,26 @@ message GetForecastAsTimeseriesRequest { ]; /* The time to search backwards from to find forecasts. * If not specified, the current time will be used. + * Forecasts created after this time are not included. */ optional google.protobuf.Timestamp pivot_timestamp_utc = 6 [ (buf.validate.field).timestamp = { gt: { seconds: 112000000}, lt_now: true } ]; + /* An individual init time to filter forecasts by. + * If specified, only forecasts with this init time will be returned. + * This enables fetching data from a single, specific forecast run. + */ + optional google.protobuf.Timestamp initialization_timestamp_utc = 7 [ + (buf.validate.field).timestamp = { gt: { seconds: 112000000}, lt_now: true } + ]; + + option (buf.validate.message).cel = { + id: "pivot_timestamp_after_initialization" + message: "pivot_timestamp_utc must be after initialization_timestamp_utc" + expression: + "!(has(this.pivot_timestamp_utc) && has(this.initialization_timestamp_utc)) " + "|| this.pivot_timestamp_utc > this.initialization_timestamp_utc" + }; } message GetForecastAsTimeseriesResponse { @@ -787,5 +803,6 @@ message StreamForecastDataResponse { google.protobuf.Timestamp created_timestamp_utc = 7; uint64 effective_capacity_watts = 8; map metadata = 9; + google.protobuf.Timestamp target_timestamp_utc = 10; } diff --git a/proto/ocf/dp/dp-data.service.proto b/proto/ocf/dp/dp-data.service.proto index f8349e6..05cd931 100644 --- a/proto/ocf/dp/dp-data.service.proto +++ b/proto/ocf/dp/dp-data.service.proto @@ -11,7 +11,7 @@ service DataPlatformDataService { /* GetForecastTimeseries fetches a horizontal slice of predicted data. * (i.e. many points in time for a single location). Since individually created forecasts can * have overlapping time ranges, identically-timestamped values are discarded, excepting the one - * with the lowest allowable lead time (horizon). + * with the lowest allowable lead time (horizon), where required. */ rpc GetForecastAsTimeseries(GetForecastAsTimeseriesRequest) returns (GetForecastAsTimeseriesResponse) {} /* GetForecastAtTimestamp fetches a vertical slice of predicted data.