diff --git a/spec/std/crystal/evented/arena_spec.cr b/spec/std/crystal/event_loop/polling/arena_spec.cr similarity index 70% rename from spec/std/crystal/evented/arena_spec.cr rename to spec/std/crystal/event_loop/polling/arena_spec.cr index edf5fd90e11b..66e83be3b192 100644 --- a/spec/std/crystal/evented/arena_spec.cr +++ b/spec/std/crystal/event_loop/polling/arena_spec.cr @@ -1,11 +1,11 @@ -{% skip_file unless Crystal.has_constant?(:Evented) %} +{% skip_file unless Crystal::EventLoop.has_constant?(:Polling) %} require "spec" -describe Crystal::Evented::Arena do +describe Crystal::EventLoop::Polling::Arena do describe "#allocate_at?" do it "yields block when not allocated" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) pointer = nil index = nil called = 0 @@ -31,8 +31,8 @@ describe Crystal::Evented::Arena do end it "allocates up to capacity" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) - indexes = [] of Crystal::Evented::Arena::Index + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) + indexes = [] of Crystal::EventLoop::Polling::Arena::Index indexes = 32.times.map do |i| arena.allocate_at?(i) { |ptr, _| ptr.value = i } @@ -49,7 +49,7 @@ describe Crystal::Evented::Arena do end it "checks bounds" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) expect_raises(IndexError) { arena.allocate_at?(-1) { } } expect_raises(IndexError) { arena.allocate_at?(33) { } } end @@ -57,7 +57,7 @@ describe Crystal::Evented::Arena do describe "#get" do it "returns previously allocated object" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) pointer = nil index = arena.allocate_at(30) do |ptr| @@ -77,15 +77,15 @@ describe Crystal::Evented::Arena do end it "can't access unallocated object" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) expect_raises(RuntimeError) do - arena.get(Crystal::Evented::Arena::Index.new(10, 0)) { } + arena.get(Crystal::EventLoop::Polling::Arena::Index.new(10, 0)) { } end end it "checks generation" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) called = 0 index1 = arena.allocate_at(2) { called += 1 } @@ -102,15 +102,15 @@ describe Crystal::Evented::Arena do end it "checks out of bounds" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) - expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(-1, 0)) { } } - expect_raises(IndexError) { arena.get(Crystal::Evented::Arena::Index.new(33, 0)) { } } + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) + expect_raises(IndexError) { arena.get(Crystal::EventLoop::Polling::Arena::Index.new(-1, 0)) { } } + expect_raises(IndexError) { arena.get(Crystal::EventLoop::Polling::Arena::Index.new(33, 0)) { } } end end describe "#get?" do it "returns previously allocated object" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) pointer = nil index = arena.allocate_at(30) do |ptr| @@ -131,16 +131,16 @@ describe Crystal::Evented::Arena do end it "can't access unallocated index" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) called = 0 - ret = arena.get?(Crystal::Evented::Arena::Index.new(10, 0)) { called += 1 } + ret = arena.get?(Crystal::EventLoop::Polling::Arena::Index.new(10, 0)) { called += 1 } ret.should be_false called.should eq(0) end it "checks generation" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) called = 0 old_index = arena.allocate_at(2) { } @@ -166,11 +166,11 @@ describe Crystal::Evented::Arena do end it "checks out of bounds" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) called = 0 - arena.get?(Crystal::Evented::Arena::Index.new(-1, 0)) { called += 1 }.should be_false - arena.get?(Crystal::Evented::Arena::Index.new(33, 0)) { called += 1 }.should be_false + arena.get?(Crystal::EventLoop::Polling::Arena::Index.new(-1, 0)) { called += 1 }.should be_false + arena.get?(Crystal::EventLoop::Polling::Arena::Index.new(33, 0)) { called += 1 }.should be_false called.should eq(0) end @@ -178,7 +178,7 @@ describe Crystal::Evented::Arena do describe "#free" do it "deallocates the object" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) index1 = arena.allocate_at(3) { |ptr| ptr.value = 123 } arena.free(index1) { } @@ -192,7 +192,7 @@ describe Crystal::Evented::Arena do end it "checks generation" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) called = 0 old_index = arena.allocate_at(1) { } @@ -214,19 +214,19 @@ describe Crystal::Evented::Arena do end it "checks out of bounds" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) called = 0 - arena.free(Crystal::Evented::Arena::Index.new(-1, 0)) { called += 1 } - arena.free(Crystal::Evented::Arena::Index.new(33, 0)) { called += 1 } + arena.free(Crystal::EventLoop::Polling::Arena::Index.new(-1, 0)) { called += 1 } + arena.free(Crystal::EventLoop::Polling::Arena::Index.new(33, 0)) { called += 1 } called.should eq(0) end end it "#each_index" do - arena = Crystal::Evented::Arena(Int32, 96).new(32) - indices = [] of {Int32, Crystal::Evented::Arena::Index} + arena = Crystal::EventLoop::Polling::Arena(Int32, 96).new(32) + indices = [] of {Int32, Crystal::EventLoop::Polling::Arena::Index} arena.each_index { |i, index| indices << {i, index} } indices.should be_empty diff --git a/spec/std/crystal/evented/poll_descriptor_spec.cr b/spec/std/crystal/event_loop/polling/poll_descriptor_spec.cr similarity index 60% rename from spec/std/crystal/evented/poll_descriptor_spec.cr rename to spec/std/crystal/event_loop/polling/poll_descriptor_spec.cr index a5719f7ff7a8..04c090e7b83f 100644 --- a/spec/std/crystal/evented/poll_descriptor_spec.cr +++ b/spec/std/crystal/event_loop/polling/poll_descriptor_spec.cr @@ -1,9 +1,9 @@ -{% skip_file unless Crystal.has_constant?(:Evented) %} +{% skip_file unless Crystal::EventLoop.has_constant?(:Polling) %} require "spec" -class Crystal::Evented::FakeLoop < Crystal::Evented::EventLoop - getter operations = [] of {Symbol, Int32, Crystal::Evented::Arena::Index | Bool} +class Crystal::EventLoop::FakeLoop < Crystal::EventLoop::Polling + getter operations = [] of {Symbol, Int32, Arena::Index | Bool} private def system_run(blocking : Bool, & : Fiber ->) : Nil end @@ -27,13 +27,13 @@ class Crystal::Evented::FakeLoop < Crystal::Evented::EventLoop end end -describe Crystal::Evented::Waiters do +describe Crystal::EventLoop::Polling::Waiters do describe "#take_ownership" do it "associates a poll descriptor to an evloop instance" do fd = Int32::MAX - pd = Crystal::Evented::PollDescriptor.new - index = Crystal::Evented::Arena::Index.new(fd, 0) - evloop = Crystal::Evented::FakeLoop.new + pd = Crystal::EventLoop::Polling::PollDescriptor.new + index = Crystal::EventLoop::Polling::Arena::Index.new(fd, 0) + evloop = Crystal::EventLoop::Polling::FakeLoop.new pd.take_ownership(evloop, fd, index) pd.@event_loop.should be(evloop) @@ -45,11 +45,11 @@ describe Crystal::Evented::Waiters do it "moves a poll descriptor to another evloop instance" do fd = Int32::MAX - pd = Crystal::Evented::PollDescriptor.new - index = Crystal::Evented::Arena::Index.new(fd, 0) + pd = Crystal::EventLoop::Polling::PollDescriptor.new + index = Crystal::EventLoop::Polling::Arena::Index.new(fd, 0) - evloop1 = Crystal::Evented::FakeLoop.new - evloop2 = Crystal::Evented::FakeLoop.new + evloop1 = Crystal::EventLoop::Polling::FakeLoop.new + evloop2 = Crystal::EventLoop::Polling::FakeLoop.new pd.take_ownership(evloop1, fd, index) pd.take_ownership(evloop2, fd, index) @@ -67,10 +67,10 @@ describe Crystal::Evented::Waiters do it "can't move to the current evloop" do fd = Int32::MAX - pd = Crystal::Evented::PollDescriptor.new - index = Crystal::Evented::Arena::Index.new(fd, 0) + pd = Crystal::EventLoop::Polling::PollDescriptor.new + index = Crystal::EventLoop::Polling::Arena::Index.new(fd, 0) - evloop = Crystal::Evented::FakeLoop.new + evloop = Crystal::EventLoop::Polling::FakeLoop.new pd.take_ownership(evloop, fd, index) expect_raises(Exception) { pd.take_ownership(evloop, fd, index) } @@ -78,15 +78,15 @@ describe Crystal::Evented::Waiters do it "can't move with pending waiters" do fd = Int32::MAX - pd = Crystal::Evented::PollDescriptor.new - index = Crystal::Evented::Arena::Index.new(fd, 0) - event = Crystal::Evented::Event.new(:io_read, Fiber.current) + pd = Crystal::EventLoop::Polling::PollDescriptor.new + index = Crystal::EventLoop::Polling::Arena::Index.new(fd, 0) + event = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) - evloop1 = Crystal::Evented::FakeLoop.new + evloop1 = Crystal::EventLoop::Polling::FakeLoop.new pd.take_ownership(evloop1, fd, index) pd.@readers.add(pointerof(event)) - evloop2 = Crystal::Evented::FakeLoop.new + evloop2 = Crystal::EventLoop::Polling::FakeLoop.new expect_raises(RuntimeError) { pd.take_ownership(evloop2, fd, index) } pd.@event_loop.should be(evloop1) diff --git a/spec/std/crystal/evented/timers_spec.cr b/spec/std/crystal/event_loop/polling/timers_spec.cr similarity index 51% rename from spec/std/crystal/evented/timers_spec.cr rename to spec/std/crystal/event_loop/polling/timers_spec.cr index 9dccbf4f56f2..6f6b8a670b08 100644 --- a/spec/std/crystal/evented/timers_spec.cr +++ b/spec/std/crystal/event_loop/polling/timers_spec.cr @@ -1,13 +1,13 @@ -{% skip_file unless Crystal.has_constant?(:Evented) %} +{% skip_file unless Crystal::EventLoop.has_constant?(:Polling) %} require "spec" -describe Crystal::Evented::Timers do +describe Crystal::EventLoop::Polling::Timers do it "#empty?" do - timers = Crystal::Evented::Timers.new + timers = Crystal::EventLoop::Polling::Timers.new timers.empty?.should be_true - event = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 7.seconds) + event = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 7.seconds) timers.add(pointerof(event)) timers.empty?.should be_false @@ -17,13 +17,13 @@ describe Crystal::Evented::Timers do it "#next_ready?" do # empty - timers = Crystal::Evented::Timers.new + timers = Crystal::EventLoop::Polling::Timers.new timers.next_ready?.should be_nil # with events - event1s = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 1.second) - event3m = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 3.minutes) - event5m = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 5.minutes) + event1s = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 1.second) + event3m = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 3.minutes) + event5m = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 5.minutes) timers.add(pointerof(event5m)) timers.next_ready?.should eq(event5m.wake_at?) @@ -36,11 +36,11 @@ describe Crystal::Evented::Timers do end it "#dequeue_ready" do - timers = Crystal::Evented::Timers.new + timers = Crystal::EventLoop::Polling::Timers.new - event1 = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 0.seconds) - event2 = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 0.seconds) - event3 = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 1.minute) + event1 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 0.seconds) + event2 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 0.seconds) + event3 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 1.minute) # empty called = 0 @@ -48,12 +48,12 @@ describe Crystal::Evented::Timers do called.should eq(0) # add events in non chronological order - timers = Crystal::Evented::Timers.new + timers = Crystal::EventLoop::Polling::Timers.new timers.add(pointerof(event1)) timers.add(pointerof(event3)) timers.add(pointerof(event2)) - events = [] of Crystal::Evented::Event* + events = [] of Crystal::EventLoop::Polling::Event* timers.dequeue_ready { |event| events << event } events.should eq([ @@ -64,12 +64,12 @@ describe Crystal::Evented::Timers do end it "#add" do - timers = Crystal::Evented::Timers.new + timers = Crystal::EventLoop::Polling::Timers.new - event0 = Crystal::Evented::Event.new(:sleep, Fiber.current) - event1 = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 0.seconds) - event2 = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 2.minutes) - event3 = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 1.minute) + event0 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current) + event1 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 0.seconds) + event2 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 2.minutes) + event3 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 1.minute) # add events in non chronological order timers.add(pointerof(event1)).should be_true # added to the head (next ready) @@ -81,13 +81,13 @@ describe Crystal::Evented::Timers do end it "#delete" do - event1 = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 0.seconds) - event2 = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 0.seconds) - event3 = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 1.minute) - event4 = Crystal::Evented::Event.new(:sleep, Fiber.current, timeout: 4.minutes) + event1 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 0.seconds) + event2 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 0.seconds) + event3 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 1.minute) + event4 = Crystal::EventLoop::Polling::Event.new(:sleep, Fiber.current, timeout: 4.minutes) # add events in non chronological order - timers = Crystal::Evented::Timers.new + timers = Crystal::EventLoop::Polling::Timers.new timers.add(pointerof(event1)) timers.add(pointerof(event3)) timers.add(pointerof(event2)) diff --git a/spec/std/crystal/evented/waiters_spec.cr b/spec/std/crystal/event_loop/polling/waiters_spec.cr similarity index 61% rename from spec/std/crystal/evented/waiters_spec.cr rename to spec/std/crystal/event_loop/polling/waiters_spec.cr index 91e145f6f811..7a72b591fba2 100644 --- a/spec/std/crystal/evented/waiters_spec.cr +++ b/spec/std/crystal/event_loop/polling/waiters_spec.cr @@ -1,32 +1,32 @@ -{% skip_file unless Crystal.has_constant?(:Evented) %} +{% skip_file unless Crystal::EventLoop.has_constant?(:Polling) %} require "spec" -describe Crystal::Evented::Waiters do +describe Crystal::EventLoop::Polling::Waiters do describe "#add" do it "adds event to list" do - waiters = Crystal::Evented::Waiters.new + waiters = Crystal::EventLoop::Polling::Waiters.new - event = Crystal::Evented::Event.new(:io_read, Fiber.current) + event = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) ret = waiters.add(pointerof(event)) ret.should be_true end it "doesn't add the event when the list is ready (race condition)" do - waiters = Crystal::Evented::Waiters.new + waiters = Crystal::EventLoop::Polling::Waiters.new waiters.ready_one { true } - event = Crystal::Evented::Event.new(:io_read, Fiber.current) + event = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) ret = waiters.add(pointerof(event)) ret.should be_false waiters.@ready.should be_false end it "doesn't add the event when the list is always ready" do - waiters = Crystal::Evented::Waiters.new + waiters = Crystal::EventLoop::Polling::Waiters.new waiters.ready_all { } - event = Crystal::Evented::Event.new(:io_read, Fiber.current) + event = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) ret = waiters.add(pointerof(event)) ret.should be_false waiters.@always_ready.should be_true @@ -35,8 +35,8 @@ describe Crystal::Evented::Waiters do describe "#delete" do it "removes the event from the list" do - waiters = Crystal::Evented::Waiters.new - event = Crystal::Evented::Event.new(:io_read, Fiber.current) + waiters = Crystal::EventLoop::Polling::Waiters.new + event = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) waiters.add(pointerof(event)) waiters.delete(pointerof(event)) @@ -47,15 +47,15 @@ describe Crystal::Evented::Waiters do end it "does nothing when the event isn't in the list" do - waiters = Crystal::Evented::Waiters.new - event = Crystal::Evented::Event.new(:io_read, Fiber.current) + waiters = Crystal::EventLoop::Polling::Waiters.new + event = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) waiters.delete(pointerof(event)) end end describe "#ready_one" do it "marks the list as ready when empty (race condition)" do - waiters = Crystal::Evented::Waiters.new + waiters = Crystal::EventLoop::Polling::Waiters.new called = false waiters.ready_one { called = true } @@ -65,10 +65,10 @@ describe Crystal::Evented::Waiters do end it "dequeues events in FIFO order" do - waiters = Crystal::Evented::Waiters.new - event1 = Crystal::Evented::Event.new(:io_read, Fiber.current) - event2 = Crystal::Evented::Event.new(:io_read, Fiber.current) - event3 = Crystal::Evented::Event.new(:io_read, Fiber.current) + waiters = Crystal::EventLoop::Polling::Waiters.new + event1 = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) + event2 = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) + event3 = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) called = 0 waiters.add(pointerof(event1)) @@ -97,10 +97,10 @@ describe Crystal::Evented::Waiters do end it "dequeues events until the block returns true" do - waiters = Crystal::Evented::Waiters.new - event1 = Crystal::Evented::Event.new(:io_read, Fiber.current) - event2 = Crystal::Evented::Event.new(:io_read, Fiber.current) - event3 = Crystal::Evented::Event.new(:io_read, Fiber.current) + waiters = Crystal::EventLoop::Polling::Waiters.new + event1 = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) + event2 = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) + event3 = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) called = 0 waiters.add(pointerof(event1)) @@ -115,9 +115,9 @@ describe Crystal::Evented::Waiters do end it "dequeues events until empty and marks the list as ready" do - waiters = Crystal::Evented::Waiters.new - event1 = Crystal::Evented::Event.new(:io_read, Fiber.current) - event2 = Crystal::Evented::Event.new(:io_read, Fiber.current) + waiters = Crystal::EventLoop::Polling::Waiters.new + event1 = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) + event2 = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) called = 0 waiters.add(pointerof(event1)) @@ -134,7 +134,7 @@ describe Crystal::Evented::Waiters do describe "#ready_all" do it "marks the list as always ready" do - waiters = Crystal::Evented::Waiters.new + waiters = Crystal::EventLoop::Polling::Waiters.new called = false waiters.ready_all { called = true } @@ -144,10 +144,10 @@ describe Crystal::Evented::Waiters do end it "dequeues all events" do - waiters = Crystal::Evented::Waiters.new - event1 = Crystal::Evented::Event.new(:io_read, Fiber.current) - event2 = Crystal::Evented::Event.new(:io_read, Fiber.current) - event3 = Crystal::Evented::Event.new(:io_read, Fiber.current) + waiters = Crystal::EventLoop::Polling::Waiters.new + event1 = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) + event2 = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) + event3 = Crystal::EventLoop::Polling::Event.new(:io_read, Fiber.current) called = 0 waiters.add(pointerof(event1)) diff --git a/src/crystal/system/event_loop.cr b/src/crystal/event_loop.cr similarity index 86% rename from src/crystal/system/event_loop.cr rename to src/crystal/event_loop.cr index 33ff4f9dac85..45fc9e4f8558 100644 --- a/src/crystal/system/event_loop.cr +++ b/src/crystal/event_loop.cr @@ -2,20 +2,20 @@ abstract class Crystal::EventLoop # Creates an event loop instance def self.create : self {% if flag?(:wasi) %} - Crystal::Wasi::EventLoop.new + Crystal::EventLoop::Wasi.new {% elsif flag?(:unix) %} # TODO: enable more targets by default (need manual tests or fixes) {% if flag?("evloop=libevent") %} - Crystal::LibEvent::EventLoop.new + Crystal::EventLoop::LibEvent.new {% elsif flag?("evloop=epoll") || flag?(:android) || flag?(:linux) %} - Crystal::Epoll::EventLoop.new + Crystal::EventLoop::Epoll.new {% elsif flag?("evloop=kqueue") || flag?(:darwin) || flag?(:freebsd) %} - Crystal::Kqueue::EventLoop.new + Crystal::EventLoop::Kqueue.new {% else %} - Crystal::LibEvent::EventLoop.new + Crystal::EventLoop::LibEvent.new {% end %} {% elsif flag?(:win32) %} - Crystal::IOCP::EventLoop.new + Crystal::EventLoop::IOCP.new {% else %} {% raise "Event loop not supported" %} {% end %} @@ -85,19 +85,19 @@ abstract class Crystal::EventLoop end {% if flag?(:wasi) %} - require "./wasi/event_loop" + require "./event_loop/wasi" {% elsif flag?(:unix) %} {% if flag?("evloop=libevent") %} - require "./unix/event_loop_libevent" + require "./event_loop/libevent" {% elsif flag?("evloop=epoll") || flag?(:android) || flag?(:linux) %} - require "./unix/epoll/event_loop" + require "./event_loop/epoll" {% elsif flag?("evloop=kqueue") || flag?(:darwin) || flag?(:freebsd) %} - require "./unix/kqueue/event_loop" + require "./event_loop/kqueue" {% else %} - require "./unix/event_loop_libevent" + require "./event_loop/libevent" {% end %} {% elsif flag?(:win32) %} - require "./win32/event_loop_iocp" + require "./event_loop/iocp" {% else %} {% raise "Event loop not supported" %} {% end %} diff --git a/src/crystal/system/unix/epoll/event_loop.cr b/src/crystal/event_loop/epoll.cr similarity index 91% rename from src/crystal/system/unix/epoll/event_loop.cr rename to src/crystal/event_loop/epoll.cr index 4850e68739f2..2d7d08ce7c94 100644 --- a/src/crystal/system/unix/epoll/event_loop.cr +++ b/src/crystal/event_loop/epoll.cr @@ -1,9 +1,9 @@ -require "../evented/event_loop" -require "../epoll" -require "../eventfd" -require "../timerfd" +require "./polling" +require "../system/unix/epoll" +require "../system/unix/eventfd" +require "../system/unix/timerfd" -class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop +class Crystal::EventLoop::Epoll < Crystal::EventLoop::Polling def initialize # the epoll instance @epoll = System::Epoll.new @@ -50,7 +50,7 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop system_set_timer(@timers.next_ready?) # re-add all registered fds - Evented.arena.each_index { |fd, index| system_add(fd, index) } + Polling.arena.each_index { |fd, index| system_add(fd, index) } end {% end %} @@ -87,12 +87,12 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop end private def process_io(epoll_event : LibC::EpollEvent*, &) : Nil - index = Evented::Arena::Index.new(epoll_event.value.data.u64) + index = Polling::Arena::Index.new(epoll_event.value.data.u64) events = epoll_event.value.events Crystal.trace :evloop, "event", fd: index.index, index: index.to_i64, events: events - Evented.arena.get?(index) do |pd| + Polling.arena.get?(index) do |pd| if (events & (LibC::EPOLLERR | LibC::EPOLLHUP)) != 0 pd.value.@readers.ready_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } } pd.value.@writers.ready_all { |event| unsafe_resume_io(event) { |fiber| yield fiber } } @@ -116,7 +116,7 @@ class Crystal::Epoll::EventLoop < Crystal::Evented::EventLoop @eventfd.write(1) if @interrupted.test_and_set end - protected def system_add(fd : Int32, index : Evented::Arena::Index) : Nil + protected def system_add(fd : Int32, index : Polling::Arena::Index) : Nil Crystal.trace :evloop, "epoll_ctl", op: "add", fd: fd, index: index.to_i64 events = LibC::EPOLLIN | LibC::EPOLLOUT | LibC::EPOLLRDHUP | LibC::EPOLLET @epoll.add(fd, events, u64: index.to_u64) diff --git a/src/crystal/system/event_loop/file_descriptor.cr b/src/crystal/event_loop/file_descriptor.cr similarity index 100% rename from src/crystal/system/event_loop/file_descriptor.cr rename to src/crystal/event_loop/file_descriptor.cr diff --git a/src/crystal/system/win32/event_loop_iocp.cr b/src/crystal/event_loop/iocp.cr similarity index 89% rename from src/crystal/system/win32/event_loop_iocp.cr rename to src/crystal/event_loop/iocp.cr index 3089e36edfeb..ce3112fa9d1d 100644 --- a/src/crystal/system/win32/event_loop_iocp.cr +++ b/src/crystal/event_loop/iocp.cr @@ -1,11 +1,11 @@ require "c/ioapiset" require "crystal/system/print_error" -require "./iocp" +require "../system/win32/iocp" # :nodoc: -class Crystal::IOCP::EventLoop < Crystal::EventLoop +class Crystal::EventLoop::IOCP < Crystal::EventLoop # This is a list of resume and timeout events managed outside of IOCP. - @queue = Deque(Crystal::IOCP::Event).new + @queue = Deque(Event).new @lock = Crystal::SpinLock.new @interrupted = Atomic(Bool).new(false) @@ -63,7 +63,7 @@ class Crystal::IOCP::EventLoop < Crystal::EventLoop end wait_time = blocking ? (next_event.wake_at - now).total_milliseconds : 0 - timed_out = IOCP.wait_queued_completions(wait_time, alertable: blocking) do |fiber| + timed_out = System::IOCP.wait_queued_completions(wait_time, alertable: blocking) do |fiber| # This block may run multiple times. Every single fiber gets enqueued. fiber.enqueue end @@ -124,34 +124,34 @@ class Crystal::IOCP::EventLoop < Crystal::EventLoop LibC.QueueUserAPC(->(ptr : LibC::ULONG_PTR) { }, thread, LibC::ULONG_PTR.new(0)) end - def enqueue(event : Crystal::IOCP::Event) + def enqueue(event : Event) unless @queue.includes?(event) @queue << event end end - def dequeue(event : Crystal::IOCP::Event) + def dequeue(event : Event) @queue.delete(event) end # Create a new resume event for a fiber. def create_resume_event(fiber : Fiber) : Crystal::EventLoop::Event - Crystal::IOCP::Event.new(fiber) + Event.new(fiber) end def create_timeout_event(fiber) : Crystal::EventLoop::Event - Crystal::IOCP::Event.new(fiber, timeout: true) + Event.new(fiber, timeout: true) end def read(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32 - IOCP.overlapped_operation(file_descriptor, "ReadFile", file_descriptor.read_timeout) do |overlapped| + System::IOCP.overlapped_operation(file_descriptor, "ReadFile", file_descriptor.read_timeout) do |overlapped| ret = LibC.ReadFile(file_descriptor.windows_handle, slice, slice.size, out byte_count, overlapped) {ret, byte_count} end.to_i32 end def write(file_descriptor : Crystal::System::FileDescriptor, slice : Bytes) : Int32 - IOCP.overlapped_operation(file_descriptor, "WriteFile", file_descriptor.write_timeout, writing: true) do |overlapped| + System::IOCP.overlapped_operation(file_descriptor, "WriteFile", file_descriptor.write_timeout, writing: true) do |overlapped| ret = LibC.WriteFile(file_descriptor.windows_handle, slice, slice.size, out byte_count, overlapped) {ret, byte_count} end.to_i32 @@ -174,7 +174,7 @@ class Crystal::IOCP::EventLoop < Crystal::EventLoop def read(socket : ::Socket, slice : Bytes) : Int32 wsabuf = wsa_buffer(slice) - bytes_read = IOCP.wsa_overlapped_operation(socket, socket.fd, "WSARecv", socket.read_timeout, connreset_is_error: false) do |overlapped| + bytes_read = System::IOCP.wsa_overlapped_operation(socket, socket.fd, "WSARecv", socket.read_timeout, connreset_is_error: false) do |overlapped| flags = 0_u32 ret = LibC.WSARecv(socket.fd, pointerof(wsabuf), 1, out bytes_received, pointerof(flags), overlapped, nil) {ret, bytes_received} @@ -186,7 +186,7 @@ class Crystal::IOCP::EventLoop < Crystal::EventLoop def write(socket : ::Socket, slice : Bytes) : Int32 wsabuf = wsa_buffer(slice) - bytes = IOCP.wsa_overlapped_operation(socket, socket.fd, "WSASend", socket.write_timeout) do |overlapped| + bytes = System::IOCP.wsa_overlapped_operation(socket, socket.fd, "WSASend", socket.write_timeout) do |overlapped| ret = LibC.WSASend(socket.fd, pointerof(wsabuf), 1, out bytes_sent, 0, overlapped, nil) {ret, bytes_sent} end @@ -196,7 +196,7 @@ class Crystal::IOCP::EventLoop < Crystal::EventLoop def send_to(socket : ::Socket, slice : Bytes, address : ::Socket::Address) : Int32 wsabuf = wsa_buffer(slice) - bytes_written = IOCP.wsa_overlapped_operation(socket, socket.fd, "WSASendTo", socket.write_timeout) do |overlapped| + bytes_written = System::IOCP.wsa_overlapped_operation(socket, socket.fd, "WSASendTo", socket.write_timeout) do |overlapped| ret = LibC.WSASendTo(socket.fd, pointerof(wsabuf), 1, out bytes_sent, 0, address, address.size, overlapped, nil) {ret, bytes_sent} end @@ -222,7 +222,7 @@ class Crystal::IOCP::EventLoop < Crystal::EventLoop wsabuf = wsa_buffer(slice) flags = 0_u32 - bytes_read = IOCP.wsa_overlapped_operation(socket, socket.fd, "WSARecvFrom", socket.read_timeout) do |overlapped| + bytes_read = System::IOCP.wsa_overlapped_operation(socket, socket.fd, "WSARecvFrom", socket.read_timeout) do |overlapped| ret = LibC.WSARecvFrom(socket.fd, pointerof(wsabuf), 1, out bytes_received, pointerof(flags), sockaddr, pointerof(addrlen), overlapped, nil) {ret, bytes_received} end @@ -279,7 +279,7 @@ class Crystal::IOCP::EventLoop < Crystal::EventLoop end end -class Crystal::IOCP::Event +class Crystal::EventLoop::IOCP::Event include Crystal::EventLoop::Event getter fiber diff --git a/src/crystal/system/unix/kqueue/event_loop.cr b/src/crystal/event_loop/kqueue.cr similarity index 94% rename from src/crystal/system/unix/kqueue/event_loop.cr rename to src/crystal/event_loop/kqueue.cr index eb55fde0cf37..52a7701ef2b1 100644 --- a/src/crystal/system/unix/kqueue/event_loop.cr +++ b/src/crystal/event_loop/kqueue.cr @@ -1,7 +1,7 @@ -require "../evented/event_loop" -require "../kqueue" +require "./polling" +require "../system/unix/kqueue" -class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop +class Crystal::EventLoop::Kqueue < Crystal::EventLoop::Polling # the following are arbitrary numbers to identify specific events INTERRUPT_IDENTIFIER = 9 TIMER_IDENTIFIER = 10 @@ -66,7 +66,7 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop system_set_timer(@timers.next_ready?) # re-add all registered fds - Evented.arena.each_index { |fd, index| system_add(fd, index) } + Polling.arena.each_index { |fd, index| system_add(fd, index) } end {% end %} @@ -118,16 +118,16 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop private def process_io(kevent : LibC::Kevent*, &) : Nil index = {% if flag?(:bits64) %} - Evented::Arena::Index.new(kevent.value.udata.address) + Polling::Arena::Index.new(kevent.value.udata.address) {% else %} # assuming 32-bit target: rebuild the arena index - Evented::Arena::Index.new(kevent.value.ident.to_i32!, kevent.value.udata.address.to_u32!) + Polling::Arena::Index.new(kevent.value.ident.to_i32!, kevent.value.udata.address.to_u32!) {% end %} Crystal.trace :evloop, "event", fd: kevent.value.ident, index: index.to_i64, filter: kevent.value.filter, flags: kevent.value.flags, fflags: kevent.value.fflags - Evented.arena.get?(index) do |pd| + Polling.arena.get?(index) do |pd| if (kevent.value.fflags & LibC::EV_EOF) == LibC::EV_EOF # apparently some systems may report EOF on write with EVFILT_READ instead # of EVFILT_WRITE, so let's wake all waiters: @@ -167,7 +167,7 @@ class Crystal::Kqueue::EventLoop < Crystal::Evented::EventLoop {% end %} end - protected def system_add(fd : Int32, index : Evented::Arena::Index) : Nil + protected def system_add(fd : Int32, index : Polling::Arena::Index) : Nil Crystal.trace :evloop, "kevent", op: "add", fd: fd, index: index.to_i64 # register both read and write events diff --git a/src/crystal/system/unix/event_loop_libevent.cr b/src/crystal/event_loop/libevent.cr similarity index 95% rename from src/crystal/system/unix/event_loop_libevent.cr rename to src/crystal/event_loop/libevent.cr index 4594f07ffe66..21ad97030336 100644 --- a/src/crystal/system/unix/event_loop_libevent.cr +++ b/src/crystal/event_loop/libevent.cr @@ -1,8 +1,8 @@ -require "./event_libevent" +require "./libevent/event" # :nodoc: -class Crystal::LibEvent::EventLoop < Crystal::EventLoop - private getter(event_base) { Crystal::LibEvent::Event::Base.new } +class Crystal::EventLoop::LibEvent < Crystal::EventLoop + private getter(event_base) { Crystal::EventLoop::LibEvent::Event::Base.new } def after_fork_before_exec : Nil end @@ -23,14 +23,14 @@ class Crystal::LibEvent::EventLoop < Crystal::EventLoop end # Create a new resume event for a fiber. - def create_resume_event(fiber : Fiber) : Crystal::EventLoop::Event + def create_resume_event(fiber : Fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| data.as(Fiber).enqueue end end # Creates a timeout_event. - def create_timeout_event(fiber) : Crystal::EventLoop::Event + def create_timeout_event(fiber) : Crystal::EventLoop::LibEvent::Event event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data| f = data.as(Fiber) if (select_action = f.timeout_select_action) diff --git a/src/crystal/system/unix/event_libevent.cr b/src/crystal/event_loop/libevent/event.cr similarity index 96% rename from src/crystal/system/unix/event_libevent.cr rename to src/crystal/event_loop/libevent/event.cr index 32578e5aba9a..d6b1a5dc0433 100644 --- a/src/crystal/system/unix/event_libevent.cr +++ b/src/crystal/event_loop/libevent/event.cr @@ -5,7 +5,7 @@ require "./lib_event2" {% end %} # :nodoc: -module Crystal::LibEvent +class Crystal::EventLoop::LibEvent < Crystal::EventLoop struct Event include Crystal::EventLoop::Event @@ -56,7 +56,7 @@ module Crystal::LibEvent def new_event(s : Int32, flags : LibEvent2::EventFlags, data, &callback : LibEvent2::Callback) event = LibEvent2.event_new(@base, s, flags, callback, data.as(Void*)) - Crystal::LibEvent::Event.new(event) + LibEvent::Event.new(event) end # NOTE: may return `true` even if no event has been triggered (e.g. diff --git a/src/crystal/system/unix/lib_event2.cr b/src/crystal/event_loop/libevent/lib_event2.cr similarity index 100% rename from src/crystal/system/unix/lib_event2.cr rename to src/crystal/event_loop/libevent/lib_event2.cr diff --git a/src/crystal/system/unix/evented/event_loop.cr b/src/crystal/event_loop/polling.cr similarity index 93% rename from src/crystal/system/unix/evented/event_loop.cr rename to src/crystal/event_loop/polling.cr index e33fb3d2ea99..0df0b134c7f4 100644 --- a/src/crystal/system/unix/evented/event_loop.cr +++ b/src/crystal/event_loop/polling.cr @@ -1,56 +1,16 @@ -require "./*" -require "./arena" +# forward declaration for the require below to not create a module +abstract class Crystal::EventLoop::Polling < Crystal::EventLoop; end + +require "./polling/*" module Crystal::System::FileDescriptor # user data (generation index for the arena) - property __evloop_data : Evented::Arena::Index = Evented::Arena::INVALID_INDEX + property __evloop_data : EventLoop::Polling::Arena::Index = EventLoop::Polling::Arena::INVALID_INDEX end module Crystal::System::Socket # user data (generation index for the arena) - property __evloop_data : Evented::Arena::Index = Evented::Arena::INVALID_INDEX -end - -module Crystal::Evented - # The generational arena: - # - # 1. decorrelates the fd from the IO since the evloop only really cares about - # the fd state and to resume pending fibers (it could monitor a fd without - # an IO object); - # - # 2. permits to avoid pushing raw pointers to IO objects into kernel data - # structures that are unknown to the GC, and to safely check whether the - # allocation is still valid before trying to dereference the pointer. Since - # `PollDescriptor` also doesn't have pointers to the actual IO object, it - # won't prevent the GC from collecting lost IO objects (and spares us from - # using weak references). - # - # 3. to a lesser extent, it also allows to keep the `PollDescriptor` allocated - # together in the same region, and polluting the IO object itself with - # specific evloop data (except for the generation index). - # - # The implementation takes advantage of the fd being unique per process and - # that the operating system will always reuse the lowest fd (POSIX compliance) - # and will only grow when the process needs that many file descriptors, so the - # allocated memory region won't grow larger than necessary. This assumption - # allows the arena to skip maintaining a list of free indexes. Some systems - # may deviate from the POSIX default, but all systems seem to follow it, as it - # allows optimizations to the OS (it can reuse already allocated resources), - # and either the man page explicitly says so (Linux), or they don't (BSD) and - # they must follow the POSIX definition. - # - # The block size is set to 64KB because it's a multiple of: - # - 4KB (usual page size) - # - 1024 (common soft limit for open files) - # - sizeof(Arena::Entry(PollDescriptor)) - protected class_getter arena = Arena(PollDescriptor, 65536).new(max_fds) - - private def self.max_fds : Int32 - if LibC.getrlimit(LibC::RLIMIT_NOFILE, out rlimit) == -1 - raise RuntimeError.from_errno("getrlimit(RLIMIT_NOFILE)") - end - rlimit.rlim_max.clamp(..Int32::MAX).to_i32! - end + property __evloop_data : EventLoop::Polling::Arena::Index = EventLoop::Polling::Arena::INVALID_INDEX end # Polling EventLoop. @@ -94,7 +54,47 @@ end # If the IO operation has a timeout, the event is also registered into `@timers` # before suspending the fiber, then after resume it will raise # `IO::TimeoutError` if the event timed out, and continue otherwise. -abstract class Crystal::Evented::EventLoop < Crystal::EventLoop +abstract class Crystal::EventLoop::Polling < Crystal::EventLoop + # The generational arena: + # + # 1. decorrelates the fd from the IO since the evloop only really cares about + # the fd state and to resume pending fibers (it could monitor a fd without + # an IO object); + # + # 2. permits to avoid pushing raw pointers to IO objects into kernel data + # structures that are unknown to the GC, and to safely check whether the + # allocation is still valid before trying to dereference the pointer. Since + # `PollDescriptor` also doesn't have pointers to the actual IO object, it + # won't prevent the GC from collecting lost IO objects (and spares us from + # using weak references). + # + # 3. to a lesser extent, it also allows to keep the `PollDescriptor` allocated + # together in the same region, and polluting the IO object itself with + # specific evloop data (except for the generation index). + # + # The implementation takes advantage of the fd being unique per process and + # that the operating system will always reuse the lowest fd (POSIX compliance) + # and will only grow when the process needs that many file descriptors, so the + # allocated memory region won't grow larger than necessary. This assumption + # allows the arena to skip maintaining a list of free indexes. Some systems + # may deviate from the POSIX default, but all systems seem to follow it, as it + # allows optimizations to the OS (it can reuse already allocated resources), + # and either the man page explicitly says so (Linux), or they don't (BSD) and + # they must follow the POSIX definition. + # + # The block size is set to 64KB because it's a multiple of: + # - 4KB (usual page size) + # - 1024 (common soft limit for open files) + # - sizeof(Arena::Entry(PollDescriptor)) + protected class_getter arena = Arena(PollDescriptor, 65536).new(max_fds) + + private def self.max_fds : Int32 + if LibC.getrlimit(LibC::RLIMIT_NOFILE, out rlimit) == -1 + raise RuntimeError.from_errno("getrlimit(RLIMIT_NOFILE)") + end + rlimit.rlim_max.clamp(..Int32::MAX).to_i32! + end + @lock = SpinLock.new # protects parallel accesses to @timers @timers = Timers.new @@ -299,7 +299,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop protected def evented_close(io) return unless (index = io.__evloop_data).valid? - Evented.arena.free(index) do |pd| + Polling.arena.free(index) do |pd| pd.value.@readers.ready_all do |event| pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber| Crystal::Scheduler.enqueue(fiber) @@ -319,7 +319,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop private def internal_remove(io) return unless (index = io.__evloop_data).valid? - Evented.arena.free(index) do |pd| + Polling.arena.free(index) do |pd| pd.value.remove(io.fd) { } # ignore system error end end @@ -350,33 +350,33 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop end end - private def wait(type : Evented::Event::Type, io, timeout, &) + private def wait(type : Polling::Event::Type, io, timeout, &) # prepare event (on the stack); we can't initialize it properly until we get # the arena index below; we also can't use a nilable since `pointerof` would # point to the union, not the event - event = uninitialized Evented::Event + event = uninitialized Event # add the event to the waiting list; in case we can't access or allocate the # poll descriptor into the arena, we merely return to let the caller handle # the situation (maybe the IO got closed?) if (index = io.__evloop_data).valid? - event = Evented::Event.new(type, Fiber.current, index, timeout) + event = Event.new(type, Fiber.current, index, timeout) - return false unless Evented.arena.get?(index) do |pd| + return false unless Polling.arena.get?(index) do |pd| yield pd, pointerof(event) end else # OPTIMIZE: failing to allocate may be a simple conflict with 2 fibers # starting to read or write on the same fd, we may want to detect any # error situation instead of returning and retrying a syscall - return false unless Evented.arena.allocate_at?(io.fd) do |pd, index| + return false unless Polling.arena.allocate_at?(io.fd) do |pd, index| # register the fd with the event loop (once), it should usually merely add # the fd to the current evloop but may "transfer" the ownership from # another event loop: io.__evloop_data = index pd.value.take_ownership(self, io.fd, index) - event = Evented::Event.new(type, Fiber.current, index, timeout) + event = Event.new(type, Fiber.current, index, timeout) yield pd, pointerof(event) end end @@ -402,14 +402,14 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop # internals: timers - protected def add_timer(event : Evented::Event*) + protected def add_timer(event : Event*) @lock.sync do is_next_ready = @timers.add(event) system_set_timer(event.value.wake_at) if is_next_ready end end - protected def delete_timer(event : Evented::Event*) : Bool + protected def delete_timer(event : Event*) : Bool @lock.sync do dequeued, was_next_ready = @timers.delete(event) # update system timer if we deleted the next timer @@ -424,7 +424,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop # Thread unsafe: we must hold the poll descriptor waiter lock for the whole # duration of the dequeue/resume_io otherwise we might conflict with timers # trying to cancel an IO event. - protected def unsafe_resume_io(event : Evented::Event*, &) : Bool + protected def unsafe_resume_io(event : Event*, &) : Bool # we only partially own the poll descriptor; thanks to the lock we know that # another thread won't dequeue it, yet it may still be in the timers queue, # which at worst may be waiting on the lock to be released, so event* can be @@ -449,7 +449,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop # collect ready timers before processing them —this is safe— to avoids a # deadlock situation when another thread tries to process a ready IO event # (in poll descriptor waiters) with a timeout (same event* in timers) - buffer = uninitialized StaticArray(Pointer(Evented::Event), 128) + buffer = uninitialized StaticArray(Pointer(Event), 128) size = 0 @lock.sync do @@ -468,7 +468,7 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop end end - private def process_timer(event : Evented::Event*, &) + private def process_timer(event : Event*, &) # we dequeued the event from timers, and by rule we own it, so event* can # safely be dereferenced: fiber = event.value.fiber @@ -477,12 +477,12 @@ abstract class Crystal::Evented::EventLoop < Crystal::EventLoop when .io_read? # reached read timeout: cancel io event; by rule the timer always wins, # even in case of conflict with #unsafe_resume_io we must resume the fiber - Evented.arena.get?(event.value.index) { |pd| pd.value.@readers.delete(event) } + Polling.arena.get?(event.value.index) { |pd| pd.value.@readers.delete(event) } event.value.timed_out! when .io_write? # reached write timeout: cancel io event; by rule the timer always wins, # even in case of conflict with #unsafe_resume_io we must resume the fiber - Evented.arena.get?(event.value.index) { |pd| pd.value.@writers.delete(event) } + Polling.arena.get?(event.value.index) { |pd| pd.value.@writers.delete(event) } event.value.timed_out! when .select_timeout? # always dequeue the event but only enqueue the fiber if we win the diff --git a/src/crystal/system/unix/evented/arena.cr b/src/crystal/event_loop/polling/arena.cr similarity index 97% rename from src/crystal/system/unix/evented/arena.cr rename to src/crystal/event_loop/polling/arena.cr index 57e408183679..a7bcb181a66f 100644 --- a/src/crystal/system/unix/evented/arena.cr +++ b/src/crystal/event_loop/polling/arena.cr @@ -13,7 +13,7 @@ # that something else will maintain the uniqueness of indexes and reuse indexes # as much as possible instead of growing. # -# For example this arena is used to hold `Crystal::Evented::PollDescriptor` +# For example this arena is used to hold `Crystal::EventLoop::Polling::PollDescriptor` # allocations for all the fd in a program, where the fd is used as the index. # They're unique to the process and the OS always reuses the lowest fd numbers # before growing. @@ -26,7 +26,7 @@ # Guarantees: blocks' memory is initialized to zero, which means `T` objects are # initialized to zero by default, then `#free` will also clear the memory, so # the next allocation shall be initialized to zero, too. -class Crystal::Evented::Arena(T, BLOCK_BYTESIZE) +class Crystal::EventLoop::Polling::Arena(T, BLOCK_BYTESIZE) INVALID_INDEX = Index.new(-1, 0) struct Index diff --git a/src/crystal/system/unix/evented/event.cr b/src/crystal/event_loop/polling/event.cr similarity index 97% rename from src/crystal/system/unix/evented/event.cr rename to src/crystal/event_loop/polling/event.cr index e6937cf4d044..93caf843b049 100644 --- a/src/crystal/system/unix/evented/event.cr +++ b/src/crystal/event_loop/polling/event.cr @@ -8,7 +8,7 @@ require "crystal/pointer_pairing_heap" # # The events can be found in different queues, for example `Timers` and/or # `Waiters` depending on their type. -struct Crystal::Evented::Event +struct Crystal::EventLoop::Polling::Event enum Type IoRead IoWrite diff --git a/src/crystal/system/unix/evented/fiber_event.cr b/src/crystal/event_loop/polling/fiber_event.cr similarity index 87% rename from src/crystal/system/unix/evented/fiber_event.cr rename to src/crystal/event_loop/polling/fiber_event.cr index 074dd67e926f..e21cf2b90526 100644 --- a/src/crystal/system/unix/evented/fiber_event.cr +++ b/src/crystal/event_loop/polling/fiber_event.cr @@ -1,8 +1,8 @@ -class Crystal::Evented::FiberEvent +class Crystal::EventLoop::Polling::FiberEvent include Crystal::EventLoop::Event - def initialize(@event_loop : EventLoop, fiber : Fiber, type : Evented::Event::Type) - @event = Evented::Event.new(type, fiber) + def initialize(@event_loop : EventLoop, fiber : Fiber, type : Event::Type) + @event = Event.new(type, fiber) end # sleep or select timeout diff --git a/src/crystal/system/unix/evented/poll_descriptor.cr b/src/crystal/event_loop/polling/poll_descriptor.cr similarity index 94% rename from src/crystal/system/unix/evented/poll_descriptor.cr rename to src/crystal/event_loop/polling/poll_descriptor.cr index 1ef318e454bb..801d1b148d89 100644 --- a/src/crystal/system/unix/evented/poll_descriptor.cr +++ b/src/crystal/event_loop/polling/poll_descriptor.cr @@ -1,11 +1,9 @@ -require "./event_loop" - # Information related to the evloop for a fd, such as the read and write queues # (waiting `Event`), as well as which evloop instance currently owns the fd. # # Thread-unsafe: parallel mutations must be protected with a lock. -struct Crystal::Evented::PollDescriptor - @event_loop : Evented::EventLoop? +struct Crystal::EventLoop::Polling::PollDescriptor + @event_loop : Polling? @readers = Waiters.new @writers = Waiters.new diff --git a/src/crystal/system/unix/evented/timers.cr b/src/crystal/event_loop/polling/timers.cr similarity index 86% rename from src/crystal/system/unix/evented/timers.cr rename to src/crystal/event_loop/polling/timers.cr index 7b6deac4f543..b9191f008f46 100644 --- a/src/crystal/system/unix/evented/timers.cr +++ b/src/crystal/event_loop/polling/timers.cr @@ -7,9 +7,9 @@ require "crystal/pointer_pairing_heap" # # NOTE: this is a struct because it only wraps a const pointer to an object # allocated in the heap. -struct Crystal::Evented::Timers +struct Crystal::EventLoop::Polling::Timers def initialize - @heap = PointerPairingHeap(Evented::Event).new + @heap = PointerPairingHeap(Event).new end def empty? : Bool @@ -24,7 +24,7 @@ struct Crystal::Evented::Timers # Dequeues and yields each ready timer (their `#wake_at` is lower than # `System::Time.monotonic`) from the oldest to the most recent (i.e. time # ascending). - def dequeue_ready(& : Evented::Event* -> Nil) : Nil + def dequeue_ready(& : Event* -> Nil) : Nil seconds, nanoseconds = System::Time.monotonic now = Time::Span.new(seconds: seconds, nanoseconds: nanoseconds) @@ -36,7 +36,7 @@ struct Crystal::Evented::Timers end # Add a new timer into the list. Returns true if it is the next ready timer. - def add(event : Evented::Event*) : Bool + def add(event : Event*) : Bool @heap.add(event) @heap.first? == event end @@ -44,7 +44,7 @@ struct Crystal::Evented::Timers # Remove a timer from the list. Returns a tuple(dequeued, was_next_ready) of # booleans. The first bool tells whether the event was dequeued, in which case # the second one tells if it was the next ready event. - def delete(event : Evented::Event*) : {Bool, Bool} + def delete(event : Event*) : {Bool, Bool} if @heap.first? == event @heap.shift? {true, true} diff --git a/src/crystal/system/unix/evented/waiters.cr b/src/crystal/event_loop/polling/waiters.cr similarity index 97% rename from src/crystal/system/unix/evented/waiters.cr rename to src/crystal/event_loop/polling/waiters.cr index 2d052718bae9..85d10fd6f5ba 100644 --- a/src/crystal/system/unix/evented/waiters.cr +++ b/src/crystal/event_loop/polling/waiters.cr @@ -1,5 +1,3 @@ -require "./event" - # A FIFO queue of `Event` waiting on the same operation (either read or write) # for a fd. See `PollDescriptor`. # @@ -7,7 +5,7 @@ require "./event" # always ready variables. # # Thread unsafe: parallel mutations must be protected with a lock. -struct Crystal::Evented::Waiters +struct Crystal::EventLoop::Polling::Waiters @list = PointerLinkedList(Event).new @ready = false @always_ready = false diff --git a/src/crystal/system/event_loop/socket.cr b/src/crystal/event_loop/socket.cr similarity index 96% rename from src/crystal/system/event_loop/socket.cr rename to src/crystal/event_loop/socket.cr index 8fa86e50affc..03b556b3be96 100644 --- a/src/crystal/system/event_loop/socket.cr +++ b/src/crystal/event_loop/socket.cr @@ -1,4 +1,4 @@ -# This file is only required when sockets are used (`require "./event_loop/socket"` in `src/crystal/system/socket.cr`) +# This file is only required when sockets are used (`require "crystal/event_loop/socket"` in `src/crystal/system/socket.cr`) # # It fills `Crystal::EventLoop::Socket` with abstract defs. diff --git a/src/crystal/system/wasi/event_loop.cr b/src/crystal/event_loop/wasi.cr similarity index 97% rename from src/crystal/system/wasi/event_loop.cr rename to src/crystal/event_loop/wasi.cr index 3cce9ba8361c..a91c469f406c 100644 --- a/src/crystal/system/wasi/event_loop.cr +++ b/src/crystal/event_loop/wasi.cr @@ -1,5 +1,5 @@ # :nodoc: -class Crystal::Wasi::EventLoop < Crystal::EventLoop +class Crystal::EventLoop::Wasi < Crystal::EventLoop # Runs the event loop. def run(blocking : Bool) : Bool raise NotImplementedError.new("Crystal::Wasi::EventLoop.run") @@ -129,7 +129,7 @@ class Crystal::Wasi::EventLoop < Crystal::EventLoop end end -struct Crystal::Wasi::Event +struct Crystal::EventLoop::Wasi::Event include Crystal::EventLoop::Event def add(timeout : Time::Span) : Nil diff --git a/src/crystal/scheduler.cr b/src/crystal/scheduler.cr index bed98ef4d05b..9b64823f3905 100644 --- a/src/crystal/scheduler.cr +++ b/src/crystal/scheduler.cr @@ -1,4 +1,4 @@ -require "crystal/system/event_loop" +require "crystal/event_loop" require "crystal/system/print_error" require "./fiber_channel" require "fiber" diff --git a/src/crystal/system/socket.cr b/src/crystal/system/socket.cr index 8d5e8c9afaf0..54648f17f7db 100644 --- a/src/crystal/system/socket.cr +++ b/src/crystal/system/socket.cr @@ -1,4 +1,4 @@ -require "./event_loop/socket" +require "../event_loop/socket" module Crystal::System::Socket # Creates a file descriptor / socket handle diff --git a/src/crystal/system/unix/socket.cr b/src/crystal/system/unix/socket.cr index 535f37f386c0..2ca502aa28f8 100644 --- a/src/crystal/system/unix/socket.cr +++ b/src/crystal/system/unix/socket.cr @@ -25,8 +25,8 @@ module Crystal::System::Socket end private def initialize_handle(fd) - {% if Crystal.has_constant?(:Evented) %} - @__evloop_data = Crystal::Evented::Arena::INVALID_INDEX + {% if Crystal::EventLoop.has_constant?(:Polling) %} + @__evloop_data = Crystal::EventLoop::Polling::Arena::INVALID_INDEX {% end %} end diff --git a/src/crystal/system/win32/addrinfo.cr b/src/crystal/system/win32/addrinfo.cr index 91ebb1620a43..da5cb6ce20c3 100644 --- a/src/crystal/system/win32/addrinfo.cr +++ b/src/crystal/system/win32/addrinfo.cr @@ -43,9 +43,9 @@ module Crystal::System::Addrinfo end end - Crystal::IOCP::GetAddrInfoOverlappedOperation.run(Crystal::EventLoop.current.iocp) do |operation| + IOCP::GetAddrInfoOverlappedOperation.run(Crystal::EventLoop.current.iocp) do |operation| completion_routine = LibC::LPLOOKUPSERVICE_COMPLETION_ROUTINE.new do |dwError, dwBytes, lpOverlapped| - orig_operation = Crystal::IOCP::GetAddrInfoOverlappedOperation.unbox(lpOverlapped) + orig_operation = IOCP::GetAddrInfoOverlappedOperation.unbox(lpOverlapped) LibC.PostQueuedCompletionStatus(orig_operation.iocp, 0, 0, lpOverlapped) end @@ -60,7 +60,7 @@ module Crystal::System::Addrinfo else case error = WinError.new(result.to_u32!) when .wsa_io_pending? - # used in `Crystal::IOCP::OverlappedOperation#try_cancel_getaddrinfo` + # used in `IOCP::OverlappedOperation#try_cancel_getaddrinfo` operation.cancel_handle = cancel_handle else raise ::Socket::Addrinfo::Error.from_os_error("GetAddrInfoExW", error, domain: domain, type: type, protocol: protocol, service: service) diff --git a/src/crystal/system/win32/file_descriptor.cr b/src/crystal/system/win32/file_descriptor.cr index 4265701cd8b2..894fcfaf5cb1 100644 --- a/src/crystal/system/win32/file_descriptor.cr +++ b/src/crystal/system/win32/file_descriptor.cr @@ -490,7 +490,7 @@ private module ConsoleUtils handle: handle, slice: slice, iocp: Crystal::EventLoop.current.iocp, - completion_key: Crystal::IOCP::CompletionKey.new(:stdin_read, ::Fiber.current), + completion_key: Crystal::System::IOCP::CompletionKey.new(:stdin_read, ::Fiber.current), ) @@read_cv.signal end @@ -509,7 +509,11 @@ private module ConsoleUtils units_read.to_i32 end - record ReadRequest, handle : LibC::HANDLE, slice : Slice(UInt16), iocp : LibC::HANDLE, completion_key : Crystal::IOCP::CompletionKey + record ReadRequest, + handle : LibC::HANDLE, + slice : Slice(UInt16), + iocp : LibC::HANDLE, + completion_key : Crystal::System::IOCP::CompletionKey @@read_cv = ::Thread::ConditionVariable.new @@read_requests = Deque(ReadRequest).new diff --git a/src/crystal/system/win32/iocp.cr b/src/crystal/system/win32/iocp.cr index 19c92c8f8725..fece9ada3a83 100644 --- a/src/crystal/system/win32/iocp.cr +++ b/src/crystal/system/win32/iocp.cr @@ -3,7 +3,7 @@ require "c/handleapi" require "crystal/system/thread_linked_list" # :nodoc: -module Crystal::IOCP +module Crystal::System::IOCP # :nodoc: class CompletionKey enum Tag @@ -11,10 +11,10 @@ module Crystal::IOCP StdinRead end - property fiber : Fiber? + property fiber : ::Fiber? getter tag : Tag - def initialize(@tag : Tag, @fiber : Fiber? = nil) + def initialize(@tag : Tag, @fiber : ::Fiber? = nil) end end @@ -88,7 +88,7 @@ module Crystal::IOCP private abstract def try_cancel : Bool @overlapped = LibC::OVERLAPPED.new - @fiber = Fiber.current + @fiber = ::Fiber.current @state : State = :started def self.run(*args, **opts, &) @@ -120,14 +120,14 @@ module Crystal::IOCP if timeout sleep timeout else - Fiber.suspend + ::Fiber.suspend end unless @state.done? if try_cancel # Wait for cancellation to complete. We must not free the operation # until it's completed. - Fiber.suspend + ::Fiber.suspend end end end @@ -225,7 +225,7 @@ module Crystal::IOCP error = WinError.new(result.to_u32!) yield error - raise Socket::Addrinfo::Error.from_os_error("GetAddrInfoExOverlappedResult", error) + raise ::Socket::Addrinfo::Error.from_os_error("GetAddrInfoExOverlappedResult", error) end @overlapped.union.pointer.as(LibC::ADDRINFOEXW**).value @@ -239,7 +239,7 @@ module Crystal::IOCP # Operation has already completed, do nothing return false else - raise Socket::Addrinfo::Error.from_os_error("GetAddrInfoExCancel", error) + raise ::Socket::Addrinfo::Error.from_os_error("GetAddrInfoExCancel", error) end end true diff --git a/src/io/evented.cr b/src/io/evented.cr index f59aa205c543..1f95d1870b0b 100644 --- a/src/io/evented.cr +++ b/src/io/evented.cr @@ -1,6 +1,6 @@ -require "crystal/system/event_loop" +require "crystal/event_loop" -{% skip_file unless flag?(:wasi) || Crystal.has_constant?(:LibEvent) %} +{% skip_file unless flag?(:wasi) || Crystal::EventLoop.has_constant?(:LibEvent) %} require "crystal/thread_local_value" diff --git a/src/kernel.cr b/src/kernel.cr index 1203d1c66a7e..2063acce95ae 100644 --- a/src/kernel.cr +++ b/src/kernel.cr @@ -620,7 +620,7 @@ end # This is a temporary workaround to ensure there is always something in the IOCP # event loop being awaited, since both the interrupt loop and the fiber stack # pool collector are disabled in interpreted code. Without this, asynchronous -# code that bypasses `Crystal::IOCP::OverlappedOperation` does not currently +# code that bypasses `Crystal::System::IOCP::OverlappedOperation` does not currently # work, see https://github.com/crystal-lang/crystal/pull/14949#issuecomment-2328314463 {% if flag?(:interpreted) && flag?(:win32) %} spawn(name: "Interpreter idle loop") do