Loading lib/httpx/plugins/expect.rb +3 −0 Original line number Diff line number Diff line Loading @@ -70,6 +70,9 @@ module HTTPX module RequestMethods def initialize(*) super @informational_status = nil return if @body.empty? threshold = @options.expect_threshold_size Loading lib/httpx/selector.rb +3 −3 Original line number Diff line number Diff line Loading @@ -29,7 +29,7 @@ module HTTPX def_delegator :@timers, :after def_delegator :@selectables, :empty? def_delegator :@selectables, :each def initialize @timers = Timers.new Loading @@ -37,8 +37,8 @@ module HTTPX @is_timer_interval = false end def each(&blk) @selectables.each(&blk) def empty? @selectables.empty? && @timers.empty? end def next_tick Loading lib/httpx/session.rb +19 −38 Original line number Diff line number Diff line Loading @@ -325,53 +325,34 @@ module HTTPX # returns the array of HTTPX::Response objects corresponding to the array of HTTPX::Request +requests+. def receive_requests(requests, selector) responses = [] # : Array[response] # guarantee ordered responses loop do request = requests.first return responses unless request catch(:coalesced) { selector.next_tick } until (response = fetch_response(request, selector, request.options)) request.complete!(response) responses << response requests.shift break if requests.empty? waiting = 0 responses = requests.map do |request| fetch_response(request, selector, request.options).tap do |response| waiting += 1 if response.nil? end end next unless selector.empty? until waiting.zero? || selector.empty? # loop on selector until at least one response has been received. catch(:coalesced) { selector.next_tick } # in some cases, the pool of connections might have been drained because there was some # handshake error, and the error responses have already been emitted, but there was no # opportunity to traverse the requests, hence we're returning only a fraction of the errors # we were supposed to. This effectively fetches the existing responses and return them. exit_from_loop = true responses.each_with_index do |response, idx| next unless response.nil? requests_to_remove = [] # : Array[Request] request = requests[idx] requests.each do |req| response = fetch_response(req, selector, request.options) response = fetch_response(request, selector, request.options) if exit_from_loop && response req.complete!(response) responses << response requests_to_remove << req else # fetch_response may resend requests. when that happens, we need to go back to the initial # loop and process the selector. we still do a pass-through on the remainder of requests, so # that every request that need to be resent, is resent. exit_from_loop = false next unless response raise Error, "something went wrong, responses not found and requests not resent" if selector.empty? request.complete!(response) responses[idx] = response waiting -= 1 end end break if exit_from_loop raise Error, "something went wrong, responses not found and requests not resent" unless waiting.zero? requests -= requests_to_remove end responses end Loading lib/httpx/timers.rb +4 −0 Original line number Diff line number Diff line Loading @@ -6,6 +6,10 @@ module HTTPX @intervals = [] end def empty? @intervals.empty? end def after(interval_in_secs, cb = nil, &blk) callback = cb || blk Loading sig/selector.rbs +3 −2 Original line number Diff line number Diff line Loading @@ -24,8 +24,6 @@ module HTTPX type io_select_selectable = (selectable | Array[selectable])? include _Each[selectable] extend Forwardable READABLE: Array[io_interests] Loading @@ -36,6 +34,9 @@ module HTTPX @selectables: Array[selectable] @is_timer_interval: bool def each: () -> ::Enumerator[selectable, self] | () { (selectable) -> void } -> self def next_tick: () -> void def terminate: () -> void Loading Loading
lib/httpx/plugins/expect.rb +3 −0 Original line number Diff line number Diff line Loading @@ -70,6 +70,9 @@ module HTTPX module RequestMethods def initialize(*) super @informational_status = nil return if @body.empty? threshold = @options.expect_threshold_size Loading
lib/httpx/selector.rb +3 −3 Original line number Diff line number Diff line Loading @@ -29,7 +29,7 @@ module HTTPX def_delegator :@timers, :after def_delegator :@selectables, :empty? def_delegator :@selectables, :each def initialize @timers = Timers.new Loading @@ -37,8 +37,8 @@ module HTTPX @is_timer_interval = false end def each(&blk) @selectables.each(&blk) def empty? @selectables.empty? && @timers.empty? end def next_tick Loading
lib/httpx/session.rb +19 −38 Original line number Diff line number Diff line Loading @@ -325,53 +325,34 @@ module HTTPX # returns the array of HTTPX::Response objects corresponding to the array of HTTPX::Request +requests+. def receive_requests(requests, selector) responses = [] # : Array[response] # guarantee ordered responses loop do request = requests.first return responses unless request catch(:coalesced) { selector.next_tick } until (response = fetch_response(request, selector, request.options)) request.complete!(response) responses << response requests.shift break if requests.empty? waiting = 0 responses = requests.map do |request| fetch_response(request, selector, request.options).tap do |response| waiting += 1 if response.nil? end end next unless selector.empty? until waiting.zero? || selector.empty? # loop on selector until at least one response has been received. catch(:coalesced) { selector.next_tick } # in some cases, the pool of connections might have been drained because there was some # handshake error, and the error responses have already been emitted, but there was no # opportunity to traverse the requests, hence we're returning only a fraction of the errors # we were supposed to. This effectively fetches the existing responses and return them. exit_from_loop = true responses.each_with_index do |response, idx| next unless response.nil? requests_to_remove = [] # : Array[Request] request = requests[idx] requests.each do |req| response = fetch_response(req, selector, request.options) response = fetch_response(request, selector, request.options) if exit_from_loop && response req.complete!(response) responses << response requests_to_remove << req else # fetch_response may resend requests. when that happens, we need to go back to the initial # loop and process the selector. we still do a pass-through on the remainder of requests, so # that every request that need to be resent, is resent. exit_from_loop = false next unless response raise Error, "something went wrong, responses not found and requests not resent" if selector.empty? request.complete!(response) responses[idx] = response waiting -= 1 end end break if exit_from_loop raise Error, "something went wrong, responses not found and requests not resent" unless waiting.zero? requests -= requests_to_remove end responses end Loading
lib/httpx/timers.rb +4 −0 Original line number Diff line number Diff line Loading @@ -6,6 +6,10 @@ module HTTPX @intervals = [] end def empty? @intervals.empty? end def after(interval_in_secs, cb = nil, &blk) callback = cb || blk Loading
sig/selector.rbs +3 −2 Original line number Diff line number Diff line Loading @@ -24,8 +24,6 @@ module HTTPX type io_select_selectable = (selectable | Array[selectable])? include _Each[selectable] extend Forwardable READABLE: Array[io_interests] Loading @@ -36,6 +34,9 @@ module HTTPX @selectables: Array[selectable] @is_timer_interval: bool def each: () -> ::Enumerator[selectable, self] | () { (selectable) -> void } -> self def next_tick: () -> void def terminate: () -> void Loading