CF-1870 : Manage SQL Catalog databases through CLI#3287
CF-1870 : Manage SQL Catalog databases through CLI#3287Paras Negi (paras-negi-flink) wants to merge 15 commits intomainfrom
Conversation
|
❌ Error getting contributor login(s). |
There was a problem hiding this comment.
Pull request overview
Adds Confluent Platform (CMF/on-prem) CLI support for managing Flink SQL catalog databases, along with integration test coverage and fixtures.
Changes:
- Introduces
confluent flink catalog databasecommand group withcreateandlistsubcommands. - Extends CMF REST client with
CreateDatabaseandListDatabases. - Updates on-prem test server routes/handlers and adds integration tests + golden/input fixtures for the new commands.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
internal/flink/command_catalog.go |
Registers the new database subcommand under flink catalog. |
internal/flink/command_catalog_database.go |
Adds the database command group and SDK→local conversion for serialized output. |
internal/flink/command_catalog_database_create.go |
Implements flink catalog database create from a JSON/YAML resource file. |
internal/flink/command_catalog_database_list.go |
Implements flink catalog database list with human + serialized output. |
internal/flink/local_types.go |
Adds local (serialized) types for KafkaDatabase output. |
pkg/flink/cmf_rest_client.go |
Adds CMF client methods to create/list Kafka databases (paginated). |
test/test-server/flink_onprem_router.go |
Adds the CMF test-server route for catalog databases. |
test/test-server/flink_onprem_handler.go |
Adds test-server handler + helpers for listing/creating databases. |
test/flink_onprem_test.go |
Adds integration tests for on-prem database create/list. |
test/fixtures/input/flink/catalog/database/create-successful.json |
Input fixture for successful database creation. |
test/fixtures/input/flink/catalog/database/create-invalid-failure.json |
Input fixture for validation failure on create. |
test/fixtures/output/flink/catalog/help-onprem.golden |
Updates catalog help output to include database. |
test/fixtures/output/flink/catalog/database/help-onprem.golden |
Adds help fixture for flink catalog database. |
test/fixtures/output/flink/catalog/database/create-help-onprem.golden |
Adds help fixture for flink catalog database create. |
test/fixtures/output/flink/catalog/database/list-help-onprem.golden |
Adds help fixture for flink catalog database list. |
test/fixtures/output/flink/catalog/database/create-success.golden |
Human output golden for create success. |
test/fixtures/output/flink/catalog/database/create-success-json.golden |
JSON output golden for create success. |
test/fixtures/output/flink/catalog/database/create-success-yaml.golden |
YAML output golden for create success. |
test/fixtures/output/flink/catalog/database/create-invalid-failure.golden |
Error output golden for create failure. |
test/fixtures/output/flink/catalog/database/list-success.golden |
Human output golden for list success. |
test/fixtures/output/flink/catalog/database/list-success-json.golden |
JSON output golden for list success. |
test/fixtures/output/flink/catalog/database/list-success-yaml.golden |
YAML output golden for list success. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 42 out of 42 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 42 out of 42 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Here are the E2E validations done for the new Flink catalog database commands in this PR, with concrete commands and outputs.
parasnegi@C6V9RN9V2Y confluent_darwin_arm64_v8.0 % ./confluent --url http://localhost:8080 flink catalog database create my-database.json --catalog kcat
+---------------+--------------------------+
| Creation Time | 2026-04-03T02:51:11.634Z |
| Name | my-database |
| Catalog | kcat |
+---------------+--------------------------+
parasnegi@C6V9RN9V2Y confluent_darwin_arm64_v8.0 % ./confluent --url http://localhost:8080 flink catalog database create my-database.json --catalog kcat --output json
{
"apiVersion": "cmf.confluent.io/v1",
"kind": "KafkaDatabase",
"metadata": {
"name": "my-database",
"creationTimestamp": "2026-04-03T02:53:18.202Z",
"updateTimestamp": "2026-04-03T02:53:18.202Z",
"uid": "e01b7728-e578-4630-80fe-7c01e8b4e478",
"labels": {
"env": "dev",
"team": "flink"
},
"annotations": {}
},
"spec": {
"kafkaCluster": {
"connectionConfig": {
"bootstrap.servers": "localhost:9093"
}
}
}
}
parasnegi@C6V9RN9V2Y confluent_darwin_arm64_v8.0 % ./confluent --url http://localhost:8080 flink catalog database list --catalog kcat
Creation Time | Name | Catalog
---------------------------+---------------+----------
2026-03-25T14:08:11.397Z | free-kafka | kcat
2026-03-26T06:16:54.494Z | test-database | kcat
2026-04-03T02:53:18.202Z | my-database | kcat
parasnegi@C6V9RN9V2Y confluent_darwin_arm64_v8.0 % ./confluent --url http://localhost:8080 flink catalog database describe my-database --catalog kcat
+---------------+--------------------------+
| Creation Time | 2026-04-03T02:53:18.202Z |
| Name | my-database |
| Catalog | kcat |
+---------------+--------------------------+
parasnegi@C6V9RN9V2Y confluent_darwin_arm64_v8.0 % ./confluent --url http://localhost:8080 flink catalog database describe my-database --catalog kcat --output yaml
apiVersion: cmf.confluent.io/v1
kind: KafkaDatabase
metadata:
name: my-database
creationTimestamp: "2026-04-03T02:53:18.202Z"
updateTimestamp: "2026-04-03T02:53:18.202Z"
uid: e01b7728-e578-4630-80fe-7c01e8b4e478
labels:
env: dev
team: flink
annotations: {}
spec:
kafkaCluster:
connectionConfig:
bootstrap.servers: localhost:9093
parasnegi@C6V9RN9V2Y confluent_darwin_arm64_v8.0 % ./confluent --url http://localhost:8080 flink catalog database delete my-database --catalog kcat
Are you sure you want to delete Flink database "my-database"? (y/n): y
Deleted Flink database "my-database".
parasnegi@C6V9RN9V2Y confluent_darwin_arm64_v8.0 % ./confluent --url http://localhost:8080 flink catalog database list --catalog kcat
Creation Time | Name | Catalog
---------------------------+---------------+----------
2026-03-25T14:08:11.397Z | free-kafka | kcat
2026-03-26T06:16:54.494Z | test-database | kcat |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 46 out of 46 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
| "github.com/spf13/cobra" | ||
| "gopkg.in/yaml.v3" | ||
|
|
||
| cmfsdk "github.com/confluentinc/cmf-sdk-go/v1" |
There was a problem hiding this comment.
Don't we expect a CMF SDK upgrade to support this feature? I don't see any go.mod changes.
Or a Docker image change is good enough?
There was a problem hiding this comment.
There is an ongoing effort in parallel for CMF SDK upgrade in CLI repo. Will be updating this PR once that change is merged.
| table.Add(&databaseOut{ | ||
| CreationTime: creationTime, | ||
| Name: sdkDatabase.GetMetadata().Name, | ||
| Catalog: catalogName, | ||
| }) |
There was a problem hiding this comment.
Will it be a concern if human mode has such a difference compared to json or yml mode through convertSdkDatabaseToLocalDatabase()?
| if err := deletion.ValidateAndConfirm(cmd, args, existenceFunc, resource.FlinkDatabase); err != nil { | ||
| // We are validating only the existence of the resources (there is no prefix validation). | ||
| // Thus, we can add some extra context for the error. | ||
| suggestions := "List available Flink databases with `confluent flink catalog database list`." |
There was a problem hiding this comment.
Since this is user facing suggestion, if --catalog is required flag and not something we save into CLI config, we should add the flag to the suggestion.
|
|
||
| databaseName := sdkDatabase.Metadata.Name | ||
|
|
||
| if err := client.UpdateDatabase(c.createContext(), catalogName, databaseName, sdkDatabase); err != nil { |
There was a problem hiding this comment.
In order to print out the database after update, we need an additional call to the describe command, is that possible to return the updated database right there from the update call to make this an atomic operation?
It could be the intentional design from CMF side but worth noting in the PR review here.
| func (c *command) newCatalogDatabaseUpdateCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "update <resourceFilePath>", | ||
| Short: "Update a Flink database.", |
There was a problem hiding this comment.
To make it consistent.
| Short: "Update a Flink database.", | |
| Short: "Update a Flink database in Confluent Platform.", |
| func (c *command) newCatalogDatabaseCreateCommand() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "create <resourceFilePath>", | ||
| Short: "Create a Flink database.", |
There was a problem hiding this comment.
To make it consistent.
| Short: "Create a Flink database.", | |
| Short: "Create a Flink database in Confluent Platform.", |
| {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog", fixture: "flink/catalog/database/create-success.golden"}, | ||
| {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output json", fixture: "flink/catalog/database/create-success-json.golden"}, | ||
| {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-successful.json --catalog test-catalog --output yaml", fixture: "flink/catalog/database/create-success-yaml.golden"}, | ||
| // failure scenarios with JSON files | ||
| {args: "flink catalog database create test/fixtures/input/flink/catalog/database/create-invalid-failure.json --catalog test-catalog", fixture: "flink/catalog/database/create-invalid-failure.golden", exitCode: 1}, |
There was a problem hiding this comment.
TestFlinkCatalogDatabaseCreateWithYAML() but it seems to be JSON.
| runIntegrationTestsWithMultipleAuth(s, tests) | ||
| } | ||
|
|
||
| func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateWithYAML() { |
There was a problem hiding this comment.
| func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateWithYAML() { | |
| func (s *CLITestSuite) TestFlinkCatalogDatabaseCreateOnPremWithYAML() { |
| runIntegrationTestsWithMultipleAuth(s, tests) | ||
| } | ||
|
|
||
| func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateWithYAML() { |
There was a problem hiding this comment.
| func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateWithYAML() { | |
| func (s *CLITestSuite) TestFlinkCatalogDatabaseUpdateOnPremWithYAML() { |
|
Can we also add the update manual verification case for the database? I want to understand what kind of fields are updatable. |
| } | ||
|
|
||
| func createKafkaDatabase(dbName string) cmfsdk.KafkaDatabase { | ||
| timeStamp := time.Date(2025, time.August, 5, 12, 0, 0, 0, time.UTC).String() |
There was a problem hiding this comment.
nit: maybe consider matching the prod response following the ISO 8601 input, so we are validating against the real prod response, e.g:
timeStamp := "2025-08-05T12:00:00.000Z"




Release Notes
Breaking Changes
New Features
confluent flink catalog database create|list|describe|update|deletecommands to manage Kafka databases (Kafka cluster connections) inside a Flink SQL catalog on Confluent Platform.Bug Fixes
Checklist
Whatsection below whether this PR applies to Confluent Cloud, Confluent Platform, or both.Test & Reviewsection below.Blast Radiussection below.What
This PR implements CF-1870 — Manage SQL Catalog databases for the Confluent CLI, targeting Confluent Platform / CP Flink (CMF on-prem):
confluent flink catalog:confluent flink catalog database create <resourceFilePath> --catalog <catName>confluent flink catalog database list --catalog <catName>confluent flink catalog database describe <dbName> --catalog <catName>confluent flink catalog database update <resourceFilePath> --catalog <catName>confluent flink catalog database delete <dbName> --catalog <catName>POST/GET/PUT/DELETE /cmf/api/v1/catalogs/kafka/{catName}/databases[/ {dbName}].CmfRestClientwrapper forKafkaDatabaseoperations and corresponding local types/output formatting, following existing patterns used for catalogs and compute pools.Blast Radius
confluent flinkandconfluent kafkacommands) is unchanged.References
KafkaDatabaseand the Flyway migration (e.g.,[CF-1772] Add flyway migration for KafkaCatalog -> KafkaDatabase).Test & Review
Environment
confluentinc/cliCF-18702.3-SNAPSHOT(image:confluentinc/cp-cmf:c505ee8b) - Kubernetes: local cluster with CMF deployed (cmf-serviceexposed viakubectl port-forward svc/cmf-service 8080:80 -n e2e)Manual CLI validation
Attached in the comment below