public class DuplexTypedMessagesFactory extends java.lang.Object implements IDuplexTypedMessagesFactory
class MyReceiver
{
// Typed message receiver receiving 'Item' and responding 'String'.
private IDuplexTypedMessageReceiver<String, Item> myReceiver;
public void startListening() throws Exception
{
// Create message receiver receiving 'Item' and responding 'String'.
// Note: XmlStringSerializer is used by default.
IDuplexTypedMessagesFactory aReceiverFactory = new DuplexTypedMessagesFactory();
myReceiver = aReceiverFactory.createDuplexTypedMessageReceiver(String.class, Item.class);
// Subscribe to receive messages.
myReceiver.messageReceived().subscribe(myOnMessageHandler);
// Create TCP messaging.
IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory();
IDuplexInputChannel anInputChannel = aMessaging.createDuplexInputChannel("tcp://127.0.0.1:8033/");
// Attach input channel and start listening to messages.
myReceiver.attachDuplexInputChannel(anInputChannel);
}
public void stopListening()
{
// Detach input channel and stop listening.
myReceiver.detachDuplexInputChannel();
}
private void onMessageReceived(Object sender, TypedRequestReceivedEventArgs<Item> e)
throws Exception
{
// Get the response message.
Item anItem = e.getMessage();
...
// Send back a response message.
// Note: The response is declared as String.
String aResponseMsg = "Response message.";
myReceiver.sendResponseMessage(e.getResponseReceiverId(), aResponseMsg);
}
// Received message handler
EventHandler<TypedRequestReceivedEventArgs<Item>> myOnResponseHandler = new EventHandler<TypedRequestReceivedEventArgs<Item>>()
{
public void invoke(Object x, TypedRequestReceivedEventArgs<Item> y)
throws Exception
{
onMessageReceived(x, y);
}
}
}
// Message to be sent and received.
public class Item
{
public String name;
public int amount;
}
class MySender
{
// Typed message sender sending 'Item' and as a response receiving 'String'.
private IDuplexTypedMessageSender<String, Item> mySender;
public void openConnection() throws Exception
{
// Create message sender sending 'Item' and receiving 'String'.
// Note: XmlStringSerializer is used by default.
IDuplexTypedMessagesFactory aSenderFactory = new DuplexTypedMessagesFactory();
mySender = aSenderFactory.createDuplexTypedMessageSender(String.class, Item.class);
// Subscribe to receive response messages.
mySender.responseReceived().subscribe(myOnResponseHandler);
// Create TCP messaging.
IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory();
IDuplexOutputChannel anOutputChannel = aMessaging.createDuplexOutputChannel("tcp://127.0.0.1:8033/");
// Attach output channel and be able to send messages and receive responses.
mySender.attachDuplexOutputChannel(anOutputChannel);
}
public void closeConnection()
{
// Detach output channel and stop listening to response messages.
mySender.detachDuplexOutputChannel();
}
public void sendMessage(Item message) throws Exception
{
mySender.sendMessage(message);
}
private void onResponseReceived(Object sender, TypedResponseReceivedEventArgs<String> e)
{
// Get the response message.
String aReceivedResponse = e.getResponseMessage();
...
}
// Response message handler
EventHandler<TypedResponseReceivedEventArgs<String>> myOnResponseHandler = new EventHandler<TypedResponseReceivedEventArgs<String>>()
{
public void invoke(Object x, TypedRequestReceivedEventArgs<String> y)
throws Exception
{
onResponseReceived(x, y);
}
}
}
In case you need a synchronous communication where client needs to wait for the response
you can use synchronous sender:
// Message to be sent and received.
public class Item
{
public String name;
public int amount;
}
class MySender
{
// Typed message sender sending 'Item' and as a response receiving 'String'.
private ISyncDuplexTypedMessageSender<String, Item> mySender;
public void openConnection() throws Exception
{
// Create message sender sending 'Item' and receiving 'String'.
// Note: XmlStringSerializer is used by default.
IDuplexTypedMessagesFactory aSenderFactory = new DuplexTypedMessagesFactory();
mySender = aSenderFactory.createSyncDuplexTypedMessageSender(String.class, Item.class);
// Create TCP messaging.
IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory();
IDuplexOutputChannel anOutputChannel = aMessaging.createDuplexOutputChannel("tcp://127.0.0.1:8033/");
// Attach output channel and be able to send messages and receive responses.
mySender.attachDuplexOutputChannel(anOutputChannel);
}
public void closeConnection()
{
// Detach output channel and stop listening to response messages.
mySender.detachDuplexOutputChannel();
}
// Sends message and waits for the response.
public String sendMessage(Item message) throws Exception
{
String aResponse = mySender.sendMessage(message);
return aResponse;
}
}
Constructor and Description |
---|
DuplexTypedMessagesFactory()
Constructs the factory with XmlStringSerializer.
|
DuplexTypedMessagesFactory(ISerializer serializer)
Constructs the factory with specified serializer.
|
Modifier and Type | Method and Description |
---|---|
<TResponse,TRequest> |
createDuplexTypedMessageReceiver(java.lang.Class<TResponse> responseMessageClazz,
java.lang.Class<TRequest> requestMessageClazz)
Creates message receiver (service) which can receive messages and send back response messages.
|
<TResponse,TRequest> |
createDuplexTypedMessageSender(java.lang.Class<TResponse> responseMessageClazz,
java.lang.Class<TRequest> requestMessageClazz)
Creates message sender (client) which can send messages and receive response messages.
|
<TResponse,TRequest> |
createSyncDuplexTypedMessageSender(java.lang.Class<TResponse> responseMessageClazz,
java.lang.Class<TRequest> requestMessageClazz)
Creates message sender (client) which sends a request message and then waits for the response.
|
ISerializer |
getSerializer()
Gets serializer for messages.
|
GetSerializerCallback |
getSerializerProvider()
Gets callback for retrieving serializer based on response receiver id.
|
IThreadDispatcherProvider |
getSyncDuplexTypedSenderThreadMode()
Gets the threading mode which is used for receiving connectionOpened and connectionClosed events in SyncDuplexTypedMessageSender.
|
int |
getSyncResponseReceiveTimeout()
Gets the timeout which is used for SyncDuplexTypedMessageSender.
|
DuplexTypedMessagesFactory |
setSerializer(ISerializer serializer)
Sets serializer for messages.
|
DuplexTypedMessagesFactory |
setSerializerProvider(GetSerializerCallback serializerProvider)
Sets callback for retrieving serializer based on response receiver id.
|
DuplexTypedMessagesFactory |
setSyncDuplexTypedSenderThreadMode(IThreadDispatcherProvider threadingMode)
Sets the threading mode for receiving connectionOpened and connectionClosed events for SyncDuplexTypedMessageSender.
|
DuplexTypedMessagesFactory |
setSyncResponseReceiveTimeout(int milliseconds)
Sets the timeout which is used for SyncDuplexTypedMessageSender.
|
public DuplexTypedMessagesFactory()
public DuplexTypedMessagesFactory(ISerializer serializer)
eneter.messaging.dataprocessing.serializing
serializer
- serializer that will be used to serialize/deserialize messages.public <TResponse,TRequest> IDuplexTypedMessageSender<TResponse,TRequest> createDuplexTypedMessageSender(java.lang.Class<TResponse> responseMessageClazz, java.lang.Class<TRequest> requestMessageClazz)
IDuplexTypedMessagesFactory
createDuplexTypedMessageSender
in interface IDuplexTypedMessagesFactory
responseMessageClazz
- type of response messagesrequestMessageClazz
- type of request messagespublic <TResponse,TRequest> ISyncDuplexTypedMessageSender<TResponse,TRequest> createSyncDuplexTypedMessageSender(java.lang.Class<TResponse> responseMessageClazz, java.lang.Class<TRequest> requestMessageClazz)
IDuplexTypedMessagesFactory
createSyncDuplexTypedMessageSender
in interface IDuplexTypedMessagesFactory
responseMessageClazz
- type of response messagesrequestMessageClazz
- type of request messagespublic <TResponse,TRequest> IDuplexTypedMessageReceiver<TResponse,TRequest> createDuplexTypedMessageReceiver(java.lang.Class<TResponse> responseMessageClazz, java.lang.Class<TRequest> requestMessageClazz)
IDuplexTypedMessagesFactory
createDuplexTypedMessageReceiver
in interface IDuplexTypedMessagesFactory
responseMessageClazz
- type of response messagesrequestMessageClazz
- type of request messagespublic DuplexTypedMessagesFactory setSyncDuplexTypedSenderThreadMode(IThreadDispatcherProvider threadingMode)
threadingMode
- threading that shall be used for receiving connectionOpened and connectionClosed events.public IThreadDispatcherProvider getSyncDuplexTypedSenderThreadMode()
public DuplexTypedMessagesFactory setSerializer(ISerializer serializer)
serializer
- serializerpublic ISerializer getSerializer()
public GetSerializerCallback getSerializerProvider()
public DuplexTypedMessagesFactory setSerializerProvider(GetSerializerCallback serializerProvider)
serializerProvider
- public DuplexTypedMessagesFactory setSyncResponseReceiveTimeout(int milliseconds)
milliseconds
- timeout in millisecondspublic int getSyncResponseReceiveTimeout()