diff --git a/CHANGES.md b/CHANGES.md index 934de1e..e1e0d21 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -56,6 +56,15 @@ To be released. [#14]: https://github.com/fedify-dev/botkit/pull/14 [#18]: https://github.com/fedify-dev/botkit/issues/18 +### @fedify/botkit-postgres + + - Added a new PostgreSQL repository package, *`@fedify/botkit-postgres`*, + which provides `PostgresRepository`, `PostgresRepositoryOptions`, and + `initializePostgresRepositorySchema()`. [[#11], [#19]] + +[#11]: https://github.com/fedify-dev/botkit/issues/11 +[#19]: https://github.com/fedify-dev/botkit/pull/19 + Version 0.3.1 ------------- diff --git a/deno.json b/deno.json index e8090e9..798d154 100644 --- a/deno.json +++ b/deno.json @@ -35,7 +35,7 @@ "check-versions": "deno run --allow-read --allow-write scripts/check_versions.ts", "fmt": "deno fmt && deno task hongdown --write", "install": "deno cache packages/*/src/*.ts && pnpm install", - "test": "deno test --allow-read --allow-write --allow-env --allow-net=hollo.social --parallel", + "test": "deno test --allow-read --allow-write --allow-env --allow-net=hollo.social,localhost,127.0.0.1 --parallel", "test:node": "pnpm install && pnpm run -r test", "test-all": { "dependencies": ["check", "test", "test:node"] diff --git a/deno.lock b/deno.lock index e7e79be..8b31f3e 100644 --- a/deno.lock +++ b/deno.lock @@ -7,6 +7,7 @@ "jsr:@fedify/markdown-it-hashtag@0.3": "0.3.0", "jsr:@fedify/markdown-it-mention@0.3": "0.3.0", "jsr:@fedify/vocab-runtime@^2.1.2": "2.1.2", + "jsr:@fedify/vocab@2.1.2": "2.1.2", "jsr:@fedify/vocab@^2.1.2": "2.1.2", "jsr:@fedify/webfinger@^2.1.2": "2.1.2", "jsr:@hongminhee/x-forwarded-fetch@0.2": "0.2.0", @@ -18,16 +19,20 @@ "jsr:@std/internal@^1.0.12": "1.0.12", "jsr:@std/path@^1.1.1": "1.1.4", "jsr:@std/path@^1.1.4": "1.1.4", + "npm:@fedify/fedify@^2.1.2": "2.1.2", "npm:@fedify/markdown-it-hashtag@0.3": "0.3.0", "npm:@fedify/markdown-it-mention@0.3": "0.3.0", + "npm:@fedify/vocab@^2.1.2": "2.1.2", + "npm:@js-temporal/polyfill@~0.5.1": "0.5.1", + "npm:@logtape/logtape@^1.3.5": "1.3.7", "npm:@multiformats/base-x@^4.0.1": "4.0.1", "npm:@opentelemetry/api@^1.9.0": "1.9.1", "npm:@opentelemetry/semantic-conventions@^1.39.0": "1.40.0", "npm:@types/markdown-it@^14.1.1": "14.1.2", "npm:asn1js@^3.0.6": "3.0.7", "npm:byte-encodings@^1.0.11": "1.0.11", - "npm:es-toolkit@^1.30.0": "1.45.1", - "npm:es-toolkit@^1.43.0": "1.45.1", + "npm:es-toolkit@^1.30.0": "1.43.0", + "npm:es-toolkit@^1.43.0": "1.43.0", "npm:hongdown@~0.1.0-dev.28": "0.1.0-dev.37", "npm:hongdown@~0.1.0-dev.29": "0.1.0-dev.37", "npm:hongdown@~0.1.0-dev.30": "0.1.0-dev.37", @@ -45,6 +50,7 @@ "npm:markdown-it@^14.1.0": "14.1.0", "npm:mime-db@^1.54.0": "1.54.0", "npm:pkijs@^3.2.5": "3.4.0", + "npm:postgres@^3.4.8": "3.4.8", "npm:structured-field-values@^2.0.4": "2.0.4", "npm:tsdown@~0.12.8": "0.12.9_rolldown@1.0.0-beta.55", "npm:uri-template-router@1": "1.0.0", @@ -64,8 +70,8 @@ "integrity": "27ee6b24286b7cf60a696a0ba115b2c17590d9ccee9ccbff23be8d1b22c399f1", "dependencies": [ "jsr:@fedify/fedify@^2.1.2", - "jsr:@fedify/vocab", "jsr:@fedify/vocab-runtime", + "jsr:@fedify/vocab@^2.1.2", "jsr:@fedify/webfinger", "jsr:@logtape/logtape@^2.0.5", "npm:@opentelemetry/api", @@ -195,6 +201,9 @@ "@babel/helper-validator-identifier" ] }, + "@cfworker/json-schema@4.1.1": { + "integrity": "sha512-gAmrUZSGtKc3AiBL71iNWxDsyUC5uMaKKGdvzYsBoTW/xi42JQHl7eKV2OYzCUqvc+D2RCcf7EXY2iCyFIk6og==" + }, "@digitalbazaar/http-client@4.3.0": { "integrity": "sha512-6lMpxpt9BOmqHKGs9Xm6DP4LlZTBFer/ZjHvP3FcW3IaUWYIWC7dw5RFZnvw4fP57kAVcm1dp3IF+Y50qhBvAw==", "dependencies": [ @@ -221,6 +230,28 @@ "tslib" ] }, + "@fedify/fedify@2.1.2": { + "integrity": "sha512-9i2dFdf/Fch6E/mlBCqlooAujujd2/Vf+7ZYlSqBkBeGtBYxicYswr842mU6lSO4DYjBAFQM83gLOxmLhqdOtg==", + "dependencies": [ + "@fedify/vocab", + "@fedify/vocab-runtime", + "@fedify/webfinger", + "@js-temporal/polyfill", + "@logtape/logtape@2.0.5", + "@opentelemetry/api", + "@opentelemetry/core", + "@opentelemetry/sdk-trace-base", + "@opentelemetry/semantic-conventions", + "byte-encodings", + "es-toolkit", + "json-canon", + "jsonld", + "structured-field-values", + "uri-template-router", + "url-template", + "urlpattern-polyfill" + ] + }, "@fedify/markdown-it-hashtag@0.3.0": { "integrity": "sha512-DCtfQ1OlFstob382d2iwEi77ezNqDQkUjS664q2r7ZOQQkBExvgvKDkL851tf1XRYp4A425/HO2yGRLZ1u1pSg==", "dependencies": [ @@ -233,6 +264,52 @@ "markdown-it" ] }, + "@fedify/vocab-runtime@2.1.2": { + "integrity": "sha512-Yn7XUAWclBz+eHuGnaHtz8CxynLYJ66qnEV1QjraveLSz+tB5Z4T0jQjpY3PMkUx3EhEiGJFu8Nb5hB7a89Jcw==", + "dependencies": [ + "@logtape/logtape@2.0.5", + "@multiformats/base-x", + "@opentelemetry/api", + "asn1js", + "byte-encodings", + "jsonld", + "pkijs" + ] + }, + "@fedify/vocab-tools@2.1.2": { + "integrity": "sha512-AeMA04m8pRnRut5mFKifKyAYrAEvEw0o0RVPCxzypZM8iEwneIQBRhZA435NHMdhIb5Zqx3aEp6J0+P9gevfWQ==", + "dependencies": [ + "@cfworker/json-schema", + "byte-encodings", + "es-toolkit", + "yaml" + ] + }, + "@fedify/vocab@2.1.2": { + "integrity": "sha512-gyBRp8BdXnRGNX98+5z//30xElWUP8d4kOid41wQJ9t4sqSYcyPpNr5re7r6jPQxc3ut7fpGBDmr9DJItJ7YYw==", + "dependencies": [ + "@fedify/vocab-runtime", + "@fedify/vocab-tools", + "@fedify/webfinger", + "@js-temporal/polyfill", + "@logtape/logtape@2.0.5", + "@multiformats/base-x", + "@opentelemetry/api", + "asn1js", + "es-toolkit", + "jsonld", + "pkijs" + ] + }, + "@fedify/webfinger@2.1.2": { + "integrity": "sha512-7wzWrYAZaTd+NG/CqbNrgt10+EXHRGbk1B640GjP51tT0oOEALG9/WGh8PYTSg/8G7htR8GLgWQEwskWO2cmjg==", + "dependencies": [ + "@fedify/vocab-runtime", + "@logtape/logtape@2.0.5", + "@opentelemetry/api", + "es-toolkit" + ] + }, "@hongdown/darwin-arm64@0.1.0-dev.37": { "integrity": "sha512-RHnlo6LF5l+e4gZMkcSWBVr0hMx69w6fmqERhsge0w1UFfBzbc8I5PwezchWFzJlYUWaiYQktsnHtcyPP3sa9w==", "os": ["darwin"], @@ -313,6 +390,18 @@ "@jridgewell/sourcemap-codec" ] }, + "@js-temporal/polyfill@0.5.1": { + "integrity": "sha512-hloP58zRVCRSpgDxmqCWJNlizAlUgJFqG2ypq79DCvyv9tHjRYMDOcPFjzfl/A1/YxDvRCZz8wvZvmapQnKwFQ==", + "dependencies": [ + "jsbi" + ] + }, + "@logtape/logtape@1.3.7": { + "integrity": "sha512-YgF+q9op97oLLPwc7TcTNIllTArVtTwkwyKky6XVzAXQcBrvFXXtMuwJSryONAyOUSItrx994O/HABOrszZyFg==" + }, + "@logtape/logtape@2.0.5": { + "integrity": "sha512-UizDkh20ZPJVOddRxG1F77WhHdlNl/sbQgoO8T534R7XvUBMAJ9En9f35u+meW2tRsNLvjz6R87Zanwf53tspQ==" + }, "@multiformats/base-x@4.0.1": { "integrity": "sha512-eMk0b9ReBbV23xXU693TAIrLyeO5iTgBZGSJfpqriG8UkYvr/hC9u9pyMlAakDNHWmbhMZCDs6KQO0jzKD8OTw==" }, @@ -330,6 +419,30 @@ "@opentelemetry/api@1.9.1": { "integrity": "sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q==" }, + "@opentelemetry/core@2.6.1_@opentelemetry+api@1.9.1": { + "integrity": "sha512-8xHSGWpJP9wBxgBpnqGL0R3PbdWQndL1Qp50qrg71+B28zK5OQmUgcDKLJgzyAAV38t4tOyLMGDD60LneR5W8g==", + "dependencies": [ + "@opentelemetry/api", + "@opentelemetry/semantic-conventions" + ] + }, + "@opentelemetry/resources@2.6.1_@opentelemetry+api@1.9.1": { + "integrity": "sha512-lID/vxSuKWXM55XhAKNoYXu9Cutoq5hFdkbTdI/zDKQktXzcWBVhNsOkiZFTMU9UtEWuGRNe0HUgmsFldIdxVA==", + "dependencies": [ + "@opentelemetry/api", + "@opentelemetry/core", + "@opentelemetry/semantic-conventions" + ] + }, + "@opentelemetry/sdk-trace-base@2.6.1_@opentelemetry+api@1.9.1": { + "integrity": "sha512-r86ut4T1e8vNwB35CqCcKd45yzqH6/6Wzvpk2/cZB8PsPLlZFTvrh8yfOS3CYZYcUmAx4hHTZJ8AO8Dj8nrdhw==", + "dependencies": [ + "@opentelemetry/api", + "@opentelemetry/core", + "@opentelemetry/resources", + "@opentelemetry/semantic-conventions" + ] + }, "@opentelemetry/semantic-conventions@1.40.0": { "integrity": "sha512-cifvXDhcqMwwTlTK04GBNeIe7yyo28Mfby85QXFe1Yk8nmi36Ab/5UQwptOx84SsoGNRg+EVSjwzfSZMy6pmlw==" }, @@ -501,8 +614,8 @@ "entities@4.5.0": { "integrity": "sha512-V0hjH4dGPh9Ao5p0MoRY6BVqtwCjhz6vI5LT8AJ55H+4g9/4vbHx1I54fS0XuclLhDHArPQCiMjDxjaL8fPxhw==" }, - "es-toolkit@1.45.1": { - "integrity": "sha512-/jhoOj/Fx+A+IIyDNOvO3TItGmlMKhtX8ISAHKE90c4b/k1tqaqEZ+uUqfpU8DMnW5cgNJv606zS55jGvza0Xw==" + "es-toolkit@1.43.0": { + "integrity": "sha512-SKCT8AsWvYzBBuUqMk4NPwFlSdqLpJwmy6AP322ERn8W2YLIB6JBXnwMI2Qsh2gfphT3q7EKAxKb23cvFHFwKA==" }, "fdir@6.5.0_picomatch@4.0.3": { "integrity": "sha512-tIbYtZbucOs0BRGqPJkshJUYdL+SDH7dVM8gjy+ERp3WAUjLEFJE+02kanyHtwjWOnwrKYBiwAmM0p4kLJAnXg==", @@ -553,6 +666,9 @@ "integrity": "sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ==", "bin": true }, + "jsbi@4.3.2": { + "integrity": "sha512-9fqMSQbhJykSeii05nxKl4m6Eqn2P6rOlYiS+C5Dr/HPIU/7yZxu5qzbs40tgaFORiw2Amd0mirjxatXYMkIew==" + }, "jsesc@3.1.0": { "integrity": "sha512-/sM3dO2FOzXjKQhJuo0Q173wf2KOo8t4I8vHy6lF9poUp7bKT0/NHE8fPX23PwfhnykfqnC2xRxOnVw5XuGIaA==", "bin": true @@ -622,6 +738,9 @@ "tslib" ] }, + "postgres@3.4.8": { + "integrity": "sha512-d+JFcLM17njZaOLkv6SCev7uoLaBtfK86vMUXhW1Z4glPWh4jozno9APvW/XKFJ3CCxVoC7OL38BqRydtu5nGg==" + }, "punycode.js@2.3.1": { "integrity": "sha512-uxFIHU0YlHYhDQtV4R9J6a52SLx28BCjT+4ieh7IGbgwVJWO+km431c4yRlREUAsAmt/uMjQUyQHNEPf0M39CA==" }, @@ -757,6 +876,9 @@ "url-template@3.1.1": { "integrity": "sha512-4oszoaEKE/mQOtAmdMWqIRHmkxWkUZMnXFnjQ5i01CuRSK3uluxcH1MRVVVWmhlnzT1SCDfKxxficm2G37qzCA==" }, + "urlpattern-polyfill@10.1.0": { + "integrity": "sha512-IGjKp/o0NL3Bso1PymYURCJxMPNAf/ILOpendP9f5B6e1rTJgdgiOvgfoT8VxCAdY+Wisb9uhGaJJf3yZ2V9nw==" + }, "uuid@11.1.0": { "integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==", "bin": true @@ -771,6 +893,10 @@ }, "yallist@4.0.0": { "integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A==" + }, + "yaml@2.8.3": { + "integrity": "sha512-AvbaCLOO2Otw/lW5bmh9d/WEdcDFdQp2Z2ZUH3pX9U2ihyUY0nvLv7J6TrWowklRGPYbB/IuIMfYgxaCPg5Bpg==", + "bin": true } }, "workspace": { @@ -814,6 +940,22 @@ ] } }, + "packages/botkit-postgres": { + "dependencies": [ + "jsr:@fedify/vocab@^2.1.2", + "npm:postgres@^3.4.8" + ], + "packageJson": { + "dependencies": [ + "npm:@fedify/fedify@^2.1.2", + "npm:@fedify/vocab@^2.1.2", + "npm:@js-temporal/polyfill@~0.5.1", + "npm:@logtape/logtape@^1.3.5", + "npm:postgres@^3.4.8", + "npm:tsdown@~0.12.8" + ] + } + }, "packages/botkit-sqlite": { "dependencies": [ "jsr:@fedify/vocab@^2.1.2" diff --git a/docs/concepts/bot.md b/docs/concepts/bot.md index 561d756..9ff4a57 100644 --- a/docs/concepts/bot.md +++ b/docs/concepts/bot.md @@ -97,6 +97,12 @@ the key–value store specified in the [`kv`](#createbotoptions-kv) option. For more information, see the [*Repository* section](./repository.md). +If you want to use a SQL-backed repository directly instead of +[`KvRepository`](./repository.md#kvrepository), BotKit also provides concrete +repository packages such as +[`SqliteRepository`](./repository.md#sqliterepository) and +[`PostgresRepository`](./repository.md#postgresrepository). + ### `~CreateBotOptions.identifier` The internal identifier of the bot actor. It is used for the URI of the bot diff --git a/docs/concepts/repository.md b/docs/concepts/repository.md index 415dcb9..8c387df 100644 --- a/docs/concepts/repository.md +++ b/docs/concepts/repository.md @@ -135,6 +135,88 @@ properties: [`node:sqlite`]: https://nodejs.org/api/sqlite.html +`PostgresRepository` +-------------------- + +*This API is available since BotKit 0.4.0.* + +The `PostgresRepository` is a repository that stores data in PostgreSQL using +the [Postgres.js] driver. It is suited for deployments where multiple bot +processes need to share the same persistent repository state, or where you +already operate PostgreSQL for other infrastructure. + +Unlike [`KvRepository`](#kvrepository), `PostgresRepository` stores BotKit data +in ordinary PostgreSQL tables rather than a key-value abstraction. It creates +tables inside a dedicated PostgreSQL schema, uses transactions for multi-step +updates, and supports either an internally owned connection pool or an injected +Postgres.js client. + +In order to use `PostgresRepository`, you need to install the +*@fedify/botkit-postgres* package: + +::: code-group + +~~~~ sh [Deno] +deno add jsr:@fedify/botkit-postgres +~~~~ + +~~~~ sh [npm] +npm add @fedify/botkit-postgres +~~~~ + +~~~~ sh [pnpm] +pnpm add @fedify/botkit-postgres +~~~~ + +~~~~ sh [Yarn] +yarn add @fedify/botkit-postgres +~~~~ + +::: + +The `PostgresRepository` constructor accepts an options object with the +following properties: + +`sql` +: An existing [Postgres.js] client. When this is provided, the repository + does not own the client and calling `close()` will not shut it down. + +`url` +: A PostgreSQL connection string used to create an internal connection pool. + Exactly one of `sql` and `url` must be provided. + +`schema` (optional) +: The PostgreSQL schema used for BotKit tables. Defaults to `"botkit"`. + +`maxConnections` (optional) +: The maximum number of connections for the internally created pool. This + option is only valid when `url` is used. + +`prepare` (optional) +: Whether to use prepared statements for repository queries. Defaults to + `true`. + +These options are mutually exclusive: use either `sql` or `url`. The +`maxConnections` option is only meaningful together with `url`. + +The repository initializes its tables and indexes automatically. If you want +to provision them before creating the repository, use the exported +`initializePostgresRepositorySchema()` helper: + +~~~~ typescript +import postgres from "postgres"; +import { initializePostgresRepositorySchema } from "@fedify/botkit-postgres"; + +const sql = postgres("postgresql://localhost/botkit"); +await initializePostgresRepositorySchema(sql, "botkit"); +~~~~ + +If you disable prepared statements for PgBouncer-style deployments, pass +`false` as the third argument so schema initialization uses the same setting. + +[Postgres.js]: https://github.com/porsager/postgres + + `MemoryRepository` ------------------ diff --git a/docs/package.json b/docs/package.json index 39b2d67..ec3cfe3 100644 --- a/docs/package.json +++ b/docs/package.json @@ -1,6 +1,7 @@ { "devDependencies": { "@fedify/botkit": "workspace:", + "@fedify/botkit-postgres": "workspace:", "@fedify/botkit-sqlite": "workspace:", "@fedify/denokv": "jsr:@fedify/denokv@^2.1.2", "@fedify/fedify": "catalog:", diff --git a/packages/botkit-postgres/README.md b/packages/botkit-postgres/README.md new file mode 100644 index 0000000..b2cad79 --- /dev/null +++ b/packages/botkit-postgres/README.md @@ -0,0 +1,130 @@ +@fedify/botkit-postgres +======================= + +[![JSR][JSR badge]][JSR] +[![npm][npm badge]][npm] +[![GitHub Actions][GitHub Actions badge]][GitHub Actions] + +This package is a [PostgreSQL]-based repository implementation for [BotKit]. +It provides persistent shared storage for bots running on either [Deno] or +[Node.js], supports connection pooling through [Postgres.js], and stores BotKit +repository data in ordinary PostgreSQL tables under a dedicated schema. + +[JSR badge]: https://jsr.io/badges/@fedify/botkit-postgres +[JSR]: https://jsr.io/@fedify/botkit-postgres +[npm badge]: https://img.shields.io/npm/v/@fedify/botkit-postgres?logo=npm +[npm]: https://www.npmjs.com/package/@fedify/botkit-postgres +[GitHub Actions badge]: https://github.com/fedify-dev/botkit/actions/workflows/main.yaml/badge.svg +[GitHub Actions]: https://github.com/fedify-dev/botkit/actions/workflows/main.yaml +[PostgreSQL]: https://www.postgresql.org/ +[BotKit]: https://botkit.fedify.dev/ +[Deno]: https://deno.land/ +[Node.js]: https://nodejs.org/ +[Postgres.js]: https://github.com/porsager/postgres + + +Installation +------------ + +~~~~ sh +deno add jsr:@fedify/botkit-postgres +npm add @fedify/botkit-postgres +pnpm add @fedify/botkit-postgres +yarn add @fedify/botkit-postgres +~~~~ + + +Usage +----- + +The `PostgresRepository` can be used as a drop-in repository implementation for +BotKit: + +~~~~ typescript +import { createBot } from "@fedify/botkit"; +import { PostgresRepository } from "@fedify/botkit-postgres"; + +const bot = createBot({ + username: "mybot", + repository: new PostgresRepository({ + url: "postgresql://localhost/botkit", + schema: "botkit", + maxConnections: 10, + }), +}); +~~~~ + +You can also inject an existing [Postgres.js] client. In that case the +repository does not own the client and `close()` will not shut it down: + +~~~~ typescript +import postgres from "postgres"; +import { PostgresRepository } from "@fedify/botkit-postgres"; + +const sql = postgres("postgresql://localhost/botkit"); +const repository = new PostgresRepository({ + sql, + schema: "botkit", +}); +~~~~ + + +Options +------- + +The `PostgresRepository` constructor accepts the following options: + + - **`sql`**: An existing [Postgres.js] client to use. + + - **`url`**: A PostgreSQL connection string for an internally managed + connection pool. + + - **`schema`** (optional): The PostgreSQL schema name used for BotKit tables. + Defaults to `"botkit"`. + + - **`maxConnections`** (optional): Maximum number of connections for an + internally managed pool created from `url`. + + - **`prepare`** (optional): Whether to use prepared statements for queries. + Defaults to `true`. + +The options are mutually exclusive: use either `sql` or `url`. The +`maxConnections` option is only valid together with `url`. + + +Schema setup +------------ + +The repository creates its schema and tables automatically on first use. +If you want to provision them explicitly ahead of time, use the exported +`initializePostgresRepositorySchema()` helper: + +~~~~ typescript +import postgres from "postgres"; +import { initializePostgresRepositorySchema } from "@fedify/botkit-postgres"; + +const sql = postgres("postgresql://localhost/botkit"); +await initializePostgresRepositorySchema(sql, "botkit"); +~~~~ + +If you disable prepared statements, pass `false` as the third argument so +schema initialization uses the same setting. + + +Features +-------- + + - **Cross-runtime**: Works with both Deno and Node.js using [Postgres.js] + + - **Shared persistent storage**: Suitable for multi-process and + multi-instance deployments backed by PostgreSQL + + - **Schema namespacing**: Keeps BotKit tables grouped under a dedicated + PostgreSQL schema + + - **Full `Repository` API**: Implements BotKit repository storage for key + pairs, messages, followers, follows, followees, and poll votes + + - **Explicit resource ownership**: Repositories created from a URL own their + pool, while repositories created from an injected client leave lifecycle + control to the caller diff --git a/packages/botkit-postgres/deno.json b/packages/botkit-postgres/deno.json new file mode 100644 index 0000000..f649dd0 --- /dev/null +++ b/packages/botkit-postgres/deno.json @@ -0,0 +1,36 @@ +{ + "name": "@fedify/botkit-postgres", + "version": "0.4.0", + "license": "AGPL-3.0-only", + "exports": { + ".": "./src/mod.ts" + }, + "imports": { + "@fedify/vocab": "jsr:@fedify/vocab@^2.1.2", + "postgres": "npm:postgres@^3.4.8" + }, + "exclude": [ + "dist", + "junit.xml", + "package.json" + ], + "fmt": { + "exclude": [ + "*.md", + "*.yaml", + "*.yml" + ] + }, + "tasks": { + "test": "deno test --allow-env --allow-net=localhost,127.0.0.1 --parallel", + "test:node": "pnpm install && pnpm test", + "test-all": { + "dependencies": [ + "check", + "test", + "test:node" + ] + }, + "coverage": "deno task test --coverage --clean && deno coverage --html" + } +} diff --git a/packages/botkit-postgres/package.json b/packages/botkit-postgres/package.json new file mode 100644 index 0000000..02dce30 --- /dev/null +++ b/packages/botkit-postgres/package.json @@ -0,0 +1,64 @@ +{ + "name": "@fedify/botkit-postgres", + "version": "0.4.0", + "description": "PostgreSQL-based repository for BotKit", + "license": "AGPL-3.0-only", + "author": { + "name": "Hong Minhee", + "email": "hong@minhee.org", + "url": "https://hongminhee.org/" + }, + "homepage": "https://botkit.fedify.dev/", + "repository": { + "type": "git", + "url": "git+https://github.com/fedify-dev/botkit.git", + "directory": "packages/botkit-postgres" + }, + "bugs": { + "url": "https://github.com/fedify-dev/botkit/issues" + }, + "funding": [ + "https://opencollective.com/fedify", + "https://github.com/sponsors/dahlia" + ], + "engines": { + "deno": ">=2.0.0", + "node": ">=22.0.0" + }, + "type": "module", + "module": "./dist/mod.js", + "types": "./dist/mod.d.ts", + "exports": { + ".": { + "types": "./dist/mod.d.ts", + "import": "./dist/mod.js" + }, + "./package.json": "./package.json" + }, + "sideEffects": true, + "files": [ + "dist", + "LICENSE", + "package.json", + "README.md" + ], + "peerDependencies": { + "@fedify/botkit": "^0.4.0" + }, + "dependencies": { + "@fedify/fedify": "^2.1.2", + "@fedify/vocab": "^2.1.2", + "@js-temporal/polyfill": "^0.5.1", + "@logtape/logtape": "^1.3.5", + "postgres": "^3.4.8" + }, + "devDependencies": { + "tsdown": "^0.12.8" + }, + "scripts": { + "build": "tsdown", + "prepack": "tsdown", + "prepublish": "deno task check && tsdown", + "test": "tsdown && cd src/ && node --test --experimental-transform-types" + } +} diff --git a/packages/botkit-postgres/src/mod.test.ts b/packages/botkit-postgres/src/mod.test.ts new file mode 100644 index 0000000..3922974 --- /dev/null +++ b/packages/botkit-postgres/src/mod.test.ts @@ -0,0 +1,979 @@ +// BotKit by Fedify: A framework for creating ActivityPub bots +// Copyright (C) 2026 Hong Minhee +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +import { + initializePostgresRepositorySchema, + PostgresRepository, +} from "@fedify/botkit-postgres"; +import { importJwk } from "@fedify/fedify/sig"; +import { Create, Follow, Note, Person, PUBLIC_COLLECTION } from "@fedify/vocab"; +import assert from "node:assert/strict"; +import { describe, test } from "node:test"; +import postgres from "postgres"; + +if (!("Temporal" in globalThis)) { + globalThis.Temporal = (await import("@js-temporal" + "/polyfill")).Temporal; +} + +function getPostgresUrl(): string | undefined { + if ("process" in globalThis) return globalThis.process.env.POSTGRES_URL; + if ("Deno" in globalThis) return globalThis.Deno.env.get("POSTGRES_URL"); + return undefined; +} + +function createSchemaName(): string { + return `botkit_test_${crypto.randomUUID().replaceAll("-", "_")}`; +} + +const postgresUrl = getPostgresUrl(); + +const keyPairs: CryptoKeyPair[] = [ + { + publicKey: await importJwk({ + kty: "RSA", + alg: "RS256", + // cSpell: disable + n: "1NZblYSc2beQqDmDUF_VDMeS7bUXShvIMK6NHd9OB-7ivBwad8vUcmqKwWj_ivqZva6EgD-n0549t0Pzn5xTArqEJ-c1DTyhC7TNtof0KIbU75qziHwHOqcyYCHusQgDm_TT7frDuxLqHJQ1UrdADMyCVDPFfcstPHhHp3NYStGeNcBo5B05DB_wkgqX2QF2MamQwkdRRMdZkVees38AsC6GTGoOFRI2lvJuUODtndpyjGAKOkLfkr9XzAcggRYx9ddsHBd5wylffwKhtUtWHkdVBdVAiEX8sZ38LhqNYm161PE83nfEvut6_lCCQ7DlPJ8Tp6SY-f2JTXA-C9sN0uJF8_YGhaCPgolv5Pk2UerQmvhMhql9MLDen1AvZrw0u1CWic0GQeIDA6Op9Exd5azhhdm4iKeYzAekUHFDi6WZRRZRCYgHaEEzXyFt9W3N3paolMYVOh1008d-aIgbYnZMToiwH897uQsNGkd1FVIutycXdeuhAbqB7AtLrzuD78wkKLO8k3DFcix2qaHRqiBKC3lUlDCD_I5yzinY_SOcagdpRxczvi6JN1ahUg39ZKYRtJIxUOp1H3iRrebbaOoxM19-axKH1om0sYtyX4JqYfN9QrSf3cO1I6CGnJY8hIkQ6CDH5Tmk_4VRRKdzphq4jZiiOYfR94WODPKDjTM", + e: "AQAB", + // cSpell: enable + key_ops: ["verify"], + ext: true, + }, "public"), + privateKey: await importJwk({ + kty: "RSA", + alg: "RS256", + // cSpell: disable + n: "1NZblYSc2beQqDmDUF_VDMeS7bUXShvIMK6NHd9OB-7ivBwad8vUcmqKwWj_ivqZva6EgD-n0549t0Pzn5xTArqEJ-c1DTyhC7TNtof0KIbU75qziHwHOqcyYCHusQgDm_TT7frDuxLqHJQ1UrdADMyCVDPFfcstPHhHp3NYStGeNcBo5B05DB_wkgqX2QF2MamQwkdRRMdZkVees38AsC6GTGoOFRI2lvJuUODtndpyjGAKOkLfkr9XzAcggRYx9ddsHBd5wylffwKhtUtWHkdVBdVAiEX8sZ38LhqNYm161PE83nfEvut6_lCCQ7DlPJ8Tp6SY-f2JTXA-C9sN0uJF8_YGhaCPgolv5Pk2UerQmvhMhql9MLDen1AvZrw0u1CWic0GQeIDA6Op9Exd5azhhdm4iKeYzAekUHFDi6WZRRZRCYgHaEEzXyFt9W3N3paolMYVOh1008d-aIgbYnZMToiwH897uQsNGkd1FVIutycXdeuhAbqB7AtLrzuD78wkKLO8k3DFcix2qaHRqiBKC3lUlDCD_I5yzinY_SOcagdpRxczvi6JN1ahUg39ZKYRtJIxUOp1H3iRrebbaOoxM19-axKH1om0sYtyX4JqYfN9QrSf3cO1I6CGnJY8hIkQ6CDH5Tmk_4VRRKdzphq4jZiiOYfR94WODPKDjTM", + e: "AQAB", + d: "Yl3DrCHDIDhfifAyyWXRIHvoYyZL4jte1WkG3WSEOtRkRA41CWLSCCNHh8YQPNo_TdQnduJ0nTBIU7f7E6x7DQrI42xPL5Py1mc0oATLiiNurGJyUUUJTklR1e440-bhTCXmANnhtkcyngy9bEI3PvMR1PqsbswFVyo76586kjG5DhykHbGH2Ru14rk0nt23E5LLzY6Kd-AufCbjuQ-ccNC_zvdBFOn7At5-r7CVAVyhjlEgyPZ5P-hhGnG8ywxIANgUJhOPeexYL2o29IQiBBJxsCV0EsdN14UttN0etPvmRh5MRIFUE-zfRkRNQB20hMT8n4FKFlfgKkMS2gXep91h9VVyfYPHAt9jGJgUbIcbx_igeLK3nQlaUXaePf2bAuVRM1kW3P2UR0FOoUKDI5FZmi9XBoEtt0taQYySdKbPSXKaJWO2vKQ4SPyVXzzz-obfVe2zIe1msQ3Tco5RFoHfnufbvvnLC_WUAC9LSfp4jrPvr5lY3uoCFmPma56R-E3mVd2q87Ybk6mqvSh4yWHjid7sfzQ8Ovh9OhZlq_7Mfa3q3M92vNL98iHs8xYkJbE0DJs691UdgX45iNi4DVD-hJ7EbKQQgePsYNovWA611kM-cartevQWk7TBBggy9VYqmdWN0QuVQX9bsHFeYjjKSXg24bV5vYQW3EPkqZk", + p: "9DeEDfMVdV605MbHCtHnw5xEbzTHd7vK-qAQNIjz5i4EmFC0tK7dvUiSn0WeyMNYJkuxVxTMHoDbWXzXq45tzbTEYuzEo5wsxyoVvldfFnnJIwMu6Hb7PWjyWfpBcbwLISr8fAJaGPzgcFsJE__KxrvLA66m1q_4k1y1L9CvXWfHDvFqb7VLGzKWXXp2wlbsACZuqx2Ff3THcWoOWb-wSww6AGsYAc3zC_DiYvAaTn9MxszZ0UYuMeJIHjLA1dmjL-Nksvq5GukjFxSSTpUS87zJ08fHoB0FzTKIIjJGpMRf6ebReLqbYCdo2Kr7eC7lbcTfwQTPI6gnHSKgPIYF5Q", + q: "3xtArH_4MQjwRpl7JVivzQUZgDTARkynMpX-4Gvyny6Gxx0QLhHH0lQMRhtFWlI6qLZxCCLC9zhXPmGlqW-QWya8-xE80mX45JTrQlwBHISpTWTV3sI2Lp5dg7CW8Sc40CE4kB4Q2rHhf7V-Aimgmqhnl1uguzH2DXfr3RaCor0ge44k6gi1LXEJN_aFQIIFYL8HQOM0ctdY147Kr2rVHLchRnh8Q4GzBAJvpOcfvEDk9HF09NVxeaivLMXChpuSUHqbEGg_lVkotLnCMb-fUWk8QmO8EFFVU0pyOFDqHKIgrHOLSHjgUvV8moBwnMGQxMgu7rpY3g-9cXfsCoKVNw", + dp: + "bL1vajqrelhSGW-83r95_-pLumx4yIJwrcmpjYrRdtNUrnF5FN6r0wVGa-629dOtI1gevZSAErDzelQRP80qbSapLxcXs3XtpjzB87-5kitl-NYJA-8-jSh2iMPacgb1ua4HQDxX27p1QPH4B9SkeHrTuW8B0KQH_a2Q65pzCxcTVj7-UoEZ0SFkPHkz-fJ0INj7--soLwlTaNd9Tk8A81mdVeRZiywlpVJ7quwX-o3KJNa_weQK26FS1Udp_45pkAAjLWJgG3BldHhvcNgF2UtdXpQc-dkSZTyzyu4x8FmUD3T8HlKQrm69y4POdsQC2i6IJsy6YrkTuXBagrh2VQ", + dq: + "j0CQZjJEyjdTEAG8cF5hguKjXQ6B5qGROYnV_YNSZaMaJv8iRHJmO0Z8GwenoDbsMyfxq6emR9aFLijEleZsahqVfR-0TePry9lStWkdzZHgozD7oexRnd1Rbh0UzgLBF-I8z0x-xe0xPS7rmbfgx20aFrVentOViVBWwb6SYqvND4hVa2_r5SGPKb_AD4tsqJH_tkosgxCCmuW0fq256JYtZ3I1V6MPrqNhzCAa4GVKnSm8Tvg9xD_rOnRAUu3RJJuUtRQ6v0pgOKqNZiQDx-IqLvaa6l9OygwjCsXpjDkNga0u4Xm7j4jQWOPfasdejPt8Jwy_wtWYbiLyDE2MQQ", + qi: + "Th3TS6fHquqNljwZU2Vg7ndI0SmJidIwSTS2LlhM-Y2bxaAUF-orpS504xDVk1xjRYBrdxiTOmohbtoKtiWhLveOUAWVoNilMqgEU7lwnhaE3yfiUoE1x8df_wLP_YiAccFKeMZwsQp29aKLxuYQtO2dRSSQkN0IuxMGchnJtGOGNTbyA44O25IwggV1nlJN7OTX-nsJCSCe1XMojnGezhnD4xXGeSuR3S07oDDiWpvAO7qtRphEavVTtXdJWIr27tBvnUytbpb4uq6A3J4-TZ6X9uzlOw6jBSQhbL7fc83Z9E_wjPTnxfHufiC_AtXow6sK7lCy10aJGHp3jnGVdQ", + // cSpell: enable + key_ops: ["sign"], + ext: true, + }, "private"), + }, +]; + +function createSql(url: string) { + return postgres(url, { max: 1, onnotice: () => {} }); +} + +function waitForMacrotask(): Promise { + return new Promise((resolve) => setTimeout(resolve, 0)); +} + +function createHarness() { + if (postgresUrl == null) throw new Error("POSTGRES_URL is not set."); + const schema = createSchemaName(); + const adminSql = createSql(postgresUrl); + const repository = new PostgresRepository({ + url: postgresUrl, + schema, + maxConnections: 1, + }); + return { + adminSql, + repository, + schema, + async cleanup() { + await repository.close(); + await adminSql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`); + await adminSql.end(); + }, + }; +} + +if (postgresUrl == null) { + test("PostgresRepository integration tests", { skip: true }, () => {}); +} else { + describe("PostgresRepository", () => { + test("initializes schema explicitly", async () => { + const sql = createSql(postgresUrl); + const schema = createSchemaName(); + try { + await initializePostgresRepositorySchema(sql, schema); + const tables = await sql.unsafe<{ table_name: string }[]>( + `SELECT table_name + FROM information_schema.tables + WHERE table_schema = $1 + ORDER BY table_name`, + [schema], + { prepare: true }, + ); + assert.deepStrictEqual( + tables.map((row) => row.table_name), + [ + "follow_requests", + "followees", + "followers", + "key_pairs", + "messages", + "poll_votes", + "sent_follows", + ], + ); + } finally { + await sql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`); + await sql.end(); + } + }); + + test("honors prepare=false during schema initialization", async () => { + const prepares: boolean[] = []; + const sql = createSql(postgresUrl); + const helperSchema = createSchemaName(); + const repositorySchema = createSchemaName(); + const wrappedSql = new Proxy(sql, { + get(target, property, receiver) { + if (property === "unsafe") { + return ( + query: string, + parameters?: postgres.ParameterOrJSON[], + options?: postgres.UnsafeQueryOptions, + ) => { + prepares.push(options?.prepare ?? true); + return target.unsafe(query, parameters, options); + }; + } + return Reflect.get(target, property, receiver); + }, + }); + try { + await initializePostgresRepositorySchema( + wrappedSql, + helperSchema, + false, + ); + + const repo = new PostgresRepository({ + sql: wrappedSql, + schema: repositorySchema, + prepare: false, + }); + await repo.countMessages(); + + assert.ok(prepares.length > 0); + assert.ok(prepares.every((prepare) => !prepare)); + } finally { + await sql.unsafe(`DROP SCHEMA IF EXISTS "${helperSchema}" CASCADE`); + await sql.unsafe(`DROP SCHEMA IF EXISTS "${repositorySchema}" CASCADE`); + await sql.end(); + } + }); + + test("rejects invalid constructor option combinations", async () => { + const sql = createSql(postgresUrl); + try { + await assert.doesNotReject( + async () => + await Reflect.construct(PostgresRepository, [{ + sql, + url: undefined, + maxConnections: undefined, + }]).countMessages(), + ); + await assert.rejects( + async () => + await Reflect.construct(PostgresRepository, [{ + sql, + url: postgresUrl, + maxConnections: 1, + }]).countMessages(), + new TypeError( + "PostgresRepositoryOptions.sql cannot be combined with PostgresRepositoryOptions.url or PostgresRepositoryOptions.maxConnections.", + ), + ); + } finally { + await sql.end(); + } + }); + + test("does not emit unhandled rejections for schema initialization", async () => { + const error = new Error("Schema initialization failed."); + const sql = { + // deno-lint-ignore require-await + unsafe: async () => { + throw error; + }, + }; + let unhandledReason: unknown; + let detach: (() => void) | undefined; + if ("process" in globalThis) { + const handler = (reason: unknown) => { + unhandledReason = reason; + }; + globalThis.process.once("unhandledRejection", handler); + detach = () => { + globalThis.process.off("unhandledRejection", handler); + }; + } else { + const handler = (event: PromiseRejectionEvent) => { + unhandledReason = event.reason; + event.preventDefault(); + }; + addEventListener("unhandledrejection", handler); + detach = () => { + removeEventListener("unhandledrejection", handler); + }; + } + try { + const repo = Reflect.construct(PostgresRepository, [{ + sql, + schema: createSchemaName(), + }]) as PostgresRepository; + await waitForMacrotask(); + await assert.rejects( + () => repo.countMessages(), + error, + ); + await waitForMacrotask(); + assert.deepStrictEqual(unhandledReason, undefined); + } finally { + detach?.(); + } + }); + + test("does not leak stale followers during concurrent assignment", async () => { + const schema = createSchemaName(); + const adminSql = createSql(postgresUrl); + const sqlA = createSql(postgresUrl); + const sqlB = createSql(postgresUrl); + const followId = new URL( + "https://example.com/ap/follow/eac8d54f-f843-49f4-94c1-485a22e07907", + ); + const followerA = new Person({ + id: new URL("https://example.com/ap/actor/concurrent-alice"), + preferredUsername: "concurrent-alice", + }); + const followerB = new Person({ + id: new URL("https://example.com/ap/actor/concurrent-bob"), + preferredUsername: "concurrent-bob", + }); + let arrivals = 0; + let releaseBarrier!: () => void; + const barrier = new Promise((resolve) => { + releaseBarrier = resolve; + }); + const barrierTimeout = 50; + const wrapSql = (sql: postgres.Sql): postgres.Sql => + new Proxy(sql, { + get(target, property, receiver) { + if (property === "begin") { + return async ( + callback: (transactionSql: postgres.TransactionSql) => unknown, + ) => + await target.begin(async (transactionSql) => + await callback( + new Proxy(transactionSql, { + get(txTarget, txProperty, txReceiver) { + if (txProperty === "unsafe") { + return async ( + query: string, + parameters?: postgres.ParameterOrJSON[], + options?: postgres.UnsafeQueryOptions, + ): Promise => { + const result = await txTarget.unsafe( + query, + parameters, + options, + ); + if ( + arrivals < 2 && + query.includes("SELECT follower_id") && + query.includes("FOR UPDATE") && + parameters?.[0] === followId.href + ) { + arrivals++; + if (arrivals === 2) releaseBarrier(); + await Promise.race([ + barrier, + new Promise((resolve) => + setTimeout(resolve, barrierTimeout) + ), + ]); + } + return result; + }; + } + return Reflect.get(txTarget, txProperty, txReceiver); + }, + }), + ) + ); + } + return Reflect.get(target, property, receiver); + }, + }); + try { + await initializePostgresRepositorySchema(adminSql, schema); + const repoA = new PostgresRepository({ sql: wrapSql(sqlA), schema }); + const repoB = new PostgresRepository({ sql: wrapSql(sqlB), schema }); + + await Promise.all([ + repoA.addFollower(followId, followerA), + repoB.addFollower(followId, followerB), + ]); + + const followers = await Array.fromAsync(repoA.getFollowers()); + assert.deepStrictEqual(await repoA.countFollowers(), 1); + assert.deepStrictEqual(followers.length, 1); + const remainingFollowerId = followers[0]?.id?.href; + assert.ok( + remainingFollowerId === followerA.id!.href || + remainingFollowerId === followerB.id!.href, + ); + assert.deepStrictEqual( + await repoA.hasFollower(followerA.id!), + remainingFollowerId === followerA.id!.href, + ); + assert.deepStrictEqual( + await repoA.hasFollower(followerB.id!), + remainingFollowerId === followerB.id!.href, + ); + } finally { + await adminSql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`); + await Promise.all([sqlA.end(), sqlB.end(), adminSql.end()]); + } + }); + + test("serializes follower removal with concurrent adds", async () => { + const schema = createSchemaName(); + const adminSql = createSql(postgresUrl); + const sqlA = createSql(postgresUrl); + const sqlB = createSql(postgresUrl); + const followId = new URL( + "https://example.com/ap/follow/55db42c7-8a99-4c40-98c1-4684d7d0c758", + ); + const follower = new Person({ + id: new URL("https://example.com/ap/actor/concurrent-charlie"), + preferredUsername: "concurrent-charlie", + }); + let releaseBarrier!: () => void; + let resolveLockAcquired!: () => void; + const barrier = new Promise((resolve) => { + releaseBarrier = resolve; + }); + const lockAcquired = new Promise((resolve) => { + resolveLockAcquired = resolve; + }); + const wrappedSql = new Proxy(sqlA, { + get(target, property, receiver) { + if (property === "begin") { + return async ( + callback: (transactionSql: postgres.TransactionSql) => unknown, + ) => + await target.begin(async (transactionSql) => + await callback( + new Proxy(transactionSql, { + get(txTarget, txProperty, txReceiver) { + if (txProperty === "unsafe") { + return async ( + query: string, + parameters?: postgres.ParameterOrJSON[], + options?: postgres.UnsafeQueryOptions, + ): Promise => { + const result = await txTarget.unsafe( + query, + parameters, + options, + ); + if ( + query.includes("pg_advisory_xact_lock") && + parameters?.[1] === `${schema}:${followId.href}` + ) { + resolveLockAcquired(); + await barrier; + } + return result; + }; + } + return Reflect.get(txTarget, txProperty, txReceiver); + }, + }), + ) + ); + } + return Reflect.get(target, property, receiver); + }, + }); + try { + await initializePostgresRepositorySchema(adminSql, schema); + const repoA = new PostgresRepository({ sql: wrappedSql, schema }); + const repoB = new PostgresRepository({ sql: sqlB, schema }); + + const addPromise = repoA.addFollower(followId, follower); + await lockAcquired; + const removePromise = repoB.removeFollower(followId, follower.id!); + await waitForMacrotask(); + releaseBarrier(); + + const [, removedFollower] = await Promise.all([ + addPromise, + removePromise, + ]); + assert.deepStrictEqual( + await removedFollower?.toJsonLd(), + await follower.toJsonLd(), + ); + assert.deepStrictEqual(await repoA.countFollowers(), 0); + assert.deepStrictEqual(await repoA.hasFollower(follower.id!), false); + } finally { + await adminSql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`); + await Promise.all([sqlA.end(), sqlB.end(), adminSql.end()]); + } + }); + + test("does not leak stale followers during concurrent reassignment cleanup", async () => { + const schema = createSchemaName(); + const adminSql = createSql(postgresUrl); + const sqlA = createSql(postgresUrl); + const sqlB = createSql(postgresUrl); + const oldFollower = new Person({ + id: new URL("https://example.com/ap/actor/concurrent-dana"), + preferredUsername: "concurrent-dana", + }); + const newFollowerA = new Person({ + id: new URL("https://example.com/ap/actor/concurrent-emma"), + preferredUsername: "concurrent-emma", + }); + const newFollowerB = new Person({ + id: new URL("https://example.com/ap/actor/concurrent-frank"), + preferredUsername: "concurrent-frank", + }); + const followA = new URL( + "https://example.com/ap/follow/ee4ca254-e614-4433-941b-bbe95b4b92a3", + ); + const followB = new URL( + "https://example.com/ap/follow/c7bacad2-3078-4cd1-9a26-4b13e58b8d67", + ); + let arrivals = 0; + let releaseBarrier!: () => void; + const barrier = new Promise((resolve) => { + releaseBarrier = resolve; + }); + const barrierTimeout = 50; + const wrapSql = (sql: postgres.Sql): postgres.Sql => + new Proxy(sql, { + get(target, property, receiver) { + if (property === "begin") { + return async ( + callback: (transactionSql: postgres.TransactionSql) => unknown, + ) => + await target.begin(async (transactionSql) => + await callback( + new Proxy(transactionSql, { + get(txTarget, txProperty, txReceiver) { + if (txProperty === "unsafe") { + return async ( + query: string, + parameters?: postgres.ParameterOrJSON[], + options?: postgres.UnsafeQueryOptions, + ): Promise => { + if ( + arrivals < 2 && + query.includes( + `DELETE FROM "${schema}"."followers"`, + ) && + parameters?.[0] === oldFollower.id!.href + ) { + arrivals++; + if (arrivals === 2) releaseBarrier(); + await Promise.race([ + barrier, + new Promise((resolve) => + setTimeout(resolve, barrierTimeout) + ), + ]); + } + return await txTarget.unsafe( + query, + parameters, + options, + ); + }; + } + return Reflect.get(txTarget, txProperty, txReceiver); + }, + }), + ) + ); + } + return Reflect.get(target, property, receiver); + }, + }); + try { + await initializePostgresRepositorySchema(adminSql, schema); + const setupRepo = new PostgresRepository({ sql: adminSql, schema }); + await setupRepo.addFollower(followA, oldFollower); + await setupRepo.addFollower(followB, oldFollower); + + const repoA = new PostgresRepository({ sql: wrapSql(sqlA), schema }); + const repoB = new PostgresRepository({ sql: wrapSql(sqlB), schema }); + await Promise.all([ + repoA.addFollower(followA, newFollowerA), + repoB.addFollower(followB, newFollowerB), + ]); + + const followers = await Promise.all( + (await Array.fromAsync(repoA.getFollowers())).map((follower) => + follower.toJsonLd() + ), + ); + assert.deepStrictEqual(await repoA.countFollowers(), 2); + assert.deepStrictEqual(await repoA.hasFollower(oldFollower.id!), false); + assert.deepStrictEqual(followers, [ + await newFollowerA.toJsonLd(), + await newFollowerB.toJsonLd(), + ]); + } finally { + await adminSql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`); + await Promise.all([sqlA.end(), sqlB.end(), adminSql.end()]); + } + }); + + test("does not drop valid followers during concurrent cleanup and add", async () => { + const schema = createSchemaName(); + const adminSql = createSql(postgresUrl); + const sqlA = createSql(postgresUrl); + const sqlB = createSql(postgresUrl); + const oldFollower = new Person({ + id: new URL("https://example.com/ap/actor/concurrent-grace"), + preferredUsername: "concurrent-grace", + }); + const reassignedFollower = new Person({ + id: new URL("https://example.com/ap/actor/concurrent-harper"), + preferredUsername: "concurrent-harper", + }); + const followA = new URL( + "https://example.com/ap/follow/ea47ae4b-8e5d-4f5f-b1c8-9d66d5d11d83", + ); + const followB = new URL( + "https://example.com/ap/follow/8f65dab0-2a7e-4c6a-b53e-99f3e2521c0e", + ); + let resolveCleanupReady!: () => void; + let resolveInsertedNewRequest!: () => void; + const cleanupReady = new Promise((resolve) => { + resolveCleanupReady = resolve; + }); + const insertedNewRequest = new Promise((resolve) => { + resolveInsertedNewRequest = resolve; + }); + const barrierTimeout = 50; + const wrapCleanupSql = (sql: postgres.Sql): postgres.Sql => + new Proxy(sql, { + get(target, property, receiver) { + if (property === "begin") { + return async ( + callback: (transactionSql: postgres.TransactionSql) => unknown, + ) => + await target.begin(async (transactionSql) => + await callback( + new Proxy(transactionSql, { + get(txTarget, txProperty, txReceiver) { + if (txProperty === "unsafe") { + return async ( + query: string, + parameters?: postgres.ParameterOrJSON[], + options?: postgres.UnsafeQueryOptions, + ): Promise => { + if ( + query.includes( + `DELETE FROM "${schema}"."followers"`, + ) && + parameters?.[0] === oldFollower.id!.href + ) { + resolveCleanupReady(); + await Promise.race([ + insertedNewRequest, + new Promise((resolve) => + setTimeout(resolve, barrierTimeout) + ), + ]); + } + return await txTarget.unsafe( + query, + parameters, + options, + ); + }; + } + return Reflect.get(txTarget, txProperty, txReceiver); + }, + }), + ) + ); + } + return Reflect.get(target, property, receiver); + }, + }); + const wrapAddSql = (sql: postgres.Sql): postgres.Sql => + new Proxy(sql, { + get(target, property, receiver) { + if (property === "begin") { + return async ( + callback: (transactionSql: postgres.TransactionSql) => unknown, + ) => + await target.begin(async (transactionSql) => + await callback( + new Proxy(transactionSql, { + get(txTarget, txProperty, txReceiver) { + if (txProperty === "unsafe") { + return async ( + query: string, + parameters?: postgres.ParameterOrJSON[], + options?: postgres.UnsafeQueryOptions, + ): Promise => { + const result = await txTarget.unsafe( + query, + parameters, + options, + ); + if ( + query.includes( + `INSERT INTO "${schema}"."follow_requests"`, + ) && + parameters?.[0] === followB.href && + parameters?.[1] === oldFollower.id!.href + ) { + resolveInsertedNewRequest(); + await new Promise((resolve) => + setTimeout(resolve, barrierTimeout) + ); + } + return result; + }; + } + return Reflect.get(txTarget, txProperty, txReceiver); + }, + }), + ) + ); + } + return Reflect.get(target, property, receiver); + }, + }); + try { + await initializePostgresRepositorySchema(adminSql, schema); + const setupRepo = new PostgresRepository({ sql: adminSql, schema }); + await setupRepo.addFollower(followA, oldFollower); + + const repoA = new PostgresRepository({ + sql: wrapCleanupSql(sqlA), + schema, + }); + const repoB = new PostgresRepository({ sql: wrapAddSql(sqlB), schema }); + + const reassignPromise = repoA.addFollower(followA, reassignedFollower); + await cleanupReady; + const addPromise = repoB.addFollower(followB, oldFollower); + await Promise.all([reassignPromise, addPromise]); + + const followers = await Promise.all( + (await Array.fromAsync(repoA.getFollowers())).map((follower) => + follower.toJsonLd() + ), + ); + assert.deepStrictEqual(await repoA.countFollowers(), 2); + assert.ok(await repoA.hasFollower(oldFollower.id!)); + assert.ok(await repoA.hasFollower(reassignedFollower.id!)); + assert.deepStrictEqual(followers, [ + await oldFollower.toJsonLd(), + await reassignedFollower.toJsonLd(), + ]); + assert.deepStrictEqual( + await (await repoA.removeFollower(followB, oldFollower.id!)) + ?.toJsonLd(), + await oldFollower.toJsonLd(), + ); + assert.deepStrictEqual(await repoA.countFollowers(), 1); + assert.deepStrictEqual(await repoA.hasFollower(oldFollower.id!), false); + } finally { + await adminSql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`); + await Promise.all([sqlA.end(), sqlB.end(), adminSql.end()]); + } + }); + + test("repository operations and persistence", async () => { + const harness = createHarness(); + try { + const repo = harness.repository; + + assert.deepStrictEqual(await repo.getKeyPairs(), undefined); + await repo.setKeyPairs(keyPairs); + assert.deepStrictEqual(await repo.getKeyPairs(), keyPairs); + + const messageA = new Create({ + id: new URL( + "https://example.com/ap/create/01941f29-7c00-7fe8-ab0a-7b593990a3c0", + ), + actor: new URL("https://example.com/ap/actor/bot"), + to: new URL("https://example.com/ap/actor/bot/followers"), + cc: PUBLIC_COLLECTION, + object: new Note({ + id: new URL( + "https://example.com/ap/note/01941f29-7c00-7fe8-ab0a-7b593990a3c0", + ), + attribution: new URL("https://example.com/ap/actor/bot"), + to: new URL("https://example.com/ap/actor/bot/followers"), + cc: PUBLIC_COLLECTION, + content: "Hello, world!", + published: Temporal.Instant.from("2025-01-01T00:00:00Z"), + }), + published: Temporal.Instant.from("2025-01-01T00:00:00Z"), + }); + const messageB = new Create({ + id: new URL( + "https://example.com/ap/create/0194244f-d800-7873-8993-ef71ccd47306", + ), + actor: new URL("https://example.com/ap/actor/bot"), + to: new URL("https://example.com/ap/actor/bot/followers"), + cc: PUBLIC_COLLECTION, + object: new Note({ + id: new URL( + "https://example.com/ap/note/0194244f-d800-7873-8993-ef71ccd47306", + ), + attribution: new URL("https://example.com/ap/actor/bot"), + to: new URL("https://example.com/ap/actor/bot/followers"), + cc: PUBLIC_COLLECTION, + content: "Second message", + published: Temporal.Instant.from("2025-01-02T00:00:00Z"), + }), + published: Temporal.Instant.from("2025-01-02T00:00:00Z"), + }); + const messageB2 = new Create({ + id: new URL( + "https://example.com/ap/create/0194244f-d800-7873-8993-ef71ccd47306", + ), + actor: new URL("https://example.com/ap/actor/bot"), + to: new URL("https://example.com/ap/actor/bot/followers"), + cc: PUBLIC_COLLECTION, + object: new Note({ + id: new URL( + "https://example.com/ap/note/0194244f-d800-7873-8993-ef71ccd47306", + ), + attribution: new URL("https://example.com/ap/actor/bot"), + to: new URL("https://example.com/ap/actor/bot/followers"), + cc: PUBLIC_COLLECTION, + content: "Updated message", + published: Temporal.Instant.from("2025-01-02T00:00:00Z"), + updated: Temporal.Instant.from("2025-01-02T12:00:00Z"), + }), + published: Temporal.Instant.from("2025-01-02T00:00:00Z"), + updated: Temporal.Instant.from("2025-01-02T12:00:00Z"), + }); + + assert.deepStrictEqual(await repo.countMessages(), 0); + await repo.addMessage("01941f29-7c00-7fe8-ab0a-7b593990a3c0", messageA); + await repo.addMessage("0194244f-d800-7873-8993-ef71ccd47306", messageB); + assert.deepStrictEqual(await repo.countMessages(), 2); + assert.deepStrictEqual( + await Promise.all( + (await Array.fromAsync(repo.getMessages({ order: "oldest" }))).map(( + message, + ) => message.toJsonLd()), + ), + [await messageA.toJsonLd(), await messageB.toJsonLd()], + ); + assert.deepStrictEqual( + await Promise.all( + (await Array.fromAsync( + repo.getMessages({ + since: Temporal.Instant.from("2025-01-02T00:00:00Z"), + }), + )).map((message) => message.toJsonLd()), + ), + [await messageB.toJsonLd()], + ); + assert.ok( + await repo.updateMessage( + "0194244f-d800-7873-8993-ef71ccd47306", + async (message) => + message.clone({ + object: await messageB2.getObject(), + updated: messageB2.updated, + }), + ), + ); + assert.deepStrictEqual( + await (await repo.getMessage("0194244f-d800-7873-8993-ef71ccd47306")) + ?.toJsonLd(), + await messageB2.toJsonLd(), + ); + assert.deepStrictEqual( + await (await repo.removeMessage( + "01941f29-7c00-7fe8-ab0a-7b593990a3c0", + )) + ?.toJsonLd(), + await messageA.toJsonLd(), + ); + assert.deepStrictEqual(await repo.countMessages(), 1); + + const followerA = new Person({ + id: new URL("https://example.com/ap/actor/alice"), + preferredUsername: "alice", + }); + const followerB = new Person({ + id: new URL("https://example.com/ap/actor/bob"), + preferredUsername: "bob", + }); + const followA = new URL( + "https://example.com/ap/follow/f2fb7255-d3ad-4fef-8f9a-1d0f2c2ec0b4", + ); + const followB = new URL( + "https://example.com/ap/follow/a3d4cc4f-af93-4a9f-a7b3-0b7c0fe4901d", + ); + + await repo.addFollower(followA, followerA); + await repo.addFollower(followB, followerB); + assert.ok(await repo.hasFollower(followerA.id!)); + assert.deepStrictEqual(await repo.countFollowers(), 2); + assert.deepStrictEqual( + await Promise.all( + (await Array.fromAsync(repo.getFollowers({ offset: 1 }))).map(( + follower, + ) => follower.toJsonLd()), + ), + [await followerB.toJsonLd()], + ); + assert.deepStrictEqual( + await repo.removeFollower(followA, followerB.id!), + undefined, + ); + assert.deepStrictEqual( + await (await repo.removeFollower(followA, followerA.id!))?.toJsonLd(), + await followerA.toJsonLd(), + ); + assert.deepStrictEqual(await repo.countFollowers(), 1); + + const followA2 = new URL( + "https://example.com/ap/follow/6eedf12f-32aa-4f1d-b6ca-d5bf34c4d149", + ); + await repo.addFollower(followA, followerA); + await repo.addFollower(followA2, followerA); + assert.deepStrictEqual(await repo.countFollowers(), 2); + assert.ok(await repo.hasFollower(followerA.id!)); + assert.deepStrictEqual( + await (await repo.removeFollower(followA, followerA.id!))?.toJsonLd(), + await followerA.toJsonLd(), + ); + assert.ok(await repo.hasFollower(followerA.id!)); + assert.deepStrictEqual(await repo.countFollowers(), 2); + assert.deepStrictEqual( + await (await repo.removeFollower(followA2, followerA.id!)) + ?.toJsonLd(), + await followerA.toJsonLd(), + ); + assert.deepStrictEqual(await repo.countFollowers(), 1); + assert.deepStrictEqual(await repo.hasFollower(followerA.id!), false); + + await repo.addFollower(followA, followerA); + assert.deepStrictEqual(await repo.countFollowers(), 2); + await repo.addFollower(followA, followerB); + assert.deepStrictEqual(await repo.countFollowers(), 1); + assert.deepStrictEqual(await repo.hasFollower(followerA.id!), false); + assert.ok(await repo.hasFollower(followerB.id!)); + assert.deepStrictEqual( + await Promise.all( + (await Array.fromAsync(repo.getFollowers())).map((follower) => + follower.toJsonLd() + ), + ), + [await followerB.toJsonLd()], + ); + + const sentFollow = new Follow({ + id: new URL( + "https://example.com/ap/follow/03a395a2-353a-4894-afdb-2cab31a7b004", + ), + actor: new URL("https://example.com/ap/actor/bot"), + object: new URL("https://example.com/ap/actor/john"), + }); + await repo.addSentFollow( + "03a395a2-353a-4894-afdb-2cab31a7b004", + sentFollow, + ); + assert.deepStrictEqual( + await (await repo.getSentFollow( + "03a395a2-353a-4894-afdb-2cab31a7b004", + )) + ?.toJsonLd(), + await sentFollow.toJsonLd(), + ); + await repo.removeSentFollow("03a395a2-353a-4894-afdb-2cab31a7b004"); + assert.deepStrictEqual( + await repo.getSentFollow("03a395a2-353a-4894-afdb-2cab31a7b004"), + undefined, + ); + + const followeeId = new URL("https://example.com/ap/actor/john"); + await repo.addFollowee(followeeId, sentFollow); + assert.deepStrictEqual( + await (await repo.getFollowee(followeeId))?.toJsonLd(), + await sentFollow.toJsonLd(), + ); + await repo.removeFollowee(followeeId); + assert.deepStrictEqual(await repo.getFollowee(followeeId), undefined); + + const messageId = "01945678-1234-7890-abcd-ef0123456789"; + const voter1 = new URL("https://example.com/ap/actor/alice"); + const voter2 = new URL("https://example.com/ap/actor/bob"); + await repo.vote(messageId, voter1, "option1"); + await repo.vote(messageId, voter1, "option1"); + await repo.vote(messageId, voter1, "option2"); + await repo.vote(messageId, voter2, "option1"); + assert.deepStrictEqual(await repo.countVoters(messageId), 2); + assert.deepStrictEqual(await repo.countVotes(messageId), { + "option1": 2, + "option2": 1, + }); + + await repo.close(); + const repo2 = new PostgresRepository({ + url: postgresUrl, + schema: harness.schema, + maxConnections: 1, + }); + assert.deepStrictEqual(await repo2.getKeyPairs(), keyPairs); + assert.deepStrictEqual(await repo2.countMessages(), 1); + await repo2.close(); + } finally { + await harness.cleanup(); + } + }); + + test("does not close injected clients", async () => { + const schema = createSchemaName(); + const sql = createSql(postgresUrl); + try { + const repo = new PostgresRepository({ sql, schema }); + await repo.countMessages(); + await repo.close(); + + const result = await sql`SELECT 1 AS value`; + assert.deepStrictEqual(result[0]?.value, 1); + } finally { + await sql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`); + await sql.end(); + } + }); + }); +} diff --git a/packages/botkit-postgres/src/mod.ts b/packages/botkit-postgres/src/mod.ts new file mode 100644 index 0000000..7006ecc --- /dev/null +++ b/packages/botkit-postgres/src/mod.ts @@ -0,0 +1,868 @@ +// BotKit by Fedify: A framework for creating ActivityPub bots +// Copyright (C) 2026 Hong Minhee +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +import type { + Repository, + RepositoryGetFollowersOptions, + RepositoryGetMessagesOptions, + Uuid, +} from "@fedify/botkit/repository"; +import { exportJwk, importJwk } from "@fedify/fedify/sig"; +import { Temporal, toTemporalInstant } from "@js-temporal/polyfill"; +import { + Activity, + type Actor, + Announce, + Create, + Follow, + isActor, + Object, +} from "@fedify/vocab"; +import { getLogger } from "@logtape/logtape"; +import postgres from "postgres"; + +if (!("Temporal" in globalThis)) { + Reflect.set(globalThis, "Temporal", Temporal); +} +if (Date.prototype.toTemporalInstant == null) { + Reflect.set(Date.prototype, "toTemporalInstant", toTemporalInstant); +} + +const logger = getLogger(["botkit", "postgres"]); +const schemaNamePattern = /^[A-Za-z_][A-Za-z0-9_]*$/; +const followRequestAdvisoryLockNamespace = 0x4254; +const followerAdvisoryLockNamespace = 0x4246; + +type Queryable = Pick; +type QueryParameter = postgres.SerializableParameter; + +/** + * Common options for creating a PostgreSQL repository. + * @since 0.4.0 + */ +interface PostgresRepositoryOptionsBase { + /** + * The PostgreSQL schema name to use. + * @default `"botkit"` + */ + readonly schema?: string; + + /** + * Whether to use prepared statements for queries. + * @default true + */ + readonly prepare?: boolean; +} + +/** + * Options for creating a PostgreSQL repository from an injected client. + * @since 0.4.0 + */ +interface PostgresRepositoryOptionsWithClient + extends PostgresRepositoryOptionsBase { + /** + * A pre-configured PostgreSQL client to use. + */ + readonly sql: postgres.Sql; + + /** + * Disallowed when `sql` is provided. + */ + readonly url?: never; + + /** + * Disallowed when `sql` is provided. + */ + readonly maxConnections?: never; +} + +/** + * Options for creating a PostgreSQL repository from a connection string. + * @since 0.4.0 + */ +interface PostgresRepositoryOptionsWithUrl + extends PostgresRepositoryOptionsBase { + /** + * A PostgreSQL connection string to connect with. + */ + readonly url: string | URL; + + /** + * Disallowed when `url` is provided. + */ + readonly sql?: never; + + /** + * The maximum number of connections for an owned pool. + */ + readonly maxConnections?: number; +} + +/** + * Options for creating a PostgreSQL repository. + * @since 0.4.0 + */ +export type PostgresRepositoryOptions = + | PostgresRepositoryOptionsWithClient + | PostgresRepositoryOptionsWithUrl; + +/** + * Initializes the PostgreSQL schema used by BotKit repositories. + * @param sql The PostgreSQL client to initialize the schema with. + * @param schema The PostgreSQL schema name to initialize. + * @param prepare Whether to use prepared statements for schema queries. + * @since 0.4.0 + */ +export async function initializePostgresRepositorySchema( + sql: Queryable, + schema = "botkit", + prepare = true, +): Promise { + const validatedSchema = validateSchemaName(schema); + await execute( + sql, + `CREATE SCHEMA IF NOT EXISTS "${validatedSchema}"`, + [], + prepare, + ); + await execute( + sql, + `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."key_pairs" ( + position INTEGER PRIMARY KEY, + private_key_jwk JSONB NOT NULL, + public_key_jwk JSONB NOT NULL + )`, + [], + prepare, + ); + await execute( + sql, + `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."messages" ( + id TEXT PRIMARY KEY, + activity_json JSONB NOT NULL, + published BIGINT + )`, + [], + prepare, + ); + await execute( + sql, + `CREATE INDEX IF NOT EXISTS "idx_messages_published" + ON "${validatedSchema}"."messages" (published, id)`, + [], + prepare, + ); + await execute( + sql, + `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."followers" ( + follower_id TEXT PRIMARY KEY, + actor_json JSONB NOT NULL + )`, + [], + prepare, + ); + await execute( + sql, + `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."follow_requests" ( + follow_request_id TEXT PRIMARY KEY, + follower_id TEXT NOT NULL + REFERENCES "${validatedSchema}"."followers" (follower_id) + ON DELETE CASCADE + )`, + [], + prepare, + ); + await execute( + sql, + `CREATE INDEX IF NOT EXISTS "idx_follow_requests_follower" + ON "${validatedSchema}"."follow_requests" (follower_id)`, + [], + prepare, + ); + await execute( + sql, + `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."sent_follows" ( + id TEXT PRIMARY KEY, + follow_json JSONB NOT NULL + )`, + [], + prepare, + ); + await execute( + sql, + `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."followees" ( + followee_id TEXT PRIMARY KEY, + follow_json JSONB NOT NULL + )`, + [], + prepare, + ); + await execute( + sql, + `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."poll_votes" ( + message_id TEXT NOT NULL, + voter_id TEXT NOT NULL, + option TEXT NOT NULL, + PRIMARY KEY (message_id, voter_id, option) + )`, + [], + prepare, + ); + await execute( + sql, + `CREATE INDEX IF NOT EXISTS "idx_poll_votes_message_option" + ON "${validatedSchema}"."poll_votes" (message_id, option)`, + [], + prepare, + ); +} + +/** + * A repository for storing bot data using PostgreSQL. + * @since 0.4.0 + */ +export class PostgresRepository implements Repository, AsyncDisposable { + readonly sql: postgres.Sql; + readonly schema: string; + readonly prepare: boolean; + private readonly ownsSql: boolean; + private readonly ready: Promise; + + constructor(options: PostgresRepositoryOptions) { + this.schema = validateSchemaName(options.schema ?? "botkit"); + this.prepare = options.prepare ?? true; + if ("sql" in options) { + if (options.url != null || options.maxConnections != null) { + throw new TypeError( + "PostgresRepositoryOptions.sql cannot be combined with PostgresRepositoryOptions.url or PostgresRepositoryOptions.maxConnections.", + ); + } + this.ownsSql = false; + this.sql = options.sql; + } else { + if (options.url == null) { + throw new TypeError( + "PostgresRepositoryOptions.url must be provided when PostgresRepositoryOptions.sql is absent.", + ); + } + this.ownsSql = true; + const url = typeof options.url === "string" + ? options.url + : options.url.href; + this.sql = postgres(url, { + max: options.maxConnections, + onnotice: () => {}, + prepare: this.prepare, + }); + } + const ready = initializePostgresRepositorySchema( + this.sql, + this.schema, + this.prepare, + ); + // Avoid unhandled rejection warnings before a repository method awaits it. + ready.catch(() => {}); + this.ready = ready; + } + + async [Symbol.asyncDispose](): Promise { + await this.close(); + } + + /** + * Closes the underlying PostgreSQL connection pool if owned by the + * repository. + */ + async close(): Promise { + try { + await this.ready; + } finally { + if (this.ownsSql) { + await this.sql.end({ timeout: 5 }); + } + } + } + + async setKeyPairs(keyPairs: CryptoKeyPair[]): Promise { + await this.ensureReady(); + await this.sql.begin(async (sql) => { + await this.query(sql, `DELETE FROM ${this.table("key_pairs")}`); + for (const [position, keyPair] of keyPairs.entries()) { + const privateJwk = await exportJwk(keyPair.privateKey); + const publicJwk = await exportJwk(keyPair.publicKey); + await this.query( + sql, + `INSERT INTO ${this.table("key_pairs")} + (position, private_key_jwk, public_key_jwk) + VALUES ($1, $2::jsonb, $3::jsonb)`, + [ + position, + serializeJson(privateJwk), + serializeJson(publicJwk), + ], + ); + } + }); + } + + async getKeyPairs(): Promise { + await this.ensureReady(); + const rows = await this.query<{ + readonly private_key_jwk: unknown; + readonly public_key_jwk: unknown; + }>( + this.sql, + `SELECT private_key_jwk, public_key_jwk + FROM ${this.table("key_pairs")} + ORDER BY position ASC`, + ); + if (rows.length < 1) return undefined; + const keyPairs: CryptoKeyPair[] = []; + for (const row of rows) { + const privateJwk = normalizeJsonObject(row.private_key_jwk); + const publicJwk = normalizeJsonObject(row.public_key_jwk); + if (privateJwk == null || publicJwk == null) { + throw new TypeError("A stored key pair is malformed."); + } + keyPairs.push({ + privateKey: await importJwk(privateJwk, "private"), + publicKey: await importJwk(publicJwk, "public"), + }); + } + return keyPairs; + } + + async addMessage(id: Uuid, activity: Create | Announce): Promise { + await this.ensureReady(); + await this.query( + this.sql, + `INSERT INTO ${this.table("messages")} (id, activity_json, published) + VALUES ($1, $2::jsonb, $3)`, + [ + id, + serializeJson(await activity.toJsonLd({ format: "compact" })), + activity.published?.epochMilliseconds ?? null, + ], + ); + } + + async updateMessage( + id: Uuid, + updater: ( + existing: Create | Announce, + ) => Create | Announce | undefined | Promise, + ): Promise { + await this.ensureReady(); + return await this.sql.begin(async (sql) => { + const rows = await this.query<{ readonly activity_json: unknown }>( + sql, + `SELECT activity_json + FROM ${this.table("messages")} + WHERE id = $1 + FOR UPDATE`, + [id], + ); + const row = rows[0]; + if (row == null) return false; + const activity = await parseActivity(row.activity_json); + if (activity == null) return false; + const updated = await updater(activity); + if (updated == null) return false; + await this.query( + sql, + `UPDATE ${this.table("messages")} + SET activity_json = $1::jsonb, + published = $2 + WHERE id = $3`, + [ + serializeJson(await updated.toJsonLd({ format: "compact" })), + updated.published?.epochMilliseconds ?? null, + id, + ], + ); + return true; + }); + } + + async removeMessage(id: Uuid): Promise { + await this.ensureReady(); + const rows = await this.query<{ readonly activity_json: unknown }>( + this.sql, + `DELETE FROM ${this.table("messages")} + WHERE id = $1 + RETURNING activity_json`, + [id], + ); + return await parseActivity(rows[0]?.activity_json); + } + + async *getMessages( + options: RepositoryGetMessagesOptions = {}, + ): AsyncIterable { + await this.ensureReady(); + const { order = "newest", since, until, limit } = options; + const parameters: QueryParameter[] = []; + let query = `SELECT activity_json + FROM ${this.table("messages")} + WHERE TRUE`; + if (since != null) { + parameters.push(since.epochMilliseconds); + query += ` AND published >= $${parameters.length}`; + } + if (until != null) { + parameters.push(until.epochMilliseconds); + query += ` AND published <= $${parameters.length}`; + } + query += order === "oldest" + ? " ORDER BY published ASC NULLS LAST, id ASC" + : " ORDER BY published DESC NULLS LAST, id DESC"; + if (limit != null) { + parameters.push(limit); + query += ` LIMIT $${parameters.length}`; + } + const rows = await this.query<{ readonly activity_json: unknown }>( + this.sql, + query, + parameters, + ); + for (const row of rows) { + const activity = await parseActivity(row.activity_json); + if (activity != null) yield activity; + } + } + + async getMessage(id: Uuid): Promise { + await this.ensureReady(); + const rows = await this.query<{ readonly activity_json: unknown }>( + this.sql, + `SELECT activity_json + FROM ${this.table("messages")} + WHERE id = $1`, + [id], + ); + return await parseActivity(rows[0]?.activity_json); + } + + async countMessages(): Promise { + await this.ensureReady(); + const rows = await this.query<{ readonly count: number }>( + this.sql, + `SELECT COUNT(*)::integer AS count + FROM ${this.table("messages")}`, + ); + return rows[0]?.count ?? 0; + } + + async addFollower(followId: URL, follower: Actor): Promise { + await this.ensureReady(); + if (follower.id == null) { + throw new TypeError("The follower ID is missing."); + } + const followerId = follower.id; + const followerJson = await follower.toJsonLd({ format: "compact" }); + await this.sql.begin(async (sql) => { + await this.lockFollowRequest(sql, followId); + const rows = await this.query<{ readonly follower_id: string }>( + sql, + `SELECT follower_id + FROM ${this.table("follow_requests")} + WHERE follow_request_id = $1 + FOR UPDATE`, + [followId.href], + ); + const previousFollowerId = rows[0]?.follower_id; + await this.lockFollowers(sql, [ + followerId.href, + ...(previousFollowerId == null ? [] : [previousFollowerId]), + ]); + await this.query( + sql, + `INSERT INTO ${this.table("followers")} (follower_id, actor_json) + VALUES ($1, $2::jsonb) + ON CONFLICT (follower_id) + DO UPDATE SET actor_json = EXCLUDED.actor_json`, + [followerId.href, serializeJson(followerJson)], + ); + await this.query( + sql, + `INSERT INTO ${ + this.table("follow_requests") + } (follow_request_id, follower_id) + VALUES ($1, $2) + ON CONFLICT (follow_request_id) + DO UPDATE SET follower_id = EXCLUDED.follower_id`, + [followId.href, followerId.href], + ); + if ( + previousFollowerId != null && previousFollowerId !== followerId.href + ) { + await this.cleanupFollower(sql, previousFollowerId); + } + }); + } + + async removeFollower( + followId: URL, + followerId: URL, + ): Promise { + await this.ensureReady(); + return await this.sql.begin(async (sql) => { + await this.lockFollowRequest(sql, followId); + const rows = await this.query<{ readonly actor_json: unknown }>( + sql, + `SELECT f.actor_json + FROM ${this.table("follow_requests")} AS fr + JOIN ${this.table("followers")} AS f + ON f.follower_id = fr.follower_id + WHERE fr.follow_request_id = $1 + AND fr.follower_id = $2 + FOR UPDATE`, + [followId.href, followerId.href], + ); + const row = rows[0]; + if (row == null) return undefined; + await this.query( + sql, + `DELETE FROM ${this.table("follow_requests")} + WHERE follow_request_id = $1`, + [followId.href], + ); + await this.cleanupFollower(sql, followerId.href); + return await parseActor(row.actor_json); + }); + } + + async hasFollower(followerId: URL): Promise { + await this.ensureReady(); + const rows = await this.query<{ readonly exists: number }>( + this.sql, + `SELECT 1 AS exists + FROM ${this.table("followers")} + WHERE follower_id = $1`, + [followerId.href], + ); + return rows.length > 0; + } + + async *getFollowers( + options: RepositoryGetFollowersOptions = {}, + ): AsyncIterable { + await this.ensureReady(); + const { offset = 0, limit } = options; + const parameters: QueryParameter[] = []; + let query = `SELECT actor_json + FROM ${this.table("followers")} + ORDER BY follower_id ASC`; + if (limit != null) { + parameters.push(limit, offset); + query += ` LIMIT $${parameters.length - 1} OFFSET $${parameters.length}`; + } else if (offset > 0) { + parameters.push(offset); + query += ` OFFSET $${parameters.length}`; + } + const rows = await this.query<{ readonly actor_json: unknown }>( + this.sql, + query, + parameters, + ); + for (const row of rows) { + const actor = await parseActor(row.actor_json); + if (actor != null) yield actor; + } + } + + async countFollowers(): Promise { + await this.ensureReady(); + const rows = await this.query<{ readonly count: number }>( + this.sql, + `SELECT COUNT(*)::integer AS count + FROM ${this.table("followers")}`, + ); + return rows[0]?.count ?? 0; + } + + async addSentFollow(id: Uuid, follow: Follow): Promise { + await this.ensureReady(); + await this.query( + this.sql, + `INSERT INTO ${this.table("sent_follows")} (id, follow_json) + VALUES ($1, $2::jsonb) + ON CONFLICT (id) + DO UPDATE SET follow_json = EXCLUDED.follow_json`, + [id, serializeJson(await follow.toJsonLd({ format: "compact" }))], + ); + } + + async removeSentFollow(id: Uuid): Promise { + await this.ensureReady(); + const rows = await this.query<{ readonly follow_json: unknown }>( + this.sql, + `DELETE FROM ${this.table("sent_follows")} + WHERE id = $1 + RETURNING follow_json`, + [id], + ); + return await parseFollow(rows[0]?.follow_json); + } + + async getSentFollow(id: Uuid): Promise { + await this.ensureReady(); + const rows = await this.query<{ readonly follow_json: unknown }>( + this.sql, + `SELECT follow_json + FROM ${this.table("sent_follows")} + WHERE id = $1`, + [id], + ); + return await parseFollow(rows[0]?.follow_json); + } + + async addFollowee(followeeId: URL, follow: Follow): Promise { + await this.ensureReady(); + await this.query( + this.sql, + `INSERT INTO ${this.table("followees")} (followee_id, follow_json) + VALUES ($1, $2::jsonb) + ON CONFLICT (followee_id) + DO UPDATE SET follow_json = EXCLUDED.follow_json`, + [ + followeeId.href, + serializeJson(await follow.toJsonLd({ format: "compact" })), + ], + ); + } + + async removeFollowee(followeeId: URL): Promise { + await this.ensureReady(); + const rows = await this.query<{ readonly follow_json: unknown }>( + this.sql, + `DELETE FROM ${this.table("followees")} + WHERE followee_id = $1 + RETURNING follow_json`, + [followeeId.href], + ); + return await parseFollow(rows[0]?.follow_json); + } + + async getFollowee(followeeId: URL): Promise { + await this.ensureReady(); + const rows = await this.query<{ readonly follow_json: unknown }>( + this.sql, + `SELECT follow_json + FROM ${this.table("followees")} + WHERE followee_id = $1`, + [followeeId.href], + ); + return await parseFollow(rows[0]?.follow_json); + } + + async vote(messageId: Uuid, voterId: URL, option: string): Promise { + await this.ensureReady(); + await this.query( + this.sql, + `INSERT INTO ${this.table("poll_votes")} (message_id, voter_id, option) + VALUES ($1, $2, $3) + ON CONFLICT (message_id, voter_id, option) + DO NOTHING`, + [messageId, voterId.href, option], + ); + } + + async countVoters(messageId: Uuid): Promise { + await this.ensureReady(); + const rows = await this.query<{ readonly count: number }>( + this.sql, + `SELECT COUNT(DISTINCT voter_id)::integer AS count + FROM ${this.table("poll_votes")} + WHERE message_id = $1`, + [messageId], + ); + return rows[0]?.count ?? 0; + } + + async countVotes(messageId: Uuid): Promise>> { + await this.ensureReady(); + const rows = await this.query<{ + readonly option: string; + readonly count: number; + }>( + this.sql, + `SELECT option, COUNT(*)::integer AS count + FROM ${this.table("poll_votes")} + WHERE message_id = $1 + GROUP BY option + ORDER BY option ASC`, + [messageId], + ); + const result: Record = {}; + for (const row of rows) { + result[row.option] = row.count; + } + return result; + } + + private table(name: string): string { + return `"${this.schema}"."${name}"`; + } + + private async lockFollowRequest( + sql: Queryable, + followId: URL, + ): Promise { + await this.query( + sql, + `SELECT pg_catalog.pg_advisory_xact_lock($1, pg_catalog.hashtext($2))`, + [ + followRequestAdvisoryLockNamespace, + `${this.schema}:${followId.href}`, + ], + ); + } + + private async lockFollower( + sql: Queryable, + followerId: string, + ): Promise { + await this.query( + sql, + `SELECT pg_catalog.pg_advisory_xact_lock($1, pg_catalog.hashtext($2))`, + [ + followerAdvisoryLockNamespace, + `${this.schema}:${followerId}`, + ], + ); + } + + private async lockFollowers( + sql: Queryable, + followerIds: readonly string[], + ): Promise { + const uniqueFollowerIds = [...new Set(followerIds)].sort(); + for (const followerId of uniqueFollowerIds) { + await this.lockFollower(sql, followerId); + } + } + + private async cleanupFollower( + sql: Queryable, + followerId: string, + ): Promise { + await this.lockFollower(sql, followerId); + await this.query( + sql, + `DELETE FROM ${this.table("followers")} + WHERE follower_id = $1 + AND NOT EXISTS ( + SELECT 1 + FROM ${this.table("follow_requests")} + WHERE follower_id = $1 + )`, + [followerId], + ); + } + + private async ensureReady(): Promise { + await this.ready; + } + + private async query( + sql: Queryable, + query: string, + parameters: readonly QueryParameter[] = [], + ): Promise { + return await execute(sql, query, parameters, this.prepare); + } +} + +function validateSchemaName(schema: string): string { + if (!schemaNamePattern.test(schema)) { + throw new TypeError("The PostgreSQL schema name is invalid."); + } + return schema; +} + +async function execute( + sql: Queryable, + query: string, + parameters: readonly QueryParameter[] = [], + prepare = true, +): Promise { + return await sql.unsafe( + query, + [...parameters], + { prepare }, + ); +} + +function serializeJson(value: unknown): string { + return JSON.stringify(value); +} + +function isJsonObject(value: unknown): value is Record { + return typeof value === "object" && value != null; +} + +async function parseActivity( + json: unknown, +): Promise { + const normalized = normalizeJsonObject(json); + if (normalized == null) return undefined; + try { + const activity = await Activity.fromJsonLd(normalized); + if (activity instanceof Create || activity instanceof Announce) { + return activity; + } + } catch (error) { + logger.warn("Failed to parse message activity.", { error }); + } + return undefined; +} + +async function parseActor(json: unknown): Promise { + const normalized = normalizeJsonObject(json); + if (normalized == null) return undefined; + try { + const actor = await Object.fromJsonLd(normalized); + if (isActor(actor)) return actor; + } catch (error) { + logger.warn("Failed to parse follower actor.", { error }); + } + return undefined; +} + +async function parseFollow(json: unknown): Promise { + const normalized = normalizeJsonObject(json); + if (normalized == null) return undefined; + try { + return await Follow.fromJsonLd(normalized); + } catch (error) { + logger.warn("Failed to parse follow activity.", { error }); + } + return undefined; +} + +function normalizeJsonObject( + value: unknown, +): Record | undefined { + if (isJsonObject(value)) return value; + if (typeof value !== "string") return undefined; + try { + const parsed: unknown = JSON.parse(value); + if (isJsonObject(parsed)) return parsed; + } catch { + return undefined; + } + return undefined; +} diff --git a/packages/botkit-postgres/tsdown.config.ts b/packages/botkit-postgres/tsdown.config.ts new file mode 100644 index 0000000..6a44d92 --- /dev/null +++ b/packages/botkit-postgres/tsdown.config.ts @@ -0,0 +1,10 @@ +import { defineConfig } from "tsdown"; + +export default defineConfig({ + entry: "src/mod.ts", + dts: { + sourcemap: true, + }, + format: "esm", + platform: "node", +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2517bd7..6056781 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -38,6 +38,9 @@ importers: '@fedify/botkit': specifier: 'workspace:' version: link:../packages/botkit + '@fedify/botkit-postgres': + specifier: 'workspace:' + version: link:../packages/botkit-postgres '@fedify/botkit-sqlite': specifier: 'workspace:' version: link:../packages/botkit-sqlite @@ -157,6 +160,31 @@ importers: specifier: 'catalog:' version: 0.12.8(typescript@5.8.3) + packages/botkit-postgres: + dependencies: + '@fedify/botkit': + specifier: ^0.4.0 + version: link:../botkit + '@fedify/fedify': + specifier: ^2.1.2 + version: 2.1.2 + '@fedify/vocab': + specifier: ^2.1.2 + version: 2.1.2 + '@js-temporal/polyfill': + specifier: ^0.5.1 + version: 0.5.1 + '@logtape/logtape': + specifier: ^1.3.5 + version: 1.3.5 + postgres: + specifier: ^3.4.8 + version: 3.4.8 + devDependencies: + tsdown: + specifier: ^0.12.8 + version: 0.12.8(typescript@5.8.3) + packages/botkit-sqlite: dependencies: '@fedify/botkit': @@ -1689,6 +1717,10 @@ packages: resolution: {integrity: sha512-Jtc2612XINuBjIl/QTWsV5UvE8UHuNblcO3vVADSrKsrc6RqGX6lOW1cEo3CM2v0XG4Nat8nI+YM7/f26VxXLw==} engines: {node: '>=12'} + postgres@3.4.8: + resolution: {integrity: sha512-d+JFcLM17njZaOLkv6SCev7uoLaBtfK86vMUXhW1Z4glPWh4jozno9APvW/XKFJ3CCxVoC7OL38BqRydtu5nGg==} + engines: {node: '>=12'} + property-information@7.1.0: resolution: {integrity: sha512-TwEZ+X+yCJmYfL7TPUOcvBZ4QfoT5YenQiJuX//0th53DE6w0xxLEtfK3iyryQFddXuvkIk51EEgrJQ0WJkOmQ==} @@ -3714,6 +3746,8 @@ snapshots: postgres@3.4.7: {} + postgres@3.4.8: {} + property-information@7.1.0: {} punycode.js@2.3.1: {} diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index dee5c84..4cf2776 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -2,6 +2,8 @@ packages: - packages/* - docs +linkWorkspacePackages: true + catalog: "@fedify/denokv": 2.1.2 "@fedify/fedify": 2.1.2