Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/python-notebook/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# "xarray==2025.7.1",
# ]
# [tool.uv.sources]
# dp-sdk = { url = "https://github.com/openclimatefix/data-platform/releases/download/v0.25.0/dp_sdk-0.25.0-py3-none-any.whl" }
# dp-sdk = { url = "https://github.com/openclimatefix/data-platform/releases/download/v0.27.0/dp_sdk-0.27.0-py3-none-any.whl" }
# ///
"""Example script for pulling data from the data platform.

Expand Down Expand Up @@ -45,6 +45,7 @@ async def main() -> None:
location_type_filter=dp.LocationType.NATION,
)
glresp = await dpc.list_locations(glreq)
print(f":: -> {len(glresp)} available locations")
uk_location = next(l for l in glresp.locations if l.location_name == "uk")
print(f"\t{uk_location.effective_capacity_watts=}")

Expand Down
1 change: 1 addition & 0 deletions internal/server/postgres/.sqlc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ sql:
emit_methods_with_db_argument: false
emit_empty_slices: true
emit_interface: true
emit_exported_queries: true
overrides:
- db_type: "uuid"
go_type:
Expand Down
223 changes: 131 additions & 92 deletions internal/server/postgres/dataserverimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,119 +475,110 @@ func (s *DataPlatformDataServiceServerImpl) StreamForecastData(
) error {
l := zerolog.Ctx(stream.Context())

querier := db.New(ix.GetTxFromContext(stream.Context()))
tx := ix.GetTxFromContext(stream.Context())

locationUuid, err := uuid.Parse(req.LocationUuid)
if err != nil {
l.Err(err).Msgf("uuid.Parse(%s)", req.LocationUuid)
return status.Errorf(codes.InvalidArgument, "Invalid location UUID: %v", err)
}

// Get the source as it was at the initial time of the time window
srcprms := db.GetSourceAtTimestampParams{
GeometryUuid: locationUuid,
SourceTypeID: int16(req.EnergySource.Number()),
AtTimestampUtc: pgtype.Timestamp{
var (
fNames []string
fVersions []string
)

for _, fc := range req.Forecasters {
fNames = append(fNames, fc.ForecasterName)
fVersions = append(fVersions, fc.ForecasterVersion)
}

// Query with the transaction directly so the returned data can be streamed.
// This is to avoid very large data requests choking the memory of the API.
// Normally I woudn't want to bypass SQLC's type safety - but this RPC is specifically for
// ML debugging and isn't on any hot path, so I don't mind here.
rows, err := tx.Query(
stream.Context(),
db.ListPredictionsForForecasts,
int16(req.EnergySource.Number()),
fNames,
fVersions,
locationUuid,
pgtype.Timestamp{
Time: req.TimeWindow.StartTimestampUtc.AsTime(),
Valid: true,
},
}

dbSource, err := querier.GetSourceAtTimestamp(stream.Context(), srcprms)
pgtype.Timestamp{
Time: req.TimeWindow.EndTimestampUtc.AsTime(),
Valid: true,
},
)
if err != nil {
l.Err(err).Msgf("querier.GetSourceAtTimestamp(%+v)", srcprms)

return status.Errorf(
codes.NotFound, "No location found for uuid %s with source type '%s'.",
req.LocationUuid, req.EnergySource,
l.Err(err).Msg("tx.Query(ListPredictionsForForecasts) failed")
return status.Errorf(codes.Internal, "Failed to stream predictions")
}
defer rows.Close()

for rows.Next() {
var row db.ListPredictionsForForecastsRow

err := rows.Scan(
&row.InitTimeUtc,
&row.ForecasterName,
&row.ForecasterVersion,
&row.CreatedAtUtc,
&row.HorizonMins,
&row.P50Sip,
&row.OtherStatsFractions,
&row.CapacityWatts,
&row.Metadata,
)
}

forecasts := make([]db.ListForecastsRow, 0)
for _, forecaster := range req.Forecasters {
fcprms := db.ListForecastsParams{
GeometryUuid: dbSource.GeometryUuid,
SourceTypeID: dbSource.SourceTypeID,
ForecasterName: forecaster.ForecasterName,
ForecasterVersion: forecaster.ForecasterVersion,
StartTimestamp: pgtype.Timestamp{
Time: req.TimeWindow.StartTimestampUtc.AsTime(),
Valid: true,
},
EndTimestamp: pgtype.Timestamp{
Time: req.TimeWindow.EndTimestampUtc.AsTime(),
Valid: true,
},
}

dbForecasts, err := querier.ListForecasts(stream.Context(), fcprms)
if err != nil {
l.Err(err).Msgf("querier.ListForecasts(%+v)", fcprms)

return status.Errorf(
codes.NotFound,
"No forecasts found for location '%s' and forecaster %s:%s between %s and %s.",
req.LocationUuid,
forecaster.ForecasterName,
forecaster.ForecasterVersion,
req.TimeWindow.StartTimestampUtc.AsTime(),
req.TimeWindow.EndTimestampUtc.AsTime(),
)
}

forecasts = append(forecasts, dbForecasts...)
}

for _, forecast := range forecasts {
psprms := db.ListPredictionsForForecastParams{ForecastUuid: forecast.ForecastUuid}

dbPreds, err := querier.ListPredictionsForForecast(stream.Context(), psprms)
if err != nil {
l.Err(err).Msgf("querier.ListPredictionsForForecast(%+v)", psprms)

return status.Errorf(
codes.NotFound,
"No predicted generation values found for forecast with init time %s",
forecast.InitTimeUtc.Time,
)
l.Err(err).Msg("rows.Scan failed")
return status.Errorf(codes.Internal, "Error reading prediction stream")
}

for _, pred := range dbPreds {
otherStatistics := make(map[string]float32)
if pred.OtherStatsFractions != nil {
for k, v := range pred.OtherStatsFractions.AsMap() {
otherStatistics[k] = float32(v.(float64))
}
otherStatistics := make(map[string]float32)
if row.OtherStatsFractions != nil {
for k, v := range row.OtherStatsFractions.AsMap() {
otherStatistics[k] = float32(v.(float64))
}
}

metadata := make(map[string]string)
if req.IncludeMetadata && pred.Metadata != nil {
for k, v := range pred.Metadata.AsMap() {
metadata[k] = v.(string)
}
metadata := make(map[string]string)
if req.IncludeMetadata && row.Metadata != nil {
for k, v := range row.Metadata.AsMap() {
metadata[k] = v.(string)
}
}

err = stream.Send(&pb.StreamForecastDataResponse{
InitTimestamp: timestamppb.New(forecast.InitTimeUtc.Time),
LocationUuid: forecast.GeometryUuid.String(),
ForecasterFullname: fmt.Sprintf(
"%s:%s",
forecast.ForecasterName,
forecast.ForecasterVersion,
),
HorizonMins: uint32(pred.HorizonMins),
P50Fraction: float32(pred.P50Sip) / 30000.0,
OtherStatisticsFractions: otherStatistics,
CreatedTimestampUtc: timestamppb.New(forecast.CreatedAtUtc.Time),
EffectiveCapacityWatts: uint64(pred.CapacityWatts),
Metadata: metadata,
})
if err != nil {
return err
}
err = stream.Send(&pb.StreamForecastDataResponse{
InitTimestamp: timestamppb.New(row.InitTimeUtc.Time),
LocationUuid: req.LocationUuid,
ForecasterFullname: fmt.Sprintf(
"%s:%s",
row.ForecasterName,
row.ForecasterVersion,
),
HorizonMins: uint32(row.HorizonMins),
P50Fraction: float32(row.P50Sip) / 30000.0,
OtherStatisticsFractions: otherStatistics,
CreatedTimestampUtc: timestamppb.New(row.CreatedAtUtc.Time),
EffectiveCapacityWatts: uint64(row.CapacityWatts),
Metadata: metadata,
})
// If the client disconnects or network fails, break the loop
if err != nil {
l.Warn().Err(err).Msg("Client stream closed prematurely")
return err
}
}

if err := rows.Err(); err != nil {
l.Err(err).Msg("rows.Err() reported a streaming failure")
return status.Errorf(codes.Internal, "Stream interrupted")
}

return nil
}

Expand Down Expand Up @@ -1447,7 +1438,6 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries(

querier := db.New(ix.GetTxFromContext(ctx))

// Get the location and source
gsprms := db.GetSourceAtTimestampParams{
GeometryUuid: uuid.MustParse(req.LocationUuid),
SourceTypeID: int16(req.EnergySource.Number()),
Expand All @@ -1467,6 +1457,55 @@ func (s *DataPlatformDataServiceServerImpl) GetForecastAsTimeseries(
)
}

// If in init time has been requested, only return the values for that single forecast.
if req.InitializationTimestampUtc != nil {
llprms := db.ListPredictionsForForecastsParams{
GeometryUuid: uuid.MustParse(req.LocationUuid),
SourceTypeID: int16(req.EnergySource.Number()),
ForecasterNames: []string{req.Forecaster.ForecasterName},
ForecasterVersions: []string{req.Forecaster.ForecasterVersion},
StartTimestamp: pgtype.Timestamp{
Time: req.InitializationTimestampUtc.AsTime(),
Valid: true,
},
EndTimestamp: pgtype.Timestamp{
Time: req.InitializationTimestampUtc.AsTime(),
Valid: true,
},
}

dbPreds, err := querier.ListPredictionsForForecasts(ctx, llprms)
if err != nil {
l.Err(err).Msgf("querier.ListPredictionsForForecasts(%+v)", llprms)

return nil, status.Error(
codes.InvalidArgument,
"No forecasts found for the given parameters.",
)
}

out := make([]*pb.GetForecastAsTimeseriesResponse_Value, len(dbPreds))
for i, pred := range dbPreds {
out[i] = &pb.GetForecastAsTimeseriesResponse_Value{
TargetTimestampUtc: timestamppb.New(
pred.InitTimeUtc.Time.Add(time.Duration(pred.HorizonMins) * time.Minute),
),
P50ValueFraction: float32(pred.P50Sip) / 30000.0,
EffectiveCapacityWatts: uint64(pred.CapacityWatts),
InitializationTimestampUtc: timestamppb.New(pred.InitTimeUtc.Time),
CreatedTimestampUtc: timestamppb.New(pred.CreatedAtUtc.Time),
Metadata: pred.Metadata,
}
}

return &pb.GetForecastAsTimeseriesResponse{
LocationUuid: req.LocationUuid,
LocationName: dbSource.GeometryName,
Values: out,
}, nil
}

// Otherwise, return the collapsed timeseries.
// Get the relevant forecaster
gpprms := db.GetForecasterElseLatestParams{
ForecasterName: req.Forecaster.ForecasterName,
Expand Down
Loading
Loading