Skip to content

Commit

Permalink
[standard]move event to logger package, and provide logger.
Browse files Browse the repository at this point in the history
  • Loading branch information
vistart committed Jun 27, 2024
1 parent 8f3f06c commit 4134c20
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 64 deletions.
14 changes: 9 additions & 5 deletions workflow/standard/context/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
// deadlines, and request-scoped values across Goroutines.
package context

import "context"
import (
"context"

"github.com/rhosocial/go-dag/workflow/standard/logger"
)

// Context defines the methods that a context implementation must satisfy.
type Context interface {
Expand All @@ -16,7 +20,7 @@ type Context interface {
GetIdentifier() IdentifierInterface
GetOptions() OptionsInterface
GetReports() ReportsInterface
GetEventManager() EventManagerInterface
GetEventManager() logger.EventManagerInterface
}

// BaseContext represents a context instance that encapsulates a context.BaseContext and
Expand Down Expand Up @@ -71,7 +75,7 @@ type BaseContext struct {
// This field is optional; if not specified, it means events will not be listened to or sent
// to any subscribers for this execution. If event-driven communication is not needed,
// this field can be left unspecified.
eventManager EventManagerInterface
eventManager logger.EventManagerInterface
}

// Cancel cancels the context with the provided error cause.
Expand All @@ -87,7 +91,7 @@ func (c *BaseContext) GetOptions() OptionsInterface { return c.options }

func (c *BaseContext) GetReports() ReportsInterface { return c.reports }

func (c *BaseContext) GetEventManager() EventManagerInterface { return c.eventManager }
func (c *BaseContext) GetEventManager() logger.EventManagerInterface { return c.eventManager }

// Option is a function type for defining context configuration options.
type Option func(*BaseContext) error
Expand Down Expand Up @@ -157,7 +161,7 @@ func WithReports(reports ReportsInterface) Option {
// You must explicitly start the Listen() method on the event manager at the appropriate time
// in your application. Failure to do so will result in the workflow and transit workers being
// blocked when they attempt to send data to the event channel.
func WithEventManager(eventManager EventManagerInterface) Option {
func WithEventManager(eventManager logger.EventManagerInterface) Option {
return func(context *BaseContext) error {
context.eventManager = eventManager
return nil
Expand Down
9 changes: 6 additions & 3 deletions workflow/standard/context/interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"testing"

"github.com/rhosocial/go-dag/workflow/standard/logger"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -66,11 +67,13 @@ func (o MockReports) AddTransit(transit string, key string, value any) error {
}

type MockEventManager struct {
EventManagerInterface
logger.EventManagerInterface
}

func (o MockEventManager) Listen(ctx context.Context) {}

func (o MockEventManager) GetLogger() logger.Interface { return nil }

type MockContext struct {
Context
}
Expand All @@ -85,7 +88,7 @@ func (m MockContext) GetOptions() OptionsInterface { return nil }

func (m MockContext) GetReports() ReportsInterface { return nil }

func (m MockContext) GetEventManager() EventManagerInterface { return nil }
func (m MockContext) GetEventManager() logger.EventManagerInterface { return nil }

// Ensure MockIdentifier implements IdentifierInterface
var _ IdentifierInterface = (*MockIdentifier)(nil)
Expand All @@ -94,7 +97,7 @@ var _ IdentifierInterface = (*MockIdentifier)(nil)
var _ OptionsInterface = (*MockOptions)(nil)

// Ensure MockEventManager implements EventManagerInterface
var _ EventManagerInterface = (*MockEventManager)(nil)
var _ logger.EventManagerInterface = (*MockEventManager)(nil)

var _ Context = (*MockContext)(nil)

Expand Down
8 changes: 8 additions & 0 deletions workflow/standard/logger/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# logger

The `logger` package provides "event manager", "basic event definitions", and "a simple log transmitter".

- **Event Manager**: Used to receive events and dispatch them to all subscribers.
- **Event Definition**: Defines the most basic events, allowing for extension.
- **Log Transmitter**: The event manager can derive a log transmitter,
allowing log events to be passed to the log transmitter when events occur.
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,32 @@
// Use of this source code is governed by Apache-2.0 license
// that can be found in the LICENSE file.

package context
// Package logger provides an event subscriber and a basic log event.
package logger

import "context"

// EventInterface represents an event sent by workflow or transit workers.
//
// If you want to customize events, please extend this interface.
type EventInterface interface{}

// SubscriberInterface represents a subscriber that listens for events.
type SubscriberInterface interface {
// ReceiveEvent is called when an event is received.
//
// Note that if the event is nil, it will not be dispatched.
ReceiveEvent(event EventInterface)
}

// EventManagerInterface represents an interface for managing events.
type EventManagerInterface interface {
// Listen listens for events and sends them to subscribers.
//
// This method is actually called asynchronously by the event manager,
// so you don't have to worry about this method being time-consuming and blocking other processes.
Listen(ctx context.Context)
GetLogger() Interface
}

// EventManager manages events and subscribers.
Expand All @@ -28,9 +37,18 @@ type EventManager struct {

// subscribers is a map of subscriber identifiers to subscriber instances.
subscribers map[string]SubscriberInterface

logger Interface
}

// NewEventManager creates a new EventManager instance.
//
// Note that the new instance cannot receive events directly and needs to start listening first. For example:
//
// manager, err := NewEventManager(WithSubscriber("identifier", subscriber))
// ctxBg, cancelBg := context.WithCancel(context.Background())
// go manager.Listen(ctxBg)
// <receive events...>
func NewEventManager(options ...EventManagerOption) (*EventManager, error) {
eventManager := &EventManager{
// Initialize eventChannel to handle EventInterface.
Expand All @@ -39,6 +57,7 @@ func NewEventManager(options ...EventManagerOption) (*EventManager, error) {
// Initialize subscribers as an empty map.
subscribers: make(map[string]SubscriberInterface),
}
eventManager.logger = NewLogger(eventManager.eventChannel)
for _, option := range options {
// Apply each option to the event manager.
err := option(eventManager)
Expand All @@ -53,6 +72,15 @@ func NewEventManager(options ...EventManagerOption) (*EventManager, error) {
type EventManagerOption func(*EventManager) error

// WithSubscriber adds a subscriber to the event manager.
//
// - identifier: represents the identifier of the subscriber.
// - subscriber: the subscriber instance, which needs to implement the SubscriberInterface.
//
// This method can be called multiple times to add multiple subscribers. When an event is received,
// it will be dispatched to each subscriber.
// The dispatch order is not guaranteed to match the order in which subscribers were added,
// so you cannot rely on the order of subscriber addition to determine the event dispatch order.
// If no subscribers are added, received events will be discarded and not dispatched.
func WithSubscriber(identifier string, subscriber SubscriberInterface) EventManagerOption {
return func(manager *EventManager) error {
manager.subscribers[identifier] = subscriber
Expand Down Expand Up @@ -87,18 +115,20 @@ func (em *EventManager) Listen(ctx context.Context) {
}
}

// KeyEventManagerChannel is the context key for the event manager channel.
const KeyEventManagerChannel = "__go_dag_workflow_event_channel"
// GetLogger returns the logger instance.
func (em *EventManager) GetLogger() Interface {
return em.logger
}

const KeyEventManagerLogger = "__go_dag_workflow_event_logger"

// WorkerContextWithEventManager creates a new context with the provided event manager.
func WorkerContextWithEventManager(ctx context.Context, em *EventManager) context.Context {
return context.WithValue(ctx, KeyEventManagerChannel, em.eventChannel)
func WorkerContextWithLogger(ctx context.Context, logger Interface) context.Context {
return context.WithValue(ctx, KeyEventManagerLogger, logger)
}

// GetEventChannelFromWorkerContext retrieves the event channel from the context.
func GetEventChannelFromWorkerContext(ctx context.Context) chan<- EventInterface {
if em, ok := ctx.Value(KeyEventManagerChannel).(chan EventInterface); ok {
return em
func GetLoggerFromWorkerContext(ctx context.Context) Interface {
if logger, ok := ctx.Value(KeyEventManagerLogger).(Interface); ok {
return logger
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by Apache-2.0 license
// that can be found in the LICENSE file.

package context
package logger

import (
"context"
Expand Down Expand Up @@ -41,18 +41,25 @@ func TestEventManager(t *testing.T) {
ctxBg, cancelBg := context.WithCancel(context.Background())
go em.Listen(ctxBg)

// Create a context with eventManager
ctx := WorkerContextWithEventManager(context.Background(), em)
// Get logger
logger := em.GetLogger()
if logger == nil {
t.Fatalf("logger is nil")
}

if logger_ := GetLoggerFromWorkerContext(context.Background()); logger_ != nil {
t.Fatalf("logger is not nil")
}

// Retrieve event channel from context
eventChannel := GetEventChannelFromWorkerContext(ctx)
if eventChannel == nil {
t.Fatal("Failed to retrieve event channel from context")
// Create a context with logger
ctx := WorkerContextWithLogger(context.Background(), em.GetLogger())
if logger_ := GetLoggerFromWorkerContext(ctx); logger_ == nil {
t.Fatalf("logger is nil")
}

// Publish an event
event := &MockEvent{Message: "Test event"}
eventChannel <- event
logger.Log(event)

// Wait for a short time to ensure event processing
time.Sleep(100 * time.Millisecond)
Expand All @@ -73,7 +80,7 @@ func TestEventManager(t *testing.T) {
delete(em.subscribers, "mock_subscriber")

// Publish another event after removing subscriber
eventChannel <- &MockEvent{Message: "Test event 2"}
logger.Log(&MockEvent{Message: "Test event 2"})

// Wait for a short time to ensure event processing
time.Sleep(100 * time.Millisecond)
Expand All @@ -86,28 +93,6 @@ func TestEventManager(t *testing.T) {
cancelBg()
}

func TestContextWithEventManager(t *testing.T) {
// Create a new eventManager
em, err := NewEventManager()
if err != nil {
t.Fatalf("Error creating eventManager: %v", err)
}

// Create a context with eventManager
ctx := WorkerContextWithEventManager(context.Background(), em)

// Retrieve event channel from context
retrieveChan := GetEventChannelFromWorkerContext(ctx)
if retrieveChan == nil {
t.Fatal("Failed to retrieve eventManager from context")
}

// Ensure that retrieved eventManager matches the original one
if retrieveChan != em.eventChannel {
t.Fatal("Retrieved eventManager does not match the original one")
}
}

func WithSubscriberError() EventManagerOption {
return func(em *EventManager) error {
return errors.New("with subscriber error")
Expand All @@ -119,9 +104,3 @@ func TestEventManagerWithError(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, em)
}

func TestGetEventChannelFromContextNil(t *testing.T) {
// Retrieve event channel from context
retrieveChan := GetEventChannelFromWorkerContext(context.Background())
assert.Nil(t, retrieveChan)
}
34 changes: 34 additions & 0 deletions workflow/standard/logger/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2023 - 2024. vistart. All rights reserved.
// Use of this source code is governed by Apache-2.0 license
// that can be found in the LICENSE file.

package logger

// Interface defines the methods that should be implemented to record events.
type Interface interface {
// Log records the event.
//
// Note that this method is actually executed asynchronously.
Log(event EventInterface)
}

// Logger implements a simple log transmitter.
type Logger struct {
// eventChannel stores the event manager's event receiving channel.
//
// Note that this channel must be initialized, otherwise it will cause a panicking.
eventChannel chan EventInterface
Interface
}

// Log records the event.
func (l *Logger) Log(event EventInterface) {
l.eventChannel <- event
}

// NewLogger instantiates a new logger.
func NewLogger(eventChannel chan EventInterface) Interface {
return &Logger{
eventChannel: eventChannel,
}
}
Loading

0 comments on commit 4134c20

Please sign in to comment.