Skip to content

Commit

Permalink
- Replaced MessageListener/StateListener/MembershipListener with Rece…
Browse files Browse the repository at this point in the history
…iver (https://issues.redhat.com/browse/JGRP-2441)

- Removed all no-op impls of suspect(), block(), unblock()
- Updated manual/tutorial
- Deprecated TOA
  • Loading branch information
belaban committed Jan 31, 2020
1 parent ad04083 commit e3ce110
Show file tree
Hide file tree
Showing 118 changed files with 441 additions and 708 deletions.
146 changes: 53 additions & 93 deletions doc/manual/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ The `Message` interface is defined below (edited for legibility):
public interface Message extends SizeStreamable, Constructable<Message> {
/** The type of the message */
short BYTES_MSG=0, NIO_MSG=1, EMPTY_MSG=2, OBJ_MSG=3, OBJ_MSG_SERIALIZABLE=4,
short BYTES_MSG=0, NIO_MSG=1, EMPTY_MSG=2, OBJ_MSG=3,
COMPOSITE_MSG=5, FRAG_MSG=6, LONG_MSG=7;
/** Returns the type of the message, e.g. BYTES_MSG, OBJ_MSG etc */
Expand Down Expand Up @@ -539,119 +539,79 @@ public void receive(Message msg) {
After all, the application knows what message types it sends, and can therefore safely downcast.


[[Interfaces]]
=== Interfaces
These interfaces are used with some of the APIs presented below, therefore they are listed first.

[[MessageListener]]
==== MessageListener

The MessageListener interface below provides callbacks for message reception and
for providing and setting the state:

[source,java]
----
public interface MessageListener {
void receive(Message msg);
void receive(MessageBatch batch);
void getState(OutputStream output) throws Exception;
void setState(InputStream input) throws Exception;
}
----

Method `receive()` is called whenever a message is received and `receive(MessageBatch)` is called when a message
batch is received.

The `getState()` and `setState()` methods are used to fetch and set the group state (e.g. when joining).
Refer to <<StateTransfer,State transfer>> for a discussion of state transfer.

[[Receiver]]
=== Receiver

[[MembershipListener]]
==== MembershipListener
The `Receiver` interface defines the callbacks that are invoked at the receiver side:

The MembershipListener interface is similar to the `MessageListener` interface above: every time a new view, a suspicion message,
or a block event is received, the corresponding method of the class implementing `MembershipListener` will be called.

[source,java]
----
public interface MembershipListener {
void viewAccepted(View new_view);
void suspect(Object suspected_mbr);
void block();
void unblock();
}
----
public interface Receiver {
Oftentimes the only callback that needs to be implemented will be
`viewAccepted()` which notifies the receiver that a new member has joined the
group or that an existing member has left or crashed. The `suspect()`
callback is invoked by JGroups whenever a member if suspected of having crashed, but not yet excluded
footnote:[It could be that the member is suspected falsely, in which case the next view would still contain the
suspected member (there is no unsuspect() method].


The `block()` method is called to notify the member that it will soon be blocked
sending messages. This is done by the FLUSH protocol, for example to ensure that nobody is sending
messages while a state transfer or view installation is in progress. When block() returns, any thread
sending messages will be blocked, until FLUSH unblocks the thread again, e.g. after the state has been
transferred successfully.

Therefore, `block()` can be used to send pending messages or complete some other work.
Note that block() should be brief, or else the entire `FLUSH` protocol is blocked.

The `unblock()` method is called to notify the member that the FLUSH protocol has completed and the member can resume
sending messages. If the member did not stop sending messages on block(), FLUSH simply blocked them and
will resume, so no action is required from a member. Implementation of the unblock() callback is optional.

NOTE: Note that it is oftentimes simpler to extend `ReceiverAdapter` (see below) and implement the needed
callbacks than to implement all methods of both of these interfaces, as most callbacks are not needed.


[[Receiver]]
==== Receiver
default void receive(Message msg) {
}
[source,java]
----
public interface Receiver extends MessageListener, MembershipListener;
----
default void receive(MessageBatch batch) {
for(Message msg: batch) {
try {
receive(msg);
}
catch(Throwable t) {
}
}
}
A `Receiver` can be used to receive messages and view changes; `receive()` will be invoked as soon as a
message has been received, and `viewAccepted()` will be called whenever a new view is installed.

default void viewAccepted(View new_view) {}
[[ReceiverAdapter]]
==== ReceiverAdapter
default void block() {}
This class implements Receiver with no-op implementations. When implementing a callback, we can simply
extend ReceiverAdapter and overwrite receive() in order to not having to implement all callbacks of the interface.

default void unblock() {}
ReceiverAdapter looks as follows:
default void getState(OutputStream output) throws Exception {
throw new UnsupportedOperationException();
}
[source,java]
----
public class ReceiverAdapter implements Receiver {
void receive(Message msg) {}
void receive(MessageBatch batch) {}
void getState(OutputStream output) throws Exception {}
void setState(InputStream input) throws Exception {}
void viewAccepted(View view) {}
void suspect(Address mbr) {}
void block() {}
void unblock() {}
default void setState(InputStream input) throws Exception {
throw new UnsupportedOperationException();
}
}
----

A ReceiverAdapter is the recommended way to implement callbacks.
[width="90%",cols="2,5",frame="topbot",options="header"]
|==========================
|Name |Description
|receive(Message)| A message is recived
|receive(MessageBatch)| A <<MessageBatch,message batch>> is received
|viewAccepted|Called when a change in membership has occurred. No long running actions, sending of messages
or anything that could block should be done in this callback. If some long running action
needs to be performed, it should be done in a separate thread.
|block|Called (usually by the <<FLUSH,FLUSH>> protocol), as an indication that the member should stop sending
messages. Any messages sent after returning from this callback might get blocked by the FLUSH protocol.
When the FLUSH protocol is done, and messages can be sent again, the FLUSH protocol will simply unblock
all pending messages. If a callback for unblocking is desired, implement `unblock()`.
|unblock|Called _after_ the FLUSH protocol has unblocked previously blocked senders, and messages can be sent again.
|getState|Allows an application to write the state to an OutputStream. After the state has been written, the
OutputStream doesn't need to be closed as stream closing is automatically done when a calling thread
returns from this callback.
|setState|Allows an application to read the state from an InputStream. After the state has been read, the InputStream
doesn't need to be closed as stream closing is automatically done when a calling thread
returns from this callback. +
Refer to <<StateTransfer,State transfer>> for a discussion of state transfer.
|==========================


CAUTION: Note that anything that could block should _not_ be done in a callback. This includes sending of messages;
CAUTION: Note that anything that could block should _not_ be done in a callback. This includes sending of messages:
if we have FLUSH on the stack, and send a message in a viewAccepted() callback, then the following happens:
the FLUSH protocol blocks all
(multicast) messages before installing a view, then installs the view, then unblocks. However,
because installation of the view triggers the viewAccepted() callback, sending of messages inside of
viewAccepted() will block. This in turn blocks the viewAccepted() thread, so the flush will never return! +
If we need to send a message in a callback, the sending should be done on a separate thread, or a
timer task should be submitted to the timer.
task should be submitted to the timer.

A `Receiver` callback is registered with the channels via `JChannel.setReceiver(Receiver)`.




[[ChannelListener]]
Expand Down
2 changes: 1 addition & 1 deletion doc/tutorial/code/SimpleChat.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.util.List;
import java.util.LinkedList;

public class SimpleChat extends ReceiverAdapter {
public class SimpleChat implements Receiver {
JChannel channel;
String user_name=System.getProperty("user.name", "n/a");
final List<String> state=new LinkedList<>();
Expand Down
9 changes: 4 additions & 5 deletions doc/tutorial/sampleapp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,14 @@ This is done in the next section below.

=== Receiving messages and view change notifications

Let's now register as a `Receiver` to receive message and view changes. To this end, we could implement
`org.jgroups.Receiver`, however, I chose to extend `ReceiverAdapter` which has default
implementations, and only override callbacks (`receive()` and `viewChange()`) we're interested in. We
now need to extend ReceiverAdapter:
Let's now register as a `Receiver` to receive message and view changes. To this end, we implement
`org.jgroups.Receiver`, which has default implementations, and override callbacks `receive()`
and `viewChange()`:


[source,java]
----
public class SimpleChat extends ReceiverAdapter {
public class SimpleChat implements Receiver {
----
, set the receiver in `start()`:

Expand Down
8 changes: 4 additions & 4 deletions src/org/jgroups/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -495,9 +495,9 @@ public void stopFlush(List<Address> flushParticipants) {
* Retrieves the full state from the target member.
* <p>
* State transfer is initiated by invoking getState on this channel. The state provider in turn
* invokes {@link MessageListener#getState(java.io.OutputStream)} callback and sends a state to
* invokes {@link Receiver#getState(java.io.OutputStream)} callback and sends a state to
* this node, the state receiver. After the state arrives to the state receiver
* {@link MessageListener#setState(java.io.InputStream)} callback is invoked to install the
* {@link Receiver#setState(java.io.InputStream)} callback is invoked to install the
* state.
*
* @param target
Expand All @@ -506,8 +506,8 @@ public void stopFlush(List<Address> flushParticipants) {
* The number of milliseconds to wait for the operation to complete successfully. 0
* waits until the state has been received
*
* @see MessageListener#getState(java.io.OutputStream)
* @see MessageListener#setState(java.io.InputStream)
* @see Receiver#getState(java.io.OutputStream)
* @see Receiver#setState(java.io.InputStream)
*
* @exception IllegalStateException
* The channel was closed or disconnected, or the flush (if present) failed
Expand Down
14 changes: 4 additions & 10 deletions src/org/jgroups/JChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -499,14 +499,14 @@ public JChannel send(Address dst, byte[] buf, int offset, int length) throws Exc
* Retrieves the full state from the target member.
* <p>
* The state transfer is initiated by invoking getState() on this channel. The state provider in turn invokes the
* {@link MessageListener#getState(java.io.OutputStream)} callback and sends the state to this node, the state receiver.
* After the state arrives at the state receiver, the {@link MessageListener#setState(java.io.InputStream)} callback
* {@link Receiver#getState(java.io.OutputStream)} callback and sends the state to this node, the state receiver.
* After the state arrives at the state receiver, the {@link Receiver#setState(java.io.InputStream)} callback
* is invoked to install the state.
* @param target the state provider. If null the coordinator is used by default
* @param timeout the number of milliseconds to wait for the operation to complete successfully. 0
* waits forever until the state has been received
* @see MessageListener#getState(java.io.OutputStream)
* @see MessageListener#setState(java.io.InputStream)
* @see Receiver#getState(java.io.OutputStream)
* @see Receiver#setState(java.io.InputStream)
* @exception IllegalStateException the channel was closed or disconnected, or the flush (if present) failed
* @exception StateTransferException raised if there was a problem during the state transfer
*/
Expand Down Expand Up @@ -883,12 +883,6 @@ protected Object invokeCallback(int type, Object arg) {
case Event.VIEW_CHANGE:
receiver.viewAccepted((View)arg);
break;
case Event.SUSPECT:
// todo: change this in 4.1 to only accept collections
Collection<Address> suspects=arg instanceof Address? Collections.singletonList((Address)arg)
: (Collection<Address>)arg;
suspects.forEach(receiver::suspect);
break;
case Event.GET_APPLSTATE:
byte[] tmp_state=null;
if(receiver != null) {
Expand Down
62 changes: 0 additions & 62 deletions src/org/jgroups/MembershipListener.java

This file was deleted.

36 changes: 0 additions & 36 deletions src/org/jgroups/MessageListener.java

This file was deleted.

Loading

0 comments on commit e3ce110

Please sign in to comment.