Skip to content

Commit a096406

Browse files
committed
Use async in tests.
1 parent ed10818 commit a096406

File tree

3 files changed

+39
-187
lines changed

3 files changed

+39
-187
lines changed

gems.rb

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
gem "decode"
2323
gem "rubocop"
2424

25+
gem "sus-fixtures-async"
26+
2527
gem "bake-test"
2628
gem "bake-test-external"
2729
end

lib/protocol/http/body/streamable.rb

+33-152
Original file line numberDiff line numberDiff line change
@@ -17,85 +17,6 @@ module Body
1717
#
1818
# When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server.
1919
module Streamable
20-
# Raised when an operation is attempted on a closed stream.
21-
class ClosedError < StandardError
22-
end
23-
24-
# Raised when a streaming body is consumed more than once.
25-
class ConsumedError < StandardError
26-
end
27-
28-
# Single value queue that can be used to communicate between fibers.
29-
class Queue
30-
def self.consumer
31-
self.new(Fiber.current, nil)
32-
end
33-
34-
def self.generator(&block)
35-
self.new(Fiber.new(&block), Fiber.current)
36-
end
37-
38-
def initialize(generator, consumer)
39-
@generator = generator
40-
@consumer = consumer
41-
@closed = false
42-
end
43-
44-
# The generator fiber can push values into the queue.
45-
def push(value)
46-
raise ClosedError, "Queue is closed!" if @closed
47-
48-
if consumer = @consumer
49-
@consumer = nil
50-
@generator = Fiber.current
51-
52-
consumer.transfer(value)
53-
else
54-
raise ClosedError, "Queue is not being popped!"
55-
end
56-
end
57-
58-
# The consumer fiber can pop values from the queue.
59-
def pop
60-
return nil if @closed
61-
62-
if generator = @generator
63-
@generator = nil
64-
@consumer = Fiber.current
65-
66-
return generator.transfer
67-
else
68-
raise ClosedError, "Queue is not being pushed!"
69-
end
70-
end
71-
72-
def close(error = nil)
73-
@closed = true
74-
75-
if consumer = @consumer
76-
@consumer = nil
77-
78-
if consumer.alive?
79-
@generator = Fiber.current
80-
if error
81-
consumer.raise(error)
82-
else
83-
consumer.transfer(nil)
84-
end
85-
end
86-
elsif generator = @generator
87-
@generator = nil
88-
@consumer = Fiber.current
89-
90-
if error
91-
generator.raise(error)
92-
else
93-
generator.transfer(nil)
94-
end
95-
end
96-
end
97-
end
98-
9920
def self.new(*arguments)
10021
if arguments.size == 1
10122
DeferredBody.new(*arguments)
@@ -112,73 +33,35 @@ def self.response(request, &block)
11233
Body.new(block, request.body)
11334
end
11435

115-
class Input
116-
def initialize
117-
@queue = Queue.consumer
36+
class Output
37+
def self.schedule(input, block)
38+
self.new(input, block).tap(&:schedule)
11839
end
11940

120-
def read
121-
@queue.pop
41+
def initialize(input, block)
42+
@output = Writable.new
43+
@stream = Stream.new(input, @output)
44+
@block = block
12245
end
12346

124-
def write(chunk)
125-
@queue.push(chunk)
47+
def schedule
48+
@fiber ||= Fiber.schedule do
49+
@block.call(@stream)
50+
51+
end
12652
end
12753

128-
def close_write(error = nil)
129-
close(error)
54+
def read
55+
@output.read
13056
end
13157

13258
def close(error = nil)
133-
@queue.close(error)
134-
end
135-
136-
def stream(body)
137-
body&.each do |chunk|
138-
$stderr.puts "Input stream chunk: #{chunk.inspect}"
139-
self.write(chunk)
140-
end
141-
ensure
142-
$stderr.puts "Input stream closed: #{$!}"
143-
self.close_write
59+
@output.close_write(error)
14460
end
14561
end
14662

147-
# Represents an output wrapper around a stream, that can invoke a fiber when `#read`` is called.
148-
#
149-
# This behaves a little bit like a generator or lazy enumerator, in that it can be used to generate chunks of data on demand.
150-
#
151-
# When closing the the output, the block is invoked one last time with `nil` to indicate the end of the stream.
152-
class Output
153-
def initialize(input, block)
154-
stream = Stream.new(input, self)
155-
156-
@queue = Queue.generator do
157-
block.call(stream)
158-
end
159-
end
160-
161-
attr :stream
162-
163-
# Generator of the output can write chunks.
164-
def write(chunk)
165-
@queue.push(chunk)
166-
end
167-
168-
# Indicates that no further output will be generated.
169-
def close_write(error = nil)
170-
close(error)
171-
end
172-
173-
# Can be invoked by the block to close the stream. Closing the output means that no more chunks will be generated.
174-
def close(error = nil)
175-
@queue.close(error)
176-
end
177-
178-
# Consumer of the output can read chunks.
179-
def read
180-
@queue.pop
181-
end
63+
# Raised when a streaming body is consumed more than once.
64+
class ConsumedError < StandardError
18265
end
18366

