diff --git a/CHANGES.md b/CHANGES.md
index 934de1e..e1e0d21 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -56,6 +56,15 @@ To be released.
[#14]: https://github.com/fedify-dev/botkit/pull/14
[#18]: https://github.com/fedify-dev/botkit/issues/18
+### @fedify/botkit-postgres
+
+ - Added a new PostgreSQL repository package, *`@fedify/botkit-postgres`*,
+ which provides `PostgresRepository`, `PostgresRepositoryOptions`, and
+ `initializePostgresRepositorySchema()`. [[#11], [#19]]
+
+[#11]: https://github.com/fedify-dev/botkit/issues/11
+[#19]: https://github.com/fedify-dev/botkit/pull/19
+
Version 0.3.1
-------------
diff --git a/deno.json b/deno.json
index e8090e9..798d154 100644
--- a/deno.json
+++ b/deno.json
@@ -35,7 +35,7 @@
"check-versions": "deno run --allow-read --allow-write scripts/check_versions.ts",
"fmt": "deno fmt && deno task hongdown --write",
"install": "deno cache packages/*/src/*.ts && pnpm install",
- "test": "deno test --allow-read --allow-write --allow-env --allow-net=hollo.social --parallel",
+ "test": "deno test --allow-read --allow-write --allow-env --allow-net=hollo.social,localhost,127.0.0.1 --parallel",
"test:node": "pnpm install && pnpm run -r test",
"test-all": {
"dependencies": ["check", "test", "test:node"]
diff --git a/deno.lock b/deno.lock
index e7e79be..8b31f3e 100644
--- a/deno.lock
+++ b/deno.lock
@@ -7,6 +7,7 @@
"jsr:@fedify/markdown-it-hashtag@0.3": "0.3.0",
"jsr:@fedify/markdown-it-mention@0.3": "0.3.0",
"jsr:@fedify/vocab-runtime@^2.1.2": "2.1.2",
+ "jsr:@fedify/vocab@2.1.2": "2.1.2",
"jsr:@fedify/vocab@^2.1.2": "2.1.2",
"jsr:@fedify/webfinger@^2.1.2": "2.1.2",
"jsr:@hongminhee/x-forwarded-fetch@0.2": "0.2.0",
@@ -18,16 +19,20 @@
"jsr:@std/internal@^1.0.12": "1.0.12",
"jsr:@std/path@^1.1.1": "1.1.4",
"jsr:@std/path@^1.1.4": "1.1.4",
+ "npm:@fedify/fedify@^2.1.2": "2.1.2",
"npm:@fedify/markdown-it-hashtag@0.3": "0.3.0",
"npm:@fedify/markdown-it-mention@0.3": "0.3.0",
+ "npm:@fedify/vocab@^2.1.2": "2.1.2",
+ "npm:@js-temporal/polyfill@~0.5.1": "0.5.1",
+ "npm:@logtape/logtape@^1.3.5": "1.3.7",
"npm:@multiformats/base-x@^4.0.1": "4.0.1",
"npm:@opentelemetry/api@^1.9.0": "1.9.1",
"npm:@opentelemetry/semantic-conventions@^1.39.0": "1.40.0",
"npm:@types/markdown-it@^14.1.1": "14.1.2",
"npm:asn1js@^3.0.6": "3.0.7",
"npm:byte-encodings@^1.0.11": "1.0.11",
- "npm:es-toolkit@^1.30.0": "1.45.1",
- "npm:es-toolkit@^1.43.0": "1.45.1",
+ "npm:es-toolkit@^1.30.0": "1.43.0",
+ "npm:es-toolkit@^1.43.0": "1.43.0",
"npm:hongdown@~0.1.0-dev.28": "0.1.0-dev.37",
"npm:hongdown@~0.1.0-dev.29": "0.1.0-dev.37",
"npm:hongdown@~0.1.0-dev.30": "0.1.0-dev.37",
@@ -45,6 +50,7 @@
"npm:markdown-it@^14.1.0": "14.1.0",
"npm:mime-db@^1.54.0": "1.54.0",
"npm:pkijs@^3.2.5": "3.4.0",
+ "npm:postgres@^3.4.8": "3.4.8",
"npm:structured-field-values@^2.0.4": "2.0.4",
"npm:tsdown@~0.12.8": "0.12.9_rolldown@1.0.0-beta.55",
"npm:uri-template-router@1": "1.0.0",
@@ -64,8 +70,8 @@
"integrity": "27ee6b24286b7cf60a696a0ba115b2c17590d9ccee9ccbff23be8d1b22c399f1",
"dependencies": [
"jsr:@fedify/fedify@^2.1.2",
- "jsr:@fedify/vocab",
"jsr:@fedify/vocab-runtime",
+ "jsr:@fedify/vocab@^2.1.2",
"jsr:@fedify/webfinger",
"jsr:@logtape/logtape@^2.0.5",
"npm:@opentelemetry/api",
@@ -195,6 +201,9 @@
"@babel/helper-validator-identifier"
]
},
+ "@cfworker/json-schema@4.1.1": {
+ "integrity": "sha512-gAmrUZSGtKc3AiBL71iNWxDsyUC5uMaKKGdvzYsBoTW/xi42JQHl7eKV2OYzCUqvc+D2RCcf7EXY2iCyFIk6og=="
+ },
"@digitalbazaar/http-client@4.3.0": {
"integrity": "sha512-6lMpxpt9BOmqHKGs9Xm6DP4LlZTBFer/ZjHvP3FcW3IaUWYIWC7dw5RFZnvw4fP57kAVcm1dp3IF+Y50qhBvAw==",
"dependencies": [
@@ -221,6 +230,28 @@
"tslib"
]
},
+ "@fedify/fedify@2.1.2": {
+ "integrity": "sha512-9i2dFdf/Fch6E/mlBCqlooAujujd2/Vf+7ZYlSqBkBeGtBYxicYswr842mU6lSO4DYjBAFQM83gLOxmLhqdOtg==",
+ "dependencies": [
+ "@fedify/vocab",
+ "@fedify/vocab-runtime",
+ "@fedify/webfinger",
+ "@js-temporal/polyfill",
+ "@logtape/logtape@2.0.5",
+ "@opentelemetry/api",
+ "@opentelemetry/core",
+ "@opentelemetry/sdk-trace-base",
+ "@opentelemetry/semantic-conventions",
+ "byte-encodings",
+ "es-toolkit",
+ "json-canon",
+ "jsonld",
+ "structured-field-values",
+ "uri-template-router",
+ "url-template",
+ "urlpattern-polyfill"
+ ]
+ },
"@fedify/markdown-it-hashtag@0.3.0": {
"integrity": "sha512-DCtfQ1OlFstob382d2iwEi77ezNqDQkUjS664q2r7ZOQQkBExvgvKDkL851tf1XRYp4A425/HO2yGRLZ1u1pSg==",
"dependencies": [
@@ -233,6 +264,52 @@
"markdown-it"
]
},
+ "@fedify/vocab-runtime@2.1.2": {
+ "integrity": "sha512-Yn7XUAWclBz+eHuGnaHtz8CxynLYJ66qnEV1QjraveLSz+tB5Z4T0jQjpY3PMkUx3EhEiGJFu8Nb5hB7a89Jcw==",
+ "dependencies": [
+ "@logtape/logtape@2.0.5",
+ "@multiformats/base-x",
+ "@opentelemetry/api",
+ "asn1js",
+ "byte-encodings",
+ "jsonld",
+ "pkijs"
+ ]
+ },
+ "@fedify/vocab-tools@2.1.2": {
+ "integrity": "sha512-AeMA04m8pRnRut5mFKifKyAYrAEvEw0o0RVPCxzypZM8iEwneIQBRhZA435NHMdhIb5Zqx3aEp6J0+P9gevfWQ==",
+ "dependencies": [
+ "@cfworker/json-schema",
+ "byte-encodings",
+ "es-toolkit",
+ "yaml"
+ ]
+ },
+ "@fedify/vocab@2.1.2": {
+ "integrity": "sha512-gyBRp8BdXnRGNX98+5z//30xElWUP8d4kOid41wQJ9t4sqSYcyPpNr5re7r6jPQxc3ut7fpGBDmr9DJItJ7YYw==",
+ "dependencies": [
+ "@fedify/vocab-runtime",
+ "@fedify/vocab-tools",
+ "@fedify/webfinger",
+ "@js-temporal/polyfill",
+ "@logtape/logtape@2.0.5",
+ "@multiformats/base-x",
+ "@opentelemetry/api",
+ "asn1js",
+ "es-toolkit",
+ "jsonld",
+ "pkijs"
+ ]
+ },
+ "@fedify/webfinger@2.1.2": {
+ "integrity": "sha512-7wzWrYAZaTd+NG/CqbNrgt10+EXHRGbk1B640GjP51tT0oOEALG9/WGh8PYTSg/8G7htR8GLgWQEwskWO2cmjg==",
+ "dependencies": [
+ "@fedify/vocab-runtime",
+ "@logtape/logtape@2.0.5",
+ "@opentelemetry/api",
+ "es-toolkit"
+ ]
+ },
"@hongdown/darwin-arm64@0.1.0-dev.37": {
"integrity": "sha512-RHnlo6LF5l+e4gZMkcSWBVr0hMx69w6fmqERhsge0w1UFfBzbc8I5PwezchWFzJlYUWaiYQktsnHtcyPP3sa9w==",
"os": ["darwin"],
@@ -313,6 +390,18 @@
"@jridgewell/sourcemap-codec"
]
},
+ "@js-temporal/polyfill@0.5.1": {
+ "integrity": "sha512-hloP58zRVCRSpgDxmqCWJNlizAlUgJFqG2ypq79DCvyv9tHjRYMDOcPFjzfl/A1/YxDvRCZz8wvZvmapQnKwFQ==",
+ "dependencies": [
+ "jsbi"
+ ]
+ },
+ "@logtape/logtape@1.3.7": {
+ "integrity": "sha512-YgF+q9op97oLLPwc7TcTNIllTArVtTwkwyKky6XVzAXQcBrvFXXtMuwJSryONAyOUSItrx994O/HABOrszZyFg=="
+ },
+ "@logtape/logtape@2.0.5": {
+ "integrity": "sha512-UizDkh20ZPJVOddRxG1F77WhHdlNl/sbQgoO8T534R7XvUBMAJ9En9f35u+meW2tRsNLvjz6R87Zanwf53tspQ=="
+ },
"@multiformats/base-x@4.0.1": {
"integrity": "sha512-eMk0b9ReBbV23xXU693TAIrLyeO5iTgBZGSJfpqriG8UkYvr/hC9u9pyMlAakDNHWmbhMZCDs6KQO0jzKD8OTw=="
},
@@ -330,6 +419,30 @@
"@opentelemetry/api@1.9.1": {
"integrity": "sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q=="
},
+ "@opentelemetry/core@2.6.1_@opentelemetry+api@1.9.1": {
+ "integrity": "sha512-8xHSGWpJP9wBxgBpnqGL0R3PbdWQndL1Qp50qrg71+B28zK5OQmUgcDKLJgzyAAV38t4tOyLMGDD60LneR5W8g==",
+ "dependencies": [
+ "@opentelemetry/api",
+ "@opentelemetry/semantic-conventions"
+ ]
+ },
+ "@opentelemetry/resources@2.6.1_@opentelemetry+api@1.9.1": {
+ "integrity": "sha512-lID/vxSuKWXM55XhAKNoYXu9Cutoq5hFdkbTdI/zDKQktXzcWBVhNsOkiZFTMU9UtEWuGRNe0HUgmsFldIdxVA==",
+ "dependencies": [
+ "@opentelemetry/api",
+ "@opentelemetry/core",
+ "@opentelemetry/semantic-conventions"
+ ]
+ },
+ "@opentelemetry/sdk-trace-base@2.6.1_@opentelemetry+api@1.9.1": {
+ "integrity": "sha512-r86ut4T1e8vNwB35CqCcKd45yzqH6/6Wzvpk2/cZB8PsPLlZFTvrh8yfOS3CYZYcUmAx4hHTZJ8AO8Dj8nrdhw==",
+ "dependencies": [
+ "@opentelemetry/api",
+ "@opentelemetry/core",
+ "@opentelemetry/resources",
+ "@opentelemetry/semantic-conventions"
+ ]
+ },
"@opentelemetry/semantic-conventions@1.40.0": {
"integrity": "sha512-cifvXDhcqMwwTlTK04GBNeIe7yyo28Mfby85QXFe1Yk8nmi36Ab/5UQwptOx84SsoGNRg+EVSjwzfSZMy6pmlw=="
},
@@ -501,8 +614,8 @@
"entities@4.5.0": {
"integrity": "sha512-V0hjH4dGPh9Ao5p0MoRY6BVqtwCjhz6vI5LT8AJ55H+4g9/4vbHx1I54fS0XuclLhDHArPQCiMjDxjaL8fPxhw=="
},
- "es-toolkit@1.45.1": {
- "integrity": "sha512-/jhoOj/Fx+A+IIyDNOvO3TItGmlMKhtX8ISAHKE90c4b/k1tqaqEZ+uUqfpU8DMnW5cgNJv606zS55jGvza0Xw=="
+ "es-toolkit@1.43.0": {
+ "integrity": "sha512-SKCT8AsWvYzBBuUqMk4NPwFlSdqLpJwmy6AP322ERn8W2YLIB6JBXnwMI2Qsh2gfphT3q7EKAxKb23cvFHFwKA=="
},
"fdir@6.5.0_picomatch@4.0.3": {
"integrity": "sha512-tIbYtZbucOs0BRGqPJkshJUYdL+SDH7dVM8gjy+ERp3WAUjLEFJE+02kanyHtwjWOnwrKYBiwAmM0p4kLJAnXg==",
@@ -553,6 +666,9 @@
"integrity": "sha512-ekilCSN1jwRvIbgeg/57YFh8qQDNbwDb9xT/qu2DAHbFFZUicIl4ygVaAvzveMhMVr3LnpSKTNnwt8PoOfmKhQ==",
"bin": true
},
+ "jsbi@4.3.2": {
+ "integrity": "sha512-9fqMSQbhJykSeii05nxKl4m6Eqn2P6rOlYiS+C5Dr/HPIU/7yZxu5qzbs40tgaFORiw2Amd0mirjxatXYMkIew=="
+ },
"jsesc@3.1.0": {
"integrity": "sha512-/sM3dO2FOzXjKQhJuo0Q173wf2KOo8t4I8vHy6lF9poUp7bKT0/NHE8fPX23PwfhnykfqnC2xRxOnVw5XuGIaA==",
"bin": true
@@ -622,6 +738,9 @@
"tslib"
]
},
+ "postgres@3.4.8": {
+ "integrity": "sha512-d+JFcLM17njZaOLkv6SCev7uoLaBtfK86vMUXhW1Z4glPWh4jozno9APvW/XKFJ3CCxVoC7OL38BqRydtu5nGg=="
+ },
"punycode.js@2.3.1": {
"integrity": "sha512-uxFIHU0YlHYhDQtV4R9J6a52SLx28BCjT+4ieh7IGbgwVJWO+km431c4yRlREUAsAmt/uMjQUyQHNEPf0M39CA=="
},
@@ -757,6 +876,9 @@
"url-template@3.1.1": {
"integrity": "sha512-4oszoaEKE/mQOtAmdMWqIRHmkxWkUZMnXFnjQ5i01CuRSK3uluxcH1MRVVVWmhlnzT1SCDfKxxficm2G37qzCA=="
},
+ "urlpattern-polyfill@10.1.0": {
+ "integrity": "sha512-IGjKp/o0NL3Bso1PymYURCJxMPNAf/ILOpendP9f5B6e1rTJgdgiOvgfoT8VxCAdY+Wisb9uhGaJJf3yZ2V9nw=="
+ },
"uuid@11.1.0": {
"integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==",
"bin": true
@@ -771,6 +893,10 @@
},
"yallist@4.0.0": {
"integrity": "sha512-3wdGidZyq5PB084XLES5TpOSRA3wjXAlIWMhum2kRcv/41Sn2emQ0dycQW4uZXLejwKvg6EsvbdlVL+FYEct7A=="
+ },
+ "yaml@2.8.3": {
+ "integrity": "sha512-AvbaCLOO2Otw/lW5bmh9d/WEdcDFdQp2Z2ZUH3pX9U2ihyUY0nvLv7J6TrWowklRGPYbB/IuIMfYgxaCPg5Bpg==",
+ "bin": true
}
},
"workspace": {
@@ -814,6 +940,22 @@
]
}
},
+ "packages/botkit-postgres": {
+ "dependencies": [
+ "jsr:@fedify/vocab@^2.1.2",
+ "npm:postgres@^3.4.8"
+ ],
+ "packageJson": {
+ "dependencies": [
+ "npm:@fedify/fedify@^2.1.2",
+ "npm:@fedify/vocab@^2.1.2",
+ "npm:@js-temporal/polyfill@~0.5.1",
+ "npm:@logtape/logtape@^1.3.5",
+ "npm:postgres@^3.4.8",
+ "npm:tsdown@~0.12.8"
+ ]
+ }
+ },
"packages/botkit-sqlite": {
"dependencies": [
"jsr:@fedify/vocab@^2.1.2"
diff --git a/docs/concepts/bot.md b/docs/concepts/bot.md
index 561d756..9ff4a57 100644
--- a/docs/concepts/bot.md
+++ b/docs/concepts/bot.md
@@ -97,6 +97,12 @@ the key–value store specified in the [`kv`](#createbotoptions-kv) option.
For more information, see the [*Repository* section](./repository.md).
+If you want to use a SQL-backed repository directly instead of
+[`KvRepository`](./repository.md#kvrepository), BotKit also provides concrete
+repository packages such as
+[`SqliteRepository`](./repository.md#sqliterepository) and
+[`PostgresRepository`](./repository.md#postgresrepository).
+
### `~CreateBotOptions.identifier`
The internal identifier of the bot actor. It is used for the URI of the bot
diff --git a/docs/concepts/repository.md b/docs/concepts/repository.md
index 415dcb9..8c387df 100644
--- a/docs/concepts/repository.md
+++ b/docs/concepts/repository.md
@@ -135,6 +135,88 @@ properties:
[`node:sqlite`]: https://nodejs.org/api/sqlite.html
+`PostgresRepository`
+--------------------
+
+*This API is available since BotKit 0.4.0.*
+
+The `PostgresRepository` is a repository that stores data in PostgreSQL using
+the [Postgres.js] driver. It is suited for deployments where multiple bot
+processes need to share the same persistent repository state, or where you
+already operate PostgreSQL for other infrastructure.
+
+Unlike [`KvRepository`](#kvrepository), `PostgresRepository` stores BotKit data
+in ordinary PostgreSQL tables rather than a key-value abstraction. It creates
+tables inside a dedicated PostgreSQL schema, uses transactions for multi-step
+updates, and supports either an internally owned connection pool or an injected
+Postgres.js client.
+
+In order to use `PostgresRepository`, you need to install the
+*@fedify/botkit-postgres* package:
+
+::: code-group
+
+~~~~ sh [Deno]
+deno add jsr:@fedify/botkit-postgres
+~~~~
+
+~~~~ sh [npm]
+npm add @fedify/botkit-postgres
+~~~~
+
+~~~~ sh [pnpm]
+pnpm add @fedify/botkit-postgres
+~~~~
+
+~~~~ sh [Yarn]
+yarn add @fedify/botkit-postgres
+~~~~
+
+:::
+
+The `PostgresRepository` constructor accepts an options object with the
+following properties:
+
+`sql`
+: An existing [Postgres.js] client. When this is provided, the repository
+ does not own the client and calling `close()` will not shut it down.
+
+`url`
+: A PostgreSQL connection string used to create an internal connection pool.
+ Exactly one of `sql` and `url` must be provided.
+
+`schema` (optional)
+: The PostgreSQL schema used for BotKit tables. Defaults to `"botkit"`.
+
+`maxConnections` (optional)
+: The maximum number of connections for the internally created pool. This
+ option is only valid when `url` is used.
+
+`prepare` (optional)
+: Whether to use prepared statements for repository queries. Defaults to
+ `true`.
+
+These options are mutually exclusive: use either `sql` or `url`. The
+`maxConnections` option is only meaningful together with `url`.
+
+The repository initializes its tables and indexes automatically. If you want
+to provision them before creating the repository, use the exported
+`initializePostgresRepositorySchema()` helper:
+
+~~~~ typescript
+import postgres from "postgres";
+import { initializePostgresRepositorySchema } from "@fedify/botkit-postgres";
+
+const sql = postgres("postgresql://localhost/botkit");
+await initializePostgresRepositorySchema(sql, "botkit");
+~~~~
+
+If you disable prepared statements for PgBouncer-style deployments, pass
+`false` as the third argument so schema initialization uses the same setting.
+
+[Postgres.js]: https://github.com/porsager/postgres
+
+
`MemoryRepository`
------------------
diff --git a/docs/package.json b/docs/package.json
index 39b2d67..ec3cfe3 100644
--- a/docs/package.json
+++ b/docs/package.json
@@ -1,6 +1,7 @@
{
"devDependencies": {
"@fedify/botkit": "workspace:",
+ "@fedify/botkit-postgres": "workspace:",
"@fedify/botkit-sqlite": "workspace:",
"@fedify/denokv": "jsr:@fedify/denokv@^2.1.2",
"@fedify/fedify": "catalog:",
diff --git a/packages/botkit-postgres/README.md b/packages/botkit-postgres/README.md
new file mode 100644
index 0000000..b2cad79
--- /dev/null
+++ b/packages/botkit-postgres/README.md
@@ -0,0 +1,130 @@
+@fedify/botkit-postgres
+=======================
+
+[![JSR][JSR badge]][JSR]
+[![npm][npm badge]][npm]
+[![GitHub Actions][GitHub Actions badge]][GitHub Actions]
+
+This package is a [PostgreSQL]-based repository implementation for [BotKit].
+It provides persistent shared storage for bots running on either [Deno] or
+[Node.js], supports connection pooling through [Postgres.js], and stores BotKit
+repository data in ordinary PostgreSQL tables under a dedicated schema.
+
+[JSR badge]: https://jsr.io/badges/@fedify/botkit-postgres
+[JSR]: https://jsr.io/@fedify/botkit-postgres
+[npm badge]: https://img.shields.io/npm/v/@fedify/botkit-postgres?logo=npm
+[npm]: https://www.npmjs.com/package/@fedify/botkit-postgres
+[GitHub Actions badge]: https://github.com/fedify-dev/botkit/actions/workflows/main.yaml/badge.svg
+[GitHub Actions]: https://github.com/fedify-dev/botkit/actions/workflows/main.yaml
+[PostgreSQL]: https://www.postgresql.org/
+[BotKit]: https://botkit.fedify.dev/
+[Deno]: https://deno.land/
+[Node.js]: https://nodejs.org/
+[Postgres.js]: https://github.com/porsager/postgres
+
+
+Installation
+------------
+
+~~~~ sh
+deno add jsr:@fedify/botkit-postgres
+npm add @fedify/botkit-postgres
+pnpm add @fedify/botkit-postgres
+yarn add @fedify/botkit-postgres
+~~~~
+
+
+Usage
+-----
+
+The `PostgresRepository` can be used as a drop-in repository implementation for
+BotKit:
+
+~~~~ typescript
+import { createBot } from "@fedify/botkit";
+import { PostgresRepository } from "@fedify/botkit-postgres";
+
+const bot = createBot({
+ username: "mybot",
+ repository: new PostgresRepository({
+ url: "postgresql://localhost/botkit",
+ schema: "botkit",
+ maxConnections: 10,
+ }),
+});
+~~~~
+
+You can also inject an existing [Postgres.js] client. In that case the
+repository does not own the client and `close()` will not shut it down:
+
+~~~~ typescript
+import postgres from "postgres";
+import { PostgresRepository } from "@fedify/botkit-postgres";
+
+const sql = postgres("postgresql://localhost/botkit");
+const repository = new PostgresRepository({
+ sql,
+ schema: "botkit",
+});
+~~~~
+
+
+Options
+-------
+
+The `PostgresRepository` constructor accepts the following options:
+
+ - **`sql`**: An existing [Postgres.js] client to use.
+
+ - **`url`**: A PostgreSQL connection string for an internally managed
+ connection pool.
+
+ - **`schema`** (optional): The PostgreSQL schema name used for BotKit tables.
+ Defaults to `"botkit"`.
+
+ - **`maxConnections`** (optional): Maximum number of connections for an
+ internally managed pool created from `url`.
+
+ - **`prepare`** (optional): Whether to use prepared statements for queries.
+ Defaults to `true`.
+
+The options are mutually exclusive: use either `sql` or `url`. The
+`maxConnections` option is only valid together with `url`.
+
+
+Schema setup
+------------
+
+The repository creates its schema and tables automatically on first use.
+If you want to provision them explicitly ahead of time, use the exported
+`initializePostgresRepositorySchema()` helper:
+
+~~~~ typescript
+import postgres from "postgres";
+import { initializePostgresRepositorySchema } from "@fedify/botkit-postgres";
+
+const sql = postgres("postgresql://localhost/botkit");
+await initializePostgresRepositorySchema(sql, "botkit");
+~~~~
+
+If you disable prepared statements, pass `false` as the third argument so
+schema initialization uses the same setting.
+
+
+Features
+--------
+
+ - **Cross-runtime**: Works with both Deno and Node.js using [Postgres.js]
+
+ - **Shared persistent storage**: Suitable for multi-process and
+ multi-instance deployments backed by PostgreSQL
+
+ - **Schema namespacing**: Keeps BotKit tables grouped under a dedicated
+ PostgreSQL schema
+
+ - **Full `Repository` API**: Implements BotKit repository storage for key
+ pairs, messages, followers, follows, followees, and poll votes
+
+ - **Explicit resource ownership**: Repositories created from a URL own their
+ pool, while repositories created from an injected client leave lifecycle
+ control to the caller
diff --git a/packages/botkit-postgres/deno.json b/packages/botkit-postgres/deno.json
new file mode 100644
index 0000000..f649dd0
--- /dev/null
+++ b/packages/botkit-postgres/deno.json
@@ -0,0 +1,36 @@
+{
+ "name": "@fedify/botkit-postgres",
+ "version": "0.4.0",
+ "license": "AGPL-3.0-only",
+ "exports": {
+ ".": "./src/mod.ts"
+ },
+ "imports": {
+ "@fedify/vocab": "jsr:@fedify/vocab@^2.1.2",
+ "postgres": "npm:postgres@^3.4.8"
+ },
+ "exclude": [
+ "dist",
+ "junit.xml",
+ "package.json"
+ ],
+ "fmt": {
+ "exclude": [
+ "*.md",
+ "*.yaml",
+ "*.yml"
+ ]
+ },
+ "tasks": {
+ "test": "deno test --allow-env --allow-net=localhost,127.0.0.1 --parallel",
+ "test:node": "pnpm install && pnpm test",
+ "test-all": {
+ "dependencies": [
+ "check",
+ "test",
+ "test:node"
+ ]
+ },
+ "coverage": "deno task test --coverage --clean && deno coverage --html"
+ }
+}
diff --git a/packages/botkit-postgres/package.json b/packages/botkit-postgres/package.json
new file mode 100644
index 0000000..02dce30
--- /dev/null
+++ b/packages/botkit-postgres/package.json
@@ -0,0 +1,64 @@
+{
+ "name": "@fedify/botkit-postgres",
+ "version": "0.4.0",
+ "description": "PostgreSQL-based repository for BotKit",
+ "license": "AGPL-3.0-only",
+ "author": {
+ "name": "Hong Minhee",
+ "email": "hong@minhee.org",
+ "url": "https://hongminhee.org/"
+ },
+ "homepage": "https://botkit.fedify.dev/",
+ "repository": {
+ "type": "git",
+ "url": "git+https://github.com/fedify-dev/botkit.git",
+ "directory": "packages/botkit-postgres"
+ },
+ "bugs": {
+ "url": "https://github.com/fedify-dev/botkit/issues"
+ },
+ "funding": [
+ "https://opencollective.com/fedify",
+ "https://github.com/sponsors/dahlia"
+ ],
+ "engines": {
+ "deno": ">=2.0.0",
+ "node": ">=22.0.0"
+ },
+ "type": "module",
+ "module": "./dist/mod.js",
+ "types": "./dist/mod.d.ts",
+ "exports": {
+ ".": {
+ "types": "./dist/mod.d.ts",
+ "import": "./dist/mod.js"
+ },
+ "./package.json": "./package.json"
+ },
+ "sideEffects": true,
+ "files": [
+ "dist",
+ "LICENSE",
+ "package.json",
+ "README.md"
+ ],
+ "peerDependencies": {
+ "@fedify/botkit": "^0.4.0"
+ },
+ "dependencies": {
+ "@fedify/fedify": "^2.1.2",
+ "@fedify/vocab": "^2.1.2",
+ "@js-temporal/polyfill": "^0.5.1",
+ "@logtape/logtape": "^1.3.5",
+ "postgres": "^3.4.8"
+ },
+ "devDependencies": {
+ "tsdown": "^0.12.8"
+ },
+ "scripts": {
+ "build": "tsdown",
+ "prepack": "tsdown",
+ "prepublish": "deno task check && tsdown",
+ "test": "tsdown && cd src/ && node --test --experimental-transform-types"
+ }
+}
diff --git a/packages/botkit-postgres/src/mod.test.ts b/packages/botkit-postgres/src/mod.test.ts
new file mode 100644
index 0000000..3922974
--- /dev/null
+++ b/packages/botkit-postgres/src/mod.test.ts
@@ -0,0 +1,979 @@
+// BotKit by Fedify: A framework for creating ActivityPub bots
+// Copyright (C) 2026 Hong Minhee
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+import {
+ initializePostgresRepositorySchema,
+ PostgresRepository,
+} from "@fedify/botkit-postgres";
+import { importJwk } from "@fedify/fedify/sig";
+import { Create, Follow, Note, Person, PUBLIC_COLLECTION } from "@fedify/vocab";
+import assert from "node:assert/strict";
+import { describe, test } from "node:test";
+import postgres from "postgres";
+
+if (!("Temporal" in globalThis)) {
+ globalThis.Temporal = (await import("@js-temporal" + "/polyfill")).Temporal;
+}
+
+function getPostgresUrl(): string | undefined {
+ if ("process" in globalThis) return globalThis.process.env.POSTGRES_URL;
+ if ("Deno" in globalThis) return globalThis.Deno.env.get("POSTGRES_URL");
+ return undefined;
+}
+
+function createSchemaName(): string {
+ return `botkit_test_${crypto.randomUUID().replaceAll("-", "_")}`;
+}
+
+const postgresUrl = getPostgresUrl();
+
+const keyPairs: CryptoKeyPair[] = [
+ {
+ publicKey: await importJwk({
+ kty: "RSA",
+ alg: "RS256",
+ // cSpell: disable
+ n: "1NZblYSc2beQqDmDUF_VDMeS7bUXShvIMK6NHd9OB-7ivBwad8vUcmqKwWj_ivqZva6EgD-n0549t0Pzn5xTArqEJ-c1DTyhC7TNtof0KIbU75qziHwHOqcyYCHusQgDm_TT7frDuxLqHJQ1UrdADMyCVDPFfcstPHhHp3NYStGeNcBo5B05DB_wkgqX2QF2MamQwkdRRMdZkVees38AsC6GTGoOFRI2lvJuUODtndpyjGAKOkLfkr9XzAcggRYx9ddsHBd5wylffwKhtUtWHkdVBdVAiEX8sZ38LhqNYm161PE83nfEvut6_lCCQ7DlPJ8Tp6SY-f2JTXA-C9sN0uJF8_YGhaCPgolv5Pk2UerQmvhMhql9MLDen1AvZrw0u1CWic0GQeIDA6Op9Exd5azhhdm4iKeYzAekUHFDi6WZRRZRCYgHaEEzXyFt9W3N3paolMYVOh1008d-aIgbYnZMToiwH897uQsNGkd1FVIutycXdeuhAbqB7AtLrzuD78wkKLO8k3DFcix2qaHRqiBKC3lUlDCD_I5yzinY_SOcagdpRxczvi6JN1ahUg39ZKYRtJIxUOp1H3iRrebbaOoxM19-axKH1om0sYtyX4JqYfN9QrSf3cO1I6CGnJY8hIkQ6CDH5Tmk_4VRRKdzphq4jZiiOYfR94WODPKDjTM",
+ e: "AQAB",
+ // cSpell: enable
+ key_ops: ["verify"],
+ ext: true,
+ }, "public"),
+ privateKey: await importJwk({
+ kty: "RSA",
+ alg: "RS256",
+ // cSpell: disable
+ n: "1NZblYSc2beQqDmDUF_VDMeS7bUXShvIMK6NHd9OB-7ivBwad8vUcmqKwWj_ivqZva6EgD-n0549t0Pzn5xTArqEJ-c1DTyhC7TNtof0KIbU75qziHwHOqcyYCHusQgDm_TT7frDuxLqHJQ1UrdADMyCVDPFfcstPHhHp3NYStGeNcBo5B05DB_wkgqX2QF2MamQwkdRRMdZkVees38AsC6GTGoOFRI2lvJuUODtndpyjGAKOkLfkr9XzAcggRYx9ddsHBd5wylffwKhtUtWHkdVBdVAiEX8sZ38LhqNYm161PE83nfEvut6_lCCQ7DlPJ8Tp6SY-f2JTXA-C9sN0uJF8_YGhaCPgolv5Pk2UerQmvhMhql9MLDen1AvZrw0u1CWic0GQeIDA6Op9Exd5azhhdm4iKeYzAekUHFDi6WZRRZRCYgHaEEzXyFt9W3N3paolMYVOh1008d-aIgbYnZMToiwH897uQsNGkd1FVIutycXdeuhAbqB7AtLrzuD78wkKLO8k3DFcix2qaHRqiBKC3lUlDCD_I5yzinY_SOcagdpRxczvi6JN1ahUg39ZKYRtJIxUOp1H3iRrebbaOoxM19-axKH1om0sYtyX4JqYfN9QrSf3cO1I6CGnJY8hIkQ6CDH5Tmk_4VRRKdzphq4jZiiOYfR94WODPKDjTM",
+ e: "AQAB",
+ d: "Yl3DrCHDIDhfifAyyWXRIHvoYyZL4jte1WkG3WSEOtRkRA41CWLSCCNHh8YQPNo_TdQnduJ0nTBIU7f7E6x7DQrI42xPL5Py1mc0oATLiiNurGJyUUUJTklR1e440-bhTCXmANnhtkcyngy9bEI3PvMR1PqsbswFVyo76586kjG5DhykHbGH2Ru14rk0nt23E5LLzY6Kd-AufCbjuQ-ccNC_zvdBFOn7At5-r7CVAVyhjlEgyPZ5P-hhGnG8ywxIANgUJhOPeexYL2o29IQiBBJxsCV0EsdN14UttN0etPvmRh5MRIFUE-zfRkRNQB20hMT8n4FKFlfgKkMS2gXep91h9VVyfYPHAt9jGJgUbIcbx_igeLK3nQlaUXaePf2bAuVRM1kW3P2UR0FOoUKDI5FZmi9XBoEtt0taQYySdKbPSXKaJWO2vKQ4SPyVXzzz-obfVe2zIe1msQ3Tco5RFoHfnufbvvnLC_WUAC9LSfp4jrPvr5lY3uoCFmPma56R-E3mVd2q87Ybk6mqvSh4yWHjid7sfzQ8Ovh9OhZlq_7Mfa3q3M92vNL98iHs8xYkJbE0DJs691UdgX45iNi4DVD-hJ7EbKQQgePsYNovWA611kM-cartevQWk7TBBggy9VYqmdWN0QuVQX9bsHFeYjjKSXg24bV5vYQW3EPkqZk",
+ p: "9DeEDfMVdV605MbHCtHnw5xEbzTHd7vK-qAQNIjz5i4EmFC0tK7dvUiSn0WeyMNYJkuxVxTMHoDbWXzXq45tzbTEYuzEo5wsxyoVvldfFnnJIwMu6Hb7PWjyWfpBcbwLISr8fAJaGPzgcFsJE__KxrvLA66m1q_4k1y1L9CvXWfHDvFqb7VLGzKWXXp2wlbsACZuqx2Ff3THcWoOWb-wSww6AGsYAc3zC_DiYvAaTn9MxszZ0UYuMeJIHjLA1dmjL-Nksvq5GukjFxSSTpUS87zJ08fHoB0FzTKIIjJGpMRf6ebReLqbYCdo2Kr7eC7lbcTfwQTPI6gnHSKgPIYF5Q",
+ q: "3xtArH_4MQjwRpl7JVivzQUZgDTARkynMpX-4Gvyny6Gxx0QLhHH0lQMRhtFWlI6qLZxCCLC9zhXPmGlqW-QWya8-xE80mX45JTrQlwBHISpTWTV3sI2Lp5dg7CW8Sc40CE4kB4Q2rHhf7V-Aimgmqhnl1uguzH2DXfr3RaCor0ge44k6gi1LXEJN_aFQIIFYL8HQOM0ctdY147Kr2rVHLchRnh8Q4GzBAJvpOcfvEDk9HF09NVxeaivLMXChpuSUHqbEGg_lVkotLnCMb-fUWk8QmO8EFFVU0pyOFDqHKIgrHOLSHjgUvV8moBwnMGQxMgu7rpY3g-9cXfsCoKVNw",
+ dp:
+ "bL1vajqrelhSGW-83r95_-pLumx4yIJwrcmpjYrRdtNUrnF5FN6r0wVGa-629dOtI1gevZSAErDzelQRP80qbSapLxcXs3XtpjzB87-5kitl-NYJA-8-jSh2iMPacgb1ua4HQDxX27p1QPH4B9SkeHrTuW8B0KQH_a2Q65pzCxcTVj7-UoEZ0SFkPHkz-fJ0INj7--soLwlTaNd9Tk8A81mdVeRZiywlpVJ7quwX-o3KJNa_weQK26FS1Udp_45pkAAjLWJgG3BldHhvcNgF2UtdXpQc-dkSZTyzyu4x8FmUD3T8HlKQrm69y4POdsQC2i6IJsy6YrkTuXBagrh2VQ",
+ dq:
+ "j0CQZjJEyjdTEAG8cF5hguKjXQ6B5qGROYnV_YNSZaMaJv8iRHJmO0Z8GwenoDbsMyfxq6emR9aFLijEleZsahqVfR-0TePry9lStWkdzZHgozD7oexRnd1Rbh0UzgLBF-I8z0x-xe0xPS7rmbfgx20aFrVentOViVBWwb6SYqvND4hVa2_r5SGPKb_AD4tsqJH_tkosgxCCmuW0fq256JYtZ3I1V6MPrqNhzCAa4GVKnSm8Tvg9xD_rOnRAUu3RJJuUtRQ6v0pgOKqNZiQDx-IqLvaa6l9OygwjCsXpjDkNga0u4Xm7j4jQWOPfasdejPt8Jwy_wtWYbiLyDE2MQQ",
+ qi:
+ "Th3TS6fHquqNljwZU2Vg7ndI0SmJidIwSTS2LlhM-Y2bxaAUF-orpS504xDVk1xjRYBrdxiTOmohbtoKtiWhLveOUAWVoNilMqgEU7lwnhaE3yfiUoE1x8df_wLP_YiAccFKeMZwsQp29aKLxuYQtO2dRSSQkN0IuxMGchnJtGOGNTbyA44O25IwggV1nlJN7OTX-nsJCSCe1XMojnGezhnD4xXGeSuR3S07oDDiWpvAO7qtRphEavVTtXdJWIr27tBvnUytbpb4uq6A3J4-TZ6X9uzlOw6jBSQhbL7fc83Z9E_wjPTnxfHufiC_AtXow6sK7lCy10aJGHp3jnGVdQ",
+ // cSpell: enable
+ key_ops: ["sign"],
+ ext: true,
+ }, "private"),
+ },
+];
+
+function createSql(url: string) {
+ return postgres(url, { max: 1, onnotice: () => {} });
+}
+
+function waitForMacrotask(): Promise {
+ return new Promise((resolve) => setTimeout(resolve, 0));
+}
+
+function createHarness() {
+ if (postgresUrl == null) throw new Error("POSTGRES_URL is not set.");
+ const schema = createSchemaName();
+ const adminSql = createSql(postgresUrl);
+ const repository = new PostgresRepository({
+ url: postgresUrl,
+ schema,
+ maxConnections: 1,
+ });
+ return {
+ adminSql,
+ repository,
+ schema,
+ async cleanup() {
+ await repository.close();
+ await adminSql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`);
+ await adminSql.end();
+ },
+ };
+}
+
+if (postgresUrl == null) {
+ test("PostgresRepository integration tests", { skip: true }, () => {});
+} else {
+ describe("PostgresRepository", () => {
+ test("initializes schema explicitly", async () => {
+ const sql = createSql(postgresUrl);
+ const schema = createSchemaName();
+ try {
+ await initializePostgresRepositorySchema(sql, schema);
+ const tables = await sql.unsafe<{ table_name: string }[]>(
+ `SELECT table_name
+ FROM information_schema.tables
+ WHERE table_schema = $1
+ ORDER BY table_name`,
+ [schema],
+ { prepare: true },
+ );
+ assert.deepStrictEqual(
+ tables.map((row) => row.table_name),
+ [
+ "follow_requests",
+ "followees",
+ "followers",
+ "key_pairs",
+ "messages",
+ "poll_votes",
+ "sent_follows",
+ ],
+ );
+ } finally {
+ await sql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`);
+ await sql.end();
+ }
+ });
+
+ test("honors prepare=false during schema initialization", async () => {
+ const prepares: boolean[] = [];
+ const sql = createSql(postgresUrl);
+ const helperSchema = createSchemaName();
+ const repositorySchema = createSchemaName();
+ const wrappedSql = new Proxy(sql, {
+ get(target, property, receiver) {
+ if (property === "unsafe") {
+ return (
+ query: string,
+ parameters?: postgres.ParameterOrJSON[],
+ options?: postgres.UnsafeQueryOptions,
+ ) => {
+ prepares.push(options?.prepare ?? true);
+ return target.unsafe(query, parameters, options);
+ };
+ }
+ return Reflect.get(target, property, receiver);
+ },
+ });
+ try {
+ await initializePostgresRepositorySchema(
+ wrappedSql,
+ helperSchema,
+ false,
+ );
+
+ const repo = new PostgresRepository({
+ sql: wrappedSql,
+ schema: repositorySchema,
+ prepare: false,
+ });
+ await repo.countMessages();
+
+ assert.ok(prepares.length > 0);
+ assert.ok(prepares.every((prepare) => !prepare));
+ } finally {
+ await sql.unsafe(`DROP SCHEMA IF EXISTS "${helperSchema}" CASCADE`);
+ await sql.unsafe(`DROP SCHEMA IF EXISTS "${repositorySchema}" CASCADE`);
+ await sql.end();
+ }
+ });
+
+ test("rejects invalid constructor option combinations", async () => {
+ const sql = createSql(postgresUrl);
+ try {
+ await assert.doesNotReject(
+ async () =>
+ await Reflect.construct(PostgresRepository, [{
+ sql,
+ url: undefined,
+ maxConnections: undefined,
+ }]).countMessages(),
+ );
+ await assert.rejects(
+ async () =>
+ await Reflect.construct(PostgresRepository, [{
+ sql,
+ url: postgresUrl,
+ maxConnections: 1,
+ }]).countMessages(),
+ new TypeError(
+ "PostgresRepositoryOptions.sql cannot be combined with PostgresRepositoryOptions.url or PostgresRepositoryOptions.maxConnections.",
+ ),
+ );
+ } finally {
+ await sql.end();
+ }
+ });
+
+ test("does not emit unhandled rejections for schema initialization", async () => {
+ const error = new Error("Schema initialization failed.");
+ const sql = {
+ // deno-lint-ignore require-await
+ unsafe: async () => {
+ throw error;
+ },
+ };
+ let unhandledReason: unknown;
+ let detach: (() => void) | undefined;
+ if ("process" in globalThis) {
+ const handler = (reason: unknown) => {
+ unhandledReason = reason;
+ };
+ globalThis.process.once("unhandledRejection", handler);
+ detach = () => {
+ globalThis.process.off("unhandledRejection", handler);
+ };
+ } else {
+ const handler = (event: PromiseRejectionEvent) => {
+ unhandledReason = event.reason;
+ event.preventDefault();
+ };
+ addEventListener("unhandledrejection", handler);
+ detach = () => {
+ removeEventListener("unhandledrejection", handler);
+ };
+ }
+ try {
+ const repo = Reflect.construct(PostgresRepository, [{
+ sql,
+ schema: createSchemaName(),
+ }]) as PostgresRepository;
+ await waitForMacrotask();
+ await assert.rejects(
+ () => repo.countMessages(),
+ error,
+ );
+ await waitForMacrotask();
+ assert.deepStrictEqual(unhandledReason, undefined);
+ } finally {
+ detach?.();
+ }
+ });
+
+ test("does not leak stale followers during concurrent assignment", async () => {
+ const schema = createSchemaName();
+ const adminSql = createSql(postgresUrl);
+ const sqlA = createSql(postgresUrl);
+ const sqlB = createSql(postgresUrl);
+ const followId = new URL(
+ "https://example.com/ap/follow/eac8d54f-f843-49f4-94c1-485a22e07907",
+ );
+ const followerA = new Person({
+ id: new URL("https://example.com/ap/actor/concurrent-alice"),
+ preferredUsername: "concurrent-alice",
+ });
+ const followerB = new Person({
+ id: new URL("https://example.com/ap/actor/concurrent-bob"),
+ preferredUsername: "concurrent-bob",
+ });
+ let arrivals = 0;
+ let releaseBarrier!: () => void;
+ const barrier = new Promise((resolve) => {
+ releaseBarrier = resolve;
+ });
+ const barrierTimeout = 50;
+ const wrapSql = (sql: postgres.Sql): postgres.Sql =>
+ new Proxy(sql, {
+ get(target, property, receiver) {
+ if (property === "begin") {
+ return async (
+ callback: (transactionSql: postgres.TransactionSql) => unknown,
+ ) =>
+ await target.begin(async (transactionSql) =>
+ await callback(
+ new Proxy(transactionSql, {
+ get(txTarget, txProperty, txReceiver) {
+ if (txProperty === "unsafe") {
+ return async (
+ query: string,
+ parameters?: postgres.ParameterOrJSON[],
+ options?: postgres.UnsafeQueryOptions,
+ ): Promise => {
+ const result = await txTarget.unsafe(
+ query,
+ parameters,
+ options,
+ );
+ if (
+ arrivals < 2 &&
+ query.includes("SELECT follower_id") &&
+ query.includes("FOR UPDATE") &&
+ parameters?.[0] === followId.href
+ ) {
+ arrivals++;
+ if (arrivals === 2) releaseBarrier();
+ await Promise.race([
+ barrier,
+ new Promise((resolve) =>
+ setTimeout(resolve, barrierTimeout)
+ ),
+ ]);
+ }
+ return result;
+ };
+ }
+ return Reflect.get(txTarget, txProperty, txReceiver);
+ },
+ }),
+ )
+ );
+ }
+ return Reflect.get(target, property, receiver);
+ },
+ });
+ try {
+ await initializePostgresRepositorySchema(adminSql, schema);
+ const repoA = new PostgresRepository({ sql: wrapSql(sqlA), schema });
+ const repoB = new PostgresRepository({ sql: wrapSql(sqlB), schema });
+
+ await Promise.all([
+ repoA.addFollower(followId, followerA),
+ repoB.addFollower(followId, followerB),
+ ]);
+
+ const followers = await Array.fromAsync(repoA.getFollowers());
+ assert.deepStrictEqual(await repoA.countFollowers(), 1);
+ assert.deepStrictEqual(followers.length, 1);
+ const remainingFollowerId = followers[0]?.id?.href;
+ assert.ok(
+ remainingFollowerId === followerA.id!.href ||
+ remainingFollowerId === followerB.id!.href,
+ );
+ assert.deepStrictEqual(
+ await repoA.hasFollower(followerA.id!),
+ remainingFollowerId === followerA.id!.href,
+ );
+ assert.deepStrictEqual(
+ await repoA.hasFollower(followerB.id!),
+ remainingFollowerId === followerB.id!.href,
+ );
+ } finally {
+ await adminSql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`);
+ await Promise.all([sqlA.end(), sqlB.end(), adminSql.end()]);
+ }
+ });
+
+ test("serializes follower removal with concurrent adds", async () => {
+ const schema = createSchemaName();
+ const adminSql = createSql(postgresUrl);
+ const sqlA = createSql(postgresUrl);
+ const sqlB = createSql(postgresUrl);
+ const followId = new URL(
+ "https://example.com/ap/follow/55db42c7-8a99-4c40-98c1-4684d7d0c758",
+ );
+ const follower = new Person({
+ id: new URL("https://example.com/ap/actor/concurrent-charlie"),
+ preferredUsername: "concurrent-charlie",
+ });
+ let releaseBarrier!: () => void;
+ let resolveLockAcquired!: () => void;
+ const barrier = new Promise((resolve) => {
+ releaseBarrier = resolve;
+ });
+ const lockAcquired = new Promise((resolve) => {
+ resolveLockAcquired = resolve;
+ });
+ const wrappedSql = new Proxy(sqlA, {
+ get(target, property, receiver) {
+ if (property === "begin") {
+ return async (
+ callback: (transactionSql: postgres.TransactionSql) => unknown,
+ ) =>
+ await target.begin(async (transactionSql) =>
+ await callback(
+ new Proxy(transactionSql, {
+ get(txTarget, txProperty, txReceiver) {
+ if (txProperty === "unsafe") {
+ return async (
+ query: string,
+ parameters?: postgres.ParameterOrJSON[],
+ options?: postgres.UnsafeQueryOptions,
+ ): Promise => {
+ const result = await txTarget.unsafe(
+ query,
+ parameters,
+ options,
+ );
+ if (
+ query.includes("pg_advisory_xact_lock") &&
+ parameters?.[1] === `${schema}:${followId.href}`
+ ) {
+ resolveLockAcquired();
+ await barrier;
+ }
+ return result;
+ };
+ }
+ return Reflect.get(txTarget, txProperty, txReceiver);
+ },
+ }),
+ )
+ );
+ }
+ return Reflect.get(target, property, receiver);
+ },
+ });
+ try {
+ await initializePostgresRepositorySchema(adminSql, schema);
+ const repoA = new PostgresRepository({ sql: wrappedSql, schema });
+ const repoB = new PostgresRepository({ sql: sqlB, schema });
+
+ const addPromise = repoA.addFollower(followId, follower);
+ await lockAcquired;
+ const removePromise = repoB.removeFollower(followId, follower.id!);
+ await waitForMacrotask();
+ releaseBarrier();
+
+ const [, removedFollower] = await Promise.all([
+ addPromise,
+ removePromise,
+ ]);
+ assert.deepStrictEqual(
+ await removedFollower?.toJsonLd(),
+ await follower.toJsonLd(),
+ );
+ assert.deepStrictEqual(await repoA.countFollowers(), 0);
+ assert.deepStrictEqual(await repoA.hasFollower(follower.id!), false);
+ } finally {
+ await adminSql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`);
+ await Promise.all([sqlA.end(), sqlB.end(), adminSql.end()]);
+ }
+ });
+
+ test("does not leak stale followers during concurrent reassignment cleanup", async () => {
+ const schema = createSchemaName();
+ const adminSql = createSql(postgresUrl);
+ const sqlA = createSql(postgresUrl);
+ const sqlB = createSql(postgresUrl);
+ const oldFollower = new Person({
+ id: new URL("https://example.com/ap/actor/concurrent-dana"),
+ preferredUsername: "concurrent-dana",
+ });
+ const newFollowerA = new Person({
+ id: new URL("https://example.com/ap/actor/concurrent-emma"),
+ preferredUsername: "concurrent-emma",
+ });
+ const newFollowerB = new Person({
+ id: new URL("https://example.com/ap/actor/concurrent-frank"),
+ preferredUsername: "concurrent-frank",
+ });
+ const followA = new URL(
+ "https://example.com/ap/follow/ee4ca254-e614-4433-941b-bbe95b4b92a3",
+ );
+ const followB = new URL(
+ "https://example.com/ap/follow/c7bacad2-3078-4cd1-9a26-4b13e58b8d67",
+ );
+ let arrivals = 0;
+ let releaseBarrier!: () => void;
+ const barrier = new Promise((resolve) => {
+ releaseBarrier = resolve;
+ });
+ const barrierTimeout = 50;
+ const wrapSql = (sql: postgres.Sql): postgres.Sql =>
+ new Proxy(sql, {
+ get(target, property, receiver) {
+ if (property === "begin") {
+ return async (
+ callback: (transactionSql: postgres.TransactionSql) => unknown,
+ ) =>
+ await target.begin(async (transactionSql) =>
+ await callback(
+ new Proxy(transactionSql, {
+ get(txTarget, txProperty, txReceiver) {
+ if (txProperty === "unsafe") {
+ return async (
+ query: string,
+ parameters?: postgres.ParameterOrJSON[],
+ options?: postgres.UnsafeQueryOptions,
+ ): Promise => {
+ if (
+ arrivals < 2 &&
+ query.includes(
+ `DELETE FROM "${schema}"."followers"`,
+ ) &&
+ parameters?.[0] === oldFollower.id!.href
+ ) {
+ arrivals++;
+ if (arrivals === 2) releaseBarrier();
+ await Promise.race([
+ barrier,
+ new Promise((resolve) =>
+ setTimeout(resolve, barrierTimeout)
+ ),
+ ]);
+ }
+ return await txTarget.unsafe(
+ query,
+ parameters,
+ options,
+ );
+ };
+ }
+ return Reflect.get(txTarget, txProperty, txReceiver);
+ },
+ }),
+ )
+ );
+ }
+ return Reflect.get(target, property, receiver);
+ },
+ });
+ try {
+ await initializePostgresRepositorySchema(adminSql, schema);
+ const setupRepo = new PostgresRepository({ sql: adminSql, schema });
+ await setupRepo.addFollower(followA, oldFollower);
+ await setupRepo.addFollower(followB, oldFollower);
+
+ const repoA = new PostgresRepository({ sql: wrapSql(sqlA), schema });
+ const repoB = new PostgresRepository({ sql: wrapSql(sqlB), schema });
+ await Promise.all([
+ repoA.addFollower(followA, newFollowerA),
+ repoB.addFollower(followB, newFollowerB),
+ ]);
+
+ const followers = await Promise.all(
+ (await Array.fromAsync(repoA.getFollowers())).map((follower) =>
+ follower.toJsonLd()
+ ),
+ );
+ assert.deepStrictEqual(await repoA.countFollowers(), 2);
+ assert.deepStrictEqual(await repoA.hasFollower(oldFollower.id!), false);
+ assert.deepStrictEqual(followers, [
+ await newFollowerA.toJsonLd(),
+ await newFollowerB.toJsonLd(),
+ ]);
+ } finally {
+ await adminSql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`);
+ await Promise.all([sqlA.end(), sqlB.end(), adminSql.end()]);
+ }
+ });
+
+ test("does not drop valid followers during concurrent cleanup and add", async () => {
+ const schema = createSchemaName();
+ const adminSql = createSql(postgresUrl);
+ const sqlA = createSql(postgresUrl);
+ const sqlB = createSql(postgresUrl);
+ const oldFollower = new Person({
+ id: new URL("https://example.com/ap/actor/concurrent-grace"),
+ preferredUsername: "concurrent-grace",
+ });
+ const reassignedFollower = new Person({
+ id: new URL("https://example.com/ap/actor/concurrent-harper"),
+ preferredUsername: "concurrent-harper",
+ });
+ const followA = new URL(
+ "https://example.com/ap/follow/ea47ae4b-8e5d-4f5f-b1c8-9d66d5d11d83",
+ );
+ const followB = new URL(
+ "https://example.com/ap/follow/8f65dab0-2a7e-4c6a-b53e-99f3e2521c0e",
+ );
+ let resolveCleanupReady!: () => void;
+ let resolveInsertedNewRequest!: () => void;
+ const cleanupReady = new Promise((resolve) => {
+ resolveCleanupReady = resolve;
+ });
+ const insertedNewRequest = new Promise((resolve) => {
+ resolveInsertedNewRequest = resolve;
+ });
+ const barrierTimeout = 50;
+ const wrapCleanupSql = (sql: postgres.Sql): postgres.Sql =>
+ new Proxy(sql, {
+ get(target, property, receiver) {
+ if (property === "begin") {
+ return async (
+ callback: (transactionSql: postgres.TransactionSql) => unknown,
+ ) =>
+ await target.begin(async (transactionSql) =>
+ await callback(
+ new Proxy(transactionSql, {
+ get(txTarget, txProperty, txReceiver) {
+ if (txProperty === "unsafe") {
+ return async (
+ query: string,
+ parameters?: postgres.ParameterOrJSON[],
+ options?: postgres.UnsafeQueryOptions,
+ ): Promise => {
+ if (
+ query.includes(
+ `DELETE FROM "${schema}"."followers"`,
+ ) &&
+ parameters?.[0] === oldFollower.id!.href
+ ) {
+ resolveCleanupReady();
+ await Promise.race([
+ insertedNewRequest,
+ new Promise((resolve) =>
+ setTimeout(resolve, barrierTimeout)
+ ),
+ ]);
+ }
+ return await txTarget.unsafe(
+ query,
+ parameters,
+ options,
+ );
+ };
+ }
+ return Reflect.get(txTarget, txProperty, txReceiver);
+ },
+ }),
+ )
+ );
+ }
+ return Reflect.get(target, property, receiver);
+ },
+ });
+ const wrapAddSql = (sql: postgres.Sql): postgres.Sql =>
+ new Proxy(sql, {
+ get(target, property, receiver) {
+ if (property === "begin") {
+ return async (
+ callback: (transactionSql: postgres.TransactionSql) => unknown,
+ ) =>
+ await target.begin(async (transactionSql) =>
+ await callback(
+ new Proxy(transactionSql, {
+ get(txTarget, txProperty, txReceiver) {
+ if (txProperty === "unsafe") {
+ return async (
+ query: string,
+ parameters?: postgres.ParameterOrJSON[],
+ options?: postgres.UnsafeQueryOptions,
+ ): Promise => {
+ const result = await txTarget.unsafe(
+ query,
+ parameters,
+ options,
+ );
+ if (
+ query.includes(
+ `INSERT INTO "${schema}"."follow_requests"`,
+ ) &&
+ parameters?.[0] === followB.href &&
+ parameters?.[1] === oldFollower.id!.href
+ ) {
+ resolveInsertedNewRequest();
+ await new Promise((resolve) =>
+ setTimeout(resolve, barrierTimeout)
+ );
+ }
+ return result;
+ };
+ }
+ return Reflect.get(txTarget, txProperty, txReceiver);
+ },
+ }),
+ )
+ );
+ }
+ return Reflect.get(target, property, receiver);
+ },
+ });
+ try {
+ await initializePostgresRepositorySchema(adminSql, schema);
+ const setupRepo = new PostgresRepository({ sql: adminSql, schema });
+ await setupRepo.addFollower(followA, oldFollower);
+
+ const repoA = new PostgresRepository({
+ sql: wrapCleanupSql(sqlA),
+ schema,
+ });
+ const repoB = new PostgresRepository({ sql: wrapAddSql(sqlB), schema });
+
+ const reassignPromise = repoA.addFollower(followA, reassignedFollower);
+ await cleanupReady;
+ const addPromise = repoB.addFollower(followB, oldFollower);
+ await Promise.all([reassignPromise, addPromise]);
+
+ const followers = await Promise.all(
+ (await Array.fromAsync(repoA.getFollowers())).map((follower) =>
+ follower.toJsonLd()
+ ),
+ );
+ assert.deepStrictEqual(await repoA.countFollowers(), 2);
+ assert.ok(await repoA.hasFollower(oldFollower.id!));
+ assert.ok(await repoA.hasFollower(reassignedFollower.id!));
+ assert.deepStrictEqual(followers, [
+ await oldFollower.toJsonLd(),
+ await reassignedFollower.toJsonLd(),
+ ]);
+ assert.deepStrictEqual(
+ await (await repoA.removeFollower(followB, oldFollower.id!))
+ ?.toJsonLd(),
+ await oldFollower.toJsonLd(),
+ );
+ assert.deepStrictEqual(await repoA.countFollowers(), 1);
+ assert.deepStrictEqual(await repoA.hasFollower(oldFollower.id!), false);
+ } finally {
+ await adminSql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`);
+ await Promise.all([sqlA.end(), sqlB.end(), adminSql.end()]);
+ }
+ });
+
+ test("repository operations and persistence", async () => {
+ const harness = createHarness();
+ try {
+ const repo = harness.repository;
+
+ assert.deepStrictEqual(await repo.getKeyPairs(), undefined);
+ await repo.setKeyPairs(keyPairs);
+ assert.deepStrictEqual(await repo.getKeyPairs(), keyPairs);
+
+ const messageA = new Create({
+ id: new URL(
+ "https://example.com/ap/create/01941f29-7c00-7fe8-ab0a-7b593990a3c0",
+ ),
+ actor: new URL("https://example.com/ap/actor/bot"),
+ to: new URL("https://example.com/ap/actor/bot/followers"),
+ cc: PUBLIC_COLLECTION,
+ object: new Note({
+ id: new URL(
+ "https://example.com/ap/note/01941f29-7c00-7fe8-ab0a-7b593990a3c0",
+ ),
+ attribution: new URL("https://example.com/ap/actor/bot"),
+ to: new URL("https://example.com/ap/actor/bot/followers"),
+ cc: PUBLIC_COLLECTION,
+ content: "Hello, world!",
+ published: Temporal.Instant.from("2025-01-01T00:00:00Z"),
+ }),
+ published: Temporal.Instant.from("2025-01-01T00:00:00Z"),
+ });
+ const messageB = new Create({
+ id: new URL(
+ "https://example.com/ap/create/0194244f-d800-7873-8993-ef71ccd47306",
+ ),
+ actor: new URL("https://example.com/ap/actor/bot"),
+ to: new URL("https://example.com/ap/actor/bot/followers"),
+ cc: PUBLIC_COLLECTION,
+ object: new Note({
+ id: new URL(
+ "https://example.com/ap/note/0194244f-d800-7873-8993-ef71ccd47306",
+ ),
+ attribution: new URL("https://example.com/ap/actor/bot"),
+ to: new URL("https://example.com/ap/actor/bot/followers"),
+ cc: PUBLIC_COLLECTION,
+ content: "Second message",
+ published: Temporal.Instant.from("2025-01-02T00:00:00Z"),
+ }),
+ published: Temporal.Instant.from("2025-01-02T00:00:00Z"),
+ });
+ const messageB2 = new Create({
+ id: new URL(
+ "https://example.com/ap/create/0194244f-d800-7873-8993-ef71ccd47306",
+ ),
+ actor: new URL("https://example.com/ap/actor/bot"),
+ to: new URL("https://example.com/ap/actor/bot/followers"),
+ cc: PUBLIC_COLLECTION,
+ object: new Note({
+ id: new URL(
+ "https://example.com/ap/note/0194244f-d800-7873-8993-ef71ccd47306",
+ ),
+ attribution: new URL("https://example.com/ap/actor/bot"),
+ to: new URL("https://example.com/ap/actor/bot/followers"),
+ cc: PUBLIC_COLLECTION,
+ content: "Updated message",
+ published: Temporal.Instant.from("2025-01-02T00:00:00Z"),
+ updated: Temporal.Instant.from("2025-01-02T12:00:00Z"),
+ }),
+ published: Temporal.Instant.from("2025-01-02T00:00:00Z"),
+ updated: Temporal.Instant.from("2025-01-02T12:00:00Z"),
+ });
+
+ assert.deepStrictEqual(await repo.countMessages(), 0);
+ await repo.addMessage("01941f29-7c00-7fe8-ab0a-7b593990a3c0", messageA);
+ await repo.addMessage("0194244f-d800-7873-8993-ef71ccd47306", messageB);
+ assert.deepStrictEqual(await repo.countMessages(), 2);
+ assert.deepStrictEqual(
+ await Promise.all(
+ (await Array.fromAsync(repo.getMessages({ order: "oldest" }))).map((
+ message,
+ ) => message.toJsonLd()),
+ ),
+ [await messageA.toJsonLd(), await messageB.toJsonLd()],
+ );
+ assert.deepStrictEqual(
+ await Promise.all(
+ (await Array.fromAsync(
+ repo.getMessages({
+ since: Temporal.Instant.from("2025-01-02T00:00:00Z"),
+ }),
+ )).map((message) => message.toJsonLd()),
+ ),
+ [await messageB.toJsonLd()],
+ );
+ assert.ok(
+ await repo.updateMessage(
+ "0194244f-d800-7873-8993-ef71ccd47306",
+ async (message) =>
+ message.clone({
+ object: await messageB2.getObject(),
+ updated: messageB2.updated,
+ }),
+ ),
+ );
+ assert.deepStrictEqual(
+ await (await repo.getMessage("0194244f-d800-7873-8993-ef71ccd47306"))
+ ?.toJsonLd(),
+ await messageB2.toJsonLd(),
+ );
+ assert.deepStrictEqual(
+ await (await repo.removeMessage(
+ "01941f29-7c00-7fe8-ab0a-7b593990a3c0",
+ ))
+ ?.toJsonLd(),
+ await messageA.toJsonLd(),
+ );
+ assert.deepStrictEqual(await repo.countMessages(), 1);
+
+ const followerA = new Person({
+ id: new URL("https://example.com/ap/actor/alice"),
+ preferredUsername: "alice",
+ });
+ const followerB = new Person({
+ id: new URL("https://example.com/ap/actor/bob"),
+ preferredUsername: "bob",
+ });
+ const followA = new URL(
+ "https://example.com/ap/follow/f2fb7255-d3ad-4fef-8f9a-1d0f2c2ec0b4",
+ );
+ const followB = new URL(
+ "https://example.com/ap/follow/a3d4cc4f-af93-4a9f-a7b3-0b7c0fe4901d",
+ );
+
+ await repo.addFollower(followA, followerA);
+ await repo.addFollower(followB, followerB);
+ assert.ok(await repo.hasFollower(followerA.id!));
+ assert.deepStrictEqual(await repo.countFollowers(), 2);
+ assert.deepStrictEqual(
+ await Promise.all(
+ (await Array.fromAsync(repo.getFollowers({ offset: 1 }))).map((
+ follower,
+ ) => follower.toJsonLd()),
+ ),
+ [await followerB.toJsonLd()],
+ );
+ assert.deepStrictEqual(
+ await repo.removeFollower(followA, followerB.id!),
+ undefined,
+ );
+ assert.deepStrictEqual(
+ await (await repo.removeFollower(followA, followerA.id!))?.toJsonLd(),
+ await followerA.toJsonLd(),
+ );
+ assert.deepStrictEqual(await repo.countFollowers(), 1);
+
+ const followA2 = new URL(
+ "https://example.com/ap/follow/6eedf12f-32aa-4f1d-b6ca-d5bf34c4d149",
+ );
+ await repo.addFollower(followA, followerA);
+ await repo.addFollower(followA2, followerA);
+ assert.deepStrictEqual(await repo.countFollowers(), 2);
+ assert.ok(await repo.hasFollower(followerA.id!));
+ assert.deepStrictEqual(
+ await (await repo.removeFollower(followA, followerA.id!))?.toJsonLd(),
+ await followerA.toJsonLd(),
+ );
+ assert.ok(await repo.hasFollower(followerA.id!));
+ assert.deepStrictEqual(await repo.countFollowers(), 2);
+ assert.deepStrictEqual(
+ await (await repo.removeFollower(followA2, followerA.id!))
+ ?.toJsonLd(),
+ await followerA.toJsonLd(),
+ );
+ assert.deepStrictEqual(await repo.countFollowers(), 1);
+ assert.deepStrictEqual(await repo.hasFollower(followerA.id!), false);
+
+ await repo.addFollower(followA, followerA);
+ assert.deepStrictEqual(await repo.countFollowers(), 2);
+ await repo.addFollower(followA, followerB);
+ assert.deepStrictEqual(await repo.countFollowers(), 1);
+ assert.deepStrictEqual(await repo.hasFollower(followerA.id!), false);
+ assert.ok(await repo.hasFollower(followerB.id!));
+ assert.deepStrictEqual(
+ await Promise.all(
+ (await Array.fromAsync(repo.getFollowers())).map((follower) =>
+ follower.toJsonLd()
+ ),
+ ),
+ [await followerB.toJsonLd()],
+ );
+
+ const sentFollow = new Follow({
+ id: new URL(
+ "https://example.com/ap/follow/03a395a2-353a-4894-afdb-2cab31a7b004",
+ ),
+ actor: new URL("https://example.com/ap/actor/bot"),
+ object: new URL("https://example.com/ap/actor/john"),
+ });
+ await repo.addSentFollow(
+ "03a395a2-353a-4894-afdb-2cab31a7b004",
+ sentFollow,
+ );
+ assert.deepStrictEqual(
+ await (await repo.getSentFollow(
+ "03a395a2-353a-4894-afdb-2cab31a7b004",
+ ))
+ ?.toJsonLd(),
+ await sentFollow.toJsonLd(),
+ );
+ await repo.removeSentFollow("03a395a2-353a-4894-afdb-2cab31a7b004");
+ assert.deepStrictEqual(
+ await repo.getSentFollow("03a395a2-353a-4894-afdb-2cab31a7b004"),
+ undefined,
+ );
+
+ const followeeId = new URL("https://example.com/ap/actor/john");
+ await repo.addFollowee(followeeId, sentFollow);
+ assert.deepStrictEqual(
+ await (await repo.getFollowee(followeeId))?.toJsonLd(),
+ await sentFollow.toJsonLd(),
+ );
+ await repo.removeFollowee(followeeId);
+ assert.deepStrictEqual(await repo.getFollowee(followeeId), undefined);
+
+ const messageId = "01945678-1234-7890-abcd-ef0123456789";
+ const voter1 = new URL("https://example.com/ap/actor/alice");
+ const voter2 = new URL("https://example.com/ap/actor/bob");
+ await repo.vote(messageId, voter1, "option1");
+ await repo.vote(messageId, voter1, "option1");
+ await repo.vote(messageId, voter1, "option2");
+ await repo.vote(messageId, voter2, "option1");
+ assert.deepStrictEqual(await repo.countVoters(messageId), 2);
+ assert.deepStrictEqual(await repo.countVotes(messageId), {
+ "option1": 2,
+ "option2": 1,
+ });
+
+ await repo.close();
+ const repo2 = new PostgresRepository({
+ url: postgresUrl,
+ schema: harness.schema,
+ maxConnections: 1,
+ });
+ assert.deepStrictEqual(await repo2.getKeyPairs(), keyPairs);
+ assert.deepStrictEqual(await repo2.countMessages(), 1);
+ await repo2.close();
+ } finally {
+ await harness.cleanup();
+ }
+ });
+
+ test("does not close injected clients", async () => {
+ const schema = createSchemaName();
+ const sql = createSql(postgresUrl);
+ try {
+ const repo = new PostgresRepository({ sql, schema });
+ await repo.countMessages();
+ await repo.close();
+
+ const result = await sql`SELECT 1 AS value`;
+ assert.deepStrictEqual(result[0]?.value, 1);
+ } finally {
+ await sql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`);
+ await sql.end();
+ }
+ });
+ });
+}
diff --git a/packages/botkit-postgres/src/mod.ts b/packages/botkit-postgres/src/mod.ts
new file mode 100644
index 0000000..7006ecc
--- /dev/null
+++ b/packages/botkit-postgres/src/mod.ts
@@ -0,0 +1,868 @@
+// BotKit by Fedify: A framework for creating ActivityPub bots
+// Copyright (C) 2026 Hong Minhee
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+import type {
+ Repository,
+ RepositoryGetFollowersOptions,
+ RepositoryGetMessagesOptions,
+ Uuid,
+} from "@fedify/botkit/repository";
+import { exportJwk, importJwk } from "@fedify/fedify/sig";
+import { Temporal, toTemporalInstant } from "@js-temporal/polyfill";
+import {
+ Activity,
+ type Actor,
+ Announce,
+ Create,
+ Follow,
+ isActor,
+ Object,
+} from "@fedify/vocab";
+import { getLogger } from "@logtape/logtape";
+import postgres from "postgres";
+
+if (!("Temporal" in globalThis)) {
+ Reflect.set(globalThis, "Temporal", Temporal);
+}
+if (Date.prototype.toTemporalInstant == null) {
+ Reflect.set(Date.prototype, "toTemporalInstant", toTemporalInstant);
+}
+
+const logger = getLogger(["botkit", "postgres"]);
+const schemaNamePattern = /^[A-Za-z_][A-Za-z0-9_]*$/;
+const followRequestAdvisoryLockNamespace = 0x4254;
+const followerAdvisoryLockNamespace = 0x4246;
+
+type Queryable = Pick;
+type QueryParameter = postgres.SerializableParameter;
+
+/**
+ * Common options for creating a PostgreSQL repository.
+ * @since 0.4.0
+ */
+interface PostgresRepositoryOptionsBase {
+ /**
+ * The PostgreSQL schema name to use.
+ * @default `"botkit"`
+ */
+ readonly schema?: string;
+
+ /**
+ * Whether to use prepared statements for queries.
+ * @default true
+ */
+ readonly prepare?: boolean;
+}
+
+/**
+ * Options for creating a PostgreSQL repository from an injected client.
+ * @since 0.4.0
+ */
+interface PostgresRepositoryOptionsWithClient
+ extends PostgresRepositoryOptionsBase {
+ /**
+ * A pre-configured PostgreSQL client to use.
+ */
+ readonly sql: postgres.Sql;
+
+ /**
+ * Disallowed when `sql` is provided.
+ */
+ readonly url?: never;
+
+ /**
+ * Disallowed when `sql` is provided.
+ */
+ readonly maxConnections?: never;
+}
+
+/**
+ * Options for creating a PostgreSQL repository from a connection string.
+ * @since 0.4.0
+ */
+interface PostgresRepositoryOptionsWithUrl
+ extends PostgresRepositoryOptionsBase {
+ /**
+ * A PostgreSQL connection string to connect with.
+ */
+ readonly url: string | URL;
+
+ /**
+ * Disallowed when `url` is provided.
+ */
+ readonly sql?: never;
+
+ /**
+ * The maximum number of connections for an owned pool.
+ */
+ readonly maxConnections?: number;
+}
+
+/**
+ * Options for creating a PostgreSQL repository.
+ * @since 0.4.0
+ */
+export type PostgresRepositoryOptions =
+ | PostgresRepositoryOptionsWithClient
+ | PostgresRepositoryOptionsWithUrl;
+
+/**
+ * Initializes the PostgreSQL schema used by BotKit repositories.
+ * @param sql The PostgreSQL client to initialize the schema with.
+ * @param schema The PostgreSQL schema name to initialize.
+ * @param prepare Whether to use prepared statements for schema queries.
+ * @since 0.4.0
+ */
+export async function initializePostgresRepositorySchema(
+ sql: Queryable,
+ schema = "botkit",
+ prepare = true,
+): Promise {
+ const validatedSchema = validateSchemaName(schema);
+ await execute(
+ sql,
+ `CREATE SCHEMA IF NOT EXISTS "${validatedSchema}"`,
+ [],
+ prepare,
+ );
+ await execute(
+ sql,
+ `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."key_pairs" (
+ position INTEGER PRIMARY KEY,
+ private_key_jwk JSONB NOT NULL,
+ public_key_jwk JSONB NOT NULL
+ )`,
+ [],
+ prepare,
+ );
+ await execute(
+ sql,
+ `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."messages" (
+ id TEXT PRIMARY KEY,
+ activity_json JSONB NOT NULL,
+ published BIGINT
+ )`,
+ [],
+ prepare,
+ );
+ await execute(
+ sql,
+ `CREATE INDEX IF NOT EXISTS "idx_messages_published"
+ ON "${validatedSchema}"."messages" (published, id)`,
+ [],
+ prepare,
+ );
+ await execute(
+ sql,
+ `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."followers" (
+ follower_id TEXT PRIMARY KEY,
+ actor_json JSONB NOT NULL
+ )`,
+ [],
+ prepare,
+ );
+ await execute(
+ sql,
+ `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."follow_requests" (
+ follow_request_id TEXT PRIMARY KEY,
+ follower_id TEXT NOT NULL
+ REFERENCES "${validatedSchema}"."followers" (follower_id)
+ ON DELETE CASCADE
+ )`,
+ [],
+ prepare,
+ );
+ await execute(
+ sql,
+ `CREATE INDEX IF NOT EXISTS "idx_follow_requests_follower"
+ ON "${validatedSchema}"."follow_requests" (follower_id)`,
+ [],
+ prepare,
+ );
+ await execute(
+ sql,
+ `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."sent_follows" (
+ id TEXT PRIMARY KEY,
+ follow_json JSONB NOT NULL
+ )`,
+ [],
+ prepare,
+ );
+ await execute(
+ sql,
+ `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."followees" (
+ followee_id TEXT PRIMARY KEY,
+ follow_json JSONB NOT NULL
+ )`,
+ [],
+ prepare,
+ );
+ await execute(
+ sql,
+ `CREATE TABLE IF NOT EXISTS "${validatedSchema}"."poll_votes" (
+ message_id TEXT NOT NULL,
+ voter_id TEXT NOT NULL,
+ option TEXT NOT NULL,
+ PRIMARY KEY (message_id, voter_id, option)
+ )`,
+ [],
+ prepare,
+ );
+ await execute(
+ sql,
+ `CREATE INDEX IF NOT EXISTS "idx_poll_votes_message_option"
+ ON "${validatedSchema}"."poll_votes" (message_id, option)`,
+ [],
+ prepare,
+ );
+}
+
+/**
+ * A repository for storing bot data using PostgreSQL.
+ * @since 0.4.0
+ */
+export class PostgresRepository implements Repository, AsyncDisposable {
+ readonly sql: postgres.Sql;
+ readonly schema: string;
+ readonly prepare: boolean;
+ private readonly ownsSql: boolean;
+ private readonly ready: Promise;
+
+ constructor(options: PostgresRepositoryOptions) {
+ this.schema = validateSchemaName(options.schema ?? "botkit");
+ this.prepare = options.prepare ?? true;
+ if ("sql" in options) {
+ if (options.url != null || options.maxConnections != null) {
+ throw new TypeError(
+ "PostgresRepositoryOptions.sql cannot be combined with PostgresRepositoryOptions.url or PostgresRepositoryOptions.maxConnections.",
+ );
+ }
+ this.ownsSql = false;
+ this.sql = options.sql;
+ } else {
+ if (options.url == null) {
+ throw new TypeError(
+ "PostgresRepositoryOptions.url must be provided when PostgresRepositoryOptions.sql is absent.",
+ );
+ }
+ this.ownsSql = true;
+ const url = typeof options.url === "string"
+ ? options.url
+ : options.url.href;
+ this.sql = postgres(url, {
+ max: options.maxConnections,
+ onnotice: () => {},
+ prepare: this.prepare,
+ });
+ }
+ const ready = initializePostgresRepositorySchema(
+ this.sql,
+ this.schema,
+ this.prepare,
+ );
+ // Avoid unhandled rejection warnings before a repository method awaits it.
+ ready.catch(() => {});
+ this.ready = ready;
+ }
+
+ async [Symbol.asyncDispose](): Promise {
+ await this.close();
+ }
+
+ /**
+ * Closes the underlying PostgreSQL connection pool if owned by the
+ * repository.
+ */
+ async close(): Promise {
+ try {
+ await this.ready;
+ } finally {
+ if (this.ownsSql) {
+ await this.sql.end({ timeout: 5 });
+ }
+ }
+ }
+
+ async setKeyPairs(keyPairs: CryptoKeyPair[]): Promise {
+ await this.ensureReady();
+ await this.sql.begin(async (sql) => {
+ await this.query(sql, `DELETE FROM ${this.table("key_pairs")}`);
+ for (const [position, keyPair] of keyPairs.entries()) {
+ const privateJwk = await exportJwk(keyPair.privateKey);
+ const publicJwk = await exportJwk(keyPair.publicKey);
+ await this.query(
+ sql,
+ `INSERT INTO ${this.table("key_pairs")}
+ (position, private_key_jwk, public_key_jwk)
+ VALUES ($1, $2::jsonb, $3::jsonb)`,
+ [
+ position,
+ serializeJson(privateJwk),
+ serializeJson(publicJwk),
+ ],
+ );
+ }
+ });
+ }
+
+ async getKeyPairs(): Promise {
+ await this.ensureReady();
+ const rows = await this.query<{
+ readonly private_key_jwk: unknown;
+ readonly public_key_jwk: unknown;
+ }>(
+ this.sql,
+ `SELECT private_key_jwk, public_key_jwk
+ FROM ${this.table("key_pairs")}
+ ORDER BY position ASC`,
+ );
+ if (rows.length < 1) return undefined;
+ const keyPairs: CryptoKeyPair[] = [];
+ for (const row of rows) {
+ const privateJwk = normalizeJsonObject(row.private_key_jwk);
+ const publicJwk = normalizeJsonObject(row.public_key_jwk);
+ if (privateJwk == null || publicJwk == null) {
+ throw new TypeError("A stored key pair is malformed.");
+ }
+ keyPairs.push({
+ privateKey: await importJwk(privateJwk, "private"),
+ publicKey: await importJwk(publicJwk, "public"),
+ });
+ }
+ return keyPairs;
+ }
+
+ async addMessage(id: Uuid, activity: Create | Announce): Promise {
+ await this.ensureReady();
+ await this.query(
+ this.sql,
+ `INSERT INTO ${this.table("messages")} (id, activity_json, published)
+ VALUES ($1, $2::jsonb, $3)`,
+ [
+ id,
+ serializeJson(await activity.toJsonLd({ format: "compact" })),
+ activity.published?.epochMilliseconds ?? null,
+ ],
+ );
+ }
+
+ async updateMessage(
+ id: Uuid,
+ updater: (
+ existing: Create | Announce,
+ ) => Create | Announce | undefined | Promise,
+ ): Promise {
+ await this.ensureReady();
+ return await this.sql.begin(async (sql) => {
+ const rows = await this.query<{ readonly activity_json: unknown }>(
+ sql,
+ `SELECT activity_json
+ FROM ${this.table("messages")}
+ WHERE id = $1
+ FOR UPDATE`,
+ [id],
+ );
+ const row = rows[0];
+ if (row == null) return false;
+ const activity = await parseActivity(row.activity_json);
+ if (activity == null) return false;
+ const updated = await updater(activity);
+ if (updated == null) return false;
+ await this.query(
+ sql,
+ `UPDATE ${this.table("messages")}
+ SET activity_json = $1::jsonb,
+ published = $2
+ WHERE id = $3`,
+ [
+ serializeJson(await updated.toJsonLd({ format: "compact" })),
+ updated.published?.epochMilliseconds ?? null,
+ id,
+ ],
+ );
+ return true;
+ });
+ }
+
+ async removeMessage(id: Uuid): Promise {
+ await this.ensureReady();
+ const rows = await this.query<{ readonly activity_json: unknown }>(
+ this.sql,
+ `DELETE FROM ${this.table("messages")}
+ WHERE id = $1
+ RETURNING activity_json`,
+ [id],
+ );
+ return await parseActivity(rows[0]?.activity_json);
+ }
+
+ async *getMessages(
+ options: RepositoryGetMessagesOptions = {},
+ ): AsyncIterable {
+ await this.ensureReady();
+ const { order = "newest", since, until, limit } = options;
+ const parameters: QueryParameter[] = [];
+ let query = `SELECT activity_json
+ FROM ${this.table("messages")}
+ WHERE TRUE`;
+ if (since != null) {
+ parameters.push(since.epochMilliseconds);
+ query += ` AND published >= $${parameters.length}`;
+ }
+ if (until != null) {
+ parameters.push(until.epochMilliseconds);
+ query += ` AND published <= $${parameters.length}`;
+ }
+ query += order === "oldest"
+ ? " ORDER BY published ASC NULLS LAST, id ASC"
+ : " ORDER BY published DESC NULLS LAST, id DESC";
+ if (limit != null) {
+ parameters.push(limit);
+ query += ` LIMIT $${parameters.length}`;
+ }
+ const rows = await this.query<{ readonly activity_json: unknown }>(
+ this.sql,
+ query,
+ parameters,
+ );
+ for (const row of rows) {
+ const activity = await parseActivity(row.activity_json);
+ if (activity != null) yield activity;
+ }
+ }
+
+ async getMessage(id: Uuid): Promise {
+ await this.ensureReady();
+ const rows = await this.query<{ readonly activity_json: unknown }>(
+ this.sql,
+ `SELECT activity_json
+ FROM ${this.table("messages")}
+ WHERE id = $1`,
+ [id],
+ );
+ return await parseActivity(rows[0]?.activity_json);
+ }
+
+ async countMessages(): Promise {
+ await this.ensureReady();
+ const rows = await this.query<{ readonly count: number }>(
+ this.sql,
+ `SELECT COUNT(*)::integer AS count
+ FROM ${this.table("messages")}`,
+ );
+ return rows[0]?.count ?? 0;
+ }
+
+ async addFollower(followId: URL, follower: Actor): Promise {
+ await this.ensureReady();
+ if (follower.id == null) {
+ throw new TypeError("The follower ID is missing.");
+ }
+ const followerId = follower.id;
+ const followerJson = await follower.toJsonLd({ format: "compact" });
+ await this.sql.begin(async (sql) => {
+ await this.lockFollowRequest(sql, followId);
+ const rows = await this.query<{ readonly follower_id: string }>(
+ sql,
+ `SELECT follower_id
+ FROM ${this.table("follow_requests")}
+ WHERE follow_request_id = $1
+ FOR UPDATE`,
+ [followId.href],
+ );
+ const previousFollowerId = rows[0]?.follower_id;
+ await this.lockFollowers(sql, [
+ followerId.href,
+ ...(previousFollowerId == null ? [] : [previousFollowerId]),
+ ]);
+ await this.query(
+ sql,
+ `INSERT INTO ${this.table("followers")} (follower_id, actor_json)
+ VALUES ($1, $2::jsonb)
+ ON CONFLICT (follower_id)
+ DO UPDATE SET actor_json = EXCLUDED.actor_json`,
+ [followerId.href, serializeJson(followerJson)],
+ );
+ await this.query(
+ sql,
+ `INSERT INTO ${
+ this.table("follow_requests")
+ } (follow_request_id, follower_id)
+ VALUES ($1, $2)
+ ON CONFLICT (follow_request_id)
+ DO UPDATE SET follower_id = EXCLUDED.follower_id`,
+ [followId.href, followerId.href],
+ );
+ if (
+ previousFollowerId != null && previousFollowerId !== followerId.href
+ ) {
+ await this.cleanupFollower(sql, previousFollowerId);
+ }
+ });
+ }
+
+ async removeFollower(
+ followId: URL,
+ followerId: URL,
+ ): Promise {
+ await this.ensureReady();
+ return await this.sql.begin(async (sql) => {
+ await this.lockFollowRequest(sql, followId);
+ const rows = await this.query<{ readonly actor_json: unknown }>(
+ sql,
+ `SELECT f.actor_json
+ FROM ${this.table("follow_requests")} AS fr
+ JOIN ${this.table("followers")} AS f
+ ON f.follower_id = fr.follower_id
+ WHERE fr.follow_request_id = $1
+ AND fr.follower_id = $2
+ FOR UPDATE`,
+ [followId.href, followerId.href],
+ );
+ const row = rows[0];
+ if (row == null) return undefined;
+ await this.query(
+ sql,
+ `DELETE FROM ${this.table("follow_requests")}
+ WHERE follow_request_id = $1`,
+ [followId.href],
+ );
+ await this.cleanupFollower(sql, followerId.href);
+ return await parseActor(row.actor_json);
+ });
+ }
+
+ async hasFollower(followerId: URL): Promise {
+ await this.ensureReady();
+ const rows = await this.query<{ readonly exists: number }>(
+ this.sql,
+ `SELECT 1 AS exists
+ FROM ${this.table("followers")}
+ WHERE follower_id = $1`,
+ [followerId.href],
+ );
+ return rows.length > 0;
+ }
+
+ async *getFollowers(
+ options: RepositoryGetFollowersOptions = {},
+ ): AsyncIterable {
+ await this.ensureReady();
+ const { offset = 0, limit } = options;
+ const parameters: QueryParameter[] = [];
+ let query = `SELECT actor_json
+ FROM ${this.table("followers")}
+ ORDER BY follower_id ASC`;
+ if (limit != null) {
+ parameters.push(limit, offset);
+ query += ` LIMIT $${parameters.length - 1} OFFSET $${parameters.length}`;
+ } else if (offset > 0) {
+ parameters.push(offset);
+ query += ` OFFSET $${parameters.length}`;
+ }
+ const rows = await this.query<{ readonly actor_json: unknown }>(
+ this.sql,
+ query,
+ parameters,
+ );
+ for (const row of rows) {
+ const actor = await parseActor(row.actor_json);
+ if (actor != null) yield actor;
+ }
+ }
+
+ async countFollowers(): Promise {
+ await this.ensureReady();
+ const rows = await this.query<{ readonly count: number }>(
+ this.sql,
+ `SELECT COUNT(*)::integer AS count
+ FROM ${this.table("followers")}`,
+ );
+ return rows[0]?.count ?? 0;
+ }
+
+ async addSentFollow(id: Uuid, follow: Follow): Promise {
+ await this.ensureReady();
+ await this.query(
+ this.sql,
+ `INSERT INTO ${this.table("sent_follows")} (id, follow_json)
+ VALUES ($1, $2::jsonb)
+ ON CONFLICT (id)
+ DO UPDATE SET follow_json = EXCLUDED.follow_json`,
+ [id, serializeJson(await follow.toJsonLd({ format: "compact" }))],
+ );
+ }
+
+ async removeSentFollow(id: Uuid): Promise {
+ await this.ensureReady();
+ const rows = await this.query<{ readonly follow_json: unknown }>(
+ this.sql,
+ `DELETE FROM ${this.table("sent_follows")}
+ WHERE id = $1
+ RETURNING follow_json`,
+ [id],
+ );
+ return await parseFollow(rows[0]?.follow_json);
+ }
+
+ async getSentFollow(id: Uuid): Promise {
+ await this.ensureReady();
+ const rows = await this.query<{ readonly follow_json: unknown }>(
+ this.sql,
+ `SELECT follow_json
+ FROM ${this.table("sent_follows")}
+ WHERE id = $1`,
+ [id],
+ );
+ return await parseFollow(rows[0]?.follow_json);
+ }
+
+ async addFollowee(followeeId: URL, follow: Follow): Promise {
+ await this.ensureReady();
+ await this.query(
+ this.sql,
+ `INSERT INTO ${this.table("followees")} (followee_id, follow_json)
+ VALUES ($1, $2::jsonb)
+ ON CONFLICT (followee_id)
+ DO UPDATE SET follow_json = EXCLUDED.follow_json`,
+ [
+ followeeId.href,
+ serializeJson(await follow.toJsonLd({ format: "compact" })),
+ ],
+ );
+ }
+
+ async removeFollowee(followeeId: URL): Promise {
+ await this.ensureReady();
+ const rows = await this.query<{ readonly follow_json: unknown }>(
+ this.sql,
+ `DELETE FROM ${this.table("followees")}
+ WHERE followee_id = $1
+ RETURNING follow_json`,
+ [followeeId.href],
+ );
+ return await parseFollow(rows[0]?.follow_json);
+ }
+
+ async getFollowee(followeeId: URL): Promise {
+ await this.ensureReady();
+ const rows = await this.query<{ readonly follow_json: unknown }>(
+ this.sql,
+ `SELECT follow_json
+ FROM ${this.table("followees")}
+ WHERE followee_id = $1`,
+ [followeeId.href],
+ );
+ return await parseFollow(rows[0]?.follow_json);
+ }
+
+ async vote(messageId: Uuid, voterId: URL, option: string): Promise {
+ await this.ensureReady();
+ await this.query(
+ this.sql,
+ `INSERT INTO ${this.table("poll_votes")} (message_id, voter_id, option)
+ VALUES ($1, $2, $3)
+ ON CONFLICT (message_id, voter_id, option)
+ DO NOTHING`,
+ [messageId, voterId.href, option],
+ );
+ }
+
+ async countVoters(messageId: Uuid): Promise {
+ await this.ensureReady();
+ const rows = await this.query<{ readonly count: number }>(
+ this.sql,
+ `SELECT COUNT(DISTINCT voter_id)::integer AS count
+ FROM ${this.table("poll_votes")}
+ WHERE message_id = $1`,
+ [messageId],
+ );
+ return rows[0]?.count ?? 0;
+ }
+
+ async countVotes(messageId: Uuid): Promise>> {
+ await this.ensureReady();
+ const rows = await this.query<{
+ readonly option: string;
+ readonly count: number;
+ }>(
+ this.sql,
+ `SELECT option, COUNT(*)::integer AS count
+ FROM ${this.table("poll_votes")}
+ WHERE message_id = $1
+ GROUP BY option
+ ORDER BY option ASC`,
+ [messageId],
+ );
+ const result: Record = {};
+ for (const row of rows) {
+ result[row.option] = row.count;
+ }
+ return result;
+ }
+
+ private table(name: string): string {
+ return `"${this.schema}"."${name}"`;
+ }
+
+ private async lockFollowRequest(
+ sql: Queryable,
+ followId: URL,
+ ): Promise {
+ await this.query(
+ sql,
+ `SELECT pg_catalog.pg_advisory_xact_lock($1, pg_catalog.hashtext($2))`,
+ [
+ followRequestAdvisoryLockNamespace,
+ `${this.schema}:${followId.href}`,
+ ],
+ );
+ }
+
+ private async lockFollower(
+ sql: Queryable,
+ followerId: string,
+ ): Promise {
+ await this.query(
+ sql,
+ `SELECT pg_catalog.pg_advisory_xact_lock($1, pg_catalog.hashtext($2))`,
+ [
+ followerAdvisoryLockNamespace,
+ `${this.schema}:${followerId}`,
+ ],
+ );
+ }
+
+ private async lockFollowers(
+ sql: Queryable,
+ followerIds: readonly string[],
+ ): Promise {
+ const uniqueFollowerIds = [...new Set(followerIds)].sort();
+ for (const followerId of uniqueFollowerIds) {
+ await this.lockFollower(sql, followerId);
+ }
+ }
+
+ private async cleanupFollower(
+ sql: Queryable,
+ followerId: string,
+ ): Promise {
+ await this.lockFollower(sql, followerId);
+ await this.query(
+ sql,
+ `DELETE FROM ${this.table("followers")}
+ WHERE follower_id = $1
+ AND NOT EXISTS (
+ SELECT 1
+ FROM ${this.table("follow_requests")}
+ WHERE follower_id = $1
+ )`,
+ [followerId],
+ );
+ }
+
+ private async ensureReady(): Promise {
+ await this.ready;
+ }
+
+ private async query(
+ sql: Queryable,
+ query: string,
+ parameters: readonly QueryParameter[] = [],
+ ): Promise {
+ return await execute(sql, query, parameters, this.prepare);
+ }
+}
+
+function validateSchemaName(schema: string): string {
+ if (!schemaNamePattern.test(schema)) {
+ throw new TypeError("The PostgreSQL schema name is invalid.");
+ }
+ return schema;
+}
+
+async function execute(
+ sql: Queryable,
+ query: string,
+ parameters: readonly QueryParameter[] = [],
+ prepare = true,
+): Promise {
+ return await sql.unsafe(
+ query,
+ [...parameters],
+ { prepare },
+ );
+}
+
+function serializeJson(value: unknown): string {
+ return JSON.stringify(value);
+}
+
+function isJsonObject(value: unknown): value is Record {
+ return typeof value === "object" && value != null;
+}
+
+async function parseActivity(
+ json: unknown,
+): Promise {
+ const normalized = normalizeJsonObject(json);
+ if (normalized == null) return undefined;
+ try {
+ const activity = await Activity.fromJsonLd(normalized);
+ if (activity instanceof Create || activity instanceof Announce) {
+ return activity;
+ }
+ } catch (error) {
+ logger.warn("Failed to parse message activity.", { error });
+ }
+ return undefined;
+}
+
+async function parseActor(json: unknown): Promise {
+ const normalized = normalizeJsonObject(json);
+ if (normalized == null) return undefined;
+ try {
+ const actor = await Object.fromJsonLd(normalized);
+ if (isActor(actor)) return actor;
+ } catch (error) {
+ logger.warn("Failed to parse follower actor.", { error });
+ }
+ return undefined;
+}
+
+async function parseFollow(json: unknown): Promise {
+ const normalized = normalizeJsonObject(json);
+ if (normalized == null) return undefined;
+ try {
+ return await Follow.fromJsonLd(normalized);
+ } catch (error) {
+ logger.warn("Failed to parse follow activity.", { error });
+ }
+ return undefined;
+}
+
+function normalizeJsonObject(
+ value: unknown,
+): Record | undefined {
+ if (isJsonObject(value)) return value;
+ if (typeof value !== "string") return undefined;
+ try {
+ const parsed: unknown = JSON.parse(value);
+ if (isJsonObject(parsed)) return parsed;
+ } catch {
+ return undefined;
+ }
+ return undefined;
+}
diff --git a/packages/botkit-postgres/tsdown.config.ts b/packages/botkit-postgres/tsdown.config.ts
new file mode 100644
index 0000000..6a44d92
--- /dev/null
+++ b/packages/botkit-postgres/tsdown.config.ts
@@ -0,0 +1,10 @@
+import { defineConfig } from "tsdown";
+
+export default defineConfig({
+ entry: "src/mod.ts",
+ dts: {
+ sourcemap: true,
+ },
+ format: "esm",
+ platform: "node",
+});
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index 2517bd7..6056781 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -38,6 +38,9 @@ importers:
'@fedify/botkit':
specifier: 'workspace:'
version: link:../packages/botkit
+ '@fedify/botkit-postgres':
+ specifier: 'workspace:'
+ version: link:../packages/botkit-postgres
'@fedify/botkit-sqlite':
specifier: 'workspace:'
version: link:../packages/botkit-sqlite
@@ -157,6 +160,31 @@ importers:
specifier: 'catalog:'
version: 0.12.8(typescript@5.8.3)
+ packages/botkit-postgres:
+ dependencies:
+ '@fedify/botkit':
+ specifier: ^0.4.0
+ version: link:../botkit
+ '@fedify/fedify':
+ specifier: ^2.1.2
+ version: 2.1.2
+ '@fedify/vocab':
+ specifier: ^2.1.2
+ version: 2.1.2
+ '@js-temporal/polyfill':
+ specifier: ^0.5.1
+ version: 0.5.1
+ '@logtape/logtape':
+ specifier: ^1.3.5
+ version: 1.3.5
+ postgres:
+ specifier: ^3.4.8
+ version: 3.4.8
+ devDependencies:
+ tsdown:
+ specifier: ^0.12.8
+ version: 0.12.8(typescript@5.8.3)
+
packages/botkit-sqlite:
dependencies:
'@fedify/botkit':
@@ -1689,6 +1717,10 @@ packages:
resolution: {integrity: sha512-Jtc2612XINuBjIl/QTWsV5UvE8UHuNblcO3vVADSrKsrc6RqGX6lOW1cEo3CM2v0XG4Nat8nI+YM7/f26VxXLw==}
engines: {node: '>=12'}
+ postgres@3.4.8:
+ resolution: {integrity: sha512-d+JFcLM17njZaOLkv6SCev7uoLaBtfK86vMUXhW1Z4glPWh4jozno9APvW/XKFJ3CCxVoC7OL38BqRydtu5nGg==}
+ engines: {node: '>=12'}
+
property-information@7.1.0:
resolution: {integrity: sha512-TwEZ+X+yCJmYfL7TPUOcvBZ4QfoT5YenQiJuX//0th53DE6w0xxLEtfK3iyryQFddXuvkIk51EEgrJQ0WJkOmQ==}
@@ -3714,6 +3746,8 @@ snapshots:
postgres@3.4.7: {}
+ postgres@3.4.8: {}
+
property-information@7.1.0: {}
punycode.js@2.3.1: {}
diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml
index dee5c84..4cf2776 100644
--- a/pnpm-workspace.yaml
+++ b/pnpm-workspace.yaml
@@ -2,6 +2,8 @@ packages:
- packages/*
- docs
+linkWorkspacePackages: true
+
catalog:
"@fedify/denokv": 2.1.2
"@fedify/fedify": 2.1.2