Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Style/EmptyElse:
Enabled: false

RSpec/DescribedClass:
EnforcedStyle: explicit
Enabled: false # does not work for nested classes

Style/DoubleNegation:
Enabled: false
Expand Down
16 changes: 16 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,22 @@ Thread.new { loop { queue << rand(100); sleep 2 } } # job producer
Parallel.map(Proc.new { queue.pop }, in_processes: 3) { |f| f ? puts("#{f} received") : sleep(1) }
```

### Processes vs Wire serializer: security risk for hardened environments

Worker processes talk to the parent over an anonymous pipe using `Marshal` by default.

If you've hardened your host against `ptrace`/`/proc/<pid>/mem` access (e.g. `ptrace_scope >= 2`) and
want to also close the `/proc/<pid>/fd/<n>` pipe-reopen vector, use the HMAC serializer.

It length-prefixes and HMAC-SHA256 signs each message with a per-worker secret generated before `fork`,
so a same-UID attacker that reopens the pipe can't inject a forged `Marshal` payload into the parent (which would be RCE).

Raises `SecurityError` on mismatch (not a `StandardError`).

```ruby
Parallel.map(items, in_processes: 2, serializer: Parallel::Serializer::Hmac.new) { ... }
```

Tips
====

Expand Down
16 changes: 10 additions & 6 deletions lib/parallel.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true
require 'rbconfig'
require 'parallel/version'
require 'parallel/serializer'

module Parallel
Stop = Object.new.freeze
Expand Down Expand Up @@ -63,10 +64,11 @@ class Worker
attr_reader :pid, :read, :write
attr_accessor :thread

def initialize(read, write, pid)
def initialize(read, write, pid, serializer)
@read = read
@write = write
@pid = pid
@serializer = serializer
end

def stop
Expand All @@ -83,13 +85,13 @@ def close_pipes

def work(data)
begin
Marshal.dump(data, write)
@serializer.dump(data, write)
rescue Errno::EPIPE
raise DeadWorker
end

result = begin
Marshal.load(read)
@serializer.load(read)
rescue EOFError
raise DeadWorker
end
Expand Down Expand Up @@ -622,6 +624,7 @@ def create_workers(job_factory, options, &block)
def worker(job_factory, options, &block)
child_read, parent_write = IO.pipe
parent_read, child_write = IO.pipe
options[:serializer] ||= Serializer::Marshal

pid = Process.fork do
self.worker_number = options[:worker_number]
Expand All @@ -642,12 +645,13 @@ def worker(job_factory, options, &block)
child_read.close
child_write.close

Worker.new(parent_read, parent_write, pid)
Worker.new(parent_read, parent_write, pid, options[:serializer])
end

def process_incoming_jobs(read, write, job_factory, options, &block)
serializer = options.fetch(:serializer)
until read.eof?
data = Marshal.load(read)
data = serializer.load(read)
item, index = job_factory.unpack(data)

result =
Expand All @@ -661,7 +665,7 @@ def process_incoming_jobs(read, write, job_factory, options, &block)
end

begin
Marshal.dump(result, write)
serializer.dump(result, write)
rescue Errno::EPIPE
return # parent thread already dead
end
Expand Down
52 changes: 52 additions & 0 deletions lib/parallel/serializer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true
require 'openssl'
require 'securerandom'

module Parallel
# Pluggable wire serializers. Each must respond to `dump(data, io)` /
# `load(io)` (used directly by Worker) and `dump(data)` / `load(string)`
# (used by wrappers like Hmac).
module Serializer
# Raw Marshal. Fast but trusts anything written to the pipe — a same-UID
# attacker that reopens /proc/<pid>/fd/<n> can inject Marshal gadgets (RCE).
Marshal = ::Marshal

# Wraps any inner serializer with a length-prefixed HMAC-SHA256 frame keyed
# on a per-worker secret generated before fork. Forged frames from a
# pipe-injector fail verification.
class Hmac
LENGTH_FORMAT = 'N' # 32-bit big-endian unsigned int
LENGTH_BYTES = 4
MAC_BYTES = 32 # SHA256

def initialize(inner: Marshal, secret: SecureRandom.bytes(32))
@inner = inner
@secret = secret
end

def dump(data, io)
payload = @inner.dump(data)
mac = OpenSSL::HMAC.digest('SHA256', @secret, payload)
io.write([payload.bytesize].pack(LENGTH_FORMAT), mac, payload)
end

def load(io)
# nil at frame boundary = clean EOF (worker died / pipe closed between messages)
header = io.read(LENGTH_BYTES) || raise(EOFError) # eof stops worker
raise SecurityError, "truncated frame header" if header.bytesize != LENGTH_BYTES

length = header.unpack1(LENGTH_FORMAT)
mac = io.read(MAC_BYTES)
raise SecurityError, "truncated frame mac" if mac.nil? || mac.bytesize != MAC_BYTES

payload = io.read(length)
raise SecurityError, "truncated frame payload" if payload.nil? || payload.bytesize != length

expected = OpenSSL::HMAC.digest('SHA256', @secret, payload)
raise SecurityError, "HMAC mismatch on worker pipe" unless OpenSSL.fixed_length_secure_compare(mac, expected)

@inner.load(payload)
end
end
end
end
113 changes: 113 additions & 0 deletions spec/parallel/serializer_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# frozen_string_literal: true
require 'spec_helper'
require 'json'

describe Parallel::Serializer do
describe Parallel::Serializer::Hmac do
let(:serializer) { described_class.new }

def with_pipe
read, write = IO.pipe
yield read, write
ensure
read.close unless read.closed?
write.close unless write.closed?
end

def pipe_round_trip(serializer, data)
with_pipe do |read, write|
serializer.dump(data, write)
write.close
return serializer.load(read)
end
end

it "round-trips a simple value" do
pipe_round_trip(serializer, "hello").should == "hello"
end

it "round-trips a complex value" do
data = { a: [1, 2, 3], b: { c: "x" }, d: Set.new([1, 2]) }
pipe_round_trip(serializer, data).should == data
end

it "round-trips multiple messages" do
with_pipe do |read, write|
[1, "two", [3, 4], { five: 5 }].each { |m| serializer.dump(m, write) }
write.close
[1, "two", [3, 4], { five: 5 }].each do |expected|
serializer.load(read).should == expected
end
read.eof?.should == true
end
end

it "rejects payloads signed with a different secret" do
with_pipe do |read, write|
described_class.new.dump("HACKERMAN", write)
write.close
-> { serializer.load(read) }.should raise_error(SecurityError, /HMAC mismatch/)
end
end

it "rejects payloads with a tampered body" do
frame = with_pipe do |read, write|
serializer.dump("untampered", write)
write.close
read.read
end
tampered = frame.dup
tampered[-1] = (tampered[-1].ord ^ 0x01).chr

with_pipe do |read, write|
write.write(tampered)
write.close
-> { serializer.load(read) }.should raise_error(SecurityError, /HMAC mismatch/)
end
end

it "raises SecurityError on a truncated frame" do
frame = with_pipe do |read, write|
serializer.dump("whatever", write)
write.close
read.read
end

with_pipe do |read, write|
write.write(frame[0, frame.bytesize - 5]) # drop last 5 bytes of payload
write.close
-> { serializer.load(read) }.should raise_error(SecurityError, /truncated frame/)
end
end

it "raises EOFError on a cleanly closed empty pipe (worker death, not tampering)" do
with_pipe do |read, write|
write.close
-> { serializer.load(read) }.should raise_error(EOFError)
end
end

it "works end-to-end" do
items = (1..20).to_a
result = Parallel.map(items, in_processes: 3, serializer: serializer) { |i| i * 10 }
result.should == items.map { |i| i * 10 }
end

it "propagates worker exceptions across the HMAC frame" do
lambda {
Parallel.map([1, 2, 3], in_processes: 2, serializer: serializer) { |i| raise "boom-#{i}" } # rubocop:disable Lint/UnreachableLoop
}.should raise_error(RuntimeError, /boom-\d/)
end

it "round-trips large payloads (bigger than a pipe buffer)" do
size = 200_000 # > typical 64KiB pipe buffer
big = "x" * size
result = Parallel.map([1, 2, 3], in_processes: 2, serializer: serializer) { |i| [i, big] }
result.map { |i, s| [i, s == big] }.should == [[1, true], [2, true], [3, true]]
end

it "supports a custom inner serializer" do
pipe_round_trip(described_class.new(inner: JSON), [1, :a, 3]).should == [1, "a", 3]
end
end
end
Loading