Guava's EventBus - Simple Publisher/Subscriber


Eventbus is a simple publisher/subscriber api included within Google's guava. Its designed to help decoupling inside applications. We give a simple example of the api by using it to make a multi-user chat system.

Mon 10 October 2011

Looking over recent additions to Google's Guava Libraries Release 10 I noticed the addition of EventBus. This is a lightweight implementation of a publish-subscribe style messaging system. This is similar to the publish-subscribe model provided by JMS, however the messages remain within the application rather than being broadcast externally.

EventBus allows you to create streams within your program to which objects can subscribe; they will then receive messages published to those streams. Although this inter-object communication is not particularly difficult to recreate using patterns such as singletons, EventBus does provide a particularly simple and lightweight mechanism. Singletons also make having multiple event buses of a single type more difficult, and are hard to test.

As an example I am going to create a simple multi-user chat program using sockets that several people will connect to via telnet. We will simply create an EventBus which will serve as a channel. Any messages that a user sends to the system will be published to all the other users.

So here is our UserThread object:

        class UserThread extends Thread {
            private Socket connection;
            private EventBus channel;
            private BufferedReader in;
            private PrintWriter out;
        
            public UserThread(Socket connection, EventBus channel) {
                this.connection = connection;
                this.channel = channel;
                try {
                    in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
                    out = new PrintWriter(connection.getOutputStream(), true);
                } catch (IOException e) {
                    e.printStackTrace();
                    System.exit(1);
                }
            }
        
            @Subscribe
            public void recieveMessage(String message) {
                if (out != null) {
                    out.println(message);
                }
            }
        
            @Override
            public void run() {
                try {
                    String input;
                    while ((input = in.readLine()) != null) {
                        channel.post(input);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                
                //reached eof
                channel.unregister(this)
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                in = null;
                out = null;
            }
        }
        

As can be seen this is just a simple threaded object that contains the EventBus that serves as a channel, and the user's Socket. The run method then simply reads the socket and sends the message to the channel by calling the post method on the EventBus.

Receiving messages is then implemented by adding a public method with the @Subscribe annotation (see above). This signals the EventBus to call this method upon receiving a message of the type given in the method argument. Here I am sending Strings however other objects can be used.

GOTCHA: The method annotated with @Subscribe MUST be public.

The receive function takes the message and writes it out to the user's connection. This will of course also ping back the message that has been sent to the original user as the UserThread object will itself receive the message that it published.

All that is left is to create a simple server object that listens for connections and creates UserThread objects as needed.

        public class EventBusChat {
            public static void main(String[] args) {
                EventBus channel = new EventBus();
                ServerSocket socket;
                try {
                    socket = new ServerSocket(4444);
                    while (true) {
                        Socket connection = socket.accept();
                        UserThread newUser = new UserThread(connection, channel);
                        channel.register(newUser);
                        newUser.start();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        

As shown this creates the channel, accepts user connections and registers them to the EventBus. The important code to notice here is the call to the register method with the UserThread object as an argument. This call subscribes the object on the EventBus, and indicates that it can process messages.

Once the server is started users can then connect to the chat server with the telnet command:

telnet 127.0.0.1 4444

And if you connect multiple instances you will see any message sent being relayed to the other instances.

Having viewed this example you may be wondering what use an EventBus has. A very good example could be when maintaining a very loose coupling between a user interface and backend code. User input would generate a message such as resize, lost focus or closing down. Back end components could then simply subscribe to these events and deal with them appropriately. The official documentation lists many other uses as well.

NB: EventBus isn't meant for general purpose publisher-subscriber communication, this is just an example of the how the API interacts.