public class AuthenticatedMessagingFactory extends java.lang.Object implements IMessagingSystemFactory
public class Program { private static class HandshakeProvider implements IGetHandshakeMessage { @Override public Object getHandshakeMessage(String channelId, String responseReceiverId, Object loginMessage) { // Check if login is ok. if (loginMessage instanceof String) { String aLoginName = (String) loginMessage; if (myUsers.containsKey(aLoginName)) { // Login is OK so generate the handshake message. // e.g. generate GUI. return UUID.randomUUID().toString(); } }
// Login was not ok so there is not handshake message // and the connection will be closed. EneterTrace.warning("Login was not ok. The connection will be closed."); return null; } }
private static class AuthenticateProvider implements IAuthenticate { @Override public boolean authenticate(String channelId, String responseReceiverId, Object loginMessage, Object handshakeMessage, Object handshakeResponseMessage) { if (loginMessage instanceof String) { // Get the password associated with the user. String aLoginName = (String) loginMessage; String aPassword = myUsers.get(aLoginName);
// E.g. handshake response may be encrypted original handshake message. // So decrypt incoming handshake response and check if it is equal to original handshake message. try { ISerializer aSerializer = new AesSerializer(aPassword); String aDecodedHandshakeResponse = aSerializer.deserialize(handshakeResponseMessage, String.class); String anOriginalHandshake = (String) handshakeMessage; if (anOriginalHandshake.equals(aDecodedHandshakeResponse)) { // The handshake response is correct so the connection can be established. return true; } } catch (Exception err) { // Decoding of the response message failed. // The authentication will not pass. EneterTrace.warning("Decoding handshake message failed.", err); } }
// Authentication did not pass. EneterTrace.warning("Authentication did not pass. The connection will be closed."); return false; } }
// Helper class subscribing to receive incoming messages. private static EventHandler<StringRequestReceivedEventArgs> myOnRequestReceived = new EventHandler<StringRequestReceivedEventArgs>() { @Override public void onEvent(Object sender, StringRequestReceivedEventArgs e) { onRequestReceived(sender, e); } };
// [login, password] private static HashMap<String, String> myUsers = new HashMap<String, String>(); private static IDuplexStringMessageReceiver myReceiver;
public static void main(String[] args) throws Exception { // Simulate users. myUsers.put("John", "password1"); myUsers.put("Steve", "password2");
// TCP messaging. IMessagingSystemFactory aTcpMessaging = new TcpMessagingSystemFactory();
// Authenticated messaging uses TCP as the underlying messaging. HandshakeProvider aHandshakeProvider = new HandshakeProvider(); AuthenticateProvider anAuthenticationProvider = new AuthenticateProvider(); IMessagingSystemFactory aMessaging = new AuthenticatedMessagingFactory(aTcpMessaging, aHandshakeProvider, anAuthenticationProvider); IDuplexInputChannel anInputChannel = aMessaging.createDuplexInputChannel("tcp://127.0.0.1:8092/");
// Use just simple text messages. myReceiver = new DuplexStringMessagesFactory().createDuplexStringMessageReceiver();
// Subscribe to receive messages. myReceiver.requestReceived().subscribe(myOnRequestReceived);
// Attach input channel and start listening. // Note: using AuthenticatedMessaging will ensure the connection will be established only // if the authentication procedure passes. myReceiver.attachDuplexInputChannel(anInputChannel);
System.out.println("Service is running. Press ENTER to stop."); new BufferedReader(new InputStreamReader(System.in)).readLine();
// Detach input channel and stop listening. // Note: tis will release the listening thread. myReceiver.detachDuplexInputChannel(); }
private static void onRequestReceived(Object sender, StringRequestReceivedEventArgs e) { // Process the incoming message here. System.out.println(e.getRequestMessage());
// Send back the response. try { myReceiver.sendResponseMessage(e.getResponseReceiverId(), "hello"); } catch (Exception err) { // Sending of response message failed. // e.g. if the client disconnected meanwhile. EneterTrace.error("Sending of response failed.", err); } } }
public class Program { private static class LoginProvider implements IGetLoginMessage { @Override public Object getLoginMessage(String channelId, String responseReceiverId) { return "John"; } }
private static class HandshakeResponseProvider implements IGetHandshakeResponseMessage { @Override public Object getHandshakeResponseMessage(String channelId, String responseReceiverId, Object handshakeMessage) { try { // Handshake response is encoded handshake message. ISerializer aSerializer = new AesSerializer("password1"); Object aHandshakeResponse = aSerializer.serialize((String)handshakeMessage, String.class);
return aHandshakeResponse; } catch (Exception err) { EneterTrace.warning("Processing handshake message failed. The connection will be closed.", err); }
return null; } }
private static EventHandler<StringResponseReceivedEventArgs> myOnResponseReceived = new EventHandler<StringResponseReceivedEventArgs>() { @Override public void onEvent(Object sender, StringResponseReceivedEventArgs e) { onResponseMessageReceived(sender, e); } };
private static IDuplexStringMessageSender mySender;
public static void main(String[] args) throws Exception { // TCP messaging. IMessagingSystemFactory aTcpMessaging = new TcpMessagingSystemFactory();
// Authenticated messaging uses TCP as the underlying messaging. LoginProvider aLoginProvider = new LoginProvider(); HandshakeResponseProvider aHandshakeResponseProvider = new HandshakeResponseProvider(); IMessagingSystemFactory aMessaging = new AuthenticatedMessagingFactory(aTcpMessaging, aLoginProvider, aHandshakeResponseProvider); IDuplexOutputChannel anOutputChannel = aMessaging.createDuplexOutputChannel("tcp://127.0.0.1:8092/");
// Use text messages. mySender = new DuplexStringMessagesFactory().createDuplexStringMessageSender();
// Subscribe to receive response messages. mySender.responseReceived().subscribe(myOnResponseReceived);
// Attach output channel and connect the service. mySender.attachDuplexOutputChannel(anOutputChannel);
// Send a message. mySender.sendMessage("Hello");
System.out.println("Client sent the message. Press ENTER to stop."); new BufferedReader(new InputStreamReader(System.in)).readLine();
// Detach output channel and stop listening. // Note: it releases the tread listening to responses. mySender.detachDuplexOutputChannel(); }
private static void onResponseMessageReceived(Object sender, StringResponseReceivedEventArgs e) { // Process the incoming response here. System.out.println(e.getResponseMessage()); }
}
Constructor and Description |
---|
AuthenticatedMessagingFactory(IMessagingSystemFactory underlyingMessagingSystem,
IGetHandshakeMessage getHandshakeMessageCallback,
IAuthenticate authenticateCallback)
Constructs factory that will be used only by a service.
|
AuthenticatedMessagingFactory(IMessagingSystemFactory underlyingMessagingSystem,
IGetHandshakeMessage getHandshakeMessageCallback,
IAuthenticate authenticateCallback,
IHandleAuthenticationCancelled handleAuthenticationCancelledCallback)
Constructs factory that will be used only by a service.
|
AuthenticatedMessagingFactory(IMessagingSystemFactory underlyingMessagingSystem,
IGetLoginMessage getLoginMessageCallback,
IGetHandshakeResponseMessage getHandshakeResponseMessageCallback)
Constructs factory that will be used only by a client.
|
AuthenticatedMessagingFactory(IMessagingSystemFactory underlyingMessagingSystem,
IGetLoginMessage getLoginMessageCallback,
IGetHandshakeResponseMessage getHandshakeResponseMessageCallback,
IGetHandshakeMessage getHandshakeMessageCallback,
IAuthenticate authenticateCallback,
IHandleAuthenticationCancelled handleAuthenticationCancelledCallback)
Constructs factory that can be used by client and service simultaneously.
|
Modifier and Type | Method and Description |
---|---|
IDuplexInputChannel |
createDuplexInputChannel(java.lang.String channelId)
Creates duplex input channel which performs the authentication procedure.
|
IDuplexOutputChannel |
createDuplexOutputChannel(java.lang.String channelId)
Creates duplex output channel which performs authentication procedure during opening the connection.
|
IDuplexOutputChannel |
createDuplexOutputChannel(java.lang.String channelId,
java.lang.String responseReceiverId)
Creates duplex output channel which performs authentication procedure during opening the connection.
|
long |
getAuthenticationTimeout()
Gets maximum time until the authentication procedure must be performed.
|
IThreadDispatcherProvider |
getOutputChannelThreading()
Gets the threading mode used by authenticated output channel.
|
AuthenticatedMessagingFactory |
setAuthenticationTimeout(long authenticationTimeout)
Sets maximum time until the authentication procedure must be performed.
|
AuthenticatedMessagingFactory |
setOutputChannelThreading(IThreadDispatcherProvider threadingMode)
Sets the threading mode for the authenticated output channel.
|
public AuthenticatedMessagingFactory(IMessagingSystemFactory underlyingMessagingSystem, IGetLoginMessage getLoginMessageCallback, IGetHandshakeResponseMessage getHandshakeResponseMessageCallback)
underlyingMessagingSystem
- underlying messaging upon which the authentication will work.getLoginMessageCallback
- callback returning the login message.getHandshakeResponseMessageCallback
- callback returning the response message for the handshake.public AuthenticatedMessagingFactory(IMessagingSystemFactory underlyingMessagingSystem, IGetHandshakeMessage getHandshakeMessageCallback, IAuthenticate authenticateCallback)
underlyingMessagingSystem
- underlying messaging upon which the authentication will work.getHandshakeMessageCallback
- callback returning the handshake message.authenticateCallback
- callback performing the authentication.public AuthenticatedMessagingFactory(IMessagingSystemFactory underlyingMessagingSystem, IGetHandshakeMessage getHandshakeMessageCallback, IAuthenticate authenticateCallback, IHandleAuthenticationCancelled handleAuthenticationCancelledCallback)
underlyingMessagingSystem
- underlying messaging upon which the authentication will work.getHandshakeMessageCallback
- callback returning the handshake message.authenticateCallback
- callback performing the authentication.handleAuthenticationCancelledCallback
- callback called by the input channel to indicate the output channel closed the connection during the authentication procedure.
Can be null if your authentication code does not need to handle it.public AuthenticatedMessagingFactory(IMessagingSystemFactory underlyingMessagingSystem, IGetLoginMessage getLoginMessageCallback, IGetHandshakeResponseMessage getHandshakeResponseMessageCallback, IGetHandshakeMessage getHandshakeMessageCallback, IAuthenticate authenticateCallback, IHandleAuthenticationCancelled handleAuthenticationCancelledCallback)
underlyingMessagingSystem
- underlying messaging upon which the authentication will work.getLoginMessageCallback
- returning the login message.getHandshakeResponseMessageCallback
- callback returning the response message for the handshake.getHandshakeMessageCallback
- callback returning the handshake message.authenticateCallback
- callback performing the authentication.public IDuplexOutputChannel createDuplexOutputChannel(java.lang.String channelId) throws java.lang.Exception
createDuplexOutputChannel
in interface IMessagingSystemFactory
channelId
- address of the input channel.java.lang.Exception
public IDuplexOutputChannel createDuplexOutputChannel(java.lang.String channelId, java.lang.String responseReceiverId) throws java.lang.Exception
createDuplexOutputChannel
in interface IMessagingSystemFactory
channelId
- address of the input channel.responseReceiverId
- unique identifier of the output channel. If the value is null then the identifier is genearated automaticallyjava.lang.Exception
public IDuplexInputChannel createDuplexInputChannel(java.lang.String channelId) throws java.lang.Exception
createDuplexInputChannel
in interface IMessagingSystemFactory
channelId
- address of the input channel.java.lang.Exception
public AuthenticatedMessagingFactory setAuthenticationTimeout(long authenticationTimeout)
authenticationTimeout
- timeout in millisecondspublic long getAuthenticationTimeout()
public AuthenticatedMessagingFactory setOutputChannelThreading(IThreadDispatcherProvider threadingMode)
threadingMode
- threading mode for the authenticated output channelpublic IThreadDispatcherProvider getOutputChannelThreading()