diff --git a/.github/workflows/README-build-and-publish.md b/.github/workflows/README-build-and-publish.md
new file mode 100644
index 00000000000..4440763b116
--- /dev/null
+++ b/.github/workflows/README-build-and-publish.md
@@ -0,0 +1,233 @@
+# Build and Publish Workflow
+
+## Overview
+
+This GitHub Actions workflow (`build-and-publish.yml`) builds the Netty library across multiple platforms and publishes the artifacts to GitHub Packages.
+
+## Workflow Architecture
+
+The workflow consists of 4 stages that run in sequence:
+
+```
+┌─────────────────────────────────────────────────────────────┐
+│ Stage 1: Linux x86_64 Full Build │
+│ - Builds all Netty modules │
+│ - Uses Docker with CentOS 6 for compatibility │
+│ - Produces complete JAR artifacts │
+└─────────────────────────────────────────────────────────────┘
+ │
+ ▼
+┌─────────────────────────────────────────────────────────────┐
+│ Stage 2: macOS Intel x86_64 Native Libraries │
+│ - Builds native modules only: │
+│ • resolver-dns-native-macos │
+│ • transport-native-unix-common │
+│ • transport-native-kqueue │
+│ - Runs on GitHub-hosted Intel Mac │
+└─────────────────────────────────────────────────────────────┘
+ │
+ ▼
+┌─────────────────────────────────────────────────────────────┐
+│ Stage 3: macOS ARM aarch64 Native Libraries │
+│ - Builds same native modules as Stage 2 │
+│ - Runs on GitHub-hosted Apple Silicon Mac │
+└─────────────────────────────────────────────────────────────┘
+ │
+ ▼
+┌─────────────────────────────────────────────────────────────┐
+│ Stage 4: Merge and Publish │
+│ - Downloads all artifacts from previous stages │
+│ - Merges staging repositories │
+│ - Generates netty-all module │
+│ - Publishes to GitHub Packages │
+└─────────────────────────────────────────────────────────────┘
+```
+
+## Triggers
+
+The workflow can be triggered in two ways:
+
+1. **Manual Trigger**: Via the GitHub Actions UI (workflow_dispatch)
+2. **Tag Push**: Automatically when pushing version tags:
+ - Tags starting with `v*` (e.g., `v4.1.100`)
+ - Tags starting with `netty-*` (e.g., `netty-4.1.100.Final`)
+
+## Prerequisites
+
+### Repository Configuration
+
+1. **GitHub Packages**: Ensure GitHub Packages is enabled for your repository
+2. **Permissions**: The workflow requires the following permissions:
+ - `contents: read` - To checkout the repository
+ - `packages: write` - To publish to GitHub Packages
+
+### Required Files
+
+The workflow depends on these existing files:
+- `docker/Dockerfile.centos6` - Docker image for Linux builds
+- `.github/scripts/local_staging_install_release.sh` - Script to merge staging artifacts
+- `.github/actions/thread-dump-jvms/action.yml` - Action for debugging cancelled jobs
+- `Brewfile` - macOS dependencies (optional, continues on error)
+
+### Secrets
+
+The workflow uses the built-in `GITHUB_TOKEN` secret, which is automatically provided by GitHub Actions. No additional secrets need to be configured.
+
+## Usage
+
+### Manual Trigger
+
+1. Go to the **Actions** tab in your GitHub repository
+2. Select **Build and Publish to GitHub Packages** workflow
+3. Click **Run workflow**
+4. Select the branch to run from
+5. Click **Run workflow** button
+
+### Tag-based Trigger
+
+```bash
+# Create and push a version tag
+git tag v4.1.100.Final
+git push origin v4.1.100.Final
+```
+
+The workflow will automatically start building and publishing.
+
+## Artifacts
+
+### Intermediate Artifacts
+
+Each build stage uploads its artifacts to GitHub Actions:
+- `linux-x86_64-local-staging` - Linux build artifacts
+- `macos-x86_64-local-staging` - Intel Mac native libraries
+- `macos-aarch64-local-staging` - ARM Mac native libraries
+- `merged-local-staging` - Final merged artifacts (for debugging)
+
+These artifacts are retained for 90 days (GitHub default) and can be downloaded from the workflow run page.
+
+### Published Artifacts
+
+Final artifacts are published to GitHub Packages Maven registry at:
+```
+https://maven.pkg.github.com/OWNER/REPOSITORY
+```
+
+## Consuming Published Artifacts
+
+To use the published artifacts in your Maven project:
+
+### 1. Configure Maven Settings
+
+Add to your `~/.m2/settings.xml`:
+
+```xml
+
+
+
+ github
+ YOUR_GITHUB_USERNAME
+ YOUR_GITHUB_TOKEN
+
+
+
+```
+
+### 2. Add Repository to pom.xml
+
+```xml
+
+
+ github
+ https://maven.pkg.github.com/OWNER/REPOSITORY
+
+
+```
+
+### 3. Add Netty Dependencies
+
+```xml
+
+ io.netty
+ netty-all
+ YOUR_VERSION
+
+```
+
+## Build Times
+
+Approximate build times (may vary):
+- **Stage 1 (Linux)**: 15-25 minutes
+- **Stage 2 (macOS Intel)**: 10-15 minutes
+- **Stage 3 (macOS ARM)**: 10-15 minutes
+- **Stage 4 (Merge & Publish)**: 5-10 minutes
+- **Total**: ~40-65 minutes
+
+## Troubleshooting
+
+### Build Failures
+
+1. **Check the logs**: Click on the failed job to see detailed logs
+2. **Download artifacts**: Failed builds may still produce partial artifacts for debugging
+3. **Thread dumps**: If a job is cancelled, thread dumps are automatically captured
+
+### Common Issues
+
+**Docker build fails on Linux**:
+- Check if `docker/Dockerfile.centos6` exists and is valid
+- Verify Docker daemon is accessible
+
+**macOS native build fails**:
+- Check if Brewfile dependencies are correct
+- Verify JDK 8 is properly installed
+- Check native compilation toolchain (Xcode Command Line Tools)
+
+**Publishing fails**:
+- Verify `GITHUB_TOKEN` has `packages:write` permission
+- Check if GitHub Packages is enabled for the repository
+- Ensure the repository URL in the workflow matches your repository
+
+### Re-running Failed Jobs
+
+You can re-run individual failed jobs without re-running the entire workflow:
+1. Go to the workflow run page
+2. Click on the failed job
+3. Click **Re-run jobs** → **Re-run failed jobs**
+
+## Caching
+
+The workflow uses Maven repository caching to speed up builds:
+- Linux builds cache: `~/.m2/repository`
+- macOS Intel builds cache: `~/.m2/repository` (separate cache key)
+- macOS ARM builds cache: `~/.m2/repository` (separate cache key)
+
+Caches are automatically invalidated when `pom.xml` files change.
+
+## Maintenance
+
+### Updating Dependencies
+
+- **Java Version**: Modify the `java-version` in the `setup-java` steps
+- **Docker Image**: Update `docker/Dockerfile.centos6`
+- **macOS Dependencies**: Update `Brewfile`
+
+### Adding New Platforms
+
+To add support for additional platforms:
+1. Add a new job in the workflow (e.g., `build-windows-x64`)
+2. Configure the appropriate runner (e.g., `runs-on: windows-latest`)
+3. Add the job to the `needs` array in `publish-to-github-packages`
+4. Update the artifact download and merge steps
+
+## Security Considerations
+
+- The workflow uses minimal permissions (read contents, write packages)
+- Secrets are not exposed in logs
+- Docker containers run with volume mounts but no privileged access
+- All dependencies are cached and verified via checksums
+
+## Support
+
+For issues with this workflow:
+1. Check the [GitHub Actions documentation](https://docs.github.com/en/actions)
+2. Review the [Netty build documentation](BUILD-DATASTAX.md)
+3. Open an issue in the repository with workflow run logs
\ No newline at end of file
diff --git a/.github/workflows/autoport-41.yml b/.github/workflows/autoport-41.yml
new file mode 100644
index 00000000000..682ac53c370
--- /dev/null
+++ b/.github/workflows/autoport-41.yml
@@ -0,0 +1,130 @@
+name: Auto-port to 4.1
+on:
+ pull_request_target:
+ types:
+ - closed
+ - labeled
+ branches:
+ - '4.2'
+ - '5.0'
+
+jobs:
+ autoport:
+ name: "Auto-porting to 4.1"
+ concurrency:
+ group: port-41-${{ github.event.pull_request.number }}
+ cancel-in-progress: true
+ if: github.event.pull_request.merged && contains(github.event.pull_request.labels.*.name, 'needs-cherry-pick-4.1')
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v6
+ with:
+ ssh-key: ${{ secrets.SSH_PRIVATE_KEY_PEM }}
+ ssh-known-hosts: ${{ secrets.SSH_KNOWN_HOSTS }}
+ fetch-depth: '0' # Cherry-pick needs full history
+
+ - name: Setup git configuration
+ run: |
+ git config --global user.email "netty-project-bot@users.noreply.github.com"
+ git config --global user.name "Netty Project Bot"
+
+ - name: Create auto-port PR branch and cherry-pick
+ id: cherry-pick
+ run: |
+ MERGE_COMMIT="${{ github.event.pull_request.merge_commit_sha }}"
+ echo "Auto-porting commit: $MERGE_COMMIT"
+
+ PORT_BRANCH="auto-port-pr-${{ github.event.pull_request.number }}-to-4.1"
+ if [[ $(git branch --show-current) != '4.1' ]]; then
+ git fetch origin 4.1:4.1
+ fi
+ git checkout -b "$PORT_BRANCH" 4.1
+
+ if git cherry-pick -x "$MERGE_COMMIT"; then
+ echo "Cherry-pick successful"
+ else
+ echo "Cherry-pick failed - conflicts detected"
+ git cherry-pick --abort
+ exit 1
+ fi
+ echo "branch=$PORT_BRANCH" >> "$GITHUB_OUTPUT"
+
+ - name: Push auto-port branch
+ id: push
+ if: steps.cherry-pick.outcome == 'success'
+ run: |
+ if ! git push origin "${{ steps.cherry-pick.outputs.branch }}"; then
+ echo "Auto-port branch push failed"
+ exit 1
+ fi
+
+ - name: Create pull request
+ id: create-pr
+ if: steps.cherry-pick.outcome == 'success'
+ uses: actions/github-script@v8
+ with:
+ github-token: '${{ secrets.PAT_TOKEN_READ_WRITE_PR }}'
+ script: |
+ const { data: pr } = await github.rest.pulls.create({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ title: `Auto-port 4.1: ${context.payload.pull_request.title}`,
+ head: '${{ steps.cherry-pick.outputs.branch }}',
+ base: '4.1',
+ body: `Auto-port of #${context.payload.pull_request.number} to 4.1\n` +
+ `Cherry-picked commit: ${context.payload.pull_request.merge_commit_sha}\n\n---\n` +
+ `${context.payload.pull_request.body || ''}`
+ });
+ console.log(`Created auto-port PR: ${pr.html_url}`);
+ await github.rest.issues.createComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ body: `Auto-port PR for 4.1: #${pr.number}`
+ });
+
+ # Important: This script MUST run with the default GITHUB_TOKEN to avoid triggering other actions.
+ - name: Remove triggering label
+ if: steps.create-pr.outcome == 'success'
+ uses: actions/github-script@v8
+ with:
+ script: |
+ await github.rest.issues.removeLabel({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ name: 'needs-cherry-pick-4.1'
+ });
+
+ - name: Report cherry-pick conflicts
+ if: failure() && steps.cherry-pick.outcome == 'failure'
+ uses: actions/github-script@v8
+ with:
+ github-token: '${{ secrets.PAT_TOKEN_READ_WRITE_PR }}'
+ script: |
+ await github.rest.issues.createComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ body: `Could not create auto-port PR.\nGot conflicts when cherry-picking onto 4.1.`
+ });
+
+ - name: Report auto-port branch push failure
+ if: failure() && steps.push.outcome == 'failure'
+ uses: actions/github-script@v8
+ with:
+ github-token: '${{ secrets.PAT_TOKEN_READ_WRITE_PR }}'
+ script: |
+ await github.rest.issues.createComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ body: `Could not create auto-port PR.\n`+
+ `I could cherry-pick onto 4.1 just fine, but pushing the new branch failed.`
+ });
+
+ - name: Remove branch on PR create failure
+ if: failure() && steps.cherry-pick.outputs.branch
+ run: |
+ git push -d origin "${{ steps.cherry-pick.outputs.branch }}"
diff --git a/.github/workflows/autoport-42.yml b/.github/workflows/autoport-42.yml
new file mode 100644
index 00000000000..15b27eafe67
--- /dev/null
+++ b/.github/workflows/autoport-42.yml
@@ -0,0 +1,130 @@
+name: Auto-port to 4.2
+on:
+ pull_request_target:
+ types:
+ - closed
+ - labeled
+ branches:
+ - '4.1'
+ - '5.0'
+
+jobs:
+ autoport:
+ name: "Auto-porting to 4.2"
+ concurrency:
+ group: port-42-${{ github.event.pull_request.number }}
+ cancel-in-progress: true
+ if: github.event.pull_request.merged && contains(github.event.pull_request.labels.*.name, 'needs-cherry-pick-4.2')
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v6
+ with:
+ ssh-key: ${{ secrets.SSH_PRIVATE_KEY_PEM }}
+ ssh-known-hosts: ${{ secrets.SSH_KNOWN_HOSTS }}
+ fetch-depth: '0' # Cherry-pick needs full history
+
+ - name: Setup git configuration
+ run: |
+ git config --global user.email "netty-project-bot@users.noreply.github.com"
+ git config --global user.name "Netty Project Bot"
+
+ - name: Create auto-port PR branch and cherry-pick
+ id: cherry-pick
+ run: |
+ MERGE_COMMIT="${{ github.event.pull_request.merge_commit_sha }}"
+ echo "Auto-porting commit: $MERGE_COMMIT"
+
+ PORT_BRANCH="auto-port-pr-${{ github.event.pull_request.number }}-to-4.2"
+ if [[ $(git branch --show-current) != '4.2' ]]; then
+ git fetch origin 4.2:4.2
+ fi
+ git checkout -b "$PORT_BRANCH" 4.2
+
+ if git cherry-pick -x "$MERGE_COMMIT"; then
+ echo "Cherry-pick successful"
+ else
+ echo "Cherry-pick failed - conflicts detected"
+ git cherry-pick --abort
+ exit 1
+ fi
+ echo "branch=$PORT_BRANCH" >> "$GITHUB_OUTPUT"
+
+ - name: Push auto-port branch
+ id: push
+ if: steps.cherry-pick.outcome == 'success'
+ run: |
+ if ! git push origin "${{ steps.cherry-pick.outputs.branch }}"; then
+ echo "Auto-port branch push failed"
+ exit 1
+ fi
+
+ - name: Create pull request
+ id: create-pr
+ if: steps.cherry-pick.outcome == 'success'
+ uses: actions/github-script@v8
+ with:
+ github-token: '${{ secrets.PAT_TOKEN_READ_WRITE_PR }}'
+ script: |
+ const { data: pr } = await github.rest.pulls.create({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ title: `Auto-port 4.2: ${context.payload.pull_request.title}`,
+ head: '${{ steps.cherry-pick.outputs.branch }}',
+ base: '4.2',
+ body: `Auto-port of #${context.payload.pull_request.number} to 4.2\n` +
+ `Cherry-picked commit: ${context.payload.pull_request.merge_commit_sha}\n\n---\n` +
+ `${context.payload.pull_request.body || ''}`
+ });
+ console.log(`Created auto-port PR: ${pr.html_url}`);
+ await github.rest.issues.createComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ body: `Auto-port PR for 4.2: #${pr.number}`
+ });
+
+ # Important: This script MUST run with the default GITHUB_TOKEN to avoid triggering other actions.
+ - name: Remove triggering label
+ if: steps.create-pr.outcome == 'success'
+ uses: actions/github-script@v8
+ with:
+ script: |
+ await github.rest.issues.removeLabel({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ name: 'needs-cherry-pick-4.2'
+ });
+
+ - name: Report cherry-pick conflicts
+ if: failure() && steps.cherry-pick.outcome == 'failure'
+ uses: actions/github-script@v8
+ with:
+ github-token: '${{ secrets.PAT_TOKEN_READ_WRITE_PR }}'
+ script: |
+ await github.rest.issues.createComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ body: `Could not create auto-port PR.\nGot conflicts when cherry-picking onto 4.2.`
+ });
+
+ - name: Report auto-port branch push failure
+ if: failure() && steps.push.outcome == 'failure'
+ uses: actions/github-script@v8
+ with:
+ github-token: '${{ secrets.PAT_TOKEN_READ_WRITE_PR }}'
+ script: |
+ await github.rest.issues.createComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ body: `Could not create auto-port PR.\n`+
+ `I could cherry-pick onto 4.2 just fine, but pushing the new branch failed.`
+ });
+
+ - name: Remove branch on PR create failure
+ if: failure() && steps.cherry-pick.outputs.branch
+ run: |
+ git push -d origin "${{ steps.cherry-pick.outputs.branch }}"
diff --git a/.github/workflows/autoport-50.yml b/.github/workflows/autoport-50.yml
new file mode 100644
index 00000000000..2899d56e209
--- /dev/null
+++ b/.github/workflows/autoport-50.yml
@@ -0,0 +1,130 @@
+name: Auto-port to 5.0
+on:
+ pull_request_target:
+ types:
+ - closed
+ - labeled
+ branches:
+ - '4.1'
+ - '4.2'
+
+jobs:
+ autoport:
+ name: "Auto-porting to 5.0"
+ concurrency:
+ group: port-50-${{ github.event.pull_request.number }}
+ cancel-in-progress: true
+ if: github.event.pull_request.merged && contains(github.event.pull_request.labels.*.name, 'needs-cherry-pick-5.0')
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v6
+ with:
+ ssh-key: ${{ secrets.SSH_PRIVATE_KEY_PEM }}
+ ssh-known-hosts: ${{ secrets.SSH_KNOWN_HOSTS }}
+ fetch-depth: '0' # Cherry-pick needs full history
+
+ - name: Setup git configuration
+ run: |
+ git config --global user.email "netty-project-bot@users.noreply.github.com"
+ git config --global user.name "Netty Project Bot"
+
+ - name: Create auto-port PR branch and cherry-pick
+ id: cherry-pick
+ run: |
+ MERGE_COMMIT="${{ github.event.pull_request.merge_commit_sha }}"
+ echo "Auto-porting commit: $MERGE_COMMIT"
+
+ PORT_BRANCH="auto-port-pr-${{ github.event.pull_request.number }}-to-5.0"
+ if [[ $(git branch --show-current) != '5.0' ]]; then
+ git fetch origin 5.0:5.0
+ fi
+ git checkout -b "$PORT_BRANCH" 5.0
+
+ if git cherry-pick -x "$MERGE_COMMIT"; then
+ echo "Cherry-pick successful"
+ else
+ echo "Cherry-pick failed - conflicts detected"
+ git cherry-pick --abort
+ exit 1
+ fi
+ echo "branch=$PORT_BRANCH" >> "$GITHUB_OUTPUT"
+
+ - name: Push auto-port branch
+ id: push
+ if: steps.cherry-pick.outcome == 'success'
+ run: |
+ if ! git push origin "${{ steps.cherry-pick.outputs.branch }}"; then
+ echo "Auto-port branch push failed"
+ exit 1
+ fi
+
+ - name: Create pull request
+ id: create-pr
+ if: steps.cherry-pick.outcome == 'success'
+ uses: actions/github-script@v8
+ with:
+ github-token: '${{ secrets.PAT_TOKEN_READ_WRITE_PR }}'
+ script: |
+ const { data: pr } = await github.rest.pulls.create({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ title: `Auto-port 5.0: ${context.payload.pull_request.title}`,
+ head: '${{ steps.cherry-pick.outputs.branch }}',
+ base: '5.0',
+ body: `Auto-port of #${context.payload.pull_request.number} to 5.0\n` +
+ `Cherry-picked commit: ${context.payload.pull_request.merge_commit_sha}\n\n---\n` +
+ `${context.payload.pull_request.body || ''}`
+ });
+ console.log(`Created auto-port PR: ${pr.html_url}`);
+ await github.rest.issues.createComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ body: `Auto-port PR for 5.0: #${pr.number}`
+ });
+
+ # Important: This script MUST run with the default GITHUB_TOKEN to avoid triggering other actions.
+ - name: Remove triggering label
+ if: steps.create-pr.outcome == 'success'
+ uses: actions/github-script@v8
+ with:
+ script: |
+ await github.rest.issues.removeLabel({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ name: 'needs-cherry-pick-5.0'
+ });
+
+ - name: Report cherry-pick conflicts
+ if: failure() && steps.cherry-pick.outcome == 'failure'
+ uses: actions/github-script@v8
+ with:
+ github-token: '${{ secrets.PAT_TOKEN_READ_WRITE_PR }}'
+ script: |
+ await github.rest.issues.createComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ body: `Could not create auto-port PR.\nGot conflicts when cherry-picking onto 5.0.`
+ });
+
+ - name: Report auto-port branch push failure
+ if: failure() && steps.push.outcome == 'failure'
+ uses: actions/github-script@v8
+ with:
+ github-token: '${{ secrets.PAT_TOKEN_READ_WRITE_PR }}'
+ script: |
+ await github.rest.issues.createComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.payload.pull_request.number,
+ body: `Could not create auto-port PR.\n`+
+ `I could cherry-pick onto 5.0 just fine, but pushing the new branch failed.`
+ });
+
+ - name: Remove branch on PR create failure
+ if: failure() && steps.cherry-pick.outputs.branch
+ run: |
+ git push -d origin "${{ steps.cherry-pick.outputs.branch }}"
diff --git a/.github/workflows/build-and-publish.yml b/.github/workflows/build-and-publish.yml
new file mode 100644
index 00000000000..a93a356d716
--- /dev/null
+++ b/.github/workflows/build-and-publish.yml
@@ -0,0 +1,306 @@
+name: Build and Publish DataStax Netty to GitHub Packages
+
+on:
+ # Allows manual trigger from the Actions tab
+ workflow_dispatch:
+
+ # Trigger on version tags
+ push:
+ branches:
+ - dse-netty-4.1.132
+ tags:
+ - '*.dse'
+ - 'dse-netty-*'
+
+permissions:
+ contents: read
+ packages: write
+
+env:
+ MAVEN_OPTS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.http.retryhandler.count=5 -Dmaven.wagon.httpconnectionManager.ttlSeconds=240
+
+# Cancel running jobs when a new push happens to the same branch/tag
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}
+ cancel-in-progress: true
+
+jobs:
+ # Stage 1: Build full Netty library on Linux x64
+ build-linux-x64:
+ runs-on: ubuntu-latest
+ name: Build Linux x86_64 (Full)
+
+ steps:
+ - uses: actions/checkout@v4
+
+ # Cache .m2/repository
+ - name: Cache local Maven repository
+ uses: actions/cache@v4
+ continue-on-error: true
+ with:
+ path: ~/.m2/repository
+ key: cache-maven-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ cache-maven-${{ hashFiles('**/pom.xml') }}
+ cache-maven-
+
+ - name: Configure Maven settings for Docker
+ run: |
+ mkdir -p ~/.m2
+ cat > ~/.m2/settings.xml << 'EOF'
+
+
+
+ github
+ ${env.GITHUB_ACTOR}
+ ${env.GITHUB_TOKEN}
+
+
+
+ EOF
+ env:
+ GITHUB_ACTOR: ${{ github.actor }}
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+
+ - name: Create local staging directory
+ run: mkdir -p ~/local-staging
+
+ - name: Build docker image
+ run: docker build -f docker/Dockerfile-netty-centos6 -t netty-centos6 .
+
+ - name: Build and stage artifacts
+ run: |
+ docker run -t \
+ -v ~/.m2:/root/.m2:Z \
+ -v ~/local-staging:/root/local-staging:Z \
+ -v $(pwd):/code:Z \
+ -w /code \
+ --entrypoint="" \
+ netty-centos6 \
+ bash -ic "./mvnw -B clean install -DskipTests=true ; ./mvnw -B deploy -DaltDeploymentRepository=local-staging::default::file:///root/local-staging -DskipTests=true"
+
+ - name: Upload local staging directory
+ uses: actions/upload-artifact@v4
+ with:
+ name: linux-x86_64-local-staging
+ path: ~/local-staging
+ if-no-files-found: error
+ include-hidden-files: true
+
+ # Stage 2: Build macOS Intel x86_64 native libraries
+ build-macos-intel:
+ runs-on: macos-15-intel
+ name: Build macOS x86_64 (Native Libraries)
+ needs: [build-linux-x64]
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up JDK 8
+ uses: actions/setup-java@v4
+ with:
+ distribution: 'zulu'
+ java-version: '8'
+
+ # Cache .m2/repository
+ - name: Cache local Maven repository
+ uses: actions/cache@v4
+ continue-on-error: true
+ with:
+ path: ~/.m2/repository
+ key: cache-maven-macos-intel-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ cache-maven-macos-intel-${{ hashFiles('**/pom.xml') }}
+ cache-maven-
+
+ - name: Install tools via brew
+ run: brew bundle
+ continue-on-error: true
+
+ - name: Create local staging directory
+ run: mkdir -p ~/local-staging
+
+ - name: Build and stage native libraries
+ run: |
+ echo "$(pwd)"
+ ./mvnw -B -U \
+ -pl resolver-dns-native-macos,transport-native-unix-common,transport-native-kqueue \
+ deploy \
+ -DskipTests \
+ -DaltDeploymentRepository=local-staging::default::file:///$(pwd)/local-staging
+ find $(pwd)/local-staging
+ find ~/local-staging
+
+ - name: Upload local staging directory
+ uses: actions/upload-artifact@v4
+ with:
+ name: macos-x86_64-local-staging
+ path: ${{ github.workspace }}/local-staging
+ if-no-files-found: error
+ include-hidden-files: true
+
+ # Stage 3: Build macOS ARM aarch64 native libraries
+ build-macos-arm:
+ runs-on: macos-15
+ name: Build macOS aarch64 (Native Libraries)
+ needs: [build-linux-x64]
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up JDK 8
+ uses: actions/setup-java@v4
+ with:
+ distribution: 'zulu'
+ java-version: '8'
+
+ # Cache .m2/repository
+ - name: Cache local Maven repository
+ uses: actions/cache@v4
+ continue-on-error: true
+ with:
+ path: ~/.m2/repository
+ key: cache-maven-macos-arm-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ cache-maven-macos-arm-${{ hashFiles('**/pom.xml') }}
+ cache-maven-
+
+ - name: Install tools via brew
+ run: brew bundle
+ continue-on-error: true
+
+ - name: Create local staging directory
+ run: mkdir -p ~/local-staging
+
+ - name: Build and stage native libraries
+ run: |
+ echo "$(pwd)"
+ ./mvnw -B -Pmac-m1-cross-compile deploy \
+ -pl resolver-dns-native-macos,transport-native-unix-common,transport-native-kqueue \
+ -DskipTests \
+ -DaltDeploymentRepository=local-staging::default::file:///$(pwd)/local-staging
+ find $(pwd)/local-staging
+ find ~/local-staging
+
+ - name: Upload local staging directory
+ uses: actions/upload-artifact@v4
+ with:
+ name: macos-aarch64-local-staging
+ path: ${{ github.workspace }}/local-staging
+ if-no-files-found: error
+ include-hidden-files: true
+
+ # Stage 4: Merge artifacts and publish to GitHub Packages
+ publish-to-github-packages:
+ runs-on: ubuntu-latest
+ name: Merge and Publish to GitHub Packages
+ needs: [build-linux-x64, build-macos-intel, build-macos-arm]
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up JDK 8
+ uses: actions/setup-java@v4
+ with:
+ distribution: 'zulu'
+ java-version: '8'
+
+ # Cache .m2/repository
+ - name: Cache local Maven repository
+ uses: actions/cache@v4
+ continue-on-error: true
+ with:
+ path: ~/.m2/repository
+ key: cache-maven-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ cache-maven-${{ hashFiles('**/pom.xml') }}
+ cache-maven-
+
+ # Configure Maven settings for GitHub Packages
+ - name: Configure Maven settings
+ uses: s4u/maven-settings-action@v3.0.0
+ with:
+ servers: |
+ [{
+ "id": "github",
+ "username": "${{ github.actor }}",
+ "password": "${{ secrets.GITHUB_TOKEN }}"
+ },
+ {
+ "id": "central-portal-snapshots",
+ "username": "${{ github.actor }}",
+ "password": "${{ secrets.GITHUB_TOKEN }}"
+ }]
+
+ # Setup environment variables
+ - name: Prepare environment variables
+ run: |
+ echo "LOCAL_STAGING_DIR=$HOME/local-staging" >> $GITHUB_ENV
+
+ # Download all staging artifacts
+ - name: Download Linux x86_64 staging directory
+ uses: actions/download-artifact@v4
+ with:
+ name: linux-x86_64-local-staging
+ path: ~/linux-x86_64-local-staging
+
+ - name: Download macOS x86_64 staging directory
+ uses: actions/download-artifact@v4
+ with:
+ name: macos-x86_64-local-staging
+ path: ~/macos-x86_64-local-staging
+
+ - name: Download macOS aarch64 staging directory
+ uses: actions/download-artifact@v4
+ with:
+ name: macos-aarch64-local-staging
+ path: ~/macos-aarch64-local-staging
+
+ # Install artifacts to local Maven repository
+ - name: Copy build artifacts to local maven repository
+ run: |
+ bash ./.github/scripts/local_staging_install_release.sh \
+ ~/.m2/repository \
+ ~/linux-x86_64-local-staging \
+ ~/macos-x86_64-local-staging \
+ ~/macos-aarch64-local-staging
+
+ # Generate netty-all and install to local Maven repository
+ - name: Generate netty-all
+ run: |
+ ./mvnw -B --file pom.xml -pl all \
+ clean install \
+ -DskipTests=true
+
+ # Merge all staging repositories
+ - name: Merge staging repositories
+ run: |
+ bash ./.github/scripts/local_staging_install_release.sh \
+ ~/local-staging \
+ ~/linux-x86_64-local-staging \
+ ~/macos-x86_64-local-staging \
+ ~/macos-aarch64-local-staging
+
+ # Copy netty-all from local Maven repository
+ if [ -d "$HOME/.m2/repository/io/netty/netty-all" ]; then
+ cp -r $HOME/.m2/repository/io/netty/netty-all $HOME/local-staging/io/netty/
+ fi
+
+ # Deploy to GitHub Packages
+ - name: Deploy to GitHub Packages
+ run: |
+ ./mvnw -B --file pom.xml \
+ org.sonatype.plugins:nexus-staging-maven-plugin:deploy-staged \
+ -DaltStagingDirectory=$HOME/local-staging \
+ -DserverId=github \
+ -DnexusUrl=https://maven.pkg.github.com/${{ github.repository }} \
+ -DrepositoryId=github
+
+ - name: Upload merged staging directory (for debugging)
+ uses: actions/upload-artifact@v4
+ if: always()
+ with:
+ name: merged-local-staging
+ path: ~/local-staging
+ if-no-files-found: warn
+ include-hidden-files: true
diff --git a/.github/workflows/ci-deploy.yml b/.github/workflows/ci-deploy.yml
index 3f41bd26508..7a8e2abf467 100644
--- a/.github/workflows/ci-deploy.yml
+++ b/.github/workflows/ci-deploy.yml
@@ -90,7 +90,7 @@ jobs:
matrix:
include:
- setup: macos-x86_64-java8
- os: macos-13
+ os: macos-15-intel
- setup: macos-aarch64-java8
os: macos-15
diff --git a/.github/workflows/ci-pr.yml b/.github/workflows/ci-pr.yml
index 506ac787e17..f67ed7ed0fd 100644
--- a/.github/workflows/ci-pr.yml
+++ b/.github/workflows/ci-pr.yml
@@ -201,16 +201,35 @@ jobs:
- setup: linux-x86_64-java11-adaptive
docker-compose-build: "-f docker/docker-compose.yaml -f docker/docker-compose.centos-6.111.yaml build"
docker-compose-run: "-f docker/docker-compose.yaml -f docker/docker-compose.centos-6.111.yaml run build-leak-adaptive"
+ - setup: linux-x86_64-java11-awslc
+ docker-compose-build: "-f docker/docker-compose.yaml -f docker/docker-compose.al2023.yaml build"
+ docker-compose-install-tcnative: "-f docker/docker-compose.yaml -f docker/docker-compose.al2023.yaml run install-tcnative"
+ docker-compose-update-tcnative-version: "-f docker/docker-compose.yaml -f docker/docker-compose.al2023.yaml run update-tcnative-version"
+ docker-compose-run: "-f docker/docker-compose.yaml -f docker/docker-compose.al2023.yaml run build"
name: ${{ matrix.setup }} build
needs: verify-pr
+ defaults:
+ run:
+ working-directory: netty
steps:
- uses: actions/checkout@v4
+ with:
+ path: netty
+
+ - uses: actions/checkout@v4
+ if: ${{ endsWith(matrix.setup, '-awslc') }}
+ with:
+ repository: netty/netty-tcnative
+ ref: main
+ path: netty-tcnative
+ fetch-depth: 0
# Cache .m2/repository
- name: Cache local Maven repository
uses: actions/cache@v4
continue-on-error: true
+ if: ${{ !endsWith(matrix.setup, '-awslc') }}
with:
path: ~/.m2/repository
key: cache-maven-${{ hashFiles('**/pom.xml') }}
@@ -218,9 +237,28 @@ jobs:
cache-maven-${{ hashFiles('**/pom.xml') }}
cache-maven-
+ - name: Cache local Maven repository
+ uses: actions/cache@v4
+ continue-on-error: true
+ if: ${{ endsWith(matrix.setup, '-awslc') }}
+ with:
+ path: ~/.m2-al2023/repository
+ key: cache-maven-al2023-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ cache-maven-al2023-${{ hashFiles('**/pom.xml') }}
+ cache-maven-al2023-
+
- name: Build docker image
run: docker compose ${{ matrix.docker-compose-build }}
+ - name: Install custom netty-tcnative
+ if: ${{ endsWith(matrix.setup, '-awslc') }}
+ run: docker compose ${{ matrix.docker-compose-install-tcnative }}
+
+ - name: Update netty-tcnative version
+ if: ${{ endsWith(matrix.setup, '-awslc') }}
+ run: docker compose ${{ matrix.docker-compose-update-tcnative-version }}
+
- name: Build project with leak detection
run: docker compose ${{ matrix.docker-compose-run }} | tee build-leak.output
@@ -231,7 +269,7 @@ jobs:
run: ./.github/scripts/check_leak.sh build-leak.output
- name: print JVM thread dumps when cancelled
- uses: ./.github/actions/thread-dump-jvms
+ uses: ./netty/.github/actions/thread-dump-jvms
if: ${{ cancelled() }}
- name: Upload Test Results
@@ -239,17 +277,17 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: test-results-${{ matrix.setup }}
- path: '**/target/surefire-reports/TEST-*.xml'
+ path: 'netty/**/target/surefire-reports/TEST-*.xml'
- uses: actions/upload-artifact@v4
if: ${{ failure() || cancelled() }}
with:
name: build-${{ matrix.setup }}-target
path: |
- **/target/surefire-reports/
- **/target/autobahntestsuite-reports/
- **/hs_err*.log
- **/core.*
+ netty/**/target/surefire-reports/
+ netty/**/target/autobahntestsuite-reports/
+ netty/**/hs_err*.log
+ netty/**/core.*
build-pr-macos:
strategy:
@@ -257,7 +295,7 @@ jobs:
matrix:
include:
- setup: macos-x86_64-java8-boringssl
- os: macos-13
+ os: macos-15-intel
- setup: macos-aarch64-java8-boringssl
os: macos-15
diff --git a/.github/workflows/ci-release-4.2.yml b/.github/workflows/ci-release-4.2.yml
index 619fb0ac819..51709846987 100644
--- a/.github/workflows/ci-release-4.2.yml
+++ b/.github/workflows/ci-release-4.2.yml
@@ -185,7 +185,7 @@ jobs:
matrix:
include:
- setup: macos-x86_64-java11
- os: macos-13
+ os: macos-15-intel
- setup: macos-aarch64-java11
os: macos-15
diff --git a/.github/workflows/ci-release.yml b/.github/workflows/ci-release.yml
index 9f60f3b80e2..c797a64f127 100644
--- a/.github/workflows/ci-release.yml
+++ b/.github/workflows/ci-release.yml
@@ -185,7 +185,7 @@ jobs:
matrix:
include:
- setup: macos-x86_64-java8
- os: macos-13
+ os: macos-15-intel
- setup: macos-aarch64-java8
os: macos-15
runs-on: ${{ matrix.os }}
diff --git a/all/pom.xml b/all/pom.xml
index d0a186dc417..2eb01ae4196 100644
--- a/all/pom.xml
+++ b/all/pom.xml
@@ -20,7 +20,7 @@
io.nettynetty-parent
- 4.1.128.1.dse
+ 4.1.132.1.dsenetty-all
diff --git a/bom/pom.xml b/bom/pom.xml
index f74949ba8aa..120a9e64113 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -25,7 +25,7 @@
io.nettynetty-bom
- 4.1.128.1.dse
+ 4.1.132.1.dsepomNetty/BOM
@@ -49,7 +49,7 @@
https://github.com/netty/nettyscm:git:git://github.com/netty/netty.gitscm:git:ssh://git@github.com/netty/netty.git
- netty-4.1.128.Final
+ netty-4.1.132.Final
@@ -73,7 +73,7 @@
- 2.0.74.Final
+ 2.0.75.Final
diff --git a/buffer/pom.xml b/buffer/pom.xml
index 21dd16f77d9..bea8c59032e 100644
--- a/buffer/pom.xml
+++ b/buffer/pom.xml
@@ -20,7 +20,7 @@
io.nettynetty-parent
- 4.1.128.1.dse
+ 4.1.132.1.dsenetty-buffer
diff --git a/buffer/src/main/java/io/netty/buffer/AdaptivePoolingAllocator.java b/buffer/src/main/java/io/netty/buffer/AdaptivePoolingAllocator.java
index d4fba097831..de90de6f784 100644
--- a/buffer/src/main/java/io/netty/buffer/AdaptivePoolingAllocator.java
+++ b/buffer/src/main/java/io/netty/buffer/AdaptivePoolingAllocator.java
@@ -18,22 +18,24 @@
import io.netty.util.ByteProcessor;
import io.netty.util.CharsetUtil;
import io.netty.util.IllegalReferenceCountException;
-import io.netty.util.IntSupplier;
+import io.netty.util.IntConsumer;
import io.netty.util.NettyRuntime;
+import io.netty.util.Recycler;
import io.netty.util.Recycler.EnhancedHandle;
import io.netty.util.ReferenceCounted;
+import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap;
+import io.netty.util.concurrent.ConcurrentSkipListIntObjMultimap.IntEntry;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.concurrent.MpscAtomicIntegerArrayQueue;
import io.netty.util.concurrent.MpscIntQueue;
-import io.netty.util.internal.ObjectPool;
+import io.netty.util.internal.MathUtil;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReferenceCountUpdater;
import io.netty.util.internal.SuppressJava6Requirement;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThreadExecutorMap;
-import io.netty.util.internal.ThreadLocalRandom;
import io.netty.util.internal.UnstableApi;
import java.io.IOException;
@@ -47,8 +49,10 @@
import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
import java.util.Arrays;
+import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
@@ -83,6 +87,16 @@
@SuppressJava6Requirement(reason = "Guarded by version check")
@UnstableApi
final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.AdaptiveAllocatorApi {
+ private static final int LOW_MEM_THRESHOLD = 512 * 1024 * 1024;
+ private static final boolean IS_LOW_MEM = Runtime.getRuntime().maxMemory() <= LOW_MEM_THRESHOLD;
+
+ /**
+ * Whether the IS_LOW_MEM setting should disable thread-local magazines.
+ * This can have fairly high performance overhead.
+ */
+ private static final boolean DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM = SystemPropertyUtil.getBoolean(
+ "io.netty.allocator.disableThreadLocalMagazinesOnLowMemory", true);
+
/**
* The 128 KiB minimum chunk size is chosen to encourage the system allocator to delegate to mmap for chunk
* allocations. For instance, glibc will do this.
@@ -90,11 +104,11 @@ final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.Adaptiv
* which is a much, much larger space. Chunks are also allocated in whole multiples of the minimum
* chunk size, which itself is a whole multiple of popular page sizes like 4 KiB, 16 KiB, and 64 KiB.
*/
- private static final int MIN_CHUNK_SIZE = 128 * 1024;
+ static final int MIN_CHUNK_SIZE = 128 * 1024;
private static final int EXPANSION_ATTEMPTS = 3;
private static final int INITIAL_MAGAZINES = 1;
private static final int RETIRE_CAPACITY = 256;
- private static final int MAX_STRIPES = NettyRuntime.availableProcessors() * 2;
+ private static final int MAX_STRIPES = IS_LOW_MEM ? 1 : NettyRuntime.availableProcessors() * 2;
private static final int BUFS_PER_CHUNK = 8; // For large buffers, aim to have about this many buffers per chunk.
/**
@@ -102,7 +116,9 @@ final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.Adaptiv
*
* This number is 8 MiB, and is derived from the limitations of internal histograms.
*/
- private static final int MAX_CHUNK_SIZE = 8 * 1024 * 1024; // 8 MiB.
+ private static final int MAX_CHUNK_SIZE = IS_LOW_MEM ?
+ 2 * 1024 * 1024 : // 2 MiB for systems with small heaps.
+ 8 * 1024 * 1024; // 8 MiB.
private static final int MAX_POOLED_BUF_SIZE = MAX_CHUNK_SIZE / BUFS_PER_CHUNK;
/**
@@ -150,21 +166,9 @@ final class AdaptivePoolingAllocator implements AdaptiveByteBufAllocator.Adaptiv
16384,
16896, // 16384 + 512
};
- private static final ChunkReleasePredicate CHUNK_RELEASE_ALWAYS = new ChunkReleasePredicate() {
- @Override
- public boolean shouldReleaseChunk(int chunkSize) {
- return true;
- }
- };
- private static final ChunkReleasePredicate CHUNK_RELEASE_NEVER = new ChunkReleasePredicate() {
- @Override
- public boolean shouldReleaseChunk(int chunkSize) {
- return false;
- }
- };
private static final int SIZE_CLASSES_COUNT = SIZE_CLASSES.length;
- private static final byte[] SIZE_INDEXES = new byte[(SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32) + 1];
+ private static final byte[] SIZE_INDEXES = new byte[SIZE_CLASSES[SIZE_CLASSES_COUNT - 1] / 32 + 1];
static {
if (MAGAZINE_BUFFER_QUEUE_CAPACITY < 2) {
@@ -175,7 +179,7 @@ public boolean shouldReleaseChunk(int chunkSize) {
for (int i = 0; i < SIZE_CLASSES_COUNT; i++) {
int sizeClass = SIZE_CLASSES[i];
//noinspection ConstantValue
- assert (sizeClass & 5) == 0 : "Size class must be a multiple of 32";
+ assert (sizeClass & 31) == 0 : "Size class must be a multiple of 32";
int sizeIndex = sizeIndexOf(sizeClass);
Arrays.fill(SIZE_INDEXES, lastIndex + 1, sizeIndex + 1, (byte) i);
lastIndex = sizeIndex;
@@ -193,8 +197,10 @@ public boolean shouldReleaseChunk(int chunkSize) {
chunkRegistry = new ChunkRegistry();
sizeClassedMagazineGroups = createMagazineGroupSizeClasses(this, false);
largeBufferMagazineGroup = new MagazineGroup(
- this, chunkAllocator, new HistogramChunkControllerFactory(true), false);
- threadLocalGroup = new FastThreadLocal() {
+ this, chunkAllocator, new BuddyChunkManagementStrategy(), false);
+
+ boolean disableThreadLocalGroups = IS_LOW_MEM && DISABLE_THREAD_LOCAL_MAGAZINES_ON_LOW_MEM;
+ threadLocalGroup = disableThreadLocalGroups ? null : new FastThreadLocal() {
@Override
protected MagazineGroup[] initialValue() {
if (useCacheForNonEventLoopThreads || ThreadExecutorMap.currentExecutor() != null) {
@@ -220,7 +226,7 @@ private static MagazineGroup[] createMagazineGroupSizeClasses(
for (int i = 0; i < SIZE_CLASSES.length; i++) {
int segmentSize = SIZE_CLASSES[i];
groups[i] = new MagazineGroup(allocator, allocator.chunkAllocator,
- new SizeClassChunkControllerFactory(segmentSize), isThreadLocal);
+ new SizeClassChunkManagementStrategy(segmentSize), isThreadLocal);
}
return groups;
}
@@ -245,7 +251,7 @@ private static MagazineGroup[] createMagazineGroupSizeClasses(
*
* @return A new multi-producer, multi-consumer queue.
*/
- private static Queue createSharedChunkQueue() {
+ private static Queue createSharedChunkQueue() {
return PlatformDependent.newFixedMpmcQueue(CHUNK_REUSE_QUEUE);
}
@@ -259,13 +265,14 @@ private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread
if (size <= MAX_POOLED_BUF_SIZE) {
final int index = sizeClassIndexOf(size);
MagazineGroup[] magazineGroups;
- if (!FastThreadLocalThread.willCleanupFastThreadLocals(currentThread) ||
+ if (!FastThreadLocalThread.willCleanupFastThreadLocals(Thread.currentThread()) ||
+ IS_LOW_MEM ||
(magazineGroups = threadLocalGroup.get()) == null) {
magazineGroups = sizeClassedMagazineGroups;
}
if (index < magazineGroups.length) {
allocated = magazineGroups[index].allocate(size, maxCapacity, currentThread, buf);
- } else {
+ } else if (!IS_LOW_MEM) {
allocated = largeBufferMagazineGroup.allocate(size, maxCapacity, currentThread, buf);
}
}
@@ -292,8 +299,7 @@ static int[] getSizeClasses() {
return SIZE_CLASSES.clone();
}
- private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread,
- AdaptiveByteBuf buf) {
+ private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread currentThread, AdaptiveByteBuf buf) {
// If we don't already have a buffer, obtain one from the most conveniently available magazine.
Magazine magazine;
if (buf != null) {
@@ -307,10 +313,11 @@ private AdaptiveByteBuf allocateFallback(int size, int maxCapacity, Thread curre
}
// Create a one-off chunk for this allocation.
AbstractByteBuf innerChunk = chunkAllocator.allocate(size, maxCapacity);
- Chunk chunk = new Chunk(innerChunk, magazine, false, CHUNK_RELEASE_ALWAYS);
+ Chunk chunk = new Chunk(innerChunk, magazine);
chunkRegistry.add(chunk);
try {
- chunk.readInitInto(buf, size, size, maxCapacity);
+ boolean success = chunk.readInitInto(buf, size, size, maxCapacity);
+ assert success: "Failed to initialize ByteBuf with dedicated chunk";
} finally {
// As the chunk is an one-off we need to always call release explicitly as readInitInto(...)
// will take care of retain once when successful. Once The AdaptiveByteBuf is released it will
@@ -355,38 +362,37 @@ private void free() {
largeBufferMagazineGroup.free();
}
- static int sizeToBucket(int size) {
- return HistogramChunkController.sizeToBucket(size);
- }
-
@SuppressJava6Requirement(reason = "Guarded by version check")
private static final class MagazineGroup {
private final AdaptivePoolingAllocator allocator;
private final ChunkAllocator chunkAllocator;
- private final ChunkControllerFactory chunkControllerFactory;
- private final Queue chunkReuseQueue;
+ private final ChunkManagementStrategy chunkManagementStrategy;
+ private final ChunkCache chunkCache;
private final StampedLock magazineExpandLock;
private final Magazine threadLocalMagazine;
+ private Thread ownerThread;
private volatile Magazine[] magazines;
private volatile boolean freed;
MagazineGroup(AdaptivePoolingAllocator allocator,
ChunkAllocator chunkAllocator,
- ChunkControllerFactory chunkControllerFactory,
+ ChunkManagementStrategy chunkManagementStrategy,
boolean isThreadLocal) {
this.allocator = allocator;
this.chunkAllocator = chunkAllocator;
- this.chunkControllerFactory = chunkControllerFactory;
- chunkReuseQueue = createSharedChunkQueue();
+ this.chunkManagementStrategy = chunkManagementStrategy;
+ chunkCache = chunkManagementStrategy.createChunkCache(isThreadLocal);
if (isThreadLocal) {
+ ownerThread = Thread.currentThread();
magazineExpandLock = null;
- threadLocalMagazine = new Magazine(this, false, chunkReuseQueue, chunkControllerFactory.create(this));
+ threadLocalMagazine = new Magazine(this, false, chunkManagementStrategy.createController(this));
} else {
+ ownerThread = null;
magazineExpandLock = new StampedLock();
threadLocalMagazine = null;
Magazine[] mags = new Magazine[INITIAL_MAGAZINES];
for (int i = 0; i < mags.length; i++) {
- mags[i] = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
+ mags[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
}
magazines = mags;
}
@@ -446,12 +452,9 @@ private boolean tryExpandMagazines(int currentLength) {
if (mags.length >= MAX_STRIPES || mags.length > currentLength || freed) {
return true;
}
- Magazine firstMagazine = mags[0];
Magazine[] expanded = new Magazine[mags.length * 2];
for (int i = 0, l = expanded.length; i < l; i++) {
- Magazine m = new Magazine(this, true, chunkReuseQueue, chunkControllerFactory.create(this));
- firstMagazine.initializeSharedStateIn(m);
- expanded[i] = m;
+ expanded[i] = new Magazine(this, true, chunkManagementStrategy.createController(this));
}
magazines = expanded;
} finally {
@@ -464,22 +467,32 @@ private boolean tryExpandMagazines(int currentLength) {
return true;
}
- boolean offerToQueue(Chunk buffer) {
+ Chunk pollChunk(int size) {
+ return chunkCache.pollChunk(size);
+ }
+
+ boolean offerChunk(Chunk chunk) {
if (freed) {
return false;
}
- boolean isAdded = chunkReuseQueue.offer(buffer);
+ if (chunk.hasUnprocessedFreelistEntries()) {
+ chunk.processFreelistEntries();
+ }
+ boolean isAdded = chunkCache.offerChunk(chunk);
+
if (freed && isAdded) {
// Help to free the reuse queue.
- freeChunkReuseQueue();
+ freeChunkReuseQueue(ownerThread);
}
return isAdded;
}
private void free() {
freed = true;
+ Thread ownerThread = this.ownerThread;
if (threadLocalMagazine != null) {
+ this.ownerThread = null;
threadLocalMagazine.free();
} else {
long stamp = magazineExpandLock.writeLock();
@@ -492,22 +505,153 @@ private void free() {
magazineExpandLock.unlockWrite(stamp);
}
}
- freeChunkReuseQueue();
+ freeChunkReuseQueue(ownerThread);
}
- private void freeChunkReuseQueue() {
- for (;;) {
- Chunk chunk = chunkReuseQueue.poll();
+ private void freeChunkReuseQueue(Thread ownerThread) {
+ Chunk chunk;
+ while ((chunk = chunkCache.pollChunk(0)) != null) {
+ if (ownerThread != null && chunk instanceof SizeClassedChunk) {
+ SizeClassedChunk threadLocalChunk = (SizeClassedChunk) chunk;
+ assert ownerThread == threadLocalChunk.ownerThread;
+ // no release segment can ever happen from the owner Thread since it's not running anymore
+ // This is required to let the ownerThread to be GC'ed despite there are AdaptiveByteBuf
+ // that reference some thread local chunk
+ threadLocalChunk.ownerThread = null;
+ }
+ chunk.markToDeallocate();
+ }
+ }
+ }
+
+ private interface ChunkCache {
+ Chunk pollChunk(int size);
+ boolean offerChunk(Chunk chunk);
+ }
+
+ private static final class ConcurrentQueueChunkCache implements ChunkCache {
+ private final Queue queue;
+
+ private ConcurrentQueueChunkCache() {
+ queue = createSharedChunkQueue();
+ }
+
+ @Override
+ public SizeClassedChunk pollChunk(int size) {
+ // we really don't care about size here since the sized class chunk q
+ // just care about segments of fixed size!
+ Queue queue = this.queue;
+ for (int i = 0; i < CHUNK_REUSE_QUEUE; i++) {
+ SizeClassedChunk chunk = queue.poll();
if (chunk == null) {
+ return null;
+ }
+ if (chunk.hasRemainingCapacity()) {
+ return chunk;
+ }
+ queue.offer(chunk);
+ }
+ return null;
+ }
+
+ @Override
+ public boolean offerChunk(Chunk chunk) {
+ return queue.offer((SizeClassedChunk) chunk);
+ }
+ }
+
+ private static final class ConcurrentSkipListChunkCache implements ChunkCache {
+ private final ConcurrentSkipListIntObjMultimap chunks;
+
+ private ConcurrentSkipListChunkCache() {
+ chunks = new ConcurrentSkipListIntObjMultimap(-1);
+ }
+
+ @Override
+ public Chunk pollChunk(int size) {
+ if (chunks.isEmpty()) {
+ return null;
+ }
+ IntEntry entry = chunks.pollCeilingEntry(size);
+ if (entry != null) {
+ Chunk chunk = entry.getValue();
+ if (chunk.hasUnprocessedFreelistEntries()) {
+ chunk.processFreelistEntries();
+ }
+ return chunk;
+ }
+
+ Chunk bestChunk = null;
+ int bestRemainingCapacity = 0;
+ Iterator> itr = chunks.iterator();
+ while (itr.hasNext()) {
+ entry = itr.next();
+ final Chunk chunk;
+ if (entry != null && (chunk = entry.getValue()).hasUnprocessedFreelistEntries()) {
+ if (!chunks.remove(entry.getKey(), entry.getValue())) {
+ continue;
+ }
+ chunk.processFreelistEntries();
+ int remainingCapacity = chunk.remainingCapacity();
+ if (remainingCapacity >= size &&
+ (bestChunk == null || remainingCapacity > bestRemainingCapacity)) {
+ if (bestChunk != null) {
+ chunks.put(bestRemainingCapacity, bestChunk);
+ }
+ bestChunk = chunk;
+ bestRemainingCapacity = remainingCapacity;
+ } else {
+ chunks.put(remainingCapacity, chunk);
+ }
+ }
+ }
+
+ return bestChunk;
+ }
+
+ @Override
+ public boolean offerChunk(Chunk chunk) {
+ chunks.put(chunk.remainingCapacity(), chunk);
+
+ int size = chunks.size();
+ while (size > CHUNK_REUSE_QUEUE) {
+ // Deallocate the chunk with the fewest incoming references.
+ int key = -1;
+ Chunk toDeallocate = null;
+ for (IntEntry entry : chunks) {
+ Chunk candidate = entry.getValue();
+ if (candidate != null) {
+ if (toDeallocate == null) {
+ toDeallocate = candidate;
+ key = entry.getKey();
+ } else {
+ int candidateRefCnt = candidate.refCnt();
+ int toDeallocateRefCnt = toDeallocate.refCnt();
+ if (candidateRefCnt < toDeallocateRefCnt ||
+ candidateRefCnt == toDeallocateRefCnt &&
+ candidate.capacity() < toDeallocate.capacity()) {
+ toDeallocate = candidate;
+ key = entry.getKey();
+ }
+ }
+ }
+ }
+ if (toDeallocate == null) {
break;
}
- chunk.release();
+ if (chunks.remove(key, toDeallocate)) {
+ toDeallocate.markToDeallocate();
+ }
+ size = chunks.size();
}
+ return true;
}
}
- private interface ChunkControllerFactory {
- ChunkController create(MagazineGroup group);
+ private interface ChunkManagementStrategy {
+ ChunkController createController(MagazineGroup group);
+
+ ChunkCache createChunkCache(boolean isThreadLocal);
}
private interface ChunkController {
@@ -516,66 +660,75 @@ private interface ChunkController {
*/
int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation);
- /**
- * Initialize the given chunk factory with shared statistics state (if any) from this factory.
- */
- void initializeSharedStateIn(ChunkController chunkController);
-
/**
* Allocate a new {@link Chunk} for the given {@link Magazine}.
*/
Chunk newChunkAllocation(int promptingSize, Magazine magazine);
}
- private interface ChunkReleasePredicate {
- boolean shouldReleaseChunk(int chunkSize);
- }
-
- private static final class SizeClassChunkControllerFactory implements ChunkControllerFactory {
+ private static final class SizeClassChunkManagementStrategy implements ChunkManagementStrategy {
// To amortize activation/deactivation of chunks, we should have a minimum number of segments per chunk.
// We choose 32 because it seems neither too small nor too big.
// For segments of 16 KiB, the chunks will be half a megabyte.
private static final int MIN_SEGMENTS_PER_CHUNK = 32;
private final int segmentSize;
private final int chunkSize;
- private final int[] segmentOffsets;
- private SizeClassChunkControllerFactory(int segmentSize) {
+ private SizeClassChunkManagementStrategy(int segmentSize) {
this.segmentSize = ObjectUtil.checkPositive(segmentSize, "segmentSize");
chunkSize = Math.max(MIN_CHUNK_SIZE, segmentSize * MIN_SEGMENTS_PER_CHUNK);
- int segmentsCount = chunkSize / segmentSize;
- segmentOffsets = new int[segmentsCount];
- for (int i = 0; i < segmentsCount; i++) {
- segmentOffsets[i] = i * segmentSize;
- }
}
@Override
- public ChunkController create(MagazineGroup group) {
- return new SizeClassChunkController(group, segmentSize, chunkSize, segmentOffsets);
+ public ChunkController createController(MagazineGroup group) {
+ return new SizeClassChunkController(group, segmentSize, chunkSize);
+ }
+
+ @Override
+ public ChunkCache createChunkCache(boolean isThreadLocal) {
+ return new ConcurrentQueueChunkCache();
}
}
private static final class SizeClassChunkController implements ChunkController {
- private static final ChunkReleasePredicate FALSE_PREDICATE = new ChunkReleasePredicate() {
- @Override
- public boolean shouldReleaseChunk(int chunkSize) {
- return false;
- }
- };
private final ChunkAllocator chunkAllocator;
private final int segmentSize;
private final int chunkSize;
private final ChunkRegistry chunkRegistry;
- private final int[] segmentOffsets;
- private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize, int[] segmentOffsets) {
+ private SizeClassChunkController(MagazineGroup group, int segmentSize, int chunkSize) {
chunkAllocator = group.chunkAllocator;
this.segmentSize = segmentSize;
this.chunkSize = chunkSize;
chunkRegistry = group.allocator.chunkRegistry;
- this.segmentOffsets = segmentOffsets;
+ }
+
+ private MpscIntQueue createEmptyFreeList() {
+ return new MpscAtomicIntegerArrayQueue(chunkSize / segmentSize, SizeClassedChunk.FREE_LIST_EMPTY);
+ }
+
+ private MpscIntQueue createFreeList() {
+ final int segmentsCount = chunkSize / segmentSize;
+ final MpscIntQueue freeList = new MpscAtomicIntegerArrayQueue(
+ segmentsCount, SizeClassedChunk.FREE_LIST_EMPTY);
+ int segmentOffset = 0;
+ for (int i = 0; i < segmentsCount; i++) {
+ freeList.offer(segmentOffset);
+ segmentOffset += segmentSize;
+ }
+ return freeList;
+ }
+
+ private IntStack createLocalFreeList() {
+ final int segmentsCount = chunkSize / segmentSize;
+ int segmentOffset = chunkSize;
+ int[] offsets = new int[segmentsCount];
+ for (int i = 0; i < segmentsCount; i++) {
+ segmentOffset -= segmentSize;
+ offsets[i] = segmentOffset;
+ }
+ return new IntStack(offsets);
}
@Override
@@ -584,235 +737,59 @@ public int computeBufferCapacity(
return Math.min(segmentSize, maxCapacity);
}
- @Override
- public void initializeSharedStateIn(ChunkController chunkController) {
- // NOOP
- }
-
@Override
public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
AbstractByteBuf chunkBuffer = chunkAllocator.allocate(chunkSize, chunkSize);
assert chunkBuffer.capacity() == chunkSize;
- SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, true,
- segmentSize, segmentOffsets, FALSE_PREDICATE);
+ SizeClassedChunk chunk = new SizeClassedChunk(chunkBuffer, magazine, this);
chunkRegistry.add(chunk);
return chunk;
}
}
- private static final class HistogramChunkControllerFactory implements ChunkControllerFactory {
- private final boolean shareable;
+ private static final class BuddyChunkManagementStrategy implements ChunkManagementStrategy {
+ private final AtomicInteger maxChunkSize = new AtomicInteger();
- private HistogramChunkControllerFactory(boolean shareable) {
- this.shareable = shareable;
+ @Override
+ public ChunkController createController(MagazineGroup group) {
+ return new BuddyChunkController(group, maxChunkSize);
}
@Override
- public ChunkController create(MagazineGroup group) {
- return new HistogramChunkController(group, shareable);
+ public ChunkCache createChunkCache(boolean isThreadLocal) {
+ return new ConcurrentSkipListChunkCache();
}
}
- private static final class HistogramChunkController implements ChunkController, ChunkReleasePredicate {
- private static final int MIN_DATUM_TARGET = 1024;
- private static final int MAX_DATUM_TARGET = 65534;
- private static final int INIT_DATUM_TARGET = 9;
- private static final int HISTO_BUCKET_COUNT = 16;
- private static final int[] HISTO_BUCKETS = {
- 16 * 1024,
- 24 * 1024,
- 32 * 1024,
- 48 * 1024,
- 64 * 1024,
- 96 * 1024,
- 128 * 1024,
- 192 * 1024,
- 256 * 1024,
- 384 * 1024,
- 512 * 1024,
- 768 * 1024,
- 1024 * 1024,
- 1792 * 1024,
- 2048 * 1024,
- 3072 * 1024
- };
-
- private final MagazineGroup group;
- private final boolean shareable;
- private final short[][] histos = {
- new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
- new short[HISTO_BUCKET_COUNT], new short[HISTO_BUCKET_COUNT],
- };
+ private static final class BuddyChunkController implements ChunkController {
+ private final ChunkAllocator chunkAllocator;
private final ChunkRegistry chunkRegistry;
- private short[] histo = histos[0];
- private final int[] sums = new int[HISTO_BUCKET_COUNT];
-
- private int histoIndex;
- private int datumCount;
- private int datumTarget = INIT_DATUM_TARGET;
- private boolean hasHadRotation;
- private volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE;
- private volatile int localPrefChunkSize = MIN_CHUNK_SIZE;
- private volatile int localUpperBufSize;
-
- private HistogramChunkController(MagazineGroup group, boolean shareable) {
- this.group = group;
- this.shareable = shareable;
- chunkRegistry = group.allocator.chunkRegistry;
- }
-
- @Override
- public int computeBufferCapacity(
- int requestedSize, int maxCapacity, boolean isReallocation) {
- if (!isReallocation) {
- // Only record allocation size if it's not caused by a reallocation that was triggered by capacity
- // change of the buffer.
- recordAllocationSize(requestedSize);
- }
+ private final AtomicInteger maxChunkSize;
- // Predict starting capacity from localUpperBufSize, but place limits on the max starting capacity
- // based on the requested size, because localUpperBufSize can potentially be quite large.
- int startCapLimits;
- if (requestedSize <= 32768) { // Less than or equal to 32 KiB.
- startCapLimits = 65536; // Use at most 64 KiB, which is also the AdaptiveRecvByteBufAllocator max.
- } else {
- startCapLimits = requestedSize * 2; // Otherwise use at most twice the requested memory.
- }
- int startingCapacity = Math.min(startCapLimits, localUpperBufSize);
- startingCapacity = Math.max(requestedSize, Math.min(maxCapacity, startingCapacity));
- return startingCapacity;
- }
-
- private void recordAllocationSize(int bufferSizeToRecord) {
- // Use the preserved size from the reused AdaptiveByteBuf, if available.
- // Otherwise, use the requested buffer size.
- // This way, we better take into account
- if (bufferSizeToRecord == 0) {
- return;
- }
- int bucket = sizeToBucket(bufferSizeToRecord);
- histo[bucket]++;
- if (datumCount++ == datumTarget) {
- rotateHistograms();
- }
- }
-
- static int sizeToBucket(int size) {
- int index = binarySearchInsertionPoint(Arrays.binarySearch(HISTO_BUCKETS, size));
- return index >= HISTO_BUCKETS.length ? HISTO_BUCKETS.length - 1 : index;
- }
-
- private static int binarySearchInsertionPoint(int index) {
- if (index < 0) {
- index = -(index + 1);
- }
- return index;
- }
-
- static int bucketToSize(int sizeBucket) {
- return HISTO_BUCKETS[sizeBucket];
- }
-
- private void rotateHistograms() {
- short[][] hs = histos;
- for (int i = 0; i < HISTO_BUCKET_COUNT; i++) {
- sums[i] = (hs[0][i] & 0xFFFF) + (hs[1][i] & 0xFFFF) + (hs[2][i] & 0xFFFF) + (hs[3][i] & 0xFFFF);
- }
- int sum = 0;
- for (int count : sums) {
- sum += count;
- }
- int targetPercentile = (int) (sum * 0.99);
- int sizeBucket = 0;
- for (; sizeBucket < sums.length; sizeBucket++) {
- if (sums[sizeBucket] > targetPercentile) {
- break;
- }
- targetPercentile -= sums[sizeBucket];
- }
- hasHadRotation = true;
- int percentileSize = bucketToSize(sizeBucket);
- int prefChunkSize = Math.max(percentileSize * BUFS_PER_CHUNK, MIN_CHUNK_SIZE);
- localUpperBufSize = percentileSize;
- localPrefChunkSize = prefChunkSize;
- if (shareable) {
- for (Magazine mag : group.magazines) {
- HistogramChunkController statistics = (HistogramChunkController) mag.chunkController;
- prefChunkSize = Math.max(prefChunkSize, statistics.localPrefChunkSize);
- }
- }
- if (sharedPrefChunkSize != prefChunkSize) {
- // Preferred chunk size changed. Increase check frequency.
- datumTarget = Math.max(datumTarget >> 1, MIN_DATUM_TARGET);
- sharedPrefChunkSize = prefChunkSize;
- } else {
- // Preferred chunk size did not change. Check less often.
- datumTarget = Math.min(datumTarget << 1, MAX_DATUM_TARGET);
- }
-
- histoIndex = histoIndex + 1 & 3;
- histo = histos[histoIndex];
- datumCount = 0;
- Arrays.fill(histo, (short) 0);
- }
-
- /**
- * Get the preferred chunk size, based on statistics from the {@linkplain #recordAllocationSize(int) recorded}
- * allocation sizes.
- *
- * This method must be thread-safe.
- *
- * @return The currently preferred chunk allocation size.
- */
- int preferredChunkSize() {
- return sharedPrefChunkSize;
+ BuddyChunkController(MagazineGroup group, AtomicInteger maxChunkSize) {
+ chunkAllocator = group.chunkAllocator;
+ chunkRegistry = group.allocator.chunkRegistry;
+ this.maxChunkSize = maxChunkSize;
}
@Override
- public void initializeSharedStateIn(ChunkController chunkController) {
- HistogramChunkController statistics = (HistogramChunkController) chunkController;
- int sharedPrefChunkSize = this.sharedPrefChunkSize;
- statistics.localPrefChunkSize = sharedPrefChunkSize;
- statistics.sharedPrefChunkSize = sharedPrefChunkSize;
+ public int computeBufferCapacity(int requestedSize, int maxCapacity, boolean isReallocation) {
+ return MathUtil.safeFindNextPositivePowerOfTwo(requestedSize);
}
@Override
public Chunk newChunkAllocation(int promptingSize, Magazine magazine) {
- int size = Math.max(promptingSize * BUFS_PER_CHUNK, preferredChunkSize());
- int minChunks = size / MIN_CHUNK_SIZE;
- if (MIN_CHUNK_SIZE * minChunks < size) {
- // Round up to nearest whole MIN_CHUNK_SIZE unit. The MIN_CHUNK_SIZE is an even multiple of many
- // popular small page sizes, like 4k, 16k, and 64k, which makes it easier for the system allocator
- // to manage the memory in terms of whole pages. This reduces memory fragmentation,
- // but without the potentially high overhead that power-of-2 chunk sizes would bring.
- size = MIN_CHUNK_SIZE * (1 + minChunks);
- }
-
- // Limit chunks to the max size, even if the histogram suggests to go above it.
- size = Math.min(size, MAX_CHUNK_SIZE);
-
- // If we haven't rotated the histogram yet, optimisticly record this chunk size as our preferred.
- if (!hasHadRotation && sharedPrefChunkSize == MIN_CHUNK_SIZE) {
- sharedPrefChunkSize = size;
- }
-
- ChunkAllocator chunkAllocator = group.chunkAllocator;
- Chunk chunk = new Chunk(chunkAllocator.allocate(size, size), magazine, true, this);
+ int maxChunkSize = this.maxChunkSize.get();
+ int proposedChunkSize = MathUtil.safeFindNextPositivePowerOfTwo(BUFS_PER_CHUNK * promptingSize);
+ int chunkSize = Math.min(MAX_CHUNK_SIZE, Math.max(maxChunkSize, proposedChunkSize));
+ if (chunkSize > maxChunkSize) {
+ // Update our stored max chunk size. It's fine that this is racy.
+ this.maxChunkSize.set(chunkSize);
+ }
+ BuddyChunk chunk = new BuddyChunk(chunkAllocator.allocate(chunkSize, chunkSize), magazine);
chunkRegistry.add(chunk);
return chunk;
}
-
- @Override
- public boolean shouldReleaseChunk(int chunkSize) {
- int preferredSize = preferredChunkSize();
- int givenChunks = chunkSize / MIN_CHUNK_SIZE;
- int preferredChunks = preferredSize / MIN_CHUNK_SIZE;
- int deviation = Math.abs(givenChunks - preferredChunks);
-
- // Retire chunks with a 5% probability per unit of MIN_CHUNK_SIZE deviation from preference.
- return deviation != 0 &&
- ThreadLocalRandom.current().nextDouble() * 20.0 < deviation;
- }
}
@SuppressJava6Requirement(reason = "Guarded by version check")
@@ -823,13 +800,31 @@ private static final class Magazine {
}
private static final Chunk MAGAZINE_FREED = new Chunk();
- private static final ObjectPool EVENT_LOOP_LOCAL_BUFFER_POOL = ObjectPool.newPool(
- new ObjectPool.ObjectCreator() {
- @Override
- public AdaptiveByteBuf newObject(ObjectPool.Handle handle) {
- return new AdaptiveByteBuf(handle);
- }
- });
+ private static final class AdaptiveRecycler extends Recycler {
+
+ private AdaptiveRecycler() {
+ }
+
+ private AdaptiveRecycler(int maxCapacity) {
+ // doesn't use fast thread local, shared
+ super(maxCapacity);
+ }
+
+ @Override
+ protected AdaptiveByteBuf newObject(final Handle handle) {
+ return new AdaptiveByteBuf((EnhancedHandle) handle);
+ }
+
+ public static AdaptiveRecycler threadLocal() {
+ return new AdaptiveRecycler();
+ }
+
+ public static AdaptiveRecycler sharedWith(int maxCapacity) {
+ return new AdaptiveRecycler(maxCapacity);
+ }
+ }
+
+ private static final AdaptiveRecycler EVENT_LOOP_LOCAL_BUFFER_POOL = AdaptiveRecycler.threadLocal();
private Chunk current;
@SuppressWarnings("unused") // updated via NEXT_IN_LINE
@@ -837,31 +832,20 @@ public AdaptiveByteBuf newObject(ObjectPool.Handle handle) {
private final MagazineGroup group;
private final ChunkController chunkController;
private final StampedLock allocationLock;
- private final Queue bufferQueue;
- private final ObjectPool.Handle handle;
- private final Queue sharedChunkQueue;
+ private final AdaptiveRecycler recycler;
- Magazine(MagazineGroup group, boolean shareable, Queue sharedChunkQueue,
- ChunkController chunkController) {
+ Magazine(MagazineGroup group, boolean shareable, ChunkController chunkController) {
this.group = group;
this.chunkController = chunkController;
if (shareable) {
// We only need the StampedLock if this Magazine will be shared across threads.
allocationLock = new StampedLock();
- bufferQueue = PlatformDependent.newFixedMpmcQueue(MAGAZINE_BUFFER_QUEUE_CAPACITY);
- handle = new ObjectPool.Handle() {
- @Override
- public void recycle(AdaptiveByteBuf self) {
- bufferQueue.offer(self);
- }
- };
+ recycler = AdaptiveRecycler.sharedWith(MAGAZINE_BUFFER_QUEUE_CAPACITY);
} else {
allocationLock = null;
- bufferQueue = null;
- handle = null;
+ recycler = null;
}
- this.sharedChunkQueue = sharedChunkQueue;
}
public boolean tryAllocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean reallocate) {
@@ -890,7 +874,7 @@ private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf b
return false;
}
if (curr == null) {
- curr = sharedChunkQueue.poll();
+ curr = group.pollChunk(size);
if (curr == null) {
return false;
}
@@ -900,9 +884,10 @@ private boolean allocateWithoutLock(int size, int maxCapacity, AdaptiveByteBuf b
int remainingCapacity = curr.remainingCapacity();
int startingCapacity = chunkController.computeBufferCapacity(
size, maxCapacity, true /* never update stats as we don't hold the magazine lock */);
- if (remainingCapacity >= size) {
- curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity);
+ if (remainingCapacity >= size &&
+ curr.readInitInto(buf, size, Math.min(remainingCapacity, startingCapacity), maxCapacity)) {
allocated = true;
+ remainingCapacity = curr.remainingCapacity();
}
try {
if (remainingCapacity >= RETIRE_CAPACITY) {
@@ -921,33 +906,17 @@ private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean
int startingCapacity = chunkController.computeBufferCapacity(size, maxCapacity, reallocate);
Chunk curr = current;
if (curr != null) {
- // We have a Chunk that has some space left.
+ boolean success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
int remainingCapacity = curr.remainingCapacity();
- if (remainingCapacity > startingCapacity) {
- curr.readInitInto(buf, size, startingCapacity, maxCapacity);
- // We still have some bytes left that we can use for the next allocation, just early return.
- return true;
- }
-
- // At this point we know that this will be the last time current will be used, so directly set it to
- // null and release it once we are done.
- current = null;
- if (remainingCapacity >= size) {
- try {
- curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
- return true;
- } finally {
- curr.releaseFromMagazine();
- }
- }
-
- // Check if we either retain the chunk in the nextInLine cache or releasing it.
- if (remainingCapacity < RETIRE_CAPACITY) {
- curr.releaseFromMagazine();
- } else {
- // See if it makes sense to transfer the Chunk to the nextInLine cache for later usage.
- // This method will release curr if this is not the case
+ if (!success && remainingCapacity > 0) {
+ current = null;
transferToNextInLineOrRelease(curr);
+ } else if (remainingCapacity == 0) {
+ current = null;
+ curr.releaseFromMagazine();
+ }
+ if (success) {
+ return true;
}
}
@@ -969,32 +938,28 @@ private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean
}
int remainingCapacity = curr.remainingCapacity();
- if (remainingCapacity > startingCapacity) {
+ if (remainingCapacity > startingCapacity &&
+ curr.readInitInto(buf, size, startingCapacity, maxCapacity)) {
// We have a Chunk that has some space left.
- curr.readInitInto(buf, size, startingCapacity, maxCapacity);
current = curr;
return true;
}
- if (remainingCapacity >= size) {
- // At this point we know that this will be the last time curr will be used, so directly set it to
- // null and release it once we are done.
- try {
- curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
- return true;
- } finally {
- // Release in a finally block so even if readInitInto(...) would throw we would still correctly
- // release the current chunk before null it out.
- curr.releaseFromMagazine();
+ try {
+ if (remainingCapacity >= size) {
+ // At this point we know that this will be the last time curr will be used, so directly set it
+ // to null and release it once we are done.
+ return curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
}
- } else {
- // Release it as it's too small.
+ } finally {
+ // Release in a finally block so even if readInitInto(...) would throw we would still correctly
+ // release the current chunk before null it out.
curr.releaseFromMagazine();
}
}
// Now try to poll from the central queue first
- curr = sharedChunkQueue.poll();
+ curr = group.pollChunk(size);
if (curr == null) {
curr = chunkController.newChunkAllocation(size, this);
} else {
@@ -1015,14 +980,15 @@ private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean
}
current = curr;
+ boolean success;
try {
int remainingCapacity = curr.remainingCapacity();
assert remainingCapacity >= size;
if (remainingCapacity > startingCapacity) {
- curr.readInitInto(buf, size, startingCapacity, maxCapacity);
+ success = curr.readInitInto(buf, size, startingCapacity, maxCapacity);
curr = null;
} else {
- curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
+ success = curr.readInitInto(buf, size, remainingCapacity, maxCapacity);
}
} finally {
if (curr != null) {
@@ -1032,7 +998,7 @@ private boolean allocate(int size, int maxCapacity, AdaptiveByteBuf buf, boolean
current = null;
}
}
- return true;
+ return success;
}
private void restoreMagazineFreed() {
@@ -1063,10 +1029,6 @@ private void transferToNextInLineOrRelease(Chunk chunk) {
chunk.releaseFromMagazine();
}
- boolean trySetNextInLine(Chunk chunk) {
- return NEXT_IN_LINE.compareAndSet(this, null, chunk);
- }
-
void free() {
// Release the current Chunk and the next that was stored for later usage.
restoreMagazineFreed();
@@ -1084,26 +1046,15 @@ void free() {
}
public AdaptiveByteBuf newBuffer() {
- AdaptiveByteBuf buf;
- if (handle == null) {
- buf = EVENT_LOOP_LOCAL_BUFFER_POOL.get();
- } else {
- buf = bufferQueue.poll();
- if (buf == null) {
- buf = new AdaptiveByteBuf(handle);
- }
- }
+ AdaptiveRecycler recycler = this.recycler;
+ AdaptiveByteBuf buf = recycler == null? EVENT_LOOP_LOCAL_BUFFER_POOL.get() : recycler.get();
buf.resetRefCnt();
buf.discardMarks();
return buf;
}
boolean offerToQueue(Chunk chunk) {
- return group.offerToQueue(chunk);
- }
-
- public void initializeSharedStateIn(Magazine other) {
- chunkController.initializeSharedStateIn(other.chunkController);
+ return group.offerChunk(chunk);
}
}
@@ -1133,9 +1084,7 @@ private static class Chunk implements ReferenceCounted {
protected final AbstractByteBuf delegate;
protected Magazine magazine;
private final AdaptivePoolingAllocator allocator;
- private final ChunkReleasePredicate chunkReleasePredicate;
private final int capacity;
- private final boolean pooled;
protected int allocatedBytes;
private static final ReferenceCountUpdater updater =
@@ -1161,23 +1110,17 @@ protected long unsafeOffset() {
delegate = null;
magazine = null;
allocator = null;
- chunkReleasePredicate = null;
capacity = 0;
- pooled = false;
}
- Chunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled,
- ChunkReleasePredicate chunkReleasePredicate) {
+ Chunk(AbstractByteBuf delegate, Magazine magazine) {
this.delegate = delegate;
- this.pooled = pooled;
capacity = delegate.capacity();
updater.setInitialValue(this);
attachToMagazine(magazine);
// We need the top-level allocator so ByteBuf.capacity(int) can call reallocate()
allocator = magazine.group.allocator;
-
- this.chunkReleasePredicate = chunkReleasePredicate;
}
Magazine currentMagazine() {
@@ -1241,46 +1184,33 @@ public boolean release(int decrement) {
/**
* Called when a magazine is done using this chunk, probably because it was emptied.
*/
- boolean releaseFromMagazine() {
- return release();
+ void releaseFromMagazine() {
+ // Chunks can be reused before they become empty.
+ // We can therefor put them in the shared queue as soon as the magazine is done with this chunk.
+ Magazine mag = magazine;
+ detachFromMagazine();
+ if (!mag.offerToQueue(this)) {
+ markToDeallocate();
+ }
}
/**
* Called when a ByteBuf is done using its allocation in this chunk.
*/
- boolean releaseSegment(int ignoredSegmentId) {
- return release();
+ void releaseSegment(int ignoredSegmentId, int size) {
+ release();
}
- private void deallocate() {
- Magazine mag = magazine;
- int chunkSize = delegate.capacity();
- if (!pooled || chunkReleasePredicate.shouldReleaseChunk(chunkSize) || mag == null) {
- // Drop the chunk if the parent allocator is closed,
- // or if the chunk deviates too much from the preferred chunk size.
- detachFromMagazine();
- allocator.chunkRegistry.remove(this);
- delegate.release();
- } else {
- updater.resetRefCnt(this);
- delegate.setIndex(0, 0);
- allocatedBytes = 0;
- if (!mag.trySetNextInLine(this)) {
- // As this Chunk does not belong to the mag anymore we need to decrease the used memory .
- detachFromMagazine();
- if (!mag.offerToQueue(this)) {
- // The central queue is full. Ensure we release again as we previously did use resetRefCnt()
- // which did increase the reference count by 1.
- boolean released = updater.release(this);
- allocator.chunkRegistry.remove(this);
- delegate.release();
- assert released;
- }
- }
- }
+ void markToDeallocate() {
+ release();
}
- public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
+ protected void deallocate() {
+ allocator.chunkRegistry.remove(this);
+ delegate.release();
+ }
+
+ public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
int startIndex = allocatedBytes;
allocatedBytes = startIndex + startingCapacity;
Chunk chunk = this;
@@ -1297,101 +1227,408 @@ public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, in
chunk.release();
}
}
+ return true;
}
public int remainingCapacity() {
return capacity - allocatedBytes;
}
+ public boolean hasUnprocessedFreelistEntries() {
+ return false;
+ }
+
+ public void processFreelistEntries() {
+ }
+
public int capacity() {
return capacity;
}
}
+ private static final class IntStack {
+
+ private final int[] stack;
+ private int top;
+
+ IntStack(int[] initialValues) {
+ stack = initialValues;
+ top = initialValues.length - 1;
+ }
+
+ public boolean isEmpty() {
+ return top == -1;
+ }
+
+ public int pop() {
+ final int last = stack[top];
+ top--;
+ return last;
+ }
+
+ public void push(int value) {
+ stack[top + 1] = value;
+ top++;
+ }
+
+ public int size() {
+ return top + 1;
+ }
+ }
+
+ /**
+ * Removes per-allocation retain()/release() atomic ops from the hot path by replacing ref counting
+ * with a segment-count state machine. Atomics are only needed on the cold deallocation path
+ * ({@link #markToDeallocate()}), which is rare for long-lived chunks that cycle segments many times.
+ * The tradeoff is a {@link MpscIntQueue#size()} call (volatile reads, no RMW) per remaining segment
+ * return after mark — acceptable since it avoids atomic RMWs entirely.
+ *
+ * State transitions:
+ *
+ *
{@link #AVAILABLE} (-1): chunk is in use, no deallocation tracking needed
+ *
0..N: local free list size at the time {@link #markToDeallocate()} was called;
+ * used to track when all segments have been returned
+ *
{@link #DEALLOCATED} (Integer.MIN_VALUE): all segments returned, chunk deallocated
+ *
+ *
+ * Ordering: external {@link #releaseSegment} pushes to the MPSC queue (which has an implicit
+ * StoreLoad barrier via its {@code offer()}), then reads {@code state} — this guarantees
+ * visibility of any preceding {@link #markToDeallocate()} write.
+ */
private static final class SizeClassedChunk extends Chunk {
private static final int FREE_LIST_EMPTY = -1;
+ private static final int AVAILABLE = -1;
+ // Integer.MIN_VALUE so that `DEALLOCATED + externalFreeList.size()` can never equal `segments`,
+ // making late-arriving releaseSegment calls on external threads arithmetically harmless.
+ private static final int DEALLOCATED = Integer.MIN_VALUE;
+ private static final AtomicIntegerFieldUpdater STATE =
+ AtomicIntegerFieldUpdater.newUpdater(SizeClassedChunk.class, "state");
+ private volatile int state;
+ private final int segments;
private final int segmentSize;
- private final MpscIntQueue freeList;
-
- SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine, boolean pooled, int segmentSize,
- final int[] segmentOffsets, ChunkReleasePredicate shouldReleaseChunk) {
- super(delegate, magazine, pooled, shouldReleaseChunk);
- this.segmentSize = segmentSize;
- int segmentCount = segmentOffsets.length;
- assert delegate.capacity() / segmentSize == segmentCount;
- assert segmentCount > 0: "Chunk must have a positive number of segments";
- freeList = new MpscAtomicIntegerArrayQueue(segmentCount, FREE_LIST_EMPTY);
- freeList.fill(segmentCount, new IntSupplier() {
- int counter;
- @Override
- public int get() {
- return segmentOffsets[counter++];
- }
- });
+ private final MpscIntQueue externalFreeList;
+ private final IntStack localFreeList;
+ private Thread ownerThread;
+
+ SizeClassedChunk(AbstractByteBuf delegate, Magazine magazine,
+ SizeClassChunkController controller) {
+ super(delegate, magazine);
+ segmentSize = controller.segmentSize;
+ segments = controller.chunkSize / segmentSize;
+ STATE.lazySet(this, AVAILABLE);
+ ownerThread = magazine.group.ownerThread;
+ if (ownerThread == null) {
+ externalFreeList = controller.createFreeList();
+ localFreeList = null;
+ } else {
+ externalFreeList = controller.createEmptyFreeList();
+ localFreeList = controller.createLocalFreeList();
+ }
}
@Override
- public void readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
- int startIndex = freeList.poll();
+ public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
+ assert state == AVAILABLE;
+ final int startIndex = nextAvailableSegmentOffset();
if (startIndex == FREE_LIST_EMPTY) {
- throw new IllegalStateException("Free list is empty");
+ return false;
}
allocatedBytes += segmentSize;
+ try {
+ buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
+ } catch (Throwable t) {
+ allocatedBytes -= segmentSize;
+ releaseSegmentOffsetIntoFreeList(startIndex);
+ PlatformDependent.throwException(t);
+ }
+ return true;
+ }
+
+ private int nextAvailableSegmentOffset() {
+ final int startIndex;
+ IntStack localFreeList = this.localFreeList;
+ if (localFreeList != null) {
+ assert Thread.currentThread() == ownerThread;
+ if (localFreeList.isEmpty()) {
+ startIndex = externalFreeList.poll();
+ } else {
+ startIndex = localFreeList.pop();
+ }
+ } else {
+ startIndex = externalFreeList.poll();
+ }
+ return startIndex;
+ }
+
+ // this can be used by the ConcurrentQueueChunkCache to find the first buffer to use:
+ // it doesn't update the remaining capacity and it's not consider a single segmentSize
+ // case as not suitable to be reused
+ public boolean hasRemainingCapacity() {
+ int remaining = super.remainingCapacity();
+ if (remaining > 0) {
+ return true;
+ }
+ if (localFreeList != null) {
+ return !localFreeList.isEmpty();
+ }
+ return !externalFreeList.isEmpty();
+ }
+
+ @Override
+ public int remainingCapacity() {
+ int remaining = super.remainingCapacity();
+ return remaining > segmentSize ? remaining : updateRemainingCapacity(remaining);
+ }
+
+ private int updateRemainingCapacity(int snapshotted) {
+ int freeSegments = externalFreeList.size();
+ IntStack localFreeList = this.localFreeList;
+ if (localFreeList != null) {
+ freeSegments += localFreeList.size();
+ }
+ int updated = freeSegments * segmentSize;
+ if (updated != snapshotted) {
+ allocatedBytes = capacity() - updated;
+ }
+ return updated;
+ }
+
+ private void releaseSegmentOffsetIntoFreeList(int startIndex) {
+ IntStack localFreeList = this.localFreeList;
+ if (localFreeList != null && Thread.currentThread() == ownerThread) {
+ localFreeList.push(startIndex);
+ } else {
+ boolean segmentReturned = externalFreeList.offer(startIndex);
+ assert segmentReturned : "Unable to return segment " + startIndex + " to free list";
+ }
+ }
+
+ @Override
+ void releaseSegment(int startIndex, int size) {
+ IntStack localFreeList = this.localFreeList;
+ if (localFreeList != null && Thread.currentThread() == ownerThread) {
+ localFreeList.push(startIndex);
+ int state = this.state;
+ if (state != AVAILABLE) {
+ updateStateOnLocalReleaseSegment(state, localFreeList);
+ }
+ } else {
+ boolean segmentReturned = externalFreeList.offer(startIndex);
+ assert segmentReturned;
+ // implicit StoreLoad barrier from MPSC offer()
+ int state = this.state;
+ if (state != AVAILABLE) {
+ deallocateIfNeeded(state);
+ }
+ }
+ }
+
+ private void updateStateOnLocalReleaseSegment(int previousLocalSize, IntStack localFreeList) {
+ int newLocalSize = localFreeList.size();
+ boolean alwaysTrue = STATE.compareAndSet(this, previousLocalSize, newLocalSize);
+ assert alwaysTrue : "this shouldn't happen unless double release in the local free list";
+ deallocateIfNeeded(newLocalSize);
+ }
+
+ private void deallocateIfNeeded(int localSize) {
+ // Check if all segments have been returned.
+ int totalFreeSegments = localSize + externalFreeList.size();
+ if (totalFreeSegments == segments && STATE.compareAndSet(this, localSize, DEALLOCATED)) {
+ deallocate();
+ }
+ }
+
+ @Override
+ void markToDeallocate() {
+ IntStack localFreeList = this.localFreeList;
+ int localSize = localFreeList != null ? localFreeList.size() : 0;
+ STATE.set(this, localSize);
+ deallocateIfNeeded(localSize);
+ }
+ }
+
+ private static final class BuddyChunk extends Chunk implements IntConsumer {
+ private static final int MIN_BUDDY_SIZE = 32768;
+ private static final byte IS_CLAIMED = (byte) (1 << 7);
+ private static final byte HAS_CLAIMED_CHILDREN = 1 << 6;
+ private static final byte SHIFT_MASK = ~(IS_CLAIMED | HAS_CLAIMED_CHILDREN);
+ private static final int PACK_OFFSET_MASK = 0xFFFF;
+ private static final int PACK_SIZE_SHIFT = Integer.SIZE - Integer.numberOfLeadingZeros(PACK_OFFSET_MASK);
+
+ private final MpscIntQueue freeList;
+ // The bits of each buddy: [1: is claimed][1: has claimed children][30: MIN_BUDDY_SIZE shift to get size]
+ private final byte[] buddies;
+ private final int freeListCapacity;
+
+ BuddyChunk(AbstractByteBuf delegate, Magazine magazine) {
+ super(delegate, magazine);
+ freeListCapacity = delegate.capacity() / MIN_BUDDY_SIZE;
+ int maxShift = Integer.numberOfTrailingZeros(freeListCapacity);
+ assert maxShift <= 30; // The top 2 bits are used for marking.
+ // At most half of tree (all leaf nodes) can be freed.
+ freeList = new MpscAtomicIntegerArrayQueue(freeListCapacity, -1);
+ buddies = new byte[freeListCapacity << 1];
+
+ // Generate the buddies entries.
+ int index = 1;
+ int runLength = 1;
+ int currentRun = 0;
+ while (maxShift > 0) {
+ buddies[index++] = (byte) maxShift;
+ if (++currentRun == runLength) {
+ currentRun = 0;
+ runLength <<= 1;
+ maxShift--;
+ }
+ }
+ }
+
+ @Override
+ public boolean readInitInto(AdaptiveByteBuf buf, int size, int startingCapacity, int maxCapacity) {
+ if (!freeList.isEmpty()) {
+ freeList.drain(freeListCapacity, this);
+ }
+ int startIndex = chooseFirstFreeBuddy(1, startingCapacity, 0);
+ if (startIndex == -1) {
+ return false;
+ }
Chunk chunk = this;
chunk.retain();
try {
- buf.init(delegate, chunk, 0, 0, startIndex, size, startingCapacity, maxCapacity);
+ buf.init(delegate, this, 0, 0, startIndex, size, startingCapacity, maxCapacity);
+ allocatedBytes += startingCapacity;
chunk = null;
} finally {
if (chunk != null) {
+ unreserveMatchingBuddy(1, startingCapacity, startIndex, 0);
// If chunk is not null we know that buf.init(...) failed and so we need to manually release
- // the chunk again as we retained it before calling buf.init(...). Beside this we also need to
- // restore the old allocatedBytes value.
- allocatedBytes -= segmentSize;
- chunk.releaseSegment(startIndex);
+ // the chunk again as we retained it before calling buf.init(...).
+ chunk.release();
}
}
+ return true;
+ }
+
+ @Override
+ public void accept(int packed) {
+ // Called by allocating thread when draining freeList.
+ int size = MIN_BUDDY_SIZE << (packed >> PACK_SIZE_SHIFT);
+ int offset = (packed & PACK_OFFSET_MASK) * MIN_BUDDY_SIZE;
+ unreserveMatchingBuddy(1, size, offset, 0);
+ allocatedBytes -= size;
+ }
+
+ @Override
+ void releaseSegment(int startingIndex, int size) {
+ int packedOffset = startingIndex / MIN_BUDDY_SIZE;
+ int packedSize = Integer.numberOfTrailingZeros(size / MIN_BUDDY_SIZE) << PACK_SIZE_SHIFT;
+ int packed = packedOffset | packedSize;
+ freeList.offer(packed);
+ release();
}
@Override
public int remainingCapacity() {
- int remainingCapacity = super.remainingCapacity();
- if (remainingCapacity > segmentSize) {
- return remainingCapacity;
+ if (!freeList.isEmpty()) {
+ freeList.drain(freeListCapacity, this);
}
- int updatedRemainingCapacity = freeList.size() * segmentSize;
- if (updatedRemainingCapacity == remainingCapacity) {
- return remainingCapacity;
- }
- // update allocatedBytes based on what's available in the free list
- allocatedBytes = capacity() - updatedRemainingCapacity;
- return updatedRemainingCapacity;
+ return super.remainingCapacity();
}
@Override
- boolean releaseFromMagazine() {
- // Size-classed chunks can be reused before they become empty.
- // We can therefor put them in the shared queue as soon as the magazine is done with this chunk.
- Magazine mag = magazine;
- detachFromMagazine();
- if (!mag.offerToQueue(this)) {
- return super.releaseFromMagazine();
+ public boolean hasUnprocessedFreelistEntries() {
+ return !freeList.isEmpty();
+ }
+
+ @Override
+ public void processFreelistEntries() {
+ freeList.drain(freeListCapacity, this);
+ }
+
+ /**
+ * Claim a suitable buddy and return its start offset into the delegate chunk, or return -1 if nothing claimed.
+ */
+ private int chooseFirstFreeBuddy(int index, int size, int currOffset) {
+ byte[] buddies = this.buddies;
+ while (index < buddies.length) {
+ byte buddy = buddies[index];
+ int currValue = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
+ if (currValue < size || (buddy & IS_CLAIMED) == IS_CLAIMED) {
+ return -1;
+ }
+ if (currValue == size && (buddy & HAS_CLAIMED_CHILDREN) == 0) {
+ buddies[index] |= IS_CLAIMED;
+ return currOffset;
+ }
+ int found = chooseFirstFreeBuddy(index << 1, size, currOffset);
+ if (found != -1) {
+ buddies[index] |= HAS_CLAIMED_CHILDREN;
+ return found;
+ }
+ index = (index << 1) + 1;
+ currOffset += currValue >> 1; // Bump offset to skip first half of this layer.
}
- return false;
+ return -1;
+ }
+
+ /**
+ * Un-reserve the matching buddy and return whether there are any other child or sibling reservations.
+ */
+ private boolean unreserveMatchingBuddy(int index, int size, int offset, int currOffset) {
+ byte[] buddies = this.buddies;
+ if (buddies.length <= index) {
+ return false;
+ }
+ byte buddy = buddies[index];
+ int currSize = MIN_BUDDY_SIZE << (buddy & SHIFT_MASK);
+
+ if (currSize == size) {
+ // We're at the right size level.
+ if (currOffset == offset) {
+ buddies[index] &= SHIFT_MASK;
+ return false;
+ }
+ throw new IllegalStateException("The intended segment was not found at index " +
+ index + ", for size " + size + " and offset " + offset);
+ }
+
+ // We're at a parent size level. Use the target offset to guide our drill-down path.
+ boolean claims;
+ int siblingIndex;
+ if (offset < currOffset + (currSize >> 1)) {
+ // Must be down the left path.
+ claims = unreserveMatchingBuddy(index << 1, size, offset, currOffset);
+ siblingIndex = (index << 1) + 1;
+ } else {
+ // Must be down the rigth path.
+ claims = unreserveMatchingBuddy((index << 1) + 1, size, offset, currOffset + (currSize >> 1));
+ siblingIndex = index << 1;
+ }
+ if (!claims) {
+ // No other claims down the path we took. Check if the sibling has claims.
+ byte sibling = buddies[siblingIndex];
+ if ((sibling & SHIFT_MASK) == sibling) {
+ // No claims in the sibling. We can clear this level as well.
+ buddies[index] &= SHIFT_MASK;
+ return false;
+ }
+ }
+ return true;
}
@Override
- boolean releaseSegment(int startIndex) {
- boolean released = release();
- boolean segmentReturned = freeList.offer(startIndex);
- assert segmentReturned: "Unable to return segment " + startIndex + " to free list";
- return released;
+ public String toString() {
+ int capacity = delegate.capacity();
+ int remaining = capacity - allocatedBytes;
+ return "BuddyChunk[capacity: " + capacity +
+ ", remaining: " + remaining +
+ ", free list: " + freeList.size() + ']';
}
}
static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
- private final ObjectPool.Handle handle;
+ private final EnhancedHandle handle;
// this both act as adjustment and the start index for a free list segment allocation
private int startIndex;
@@ -1403,7 +1640,7 @@ static final class AdaptiveByteBuf extends AbstractReferenceCountedByteBuf {
private boolean hasArray;
private boolean hasMemoryAddress;
- AdaptiveByteBuf(ObjectPool.Handle recyclerHandle) {
+ AdaptiveByteBuf(EnhancedHandle recyclerHandle) {
super(0);
handle = ObjectUtil.checkNotNull(recyclerHandle, "recyclerHandle");
}
@@ -1442,12 +1679,11 @@ public int maxFastWritableBytes() {
@Override
public ByteBuf capacity(int newCapacity) {
+ checkNewCapacity(newCapacity);
if (length <= newCapacity && newCapacity <= maxFastCapacity) {
- ensureAccessible();
length = newCapacity;
return this;
}
- checkNewCapacity(newCapacity);
if (newCapacity < capacity()) {
length = newCapacity;
trimIndicesToCapacity(newCapacity);
@@ -1460,11 +1696,14 @@ public ByteBuf capacity(int newCapacity) {
int readerIndex = this.readerIndex;
int writerIndex = this.writerIndex;
int baseOldRootIndex = startIndex;
- int oldCapacity = length;
+ int oldLength = length;
+ int oldCapacity = maxFastCapacity;
AbstractByteBuf oldRoot = rootParent();
allocator.reallocate(newCapacity, maxCapacity(), this);
- oldRoot.getBytes(baseOldRootIndex, this, 0, oldCapacity);
- chunk.releaseSegment(baseOldRootIndex);
+ oldRoot.getBytes(baseOldRootIndex, this, 0, oldLength);
+ chunk.releaseSegment(baseOldRootIndex, oldCapacity);
+ assert oldCapacity < maxFastCapacity && newCapacity <= maxFastCapacity:
+ "Capacity increase failed";
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
return this;
@@ -1475,6 +1714,7 @@ public ByteBufAllocator alloc() {
return rootParent().alloc();
}
+ @SuppressWarnings("deprecation")
@Override
public ByteOrder order() {
return rootParent().order();
@@ -1841,17 +2081,12 @@ private int idx(int index) {
@Override
protected void deallocate() {
if (chunk != null) {
- chunk.releaseSegment(startIndex);
+ chunk.releaseSegment(startIndex, maxFastCapacity);
}
tmpNioBuf = null;
chunk = null;
rootParent = null;
- if (handle instanceof EnhancedHandle) {
- EnhancedHandle enhancedHandle = (EnhancedHandle) handle;
- enhancedHandle.unguardedRecycle(this);
- } else {
- handle.recycle(this);
- }
+ handle.unguardedRecycle(this);
}
}
diff --git a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java
index 4ad86136888..4786724dc0b 100644
--- a/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java
+++ b/buffer/src/main/java/io/netty/buffer/CompositeByteBuf.java
@@ -2360,4 +2360,17 @@ private void shiftComps(int i, int count) {
}
componentCount = newSize;
}
+
+ /**
+ * Decreases the reference count by the specified {@code decrement} and deallocates this object if the reference
+ * count reaches at {@code 0}. At this point it will also decrement the reference count of each internal
+ * component by {@code 1}.
+ *
+ * @param decrement the number by which the reference count should be decreased
+ * @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
+ */
+ @Override
+ public boolean release(final int decrement) {
+ return super.release(decrement);
+ }
}
diff --git a/buffer/src/main/java/io/netty/buffer/SizeClasses.java b/buffer/src/main/java/io/netty/buffer/SizeClasses.java
index b42d455d5e6..d1fa1389855 100644
--- a/buffer/src/main/java/io/netty/buffer/SizeClasses.java
+++ b/buffer/src/main/java/io/netty/buffer/SizeClasses.java
@@ -107,7 +107,7 @@ final class SizeClasses implements SizeClassesMetric {
private final int[] pageIdx2sizeTab;
- // lookup table for sizeIdx <= smallMaxSizeIdx
+ // lookup table for sizeIdx < nSizes
private final int[] sizeIdx2sizeTab;
// lookup table used for size <= lookupMaxClass
diff --git a/buffer/src/test/java/io/netty/buffer/AbstractByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/AbstractByteBufAllocatorTest.java
index c32183fa707..a5f3675ba66 100644
--- a/buffer/src/test/java/io/netty/buffer/AbstractByteBufAllocatorTest.java
+++ b/buffer/src/test/java/io/netty/buffer/AbstractByteBufAllocatorTest.java
@@ -17,6 +17,7 @@
import io.netty.util.internal.PlatformDependent;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
@@ -26,6 +27,7 @@
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assumptions.abort;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
@@ -196,6 +198,27 @@ public void shouldReuseChunks() throws Exception {
.isLessThan(8 * 1024 * 1024);
}
+ @Test
+ public void testCapacityNotGreaterThanMaxCapacity() {
+ testCapacityNotGreaterThanMaxCapacity(true);
+ testCapacityNotGreaterThanMaxCapacity(false);
+ }
+
+ private void testCapacityNotGreaterThanMaxCapacity(boolean preferDirect) {
+ final int maxSize = 100000;
+ final ByteBuf buf = newAllocator(preferDirect).newDirectBuffer(maxSize, maxSize);
+ try {
+ assertThrows(IllegalArgumentException.class, new Executable() {
+ @Override
+ public void execute() throws Throwable {
+ buf.capacity(maxSize + 1);
+ }
+ });
+ } finally {
+ buf.release();
+ }
+ }
+
protected long expectedUsedMemory(T allocator, int capacity) {
return capacity;
}
diff --git a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java
index 58a4ae82e75..d8ff780f517 100644
--- a/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java
+++ b/buffer/src/test/java/io/netty/buffer/AbstractByteBufTest.java
@@ -57,6 +57,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -74,7 +75,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -2290,7 +2290,7 @@ public void testToString() {
}
@Test
- @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
+ @Timeout(30)
public void testToStringMultipleThreads() throws Throwable {
buffer.clear();
buffer.writeBytes("Hello, World!".getBytes(CharsetUtil.ISO_8859_1));
@@ -2300,7 +2300,7 @@ public void testToStringMultipleThreads() throws Throwable {
static void testToStringMultipleThreads0(final ByteBuf buffer) throws Throwable {
final String expected = buffer.toString(CharsetUtil.ISO_8859_1);
- final AtomicInteger counter = new AtomicInteger(30000);
+ final CyclicBarrier startBarrier = new CyclicBarrier(10);
final AtomicReference errorRef = new AtomicReference();
List threads = new ArrayList();
for (int i = 0; i < 10; i++) {
@@ -2308,11 +2308,15 @@ static void testToStringMultipleThreads0(final ByteBuf buffer) throws Throwable
@Override
public void run() {
try {
- while (errorRef.get() == null && counter.decrementAndGet() > 0) {
+ startBarrier.await(10, TimeUnit.SECONDS);
+ int counter = 3000;
+ while (errorRef.get() == null && counter-- > 0) {
assertEquals(expected, buffer.toString(CharsetUtil.ISO_8859_1));
}
} catch (Throwable cause) {
- errorRef.compareAndSet(null, cause);
+ if (!errorRef.compareAndSet(null, cause)) {
+ ThrowableUtil.addSuppressed(errorRef.get(), cause);
+ }
}
}
});
@@ -2322,13 +2326,27 @@ public void run() {
thread.start();
}
- for (Thread thread : threads) {
- thread.join();
- }
+ joinAllAndReportErrors(threads, errorRef);
+ }
- Throwable error = errorRef.get();
- if (error != null) {
- throw error;
+ private static void joinAllAndReportErrors(List threads, AtomicReference errorRef)
+ throws Throwable {
+ try {
+ for (Thread thread : threads) {
+ thread.join();
+ }
+
+ Throwable error = errorRef.get();
+ if (error != null) {
+ throw error;
+ }
+ } catch (Throwable e) {
+ for (Thread thread : threads) {
+ if (thread.isAlive()) {
+ ThrowableUtil.interruptAndAttachAsyncStackTrace(thread, e);
+ }
+ }
+ throw e;
}
}
@@ -2345,7 +2363,7 @@ public void testCopyMultipleThreads0() throws Throwable {
static void testCopyMultipleThreads0(final ByteBuf buffer) throws Throwable {
final ByteBuf expected = buffer.copy();
try {
- final AtomicInteger counter = new AtomicInteger(30000);
+ final CyclicBarrier startBarrier = new CyclicBarrier(10);
final AtomicReference errorRef = new AtomicReference();
List threads = new ArrayList();
for (int i = 0; i < 10; i++) {
@@ -2353,7 +2371,9 @@ static void testCopyMultipleThreads0(final ByteBuf buffer) throws Throwable {
@Override
public void run() {
try {
- while (errorRef.get() == null && counter.decrementAndGet() > 0) {
+ startBarrier.await(10, TimeUnit.SECONDS);
+ int counter = 3000;
+ while (errorRef.get() == null && counter-- > 0) {
ByteBuf copy = buffer.copy();
try {
assertEquals(expected, copy);
@@ -2372,14 +2392,7 @@ public void run() {
thread.start();
}
- for (Thread thread : threads) {
- thread.join();
- }
-
- Throwable error = errorRef.get();
- if (error != null) {
- throw error;
- }
+ joinAllAndReportErrors(threads, errorRef);
} finally {
expected.release();
}
@@ -2879,43 +2892,54 @@ public void testSliceBytesInArrayMultipleThreads() throws Exception {
static void testBytesInArrayMultipleThreads(
final ByteBuf buffer, final byte[] expectedBytes, final boolean slice) throws Exception {
- final AtomicReference cause = new AtomicReference();
- final CountDownLatch latch = new CountDownLatch(60000);
- final CyclicBarrier barrier = new CyclicBarrier(11);
- for (int i = 0; i < 10; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (cause.get() == null && latch.getCount() > 0) {
- ByteBuf buf;
- if (slice) {
- buf = buffer.slice();
- } else {
- buf = buffer.duplicate();
- }
-
- byte[] array = new byte[8];
- buf.readBytes(array);
+ final CyclicBarrier startBarrier = new CyclicBarrier(10);
+ final CyclicBarrier endBarrier = new CyclicBarrier(11);
+ Callable callable = new Callable() {
+ @Override
+ public Void call() throws Exception {
+ startBarrier.await();
+ for (int i = 0; i < 6000; i++) {
+ ByteBuf buf;
+ if (slice) {
+ buf = buffer.slice();
+ } else {
+ buf = buffer.duplicate();
+ }
- assertArrayEquals(expectedBytes, array);
+ byte[] array = new byte[8];
+ buf.readBytes(array);
- Arrays.fill(array, (byte) 0);
- buf.getBytes(0, array);
- assertArrayEquals(expectedBytes, array);
+ assertArrayEquals(expectedBytes, array);
- latch.countDown();
- }
- try {
- barrier.await();
- } catch (Exception e) {
- // ignore
- }
+ Arrays.fill(array, (byte) 0);
+ buf.getBytes(0, array);
+ assertArrayEquals(expectedBytes, array);
}
- }).start();
+ endBarrier.await();
+ return null;
+ }
+ };
+ List> tasks = new ArrayList>();
+ for (int i = 0; i < 10; i++) {
+ FutureTask task = new FutureTask(callable);
+ new Thread(task).start();
+ tasks.add(task);
+ }
+ try {
+ endBarrier.await(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ for (FutureTask task : tasks) {
+ try {
+ task.get(100, TimeUnit.MILLISECONDS);
+ } catch (Exception ex) {
+ e.addSuppressed(ex);
+ }
+ }
+ throw e;
+ }
+ for (FutureTask task : tasks) {
+ task.get(1, TimeUnit.SECONDS);
}
- latch.await(10, TimeUnit.SECONDS);
- barrier.await(5, TimeUnit.SECONDS);
- assertNull(cause.get());
}
public static Object[][] setCharSequenceCombinations() {
diff --git a/buffer/src/test/java/io/netty/buffer/AdaptiveByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/AdaptiveByteBufAllocatorTest.java
index 448930a3189..4c212410d88 100644
--- a/buffer/src/test/java/io/netty/buffer/AdaptiveByteBufAllocatorTest.java
+++ b/buffer/src/test/java/io/netty/buffer/AdaptiveByteBufAllocatorTest.java
@@ -16,10 +16,17 @@
package io.netty.buffer;
import io.netty.util.NettyRuntime;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.RepetitionInfo;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+
+import java.lang.reflect.Array;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.SplittableRandom;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
@@ -111,24 +118,29 @@ public void testUsedHeapMemory() {
@Test
void adaptiveChunkMustDeallocateOrReuseWthBufferRelease() throws Exception {
AdaptiveByteBufAllocator allocator = newAllocator(false);
- ByteBuf a = allocator.heapBuffer(28 * 1024);
- assertEquals(262144, allocator.usedHeapMemory());
- ByteBuf b = allocator.heapBuffer(100 * 1024);
- assertEquals(262144, allocator.usedHeapMemory());
- b.release();
- a.release();
- assertEquals(262144, allocator.usedHeapMemory());
- a = allocator.heapBuffer(28 * 1024);
- assertEquals(262144, allocator.usedHeapMemory());
- b = allocator.heapBuffer(100 * 1024);
- assertEquals(262144, allocator.usedHeapMemory());
- a.release();
- ByteBuf c = allocator.heapBuffer(28 * 1024);
- assertEquals(2 * 262144, allocator.usedHeapMemory());
- c.release();
- assertEquals(2 * 262144, allocator.usedHeapMemory());
- b.release();
- assertEquals(2 * 262144, allocator.usedHeapMemory());
+ Deque bufs = new ArrayDeque();
+ assertEquals(0, allocator.usedHeapMemory());
+ assertEquals(0, allocator.usedHeapMemory());
+ bufs.add(allocator.heapBuffer(256));
+ long usedHeapMemory = allocator.usedHeapMemory();
+ int buffersPerChunk = Math.toIntExact(usedHeapMemory / 256);
+ for (int i = 0; i < buffersPerChunk; i++) {
+ bufs.add(allocator.heapBuffer(256));
+ }
+ assertEquals(2 * usedHeapMemory, allocator.usedHeapMemory());
+ bufs.pop().release();
+ assertEquals(2 * usedHeapMemory, allocator.usedHeapMemory());
+ while (!bufs.isEmpty()) {
+ bufs.pop().release();
+ }
+ assertEquals(2 * usedHeapMemory, allocator.usedHeapMemory());
+ for (int i = 0; i < 2 * buffersPerChunk; i++) {
+ bufs.add(allocator.heapBuffer(256));
+ }
+ assertEquals(2 * usedHeapMemory, allocator.usedHeapMemory());
+ while (!bufs.isEmpty()) {
+ bufs.pop().release();
+ }
}
@ParameterizedTest
@@ -198,4 +210,71 @@ public void run() {
fail("Expected no exception, but got", throwable);
}
}
+
+ @RepeatedTest(100)
+ void buddyAllocationConsistency(RepetitionInfo info) {
+ SplittableRandom rng = new SplittableRandom(info.getCurrentRepetition());
+ AdaptiveByteBufAllocator allocator = newAllocator(true);
+ int small = 32768;
+ int large = 2 * small;
+ int xlarge = 2 * large;
+
+ int[] allocationSizes = {
+ small, small, small, small, small, small, small, small,
+ large, large, large, large,
+ xlarge, xlarge,
+ };
+
+ shuffle(rng, allocationSizes);
+
+ ByteBuf[] bufs = new ByteBuf[allocationSizes.length];
+ for (int i = 0; i < bufs.length; i++) {
+ bufs[i] = allocator.buffer(allocationSizes[i], allocationSizes[i]);
+ }
+
+ shuffle(rng, bufs);
+
+ int[] reallocations = new int[bufs.length / 2];
+ for (int i = 0; i < reallocations.length; i++) {
+ reallocations[i] = bufs[i].capacity();
+ bufs[i].release();
+ bufs[i] = null;
+ }
+ for (int i = 0; i < reallocations.length; i++) {
+ assertNull(bufs[i]);
+ bufs[i] = allocator.buffer(reallocations[i], reallocations[i]);
+ }
+
+ for (int i = 0; i < bufs.length; i++) {
+ while (bufs[i].isWritable()) {
+ bufs[i].writeByte(i + 1);
+ }
+ }
+ try {
+ for (int i = 0; i < bufs.length; i++) {
+ while (bufs[i].isReadable()) {
+ int b = Byte.toUnsignedInt(bufs[i].readByte());
+ if (b != i + 1) {
+ fail("Expected byte " + (i + 1) +
+ " at index " + (bufs[i].readerIndex() - 1) +
+ " but got " + b);
+ }
+ }
+ }
+ } finally {
+ for (ByteBuf buf : bufs) {
+ buf.release();
+ }
+ }
+ }
+
+ private static void shuffle(SplittableRandom rng, Object array) {
+ int len = Array.getLength(array);
+ for (int i = 0; i < len; i++) {
+ int n = rng.nextInt(i, len);
+ Object value = Array.get(array, i);
+ Array.set(array, i, Array.get(array, n));
+ Array.set(array, n, value);
+ }
+ }
}
diff --git a/buffer/src/test/java/io/netty/buffer/AdaptivePoolingAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/AdaptivePoolingAllocatorTest.java
index ab47050c641..4a4c28deebf 100644
--- a/buffer/src/test/java/io/netty/buffer/AdaptivePoolingAllocatorTest.java
+++ b/buffer/src/test/java/io/netty/buffer/AdaptivePoolingAllocatorTest.java
@@ -15,52 +15,11 @@
*/
package io.netty.buffer;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.function.Supplier;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
-class AdaptivePoolingAllocatorTest implements Supplier {
- private int i;
-
- @BeforeEach
- void setUp() {
- i = 0;
- }
-
- @Override
- public String get() {
- return "i = " + i;
- }
-
- @Test
- void sizeBucketComputations() throws Exception {
- assertSizeBucket(0, 16 * 1024);
- assertSizeBucket(1, 24 * 1024);
- assertSizeBucket(2, 32 * 1024);
- assertSizeBucket(3, 48 * 1024);
- assertSizeBucket(4, 64 * 1024);
- assertSizeBucket(5, 96 * 1024);
- assertSizeBucket(6, 128 * 1024);
- assertSizeBucket(7, 192 * 1024);
- assertSizeBucket(8, 256 * 1024);
- assertSizeBucket(9, 384 * 1024);
- assertSizeBucket(10, 512 * 1024);
- assertSizeBucket(11, 768 * 1024);
- assertSizeBucket(12, 1024 * 1024);
- assertSizeBucket(13, 1792 * 1024);
- assertSizeBucket(14, 2048 * 1024);
- assertSizeBucket(15, 3072 * 1024);
- // The sizeBucket function will be used for sizes up to 8 MiB
- assertSizeBucket(15, 4 * 1024 * 1024);
- assertSizeBucket(15, 5 * 1024 * 1024);
- assertSizeBucket(15, 6 * 1024 * 1024);
- assertSizeBucket(15, 7 * 1024 * 1024);
- assertSizeBucket(15, 8 * 1024 * 1024);
- }
-
+class AdaptivePoolingAllocatorTest {
@Test
void sizeClassComputations() throws Exception {
final int[] sizeClasses = AdaptivePoolingAllocator.getSizeClasses();
@@ -75,20 +34,7 @@ void sizeClassComputations() throws Exception {
private static void assertSizeClassOf(int expectedSizeClass, int previousSizeIncluded, int maxSizeIncluded) {
for (int size = previousSizeIncluded; size <= maxSizeIncluded; size++) {
- final int sizeToTest = size;
- Supplier messageSupplier = new Supplier() {
- @Override
- public String get() {
- return "size = " + sizeToTest;
- }
- };
- assertEquals(expectedSizeClass, AdaptivePoolingAllocator.sizeClassIndexOf(size), messageSupplier);
- }
- }
-
- private void assertSizeBucket(int expectedSizeBucket, int maxSizeIncluded) {
- for (; i <= maxSizeIncluded; i++) {
- assertEquals(expectedSizeBucket, AdaptivePoolingAllocator.sizeToBucket(i), this);
+ assertEquals(expectedSizeClass, AdaptivePoolingAllocator.sizeClassIndexOf(size), "size = " + size);
}
}
}
diff --git a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java
index ecc01065210..64638f8e1cb 100644
--- a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java
+++ b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java
@@ -20,6 +20,7 @@
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
+import io.netty.util.internal.ThrowableUtil;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -30,6 +31,7 @@
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -349,13 +351,13 @@ public void testAllocateSmallOffset() {
}
@Test
- @Timeout(value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
+ @Timeout(value = 20, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
public void testThreadCacheDestroyedByThreadCleaner() throws InterruptedException {
testThreadCacheDestroyed(false);
}
@Test
- @Timeout(value = 10, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
+ @Timeout(value = 20, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
public void testThreadCacheDestroyedAfterExitRun() throws InterruptedException {
testThreadCacheDestroyed(true);
}
@@ -408,7 +410,6 @@ public void run() {
while (allocator.metric().numThreadLocalCaches() > 0) {
// Signal we want to have a GC run to ensure we can process our ThreadCleanerReference
System.gc();
- System.runFinalization();
LockSupport.parkNanos(MILLISECONDS.toNanos(100));
}
@@ -416,8 +417,8 @@ public void run() {
}
@Test
- @Timeout(value = 3000, unit = MILLISECONDS)
- public void testNumThreadCachesWithNoDirectArenas() throws InterruptedException {
+ @Timeout(10)
+ public void testNumThreadCachesWithNoDirectArenas() throws Exception {
int numHeapArenas = 1;
final PooledByteBufAllocator allocator =
new PooledByteBufAllocator(numHeapArenas, 0, 8192, 1);
@@ -436,11 +437,11 @@ public void testNumThreadCachesWithNoDirectArenas() throws InterruptedException
}
@Test
- @Timeout(value = 3000, unit = MILLISECONDS)
- public void testNumThreadCachesAccountForDirectAndHeapArenas() throws InterruptedException {
- int numHeapArenas = 1;
+ @Timeout(10)
+ public void testNumThreadCachesAccountForDirectAndHeapArenas() throws Exception {
+ int numArenas = 1;
final PooledByteBufAllocator allocator =
- new PooledByteBufAllocator(numHeapArenas, 0, 8192, 1);
+ new PooledByteBufAllocator(numArenas, numArenas, 8192, 1);
ThreadCache tcache0 = createNewThreadCache(allocator, false);
assertEquals(1, allocator.metric().numThreadLocalCaches());
@@ -456,8 +457,8 @@ public void testNumThreadCachesAccountForDirectAndHeapArenas() throws Interrupte
}
@Test
- @Timeout(value = 3000, unit = MILLISECONDS)
- public void testThreadCacheToArenaMappings() throws InterruptedException {
+ @Timeout(10)
+ public void testThreadCacheToArenaMappings() throws Exception {
int numArenas = 2;
final PooledByteBufAllocator allocator =
new PooledByteBufAllocator(numArenas, numArenas, 8192, 1);
@@ -500,8 +501,7 @@ private static ThreadCache createNewThreadCache(final PooledByteBufAllocator all
throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch cacheLatch = new CountDownLatch(1);
- final Thread t = new FastThreadLocalThread(new Runnable() {
-
+ final FutureTask task = new FutureTask(new Runnable() {
@Override
public void run() {
final ByteBuf buf;
@@ -527,23 +527,35 @@ public void run() {
FastThreadLocal.removeAll();
}
- });
+ }, null);
+ final Thread t = new FastThreadLocalThread(task);
t.start();
// Wait until we allocated a buffer and so be sure the thread was started and the cache exists.
- cacheLatch.await();
+ try {
+ cacheLatch.await();
+ } catch (InterruptedException e) {
+ ThrowableUtil.interruptAndAttachAsyncStackTrace(t, e);
+ throw e;
+ }
return new ThreadCache() {
@Override
- public void destroy() throws InterruptedException {
+ public void destroy() throws Exception {
latch.countDown();
- t.join();
+ try {
+ task.get();
+ t.join();
+ } catch (InterruptedException e) {
+ ThrowableUtil.interruptAndAttachAsyncStackTrace(t, e);
+ throw e;
+ }
}
};
}
private interface ThreadCache {
- void destroy() throws InterruptedException;
+ void destroy() throws Exception;
}
@Test
diff --git a/buffer/src/test/java/io/netty/buffer/UnpooledTest.java b/buffer/src/test/java/io/netty/buffer/UnpooledTest.java
index efc1dafd1ed..fd705f1bed0 100644
--- a/buffer/src/test/java/io/netty/buffer/UnpooledTest.java
+++ b/buffer/src/test/java/io/netty/buffer/UnpooledTest.java
@@ -476,7 +476,7 @@ public void testUnmodifiableBuffer() throws Exception {
} catch (UnsupportedOperationException e) {
// Expected
}
- Mockito.verifyZeroInteractions(inputStream);
+ Mockito.verifyNoInteractions(inputStream);
ScatteringByteChannel scatteringByteChannel = Mockito.mock(ScatteringByteChannel.class);
try {
@@ -485,7 +485,7 @@ public void testUnmodifiableBuffer() throws Exception {
} catch (UnsupportedOperationException e) {
// Expected
}
- Mockito.verifyZeroInteractions(scatteringByteChannel);
+ Mockito.verifyNoInteractions(scatteringByteChannel);
buf.release();
}
diff --git a/codec-dns/pom.xml b/codec-dns/pom.xml
index 39846136332..00ed2a59064 100644
--- a/codec-dns/pom.xml
+++ b/codec-dns/pom.xml
@@ -20,7 +20,7 @@
io.nettynetty-parent
- 4.1.128.1.dse
+ 4.1.132.1.dsenetty-codec-dns
diff --git a/codec-dns/src/main/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoder.java b/codec-dns/src/main/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoder.java
index 2aea39159fe..80cf862ab6a 100644
--- a/codec-dns/src/main/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoder.java
+++ b/codec-dns/src/main/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoder.java
@@ -16,6 +16,8 @@
package io.netty.handler.codec.dns;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.CorruptedFrameException;
/**
* The default {@link DnsRecordDecoder} implementation.
@@ -99,6 +101,30 @@ protected DnsRecord decodeRecord(
DnsCodecUtil.decompressDomainName(
in.duplicate().setIndex(offset, offset + length)));
}
+ if (type == DnsRecordType.MX) {
+ // MX RDATA: 16-bit preference + exchange (domain name, possibly compressed)
+ if (length < 3) {
+ throw new CorruptedFrameException("MX record RDATA is too short: " + length);
+ }
+ final int pref = in.getUnsignedShort(offset);
+ ByteBuf exchange = null;
+ try {
+ exchange = DnsCodecUtil.decompressDomainName(
+ in.duplicate().setIndex(offset + 2, offset + length));
+
+ // Build decompressed RDATA = [preference][expanded exchange name]
+ final ByteBuf out = in.alloc().buffer(2 + exchange.readableBytes());
+ out.writeShort(pref);
+ out.writeBytes(exchange);
+
+ return new DefaultDnsRawRecord(name, type, dnsClass, timeToLive, out);
+ } finally {
+ if (exchange != null) {
+ exchange.release();
+ }
+ }
+ }
+
return new DefaultDnsRawRecord(
name, type, dnsClass, timeToLive, in.retainedDuplicate().setIndex(offset, offset + length));
}
diff --git a/codec-dns/src/test/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoderTest.java b/codec-dns/src/test/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoderTest.java
index a8379f6d8d7..d66b994b604 100644
--- a/codec-dns/src/test/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoderTest.java
+++ b/codec-dns/src/test/java/io/netty/handler/codec/dns/DefaultDnsRecordDecoderTest.java
@@ -166,6 +166,51 @@ public void testDecodeCompressionRDataPointer() throws Exception {
}
}
+ @Test
+ public void testDecodeCompressionRDataPointerMX() throws Exception {
+ DefaultDnsRecordDecoder decoder = new DefaultDnsRecordDecoder();
+ byte[] compressionPointer = {
+ 5, 'n', 'e', 't', 't', 'y', 2, 'i', 'o', 0,
+ 0, 10, // preference = 10
+ (byte) 0xC0, 0 // record is a pointer to netty.io
+ };
+
+ byte[] expected = {
+ 0, 10, // pref = 10
+ 5, 'n', 'e', 't', 't', 'y', 2, 'i', 'o', 0
+ };
+ ByteBuf buffer = Unpooled.wrappedBuffer(compressionPointer);
+ DefaultDnsRawRecord mxRecord = null;
+ ByteBuf expectedBuf = null;
+ try {
+ mxRecord = (DefaultDnsRawRecord) decoder.decodeRecord(
+ "mail.example.com",
+ DnsRecordType.MX,
+ DnsRecord.CLASS_IN,
+ 60,
+ buffer,
+ 10,
+ 4);
+
+ expectedBuf = Unpooled.wrappedBuffer(expected);
+
+ assertEquals(0, ByteBufUtil.compare(expectedBuf, mxRecord.content()),
+ "The rdata of MX-type record should be decompressed in advance");
+ assertEquals(10, mxRecord.content().getUnsignedShort(0));
+
+ ByteBuf exchangerName = mxRecord.content().duplicate().setIndex(2, mxRecord.content().writerIndex());
+ assertEquals("netty.io.", DnsCodecUtil.decodeDomainName(exchangerName));
+ } finally {
+ buffer.release();
+ if (expectedBuf != null) {
+ expectedBuf.release();
+ }
+ if (mxRecord != null) {
+ mxRecord.release();
+ }
+ }
+ }
+
@Test
public void testDecodeMessageCompression() throws Exception {
// See https://www.ietf.org/rfc/rfc1035 [4.1.4. Message compression]
diff --git a/codec-haproxy/pom.xml b/codec-haproxy/pom.xml
index b7a0e7202a1..139ef90e84d 100644
--- a/codec-haproxy/pom.xml
+++ b/codec-haproxy/pom.xml
@@ -20,7 +20,7 @@
io.nettynetty-parent
- 4.1.128.1.dse
+ 4.1.132.1.dsenetty-codec-haproxy
diff --git a/codec-http/pom.xml b/codec-http/pom.xml
index 9d37046e74e..0c6b82ac520 100644
--- a/codec-http/pom.xml
+++ b/codec-http/pom.xml
@@ -20,7 +20,7 @@
io.nettynetty-parent
- 4.1.128.1.dse
+ 4.1.132.1.dsenetty-codec-http
diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultFullHttpRequest.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultFullHttpRequest.java
index 3cd8d0c6985..a4762516846 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultFullHttpRequest.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultFullHttpRequest.java
@@ -92,7 +92,15 @@ public DefaultFullHttpRequest(HttpVersion httpVersion, HttpMethod method, String
*/
public DefaultFullHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri,
ByteBuf content, HttpHeaders headers, HttpHeaders trailingHeader) {
- super(httpVersion, method, uri, headers);
+ this(httpVersion, method, uri, content, headers, trailingHeader, true);
+ }
+
+ /**
+ * Create a full HTTP response with the given HTTP version, method, URI, contents, and header and trailer objects.
+ */
+ public DefaultFullHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri,
+ ByteBuf content, HttpHeaders headers, HttpHeaders trailingHeader, boolean validateRequestLine) {
+ super(httpVersion, method, uri, headers, validateRequestLine);
this.content = checkNotNull(content, "content");
this.trailingHeader = checkNotNull(trailingHeader, "trailingHeader");
}
diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java
index 271b6069a02..437598503e6 100644
--- a/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/DefaultHttpRequest.java
@@ -75,9 +75,25 @@ public DefaultHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri
* @param headers the Headers for this Request
*/
public DefaultHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri, HttpHeaders headers) {
+ this(httpVersion, method, uri, headers, true);
+ }
+
+ /**
+ * Creates a new instance.
+ *
+ * @param httpVersion the HTTP version of the request
+ * @param method the HTTP method of the request
+ * @param uri the URI or path of the request
+ * @param headers the Headers for this Request
+ */
+ public DefaultHttpRequest(HttpVersion httpVersion, HttpMethod method, String uri, HttpHeaders headers,
+ boolean validateRequestLine) {
super(httpVersion, headers);
this.method = checkNotNull(method, "method");
this.uri = checkNotNull(uri, "uri");
+ if (validateRequestLine) {
+ HttpUtil.validateRequestLineTokens(method, uri);
+ }
}
@Override
diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkLineValidatingByteProcessor.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkLineValidatingByteProcessor.java
new file mode 100644
index 00000000000..6839ce8d8db
--- /dev/null
+++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkLineValidatingByteProcessor.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2026 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.handler.codec.http;
+
+import io.netty.util.ByteProcessor;
+
+import java.util.BitSet;
+
+/**
+ * Validates the chunk start line. That is, the chunk size and chunk extensions, until the CR LF pair.
+ * See RFC 9112 section 7.1.
+ *
+ *