Sample Messaging Publishing/Subscribing System

 

Using Luban programming language and its networking and file utilities, it is very easy to write a simple topic based messaging system. Client can subscribe and publish message by topic, while server accepts client subscription, receive topic message update and broadcast to subscribing clients. The code required to implement such a messaging system is very small. And they are completely listed below.

 

1           Messaging Server Code

 

namespace luban::demo;

 

stationary struct MsgServiceConfig

(

   static readonly host = "localhost";

   static readonly subport = 9876;

   static readonly pubport = 9877;

   static topics = {:};

);

 

 struct SubService( )

as process

{

               subport = net::listener( ::MsgServiceConfig.subport );

               while ( true )

               {

                           sck = subport.accept();

                           topic = sck.readobj();

                           std::println(obj=”subscriber for “+topic);

                           first = false;

                           if ( not ::MsgServiceConfig.topics.contains( topic ) )

                           {

                                       ::MsgServiceConfig.topics[topic] = [null, []];

                                       first = true;

                           }

                           ::MsgServiceConfig.topics[topic][1].append(sck);

                           sck.writeobj("OK");

                           if ( not first )

                                       sck.writeobj( ::MsgServiceConfig.topics[topic][0] );

               }

}

 

struct PubService( )

as process

{

               pubport = net::listener( ::MsgServiceConfig.pubport );

               while ( true )

               {

                           sck = pubport.accept();

                           pack = sck.readobj();

                           std::println(obj=pack);

                           topic = pack[0];

                           value = pack[1];

                           sck.writeobj("OK");

              

                           if ( not ::MsgServiceConfig.topics.contains(topic) )

                           {

                                       ::MsgServiceConfig.topics[topic] = [ value, [] ];

                                       continue;

                           }

                           ::MsgServiceConfig.topics[topic][0] = value;

                           foreach( sub in ::MsgServiceConfig.topics[topic][1] )

                                       sub.writeobj(value);

               }

}

 

struct PubSubServer()

as process

{

   ::PubService(=) &

   ::SubService(=) &

}

 

The messaging server code consists of several components. The first one MsgServiceConfig is actually a shared data holder. It contains several static properties that are accessible for cross different Luban structures. The static properties save the information like server host name and port number. There are two ports open on the server, one to accept incoming subscription, another one to accept incoming topic updates. The essential data property is the one named “topics”. It is initialized as an empty map, it is to contain all the topics and their value, plus all the subscriber’s socket data. The key to the map is message topic. And the value of in the map is a two element vector. The first element is the value of the topic, while the second one is a vector containing all the subscriber’s connection sockets.

The execution logic of the server is coded into two structures. One structure “SubService” is to handle subscription request. It accepts the incoming request, look up the topic in the shared data map and put the socket into the vector associated with the topic subscribed. Another structure “PubService” accepts the message topic updating request. It takes the topic and value of the update, looks up the data map, update the value of the topic, then send out the updated message to each of the subscriber through the saved socket vector.

 

2           Messaging Service Client Code

 

namespace luban::demo;

 

asynch struct Subscriber

(

   input:

                string topic;

   output:

               updates;

)

as process

{

socket=net::socket(::MsgServiceConfig.host,::MsgServiceConfig.subport );

   socket.writeobj(input.topic);

   ack = socket.readobj();

   std::println(obj=ack);

   while ( true )

     {

               newdata = socket.readobj();

               output.updates = newdata;

     }

}

 

struct Publisher

(

   input:

               string topic;

               updates;

   output:

               serverack;

)

as process

{

   socket=net::socket(::MsgServiceConfig.host,::MsgServiceConfig.pubport );

   socket.writeobj([input.topic, input.updates]);

   ack = socket.readobj();

   std::println(obj=ack);

   output.serverack = ack;

}

 

 

 

struct SubClient

(

   input:

               string topic;

)

as composition

{

   sub: ::Subscriber(topic=input.topic);

   printer: std::println(obj=sub.updates);

}

 

struct PubClient()

as process

{

   for(;;)

     {

               console = std::console();

               line = console.readline();

               tandv = line.split();

               ack = ::Publisher(topic=tandv[0], updates=tandv[1]).serverack;

               console.writeline(ack);

     }

}

 

 

The above client code are separated into “Subscriber” and “Publisher”. The logic of both structure are very simple. Subscriber connects to the message server on specific host and port, send the topic to subscribe to, then it holds the socket and listen to any update and send received update its output property so the downstream components can process the update. You may notice that this “Subscriber” structure is asynch. And it is purposely design to be so. Asynch structure fit naturally with network messaging service because it runs in a different thread and put all its output in queue. The “Publisher” also connects to the message server though on different port. It simply sends the topic of message and the message itself in a two element vector. “Publisher” is a regular structure, instead of asynch.

The other two structures are for testing purpose. The “SubClient” structure is compositional, it simply connects a “Subscriber” to a “std::println” structure. So every updates the “Subscriber” gets from network will be printed out to standard output by the “std::println” structure. The “PubClient” simply read line from standard input in the format of “topic message” and call the “Publisher” structure to publish the topic and message.

 

3           Potential Improvements to Messaging System

You can see from above sample code that the messaging system is practically useful though the coding of it is very simple. This simple layer present the network communication function in a topic based format that is far more simpler and easier than the socket.

One potential improvement to this simple messaging service is to expand it to a messaging/database hybrid system. Put it simply, if we persist all the message to a file system, it becomes a simple name key based database system. And things saved in it can be any Luban object. The hybrid system can be very interesting to use because it not only saves the data by name, it can broadcast the updated new data to subscribers cross the network.