Skip to content

Commit 1d85bf2

Browse files
committed
Fix error handling edge cases.
1 parent 0361015 commit 1d85bf2

File tree

4 files changed

+77
-32
lines changed

4 files changed

+77
-32
lines changed

lib/protocol/http/body/stream.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ def close_write(error = nil)
273273

274274
# Close the input and output bodies.
275275
def close(error = nil)
276-
self.close_read
277-
self.close_write
276+
self.close_read(error)
277+
self.close_write(error)
278278

279279
return nil
280280
ensure

lib/protocol/http/body/streamable.rb

+21-24
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,12 @@ module Body
1717
#
1818
# When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server.
1919
module Streamable
20-
def self.new(*arguments)
21-
if arguments.size == 1
22-
DeferredBody.new(*arguments)
23-
else
24-
Body.new(*arguments)
25-
end
26-
end
27-
2820
def self.request(&block)
29-
DeferredBody.new(block)
21+
RequestBody.new(block)
3022
end
3123

3224
def self.response(request, &block)
33-
Body.new(block, request.body)
25+
ResponseBody.new(block, request.body)
3426
end
3527

3628
class Output
@@ -109,32 +101,37 @@ def call(stream)
109101
raise
110102
end
111103

112-
# Closing a stream indicates we are no longer interested in reading from it.
113-
def close(error = nil)
114-
if output = @output
115-
@output = nil
116-
# Closing the output here may take some time, as it may need to finish handling the stream:
117-
output.close(error)
118-
end
119-
104+
def close_input(error = nil)
120105
if input = @input
121106
@input = nil
122107
input.close(error)
123108
end
124109
end
110+
111+
def close_output(error = nil)
112+
@output&.close(error)
113+
end
114+
end
115+
116+
# A response body is used on the server side to generate the response body using a block.
117+
class ResponseBody < Body
118+
def close(error = nil)
119+
# Close will be invoked when all the output is written.
120+
self.close_output(error)
121+
end
125122
end
126123

127-
# A deferred body has an extra `stream` method which can be used to stream data into the body, as the response body won't be available until the request has been sent.
128-
class DeferredBody < Body
124+
# A request body is used on the client side to generate the request body using a block.
125+
#
126+
# As the response body isn't available until the request is sent, the response body must be {stream}ed into the request body.
127+
class RequestBody < Body
129128
def initialize(block)
130129
super(block, Writable.new)
131130
end
132131

133-
# Closing a stream indicates we are no longer interested in reading from it, but in this case that does not mean that the output block is finished generating data.
134132
def close(error = nil)
135-
if error
136-
super
137-
end
133+
# Close will be invoked when all the input is read.
134+
self.close_input(error)
138135
end
139136

140137
# Stream the response body into the block's input.

lib/protocol/http/body/writable.rb

+13-2
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,14 @@ def read
5555
raise @error
5656
end
5757

58-
@queue.pop
58+
# This operation may result in @error being set.
59+
chunk = @queue.pop
60+
61+
if @error
62+
raise @error
63+
end
64+
65+
return chunk
5966
end
6067

6168
# Write a single chunk to the body. Signal completion by calling `#finish`.
@@ -118,7 +125,11 @@ def output
118125
end
119126

120127
def inspect
121-
"\#<#{self.class} #{@count} chunks written, #{status}>"
128+
if @error
129+
"\#<#{self.class} #{@count} chunks written, #{status}, error=#{@error}>"
130+
else
131+
"\#<#{self.class} #{@count} chunks written, #{status}>"
132+
end
122133
end
123134

124135
private

test/protocol/http/body/streamable.rb

+41-4
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,23 @@
1717
end
1818
end
1919

20-
let(:body) {subject.new(block)}
20+
let(:body) {subject.request(&block)}
21+
22+
with ".request" do
23+
it "can create a new body" do
24+
body = subject.request(&block)
25+
expect(body).to be_a(Protocol::HTTP::Body::Streamable::RequestBody)
26+
end
27+
end
28+
29+
with ".response" do
30+
let(:request) {Protocol::HTTP::Request.new("GET", "/")}
31+
32+
it "can create a new body" do
33+
body = subject.response(request, &block)
34+
expect(body).to be_a(Protocol::HTTP::Body::Streamable::Body)
35+
end
36+
end
2137

2238
with "#stream?" do
2339
it "should be streamable" do
@@ -40,8 +56,6 @@
4056
end
4157
end
4258

43-
let(:body) {subject.new(block)}
44-
4559
it "can close the output body" do
4660
expect(body.read).to be == nil
4761
end
@@ -141,7 +155,8 @@
141155
end
142156
end
143157

144-
let(:body) {subject.new(block, input)}
158+
let(:response) {Protocol::HTTP::Response[200, body: input]}
159+
let(:body) {subject.response(response, &block)}
145160

146161
it "can read from input" do
147162
expect(body.read).to be == "Hello"
@@ -160,6 +175,8 @@
160175

161176
with '#close' do
162177
it "can close the body" do
178+
expect(input).not.to receive(:close)
179+
163180
expect(body.read).to be == "Hello"
164181
body.close
165182
end
@@ -172,6 +189,8 @@
172189
while chunk = stream.read_partial
173190
stream.write(chunk)
174191
end
192+
rescue => error
193+
stream.close(error)
175194
end
176195
end
177196

@@ -186,5 +205,23 @@
186205

187206
body.close
188207
end
208+
209+
it "can stream to output with an error" do
210+
input = Protocol::HTTP::Body::Buffered.new(["Hello", " ", "World"])
211+
212+
mock(input) do |mock|
213+
mock.replace(:read) do
214+
raise "Oh no!"
215+
end
216+
end
217+
218+
expect do
219+
body.stream(input)
220+
end.to raise_exception(RuntimeError, message: be =~ /Oh no!/)
221+
222+
expect do
223+
body.read
224+
end.to raise_exception(RuntimeError, message: be =~ /Oh no!/)
225+
end
189226
end
190227
end

0 commit comments

Comments
 (0)