Skip to content

Commit

Permalink
executor observer now publishes rich events
Browse files Browse the repository at this point in the history
  • Loading branch information
robacarp committed Nov 8, 2024
1 parent e7220cd commit 5fc96ae
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 0 deletions.
1 change: 1 addition & 0 deletions spec/helpers/global_helpers.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module TestHelpers
backend.flush

TestingLogBackend.instance.clear
PubSub.instance.clear
yield
end
end
Expand Down
52 changes: 52 additions & 0 deletions spec/helpers/pub_sub.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
class PubSub
def self.instance
@@instance ||= new
end

def self.eavesdrop : Array(Mosquito::Backend::BroadcastMessage)
instance.receive_messages
yield
instance.stop_listening
instance.messages
end

getter messages = [] of Mosquito::Backend::BroadcastMessage
@channel = Channel(Mosquito::Backend::BroadcastMessage).new
@stopping_channel = Channel(Bool).new

def initialize
end

def receive_messages
@continue_receiving = true
spawn receive_loop
@channel = Mosquito.backend.subscribe "mosquito:*"
end

def stop_listening
@continue_receiving = false
end

def receive_loop
loop do
break unless @continue_receiving
select
when message = @channel.receive
@messages << message
when timeout(500.milliseconds)
end
end
@channel.close
end

delegate clear, to: @messages

module Helpers
delegate eavesdrop, to: PubSub
def assert_message_received(matcher : Regex) : Nil
PubSub.instance.messages.find do |message|
matcher === message.message
end
end
end
end
13 changes: 13 additions & 0 deletions spec/mosquito/api/executor_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,17 @@ describe Mosquito::Api::Executor do
# the heartbeat is stored as a unix epoch without millis
assert_equal now.at_beginning_of_second, api.heartbeat
end

it "publishes job started/finished events" do
job_run.store
job_run.build_job

eavesdrop do
observer.execute job_run, job.class.queue do
end
end

assert_message_received /job-started/
assert_message_received /job-finished/
end
end
3 changes: 3 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ Mosquito.configure do |settings|
end

require "./helpers/*"
class Minitest::Test
include PubSub::Helpers
end

Mosquito.configuration.backend.flush

Expand Down
4 changes: 4 additions & 0 deletions src/mosquito/api.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "./backend"
require "./api/observability/*"
require "./api/*"

module Mosquito::Api
Expand All @@ -24,4 +25,7 @@ module Mosquito::Api
.map { |name| Overseer.new name }
end

def self.event_receiver : Channel(Backend::BroadcastMessage)
Mosquito.backend.subscribe "mosquito:*"
end
end
10 changes: 10 additions & 0 deletions src/mosquito/api/executor.cr
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ module Mosquito

module Observability
class Executor
include Publisher

private getter log : ::Log
def self.metadata_key(instance_id : String) : String
Backend.build_key "executor", instance_id
Expand All @@ -43,12 +45,19 @@ module Mosquito
def initialize(executor : Mosquito::Runners::Executor)
@metadata = Metadata.new self.class.metadata_key executor.object_id.to_s
@log = Log.for(executor.runnable_name)
@publish_context = PublishContext.new [:executor, executor.object_id]
end

def execute(job_run : JobRun, from_queue : Queue)
@metadata["current_job"] = job_run.id
@metadata["current_job_queue"] = from_queue.name
log.info { "#{"Starting:".colorize.magenta} #{job_run} from #{from_queue.name}" }
publish({
event: "job-started",
job_run: job_run.id,
from_queue: from_queue.name,
# expected_duration_ms: expected_duration
})

duration = Time.measure do
yield
Expand All @@ -60,6 +69,7 @@ module Mosquito
log_failure_message job_run, duration
end

publish({event: "job-finished", job_run: job_run.id})
@metadata["current_job"] = nil
@metadata["current_job_queue"] = nil
end
Expand Down
27 changes: 27 additions & 0 deletions src/mosquito/api/observability/publisher.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module Mosquito::Observability::Publisher
getter publish_context : PublishContext

def publish(data : NamedTuple)
Log.debug { "Publishing #{data} to #{@publish_context.originator}" }
Mosquito.backend.publish(
publish_context.originator,
data.to_json
)
end

class PublishContext
alias Context = Array(String | Symbol | UInt64)
property originator : String
property context : String

def initialize(context : Context)
@context = KeyBuilder.build context
@originator = KeyBuilder.build "mosquito", @context
end

def initialize(parent : self, context : Context)
@context = KeyBuilder.build context
@originator = KeyBuilder.build "mosquito", parent.context, context
end
end
end
10 changes: 10 additions & 0 deletions src/mosquito/backend.cr
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
module Mosquito
abstract class Backend
struct BroadcastMessage
property channel : String
property message : String

def initialize(@channel, @message)
end
end

QUEUES = %w(waiting scheduled pending dead)

KEY_PREFIX = {"mosquito"}
Expand Down Expand Up @@ -43,6 +51,8 @@ module Mosquito

abstract def unlock(key : String, value : String) : Nil
abstract def lock?(key : String, value : String, ttl : Time::Span) : Bool
abstract def publish(key : String, value : String) : Nil
abstract def subscribe(key : String) : Channel(BroadcastMessage)
end

macro inherited
Expand Down
2 changes: 2 additions & 0 deletions src/mosquito/base.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require "json"

module Mosquito
alias Id = Int64 | Int32

Expand Down
27 changes: 27 additions & 0 deletions src/mosquito/redis_backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,33 @@ module Mosquito
remove_matching_key keys: [key], args: [value]
end

def self.publish(key : String, value : String) : Nil
redis.publish key, value
end

def self.subscribe(key : String) : Channel(Backend::BroadcastMessage)
stream = Channel(Backend::BroadcastMessage).new

spawn do
redis.psubscribe(key) do |subscription, connection|
subscription.on_message do |channel, message|
if stream.closed?
connection.unsubscribe channel
else
stream.send(
Backend::BroadcastMessage.new(
channel: channel,
message: message
)
)
end
end
end
end

stream
end

def schedule(job_run : JobRun, at scheduled_time : Time) : JobRun
redis.zadd scheduled_q, scheduled_time.to_unix_ms.to_s, job_run.id
job_run
Expand Down
7 changes: 7 additions & 0 deletions src/mosquito/test_backend.cr
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ module Mosquito
def self.unlock(key : String, value : String) : Nil
end

def self.publish(key : String, value : String) : Nil
end

def self.subscribe(key : String) : Channel(BroadcastMessage)
Channel(BroadcastMessage).new
end

struct EnqueuedJob
getter id : String
getter klass : Mosquito::Job.class
Expand Down

0 comments on commit 5fc96ae

Please sign in to comment.