diff --git a/.rubocop.yml b/.rubocop.yml index 7d8926b..9c2a6ad 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -63,7 +63,7 @@ Style/EmptyElse: Enabled: false RSpec/DescribedClass: - EnforcedStyle: explicit + Enabled: false # does not work for nested classes Style/DoubleNegation: Enabled: false diff --git a/Readme.md b/Readme.md index 7e14d38..59c8138 100644 --- a/Readme.md +++ b/Readme.md @@ -189,6 +189,22 @@ Thread.new { loop { queue << rand(100); sleep 2 } } # job producer Parallel.map(Proc.new { queue.pop }, in_processes: 3) { |f| f ? puts("#{f} received") : sleep(1) } ``` +### Processes vs Wire serializer: security risk for hardened environments + +Worker processes talk to the parent over an anonymous pipe using `Marshal` by default. + +If you've hardened your host against `ptrace`/`/proc//mem` access (e.g. `ptrace_scope >= 2`) and +want to also close the `/proc//fd/` pipe-reopen vector, use the HMAC serializer. + +It length-prefixes and HMAC-SHA256 signs each message with a per-worker secret generated before `fork`, +so a same-UID attacker that reopens the pipe can't inject a forged `Marshal` payload into the parent (which would be RCE). + +Raises `SecurityError` on mismatch (not a `StandardError`). + +```ruby +Parallel.map(items, in_processes: 2, serializer: Parallel::Serializer::Hmac.new) { ... } +``` + Tips ==== diff --git a/lib/parallel.rb b/lib/parallel.rb index 349e201..c83615d 100644 --- a/lib/parallel.rb +++ b/lib/parallel.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require 'rbconfig' require 'parallel/version' +require 'parallel/serializer' module Parallel Stop = Object.new.freeze @@ -63,10 +64,11 @@ class Worker attr_reader :pid, :read, :write attr_accessor :thread - def initialize(read, write, pid) + def initialize(read, write, pid, serializer) @read = read @write = write @pid = pid + @serializer = serializer end def stop @@ -83,13 +85,13 @@ def close_pipes def work(data) begin - Marshal.dump(data, write) + @serializer.dump(data, write) rescue Errno::EPIPE raise DeadWorker end result = begin - Marshal.load(read) + @serializer.load(read) rescue EOFError raise DeadWorker end @@ -622,6 +624,7 @@ def create_workers(job_factory, options, &block) def worker(job_factory, options, &block) child_read, parent_write = IO.pipe parent_read, child_write = IO.pipe + options[:serializer] ||= Serializer::Marshal pid = Process.fork do self.worker_number = options[:worker_number] @@ -642,12 +645,13 @@ def worker(job_factory, options, &block) child_read.close child_write.close - Worker.new(parent_read, parent_write, pid) + Worker.new(parent_read, parent_write, pid, options[:serializer]) end def process_incoming_jobs(read, write, job_factory, options, &block) + serializer = options.fetch(:serializer) until read.eof? - data = Marshal.load(read) + data = serializer.load(read) item, index = job_factory.unpack(data) result = @@ -661,7 +665,7 @@ def process_incoming_jobs(read, write, job_factory, options, &block) end begin - Marshal.dump(result, write) + serializer.dump(result, write) rescue Errno::EPIPE return # parent thread already dead end diff --git a/lib/parallel/serializer.rb b/lib/parallel/serializer.rb new file mode 100644 index 0000000..91abc58 --- /dev/null +++ b/lib/parallel/serializer.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true +require 'openssl' +require 'securerandom' + +module Parallel + # Pluggable wire serializers. Each must respond to `dump(data, io)` / + # `load(io)` (used directly by Worker) and `dump(data)` / `load(string)` + # (used by wrappers like Hmac). + module Serializer + # Raw Marshal. Fast but trusts anything written to the pipe — a same-UID + # attacker that reopens /proc//fd/ can inject Marshal gadgets (RCE). + Marshal = ::Marshal + + # Wraps any inner serializer with a length-prefixed HMAC-SHA256 frame keyed + # on a per-worker secret generated before fork. Forged frames from a + # pipe-injector fail verification. + class Hmac + LENGTH_FORMAT = 'N' # 32-bit big-endian unsigned int + LENGTH_BYTES = 4 + MAC_BYTES = 32 # SHA256 + + def initialize(inner: Marshal, secret: SecureRandom.bytes(32)) + @inner = inner + @secret = secret + end + + def dump(data, io) + payload = @inner.dump(data) + mac = OpenSSL::HMAC.digest('SHA256', @secret, payload) + io.write([payload.bytesize].pack(LENGTH_FORMAT), mac, payload) + end + + def load(io) + # nil at frame boundary = clean EOF (worker died / pipe closed between messages) + header = io.read(LENGTH_BYTES) || raise(EOFError) # eof stops worker + raise SecurityError, "truncated frame header" if header.bytesize != LENGTH_BYTES + + length = header.unpack1(LENGTH_FORMAT) + mac = io.read(MAC_BYTES) + raise SecurityError, "truncated frame mac" if mac.nil? || mac.bytesize != MAC_BYTES + + payload = io.read(length) + raise SecurityError, "truncated frame payload" if payload.nil? || payload.bytesize != length + + expected = OpenSSL::HMAC.digest('SHA256', @secret, payload) + raise SecurityError, "HMAC mismatch on worker pipe" unless OpenSSL.fixed_length_secure_compare(mac, expected) + + @inner.load(payload) + end + end + end +end diff --git a/spec/parallel/serializer_spec.rb b/spec/parallel/serializer_spec.rb new file mode 100644 index 0000000..3956ced --- /dev/null +++ b/spec/parallel/serializer_spec.rb @@ -0,0 +1,113 @@ +# frozen_string_literal: true +require 'spec_helper' +require 'json' + +describe Parallel::Serializer do + describe Parallel::Serializer::Hmac do + let(:serializer) { described_class.new } + + def with_pipe + read, write = IO.pipe + yield read, write + ensure + read.close unless read.closed? + write.close unless write.closed? + end + + def pipe_round_trip(serializer, data) + with_pipe do |read, write| + serializer.dump(data, write) + write.close + return serializer.load(read) + end + end + + it "round-trips a simple value" do + pipe_round_trip(serializer, "hello").should == "hello" + end + + it "round-trips a complex value" do + data = { a: [1, 2, 3], b: { c: "x" }, d: Set.new([1, 2]) } + pipe_round_trip(serializer, data).should == data + end + + it "round-trips multiple messages" do + with_pipe do |read, write| + [1, "two", [3, 4], { five: 5 }].each { |m| serializer.dump(m, write) } + write.close + [1, "two", [3, 4], { five: 5 }].each do |expected| + serializer.load(read).should == expected + end + read.eof?.should == true + end + end + + it "rejects payloads signed with a different secret" do + with_pipe do |read, write| + described_class.new.dump("HACKERMAN", write) + write.close + -> { serializer.load(read) }.should raise_error(SecurityError, /HMAC mismatch/) + end + end + + it "rejects payloads with a tampered body" do + frame = with_pipe do |read, write| + serializer.dump("untampered", write) + write.close + read.read + end + tampered = frame.dup + tampered[-1] = (tampered[-1].ord ^ 0x01).chr + + with_pipe do |read, write| + write.write(tampered) + write.close + -> { serializer.load(read) }.should raise_error(SecurityError, /HMAC mismatch/) + end + end + + it "raises SecurityError on a truncated frame" do + frame = with_pipe do |read, write| + serializer.dump("whatever", write) + write.close + read.read + end + + with_pipe do |read, write| + write.write(frame[0, frame.bytesize - 5]) # drop last 5 bytes of payload + write.close + -> { serializer.load(read) }.should raise_error(SecurityError, /truncated frame/) + end + end + + it "raises EOFError on a cleanly closed empty pipe (worker death, not tampering)" do + with_pipe do |read, write| + write.close + -> { serializer.load(read) }.should raise_error(EOFError) + end + end + + it "works end-to-end" do + items = (1..20).to_a + result = Parallel.map(items, in_processes: 3, serializer: serializer) { |i| i * 10 } + result.should == items.map { |i| i * 10 } + end + + it "propagates worker exceptions across the HMAC frame" do + lambda { + Parallel.map([1, 2, 3], in_processes: 2, serializer: serializer) { |i| raise "boom-#{i}" } # rubocop:disable Lint/UnreachableLoop + }.should raise_error(RuntimeError, /boom-\d/) + end + + it "round-trips large payloads (bigger than a pipe buffer)" do + size = 200_000 # > typical 64KiB pipe buffer + big = "x" * size + result = Parallel.map([1, 2, 3], in_processes: 2, serializer: serializer) { |i| [i, big] } + result.map { |i, s| [i, s == big] }.should == [[1, true], [2, true], [3, true]] + end + + it "supports a custom inner serializer" do + pipe_round_trip(described_class.new(inner: JSON), [1, :a, 3]).should == [1, "a", 3] + end + end +end