18467
class Body < Readable
@@ -190,6 +73,10 @@ def initialize(block, input = nil)
19073

19174
attr :block
19275

76+
attr :input
77+
78+
attr :output
79+
19380
def stream?
19481
true
19582
end
@@ -202,7 +89,7 @@ def read
20289
raise ConsumedError, "Streaming body has already been consumed!"
20390
end
20491

205-
@output = Output.new(@input, @block)
92+
@output = Output.schedule(@input, @block)
20693
@block = nil
20794
end
20895

@@ -231,14 +118,12 @@ def call(stream)
231118

232119
# Closing a stream indicates we are no longer interested in reading from it.
233120
def close(error = nil)
234-
$stderr.puts "Closing output #{@output}..."
235121
if output = @output
236122
@output = nil
237123
# Closing the output here may take some time, as it may need to finish handling the stream:
238124
output.close(error)
239125
end
240126

241-
$stderr.puts "Closing input #{@input}..."
242127
if input = @input
243128
@input = nil
244129
input.close(error)
@@ -249,29 +134,25 @@ def close(error = nil)
249134
# A deferred body has an extra `stream` method which can be used to stream data into the body, as the response body won't be available until the request has been sent.
250135
class DeferredBody < Body
251136
def initialize(block)
252-
super(block, Input.new)
137+
super(block, Writable.new)
253138
end
254139

255-
# Closing a stream indicates we are no longer interested in reading from it.
140+
# Closing a stream indicates we are no longer interested in reading from it, but in this case that does not mean that the output block is finished generating data.
256141
def close(error = nil)
142+
if error
143+
super
144+
end
257145
end
258146

259147
# Stream the response body into the block's input.
260148
def stream(body)
261-
@input.stream(body)
262-
263-
$stderr.puts "Closing output #{@output}..."
264-
if output = @output
265-
@output = nil
266-
# Closing the output here may take some time, as it may need to finish handling the stream:
267-
output.close(error)
268-
end
269-
270-
$stderr.puts "Closing input #{@input}..."
271-
if input = @input
272-
@input = nil
273-
input.close(error)
149+
body&.each do |chunk|
150+
@input.write(chunk)
274151
end
152+
rescue => error
153+
raise
154+
ensure
155+
@input.close_write(error)
275156
end
276157
end
277158
end

test/protocol/http/body/streamable.rb

+4-35
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
# Copyright, 2024, by Samuel Williams.
55

66
require 'protocol/http/body/streamable'
7+
require 'sus/fixtures/async'
78

89
describe Protocol::HTTP::Body::Streamable do
10+
include Sus::Fixtures::Async::ReactorContext
11+
912
let(:block) do
1013
proc do |stream|
1114
stream.write("Hello")
@@ -34,37 +37,6 @@
3437
expect(body.read).to be == "World"
3538
expect(body.read).to be == nil
3639
end
37-
38-
with "block that doesn't close" do
39-
let(:block) do
40-
proc do |stream|
41-
stream.write("Hello")
42-
stream.write("World")
43-
end
44-
end
45-
46-
it "can read the body" do
47-
expect(body.read).to be == "Hello"
48-
expect(body.read).to be == "World"
49-
expect(body.read).to be == nil
50-
end
51-
end
52-
53-
with "a block that allows stream to escape" do
54-
let(:block) do
55-
proc do |stream|
56-
@stream = stream
57-
end
58-
end
59-
60-
it "can read the body" do
61-
expect(body.read).to be == nil
62-
63-
expect do
64-
@stream.write("!")
65-
end.to raise_exception(Protocol::HTTP::Body::Streamable::ClosedError, message: be =~ /Stream is not being read!/)
66-
end
67-
end
6840
end
6941

7042
with '#close_write' do
@@ -146,10 +118,7 @@
146118

147119
it "can raise an error on the block" do
148120
expect(body.read).to be == "Hello"
149-
150-
expect do
151-
body.close(RuntimeError.new("Oh no!"))
152-
end.to raise_exception(RuntimeError, message: be =~ /Oh no!/)
121+
body.close(RuntimeError.new("Oh no!"))
153122
end
154123
end
155124

0 commit comments

Comments
 (0)