aClass) {
+ if (aClass == IRouter.class) {
+ return aClass.cast(this);
+ }
+ else {
+ return null;
}
}
}
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/router/WeightedRoundRobinRouter.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/router/WeightedRoundRobinRouter.java
index b52a898a0..d686fb159 100644
--- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/router/WeightedRoundRobinRouter.java
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/router/WeightedRoundRobinRouter.java
@@ -32,8 +32,8 @@
/**
* Weighted round-robin router implementation
*
- * @see http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
* @author Nils Sowen
+ * @see http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
*/
public class WeightedRoundRobinRouter extends RouterImpl {
@@ -52,7 +52,7 @@ public WeightedRoundRobinRouter(IContainer container, IConcurrentFactory concurr
/**
* Select peer by weighted round-robin scheduling
* As documented in http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
- *
+ *
*
* The weighted round-robin scheduling is designed to better handle servers
* with different processing capacities. Each server can be assigned a weight,
@@ -106,7 +106,7 @@ public WeightedRoundRobinRouter(IContainer container, IConcurrentFactory concurr
*
* This method is internally synchronized due to concurrent modifications to lastSelectedPeer and currentWeight.
* Please consider this when relying on heavy throughput.
- *
+ *
* Please note: if the list of availablePeers changes between calls (e.g. if a peer becomes active or inactive),
* the balancing algorithm is disturbed and might be distributed uneven.
* This is likely to happen if peers are flapping.
@@ -138,7 +138,7 @@ public IPeer selectPeer(List availablePeers) {
// Find best matching candidate. Synchronized here due to consistent changes on member variables
synchronized (this) {
- for ( ;; ) {
+ for (; ; ) {
lastSelectedPeer = (lastSelectedPeer + 1) % peerSize;
if (lastSelectedPeer == 0) {
currentWeight = currentWeight - gcd;
@@ -159,7 +159,7 @@ public IPeer selectPeer(List availablePeers) {
}
/**
- * Return greatest common divisor for two integers
+ * Returns greatest common divisor for two integers
* https://en.wikipedia.org/wiki/Greatest_common_divisor#Using_Euclid.27s_algorithm
*
* @param a
@@ -169,4 +169,24 @@ public IPeer selectPeer(List availablePeers) {
protected int gcd(int a, int b) {
return (b == 0) ? a : gcd(b, a % b);
}
+
+ /**
+ * Resets all round-robin counters/variables in order to make the whole algorithm
+ * start from scratch.
+ */
+ protected synchronized void resetRoundRobinContext() {
+ lastSelectedPeer = -1;
+ currentWeight = 0;
+ }
+
+ /**
+ * Gets a readable format of the current round-robin context, i.e. last selected
+ * peer and current weight
+ *
+ * @return readable summary of round-robin context
+ */
+ @Override
+ protected String dumpRoundRobinContext() {
+ return String.format("Current round-robin context is: lastSelectedPeer=[%d] currentWeight=[%d]", lastSelectedPeer, currentWeight);
+ }
}
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPClientConnection.java b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPClientConnection.java
index 10521c373..8e6a98331 100644
--- a/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPClientConnection.java
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/client/impl/transport/tcp/TCPClientConnection.java
@@ -387,6 +387,15 @@ protected boolean processBufferedMessages(Event event) throws AvpDataException {
}
}
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("TCPClientConnection [createdTime=").append(createdTime)
+ .append(", cachedKey=").append(cachedKey).append(", isConnected=")
+ .append(isConnected()).append("]");
+ return builder.toString();
+ }
+
//------------------ helper classes ------------------------
private enum EventType {
CONNECTED, DISCONNECTED, MESSAGE_RECEIVED, DATA_EXCEPTION
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/app/ro/IClientRoSessionContext.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/app/ro/IClientRoSessionContext.java
index 98bc3d523..66194c546 100644
--- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/app/ro/IClientRoSessionContext.java
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/app/ro/IClientRoSessionContext.java
@@ -1,44 +1,24 @@
- /*
- * TeleStax, Open Source Cloud Communications
- * Copyright 2011-2016, TeleStax Inc. and individual contributors
- * by the @authors tag.
- *
- * This program is free software: you can redistribute it and/or modify
- * under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation; either version 3 of
- * the License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see
- *
- * This file incorporates work covered by the following copyright and
- * permission notice:
- *
- * JBoss, Home of Professional Open Source
- * Copyright 2007-2011, Red Hat, Inc. and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
package org.jdiameter.common.api.app.ro;
@@ -51,6 +31,7 @@
*
* @author Bartosz Baranowski
* @author Alexandre Mendonca
+ * @author Grzegorz Figiel (ProIDS sp. z o.o.)
*/
public interface IClientRoSessionContext {
@@ -62,6 +43,8 @@ public interface IClientRoSessionContext {
int getDefaultDDFHValue();
+ int getDefaultCCSFValue();
+
void grantAccessOnDeliverFailure(ClientRoSession clientCCASessionImpl, Message request);
void denyAccessOnDeliverFailure(ClientRoSession clientCCASessionImpl, Message request);
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/data/IRoutingAwareSessionDatasource.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/data/IRoutingAwareSessionDatasource.java
new file mode 100644
index 000000000..53715898f
--- /dev/null
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/api/data/IRoutingAwareSessionDatasource.java
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2010, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jdiameter.common.api.data;
+
+import org.jdiameter.api.SessionPersistenceStorage;
+import org.jdiameter.client.api.controller.IPeer;
+
+import java.util.List;
+
+/**
+ * Extends basic session storage with capabilities of CRUD operations
+ * for session persistence records which bind sessions with peers that
+ * are processing those sessions.
+ */
+public interface IRoutingAwareSessionDatasource extends ISessionDatasource, SessionPersistenceStorage {
+
+ /**
+ * Gets a name of the peer that is currently assigned to a given session.
+ *
+ * @param sessionId session identifier used as mapping key in session storage
+ * @return peer name
+ */
+ String getSessionPeer(String sessionId);
+
+ /**
+ * Binds a particular session with a given peer.
+ *
+ * @param sessionId session identifier used as mapping key in session storage
+ * @param peer object to bind
+ */
+ void setSessionPeer(String sessionId, IPeer peer);
+
+ /**
+ * Unbinds a particular session from a given peer.
+ *
+ * @param sessionId session identifier used as mapping key in session storage
+ * @return peer name that has just been unbound
+ */
+ String removeSessionPeer(String sessionId);
+
+ /**
+ * @param sessionId session identifier used as mapping key in session storage
+ */
+ void clearUnanswerablePeers(String sessionId);
+
+ /**
+ * @param sessionId session identifier used as mapping key in session storage
+ * @return list of peers that did not answer for request within Tx timer value period
+ */
+ List getUnanswerablePeers(String sessionId);
+}
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/AppRoutingAwareSessionImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/AppRoutingAwareSessionImpl.java
new file mode 100644
index 000000000..fb5e601a1
--- /dev/null
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/AppRoutingAwareSessionImpl.java
@@ -0,0 +1,168 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2006, Red Hat, Inc. and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jdiameter.common.impl.app;
+
+import org.jdiameter.api.app.AppEvent;
+import org.jdiameter.client.api.IMessage;
+import org.jdiameter.client.api.ISessionFactory;
+import org.jdiameter.client.api.controller.IPeer;
+import org.jdiameter.client.api.controller.IPeerTable;
+import org.jdiameter.common.api.app.IAppSessionData;
+import org.jdiameter.common.api.data.IRoutingAwareSessionDatasource;
+import org.jdiameter.common.api.data.ISessionDatasource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+import static org.jdiameter.client.impl.helpers.Parameters.SessionInactivityTimeOut;
+
+/**
+ * Routing aware extension of {@link AppSessionImpl} that enables proper diameter session
+ * load balancing. It provides diameter session persistence which maps a single diameter
+ * session to a single peer which is processing the session.
+ */
+public abstract class AppRoutingAwareSessionImpl extends AppSessionImpl {
+
+ private static final String SESSION_INACTIVITY_TIMER_NAME = "Ro_CLIENT_SESSION_INACTIVITY_TIMER";
+
+ private static final Logger logger = LoggerFactory.getLogger(AppRoutingAwareSessionImpl.class);
+
+ private transient IPeerTable peerTable = null;
+ private transient IRoutingAwareSessionDatasource sessionPersistenceStorage = null;
+
+ private final int sesInactivityTimerVal;
+ private Serializable sesInactivityTimerId = null;
+
+ /**
+ * Parameterized constructor. If session persistence is supposed to be enabled, sessionStorage
+ * argument should be of type {@link org.jdiameter.common.impl.data.RoutingAwareDataSource}.
+ *
+ * @param sessionStorage session datasource
+ * @param sessionFactory session factory
+ * @param appSessionData session data
+ */
+ public AppRoutingAwareSessionImpl(ISessionDatasource sessionStorage, ISessionFactory sessionFactory, IAppSessionData appSessionData) {
+ super(sessionFactory, appSessionData);
+ peerTable = sessionFactory.getContainer().getAssemblerFacility().getComponentInstance(IPeerTable.class);
+ sesInactivityTimerVal = sessionFactory.getContainer().getConfiguration().getIntValue(SessionInactivityTimeOut.ordinal(), (Integer)
+ SessionInactivityTimeOut.defValue()) * 1000;
+ if (sessionStorage instanceof IRoutingAwareSessionDatasource) {
+ sessionPersistenceStorage = (IRoutingAwareSessionDatasource) sessionStorage;
+ }
+ }
+
+ /**
+ * Tells whether session persistent routing is enabled for this session.
+ *
+ * @return true if enabled
+ */
+ protected boolean isSessionPersistenceEnabled() {
+ return this.sessionPersistenceStorage != null;
+ }
+
+ /**
+ * Initiates session persistence record, i.e. assigns the current session to a peer which is
+ * processing it. Session persistence record shall be created after a peer had answered the
+ * first (initial) request for that session.
+ *
+ * @param reqEvent request that had been sent beforehand
+ * @param ansEvent response that has been just received
+ */
+ protected void initSessionPersistenceContext(AppEvent reqEvent, AppEvent ansEvent) {
+ try {
+ IPeer peer = null;
+ if (reqEvent.getMessage() instanceof IMessage) {
+ sessionPersistenceStorage.clearUnanswerablePeers(this.getSessionId());
+ peer = ((IMessage) reqEvent.getMessage()).getPeer();
+ }
+ else {
+ logger.warn("Cannot retrieve message detailed context for Session-Id/activityId [{}]", this.getSessionId());
+ }
+
+ if (peer == null) {
+ logger.warn("Taking peer from Origin-Host AVP as no peer is assigned yet to the following message in session [{}]: [{}]", this.getSessionId(),
+ reqEvent.getMessage().getAvps());
+ peer = peerTable.getPeer(ansEvent.getOriginHost());
+ }
+
+ sessionPersistenceStorage.setSessionPeer(this.getSessionId(), peer);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Session persistent routing will be enforced for Session-Id [{}] with peer [{}]", this.getSessionId(), peer);
+ }
+
+ } catch (Exception ex) {
+ logger.error("Cannot update session persistence data, default routing will be applied", ex);
+ }
+ }
+
+ /**
+ * Removes mapping between current session and the peer that has been assigned so far.
+ *
+ * @return peer name that has been assigned so far
+ */
+ protected String flushSessionPersistenceContext() {
+ try {
+ return sessionPersistenceStorage.removeSessionPeer(this.getSessionId());
+ } catch (Exception ex) {
+ logger.error("Cannot update session persistence data", ex);
+ return null;
+ }
+ }
+
+ /**
+ * Starts maximum session inactivity timer which defines how much time the persistence record
+ * should be kept if there is no request sent within a session.
+ */
+ protected void startSessionInactivityTimer() {
+ logger.debug("Scheduling session inactivity timer equal to [{}] ms", sesInactivityTimerVal);
+ stopSessionInactivityTimer();
+ this.sesInactivityTimerId = this.timerFacility.schedule(this.getSessionId(), SESSION_INACTIVITY_TIMER_NAME, sesInactivityTimerVal);
+ }
+
+ /**
+ * Stops session inactivity timer.
+ */
+ protected void stopSessionInactivityTimer() {
+ if (this.sesInactivityTimerId != null) {
+ logger.debug("Stopping session inactivity timer [{}]", this.sesInactivityTimerId);
+ timerFacility.cancel(this.sesInactivityTimerId);
+ this.sesInactivityTimerId = null;
+ }
+ }
+
+ /**
+ * Handles expiry of session inactivity timer. Should be called by any subclasses which define
+ * any additional timers.
+ *
+ * @see org.jdiameter.common.impl.app.AppSessionImpl#onTimer(java.lang.String)
+ */
+ @Override
+ public void onTimer(String timerName) {
+ if (timerName.equals(SESSION_INACTIVITY_TIMER_NAME)) {
+ //no need to interfere with session state machine (simply remove routing context used for sticky sessions based routing)
+ String oldPeer = flushSessionPersistenceContext();
+ logger.debug("Session inactivity timer expired so routing context for peer [{}] was removed from session [{}]", oldPeer, this.getSessionId());
+ }
+ }
+}
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/cca/AppCCASessionImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/cca/AppCCASessionImpl.java
index 83919a918..d7a0e1123 100644
--- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/cca/AppCCASessionImpl.java
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/cca/AppCCASessionImpl.java
@@ -52,14 +52,15 @@
import org.jdiameter.api.cca.CCASession;
import org.jdiameter.client.api.ISessionFactory;
import org.jdiameter.common.api.app.IAppSessionData;
-import org.jdiameter.common.impl.app.AppSessionImpl;
+import org.jdiameter.common.api.data.ISessionDatasource;
+import org.jdiameter.common.impl.app.AppRoutingAwareSessionImpl;
/**
*
* @author Bartosz Baranowski
* @author Alexandre Mendonca
*/
-public abstract class AppCCASessionImpl extends AppSessionImpl implements CCASession, NetworkReqListener {
+public abstract class AppCCASessionImpl extends AppRoutingAwareSessionImpl implements CCASession,NetworkReqListener {
protected Lock sendAndStateLock = new ReentrantLock();
@@ -68,8 +69,8 @@ public abstract class AppCCASessionImpl extends AppSessionImpl implements CCASes
//FIXME: use FastList ?
protected List stateListeners = new CopyOnWriteArrayList();
- public AppCCASessionImpl(ISessionFactory sf, IAppSessionData data) {
- super(sf, data);
+ public AppCCASessionImpl(ISessionDatasource sessionStorage, ISessionFactory sf, IAppSessionData data) {
+ super(sessionStorage, sf, data);
}
@Override
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/cca/CCASessionFactoryImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/cca/CCASessionFactoryImpl.java
index d8bf0d2f7..16d575126 100644
--- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/cca/CCASessionFactoryImpl.java
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/cca/CCASessionFactoryImpl.java
@@ -278,7 +278,7 @@ public AppSession getSession(String sessionId, Class extends AppSession> aClas
ClientCCASessionImpl clientSession = null;
IClientCCASessionData data = (IClientCCASessionData) this.sessionDataFactory.getAppSessionData(ClientCCASession.class, sessionId);
- clientSession = new ClientCCASessionImpl(data, this.getMessageFactory(), sessionFactory, this.getClientSessionListener(),
+ clientSession = new ClientCCASessionImpl(data, this.getMessageFactory(), iss, sessionFactory, this.getClientSessionListener(),
this.getClientContextListener(), this.getStateListener());
clientSession.getSessions().get(0).setRequestListener(clientSession);
appSession = clientSession;
@@ -320,8 +320,8 @@ public AppSession getNewSession(String sessionId, Class extends AppSession> aC
}
IClientCCASessionData data = (IClientCCASessionData) this.sessionDataFactory.getAppSessionData(ClientCCASession.class, sessionId);
data.setApplicationId(applicationId);
- clientSession = new ClientCCASessionImpl(data, this.getMessageFactory(), sessionFactory, this.getClientSessionListener(),
- this.getClientContextListener(), this.getStateListener());
+ clientSession = new ClientCCASessionImpl(data, this.getMessageFactory(), iss, sessionFactory,
+ this.getClientSessionListener(), this.getClientContextListener(), this.getStateListener());
// this goes first!
iss.addSession(clientSession);
clientSession.getSessions().get(0).setRequestListener(clientSession);
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/ro/AppRoSessionImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/ro/AppRoSessionImpl.java
index cff2a7435..729c057b9 100644
--- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/ro/AppRoSessionImpl.java
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/ro/AppRoSessionImpl.java
@@ -52,14 +52,15 @@
import org.jdiameter.api.app.StateMachine;
import org.jdiameter.client.api.ISessionFactory;
import org.jdiameter.common.api.app.ro.IRoSessionData;
-import org.jdiameter.common.impl.app.AppSessionImpl;
+import org.jdiameter.common.api.data.ISessionDatasource;
+import org.jdiameter.common.impl.app.AppRoutingAwareSessionImpl;
/**
*
* @author Bartosz Baranowski
* @author Alexandre Mendonca
*/
-public abstract class AppRoSessionImpl extends AppSessionImpl implements NetworkReqListener, StateMachine {
+public abstract class AppRoSessionImpl extends AppRoutingAwareSessionImpl implements NetworkReqListener, StateMachine {
protected Lock sendAndStateLock = new ReentrantLock();
@@ -67,8 +68,8 @@ public abstract class AppRoSessionImpl extends AppSessionImpl implements Network
//FIXME: change this to single ref!
protected transient List stateListeners = new CopyOnWriteArrayList();
- public AppRoSessionImpl(ISessionFactory sf, IRoSessionData sessionData) {
- super(sf, sessionData);
+ public AppRoSessionImpl(ISessionDatasource sessionStorage, ISessionFactory sf, IRoSessionData sessionData) {
+ super(sessionStorage, sf, sessionData);
}
@Override
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/ro/RoSessionFactoryImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/ro/RoSessionFactoryImpl.java
index e139b07ff..68097b0eb 100644
--- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/ro/RoSessionFactoryImpl.java
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/app/ro/RoSessionFactoryImpl.java
@@ -19,13 +19,13 @@
package org.jdiameter.common.impl.app.ro;
-import java.util.concurrent.ScheduledFuture;
-
import org.jdiameter.api.Answer;
import org.jdiameter.api.ApplicationId;
import org.jdiameter.api.InternalException;
import org.jdiameter.api.Message;
+import org.jdiameter.api.Peer;
import org.jdiameter.api.Request;
+import org.jdiameter.api.RouteException;
import org.jdiameter.api.SessionFactory;
import org.jdiameter.api.app.AppAnswerEvent;
import org.jdiameter.api.app.AppRequestEvent;
@@ -39,6 +39,7 @@
import org.jdiameter.api.ro.ServerRoSessionListener;
import org.jdiameter.api.ro.events.RoCreditControlAnswer;
import org.jdiameter.api.ro.events.RoCreditControlRequest;
+import org.jdiameter.client.api.IMessage;
import org.jdiameter.client.api.ISessionFactory;
import org.jdiameter.client.impl.app.ro.ClientRoSessionImpl;
import org.jdiameter.client.impl.app.ro.IClientRoSessionData;
@@ -56,11 +57,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.ScheduledFuture;
+
/**
* Default Diameter Ro Session Factory implementation
*
* @author Alexandre Mendonca
* @author Bartosz Baranowski
+ * @author Grzegorz Figiel (ProIDS sp. z o.o.)
*/
public class RoSessionFactoryImpl implements IRoSessionFactory, ClientRoSessionListener, ServerRoSessionListener, StateChangeListener,
IRoMessageFactory, IServerRoSessionContext, IClientRoSessionContext {
@@ -68,6 +72,7 @@ public class RoSessionFactoryImpl implements IRoSessionFactory, ClientRoSessionL
// Message timeout value (in milliseconds)
protected int defaultDirectDebitingFailureHandling = 0;
protected int defaultCreditControlFailureHandling = 0;
+ protected int defaultCreditControlSessionFailover = IMessage.SESSION_FAILOVER_NOT_SUPPORTED_VALUE;
// its seconds
protected long defaultValidityTime = 60;
@@ -94,8 +99,8 @@ public RoSessionFactoryImpl(SessionFactory sessionFactory) {
this.sessionDataFactory = (IAppSessionDataFactory) this.iss.getDataFactory(IRoSessionData.class);
}
- public RoSessionFactoryImpl(SessionFactory sessionFactory, int defaultDirectDebitingFailureHandling, int defaultCreditControlFailureHandling,
- long defaultValidityTime, long defaultTxTimerValue) {
+ public RoSessionFactoryImpl(SessionFactory sessionFactory, int defaultDirectDebitingFailureHandling, int defaultCreditControlFailureHandling, long
+ defaultValidityTime, long defaultTxTimerValue) {
this(sessionFactory);
this.defaultDirectDebitingFailureHandling = defaultDirectDebitingFailureHandling;
@@ -107,7 +112,6 @@ public RoSessionFactoryImpl(SessionFactory sessionFactory, int defaultDirectDebi
/**
* @return the clientSessionListener
*/
- @Override
public ClientRoSessionListener getClientSessionListener() {
if (clientSessionListener != null) {
return clientSessionListener;
@@ -118,10 +122,8 @@ public ClientRoSessionListener getClientSessionListener() {
}
/**
- * @param clientSessionListener
- * the clientSessionListener to set
+ * @param clientSessionListener the clientSessionListener to set
*/
- @Override
public void setClientSessionListener(ClientRoSessionListener clientSessionListener) {
this.clientSessionListener = clientSessionListener;
}
@@ -129,7 +131,6 @@ public void setClientSessionListener(ClientRoSessionListener clientSessionListen
/**
* @return the serverSessionListener
*/
- @Override
public ServerRoSessionListener getServerSessionListener() {
if (serverSessionListener != null) {
return serverSessionListener;
@@ -140,10 +141,8 @@ public ServerRoSessionListener getServerSessionListener() {
}
/**
- * @param serverSessionListener
- * the serverSessionListener to set
+ * @param serverSessionListener the serverSessionListener to set
*/
- @Override
public void setServerSessionListener(ServerRoSessionListener serverSessionListener) {
this.serverSessionListener = serverSessionListener;
}
@@ -151,7 +150,6 @@ public void setServerSessionListener(ServerRoSessionListener serverSessionListen
/**
* @return the serverContextListener
*/
- @Override
public IServerRoSessionContext getServerContextListener() {
if (serverContextListener != null) {
return serverContextListener;
@@ -162,10 +160,8 @@ public IServerRoSessionContext getServerContextListener() {
}
/**
- * @param serverContextListener
- * the serverContextListener to set
+ * @param serverContextListener the serverContextListener to set
*/
- @Override
public void setServerContextListener(IServerRoSessionContext serverContextListener) {
this.serverContextListener = serverContextListener;
}
@@ -173,7 +169,6 @@ public void setServerContextListener(IServerRoSessionContext serverContextListen
/**
* @return the clientContextListener
*/
- @Override
public IClientRoSessionContext getClientContextListener() {
if (clientContextListener != null) {
return clientContextListener;
@@ -186,7 +181,6 @@ public IClientRoSessionContext getClientContextListener() {
/**
* @return the messageFactory
*/
- @Override
public IRoMessageFactory getMessageFactory() {
if (messageFactory != null) {
return messageFactory;
@@ -197,19 +191,15 @@ public IRoMessageFactory getMessageFactory() {
}
/**
- * @param messageFactory
- * the messageFactory to set
+ * @param messageFactory the messageFactory to set
*/
- @Override
public void setMessageFactory(IRoMessageFactory messageFactory) {
this.messageFactory = messageFactory;
}
/**
- * @param clientContextListener
- * the clientContextListener to set
+ * @param clientContextListener the clientContextListener to set
*/
- @Override
public void setClientContextListener(IClientRoSessionContext clientContextListener) {
this.clientContextListener = clientContextListener;
}
@@ -222,8 +212,7 @@ public SessionFactory getSessionFactory() {
}
/**
- * @param sessionFactory
- * the sessionFactory to set
+ * @param sessionFactory the sessionFactory to set
*/
public void setSessionFactory(SessionFactory sessionFactory) {
this.sessionFactory = (ISessionFactory) sessionFactory;
@@ -232,7 +221,6 @@ public void setSessionFactory(SessionFactory sessionFactory) {
/**
* @return the stateListener
*/
- @Override
public StateChangeListener getStateListener() {
if (this.stateListener != null) {
return stateListener;
@@ -243,15 +231,12 @@ public StateChangeListener getStateListener() {
}
/**
- * @param stateListener
- * the stateListener to set
+ * @param stateListener the stateListener to set
*/
- @Override
public void setStateListener(StateChangeListener stateListener) {
this.stateListener = stateListener;
}
- @Override
public AppSession getNewSession(String sessionId, Class extends AppSession> aClass, ApplicationId applicationId, Object[] args) {
AppSession appSession = null;
try {
@@ -270,8 +255,8 @@ public AppSession getNewSession(String sessionId, Class extends AppSession> aC
IClientRoSessionData sessionData = (IClientRoSessionData) this.sessionDataFactory.getAppSessionData(ClientRoSession.class, sessionId);
sessionData.setApplicationId(applicationId);
- clientSession = new ClientRoSessionImpl(sessionData, this.getMessageFactory(), sessionFactory, this.getClientSessionListener(),
- this.getClientContextListener(), this.getStateListener());
+ clientSession = new ClientRoSessionImpl(sessionData, this.getMessageFactory(), iss, sessionFactory, this.getClientSessionListener(), this
+ .getClientContextListener(), this.getStateListener());
// this goes first!
iss.addSession(clientSession);
clientSession.getSessions().get(0).setRequestListener(clientSession);
@@ -320,8 +305,8 @@ public AppSession getSession(String sessionId, Class extends AppSession> aClas
try {
if (aClass == ClientRoSession.class) {
IClientRoSessionData sessionData = (IClientRoSessionData) this.sessionDataFactory.getAppSessionData(ClientRoSession.class, sessionId);
- ClientRoSessionImpl clientSession = new ClientRoSessionImpl(sessionData, this.getMessageFactory(), sessionFactory, this.getClientSessionListener(),
- this.getClientContextListener(), this.getStateListener());
+ ClientRoSessionImpl clientSession = new ClientRoSessionImpl(sessionData, this.getMessageFactory(), iss, sessionFactory, this.getClientSessionListener
+ (), this.getClientContextListener(), this.getStateListener());
// this goes first!
clientSession.getSessions().get(0).setRequestListener(clientSession);
appSession = clientSession;
@@ -347,56 +332,58 @@ else if (aClass == ServerRoSession.class) {
// Message Handlers ---------------------------------------------------------
- @Override
public void doCreditControlRequest(ServerRoSession session, RoCreditControlRequest request) throws InternalException {
}
- @Override
public void doCreditControlAnswer(ClientRoSession session, RoCreditControlRequest request, RoCreditControlAnswer answer) throws InternalException {
}
- @Override
public void doReAuthRequest(ClientRoSession session, ReAuthRequest request) throws InternalException {
}
- @Override
public void doReAuthAnswer(ServerRoSession session, ReAuthRequest request, ReAuthAnswer answer) throws InternalException {
}
- @Override
public void doOtherEvent(AppSession session, AppRequestEvent request, AppAnswerEvent answer) throws InternalException {
}
+ public void doRequestTxTimeout(ClientRoSession session, Message msg, Peer peer) throws InternalException {
+
+ }
+
+ public void doRequestTimeout(ClientRoSession session, Message msg, Peer peer) throws InternalException {
+
+ }
+
+ public void doPeerUnavailability(RouteException cause, ClientRoSession session, Message msg, Peer peer) throws InternalException {
+
+ }
+
// Message Factory Methods --------------------------------------------------
- @Override
public RoCreditControlAnswer createCreditControlAnswer(Answer answer) {
return new RoCreditControlAnswerImpl(answer);
}
- @Override
public RoCreditControlRequest createCreditControlRequest(Request req) {
return new RoCreditControlRequestImpl(req);
}
- @Override
public ReAuthAnswer createReAuthAnswer(Answer answer) {
return new ReAuthAnswerImpl(answer);
}
- @Override
public ReAuthRequest createReAuthRequest(Request req) {
return new ReAuthRequestImpl(req);
}
// Context Methods ----------------------------------------------------------
- @Override
@SuppressWarnings("unchecked")
public void stateChanged(Enum oldState, Enum newState) {
logger.info("Diameter Ro SessionFactory :: stateChanged :: oldState[{}], newState[{}]", oldState, newState);
@@ -407,7 +394,6 @@ public void stateChanged(Enum oldState, Enum newState) {
*
* @see org.jdiameter.api.app.StateChangeListener#stateChanged(java.lang.Object, java.lang.Enum, java.lang.Enum)
*/
- @Override
@SuppressWarnings("unchecked")
public void stateChanged(AppSession source, Enum oldState, Enum newState) {
logger.info("Diameter Ro SessionFactory :: stateChanged :: source[{}], oldState[{}], newState[{}]", new Object[]{source, oldState, newState});
@@ -415,100 +401,85 @@ public void stateChanged(AppSession source, Enum oldState, Enum newState) {
// FIXME: add ctx methods proxy calls!
- @Override
public void sessionSupervisionTimerExpired(ServerRoSession session) {
// this.resourceAdaptor.sessionDestroyed(session.getSessions().get(0).getSessionId(), session);
session.release();
}
- @Override
@SuppressWarnings("unchecked")
public void sessionSupervisionTimerReStarted(ServerRoSession session, ScheduledFuture future) {
// TODO Complete this method.
}
- @Override
@SuppressWarnings("unchecked")
public void sessionSupervisionTimerStarted(ServerRoSession session, ScheduledFuture future) {
// TODO Complete this method.
}
- @Override
@SuppressWarnings("unchecked")
public void sessionSupervisionTimerStopped(ServerRoSession session, ScheduledFuture future) {
// TODO Complete this method.
}
- @Override
public void timeoutExpired(Request request) {
// FIXME What should we do when there's a timeout?
}
- @Override
public void denyAccessOnDeliverFailure(ClientRoSession clientRoSessionImpl, Message request) {
// TODO Complete this method.
}
- @Override
public void denyAccessOnFailureMessage(ClientRoSession clientRoSessionImpl) {
// TODO Complete this method.
}
- @Override
public void denyAccessOnTxExpire(ClientRoSession clientRoSessionImpl) {
// this.resourceAdaptor.sessionDestroyed(clientRoSessionImpl.getSessions().get(0).getSessionId(),
// clientRoSessionImpl);
clientRoSessionImpl.release();
}
- @Override
public int getDefaultCCFHValue() {
return defaultCreditControlFailureHandling;
}
- @Override
+ public int getDefaultCCSFValue() {
+ return defaultCreditControlSessionFailover;
+ }
+
public int getDefaultDDFHValue() {
return defaultDirectDebitingFailureHandling;
}
- @Override
public long getDefaultTxTimerValue() {
return defaultTxTimerValue;
}
- @Override
public void grantAccessOnDeliverFailure(ClientRoSession clientRoSessionImpl, Message request) {
// TODO Auto-generated method stub
}
- @Override
public void grantAccessOnFailureMessage(ClientRoSession clientRoSessionImpl) {
// TODO Auto-generated method stub
}
- @Override
public void grantAccessOnTxExpire(ClientRoSession clientRoSessionImpl) {
// TODO Auto-generated method stub
}
- @Override
public void indicateServiceError(ClientRoSession clientRoSessionImpl) {
// TODO Auto-generated method stub
}
- @Override
public void txTimerExpired(ClientRoSession session) {
- // this.resourceAdaptor.sessionDestroyed(session.getSessions().get(0).getSessionId(), session);
- session.release();
+ // TODO Auto-generated method stub
}
- @Override
public long[] getApplicationIds() {
// FIXME: What should we do here?
- return new long[] { 4 };
+ return new long[]{4};
}
- @Override
public long getDefaultValidityTime() {
return this.defaultValidityTime;
}
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/data/LocalDataSource.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/data/LocalDataSource.java
index d066798e6..8a40cd84f 100644
--- a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/data/LocalDataSource.java
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/data/LocalDataSource.java
@@ -42,9 +42,6 @@
package org.jdiameter.common.impl.data;
-import java.util.HashMap;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.jdiameter.api.BaseSession;
import org.jdiameter.api.NetworkReqListener;
import org.jdiameter.client.api.IContainer;
@@ -77,6 +74,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* Local implementation of session datasource for {@link ISessionDatasource}
*
@@ -89,7 +89,7 @@ public class LocalDataSource implements ISessionDatasource {
protected HashMap, IAppSessionDataFactory extends IAppSessionData>> appSessionDataFactories =
new HashMap, IAppSessionDataFactory extends IAppSessionData>>();
- private ConcurrentHashMap sessionIdToEntry = new ConcurrentHashMap();
+ protected ConcurrentHashMap sessionIdToEntry = new ConcurrentHashMap();
private static final Logger logger = LoggerFactory.getLogger(LocalDataSource.class);
@@ -152,23 +152,34 @@ public NetworkReqListener removeSessionListener(String sessionId) {
@Override
public void addSession(BaseSession session) {
- logger.debug("addSession({})", session);
- SessionEntry se = null;
+ addSession(session, SessionEntry.class);
+ }
+
+ protected void addSession(BaseSession session, Class sessionWraperType) {
+ logger.debug("addSession({}) => {}", session.getSessionId(), session);
+ T se = null;
String sessionId = session.getSessionId();
//FIXME: check here replicable vs not replicable?
if (this.sessionIdToEntry.containsKey(sessionId)) {
- se = this.sessionIdToEntry.get(sessionId);
- if ( !(se.session instanceof ISession) || se.session.isReplicable()) { //must be not replicable so we can "overwrite"
- throw new IllegalArgumentException("Sessin with id: " + sessionId + ", already exists!");
- }
- else {
- this.sessionIdToEntry.put(sessionId, se);
+ se = sessionWraperType.cast(this.sessionIdToEntry.get(sessionId));
+ if( se != null && (!(se.session instanceof ISession) || se.session.isReplicable()) ) { //must be not replicable so we can "overwrite"
+ throw new IllegalArgumentException("Session with id: " + sessionId + ", already exists!");
}
}
- else {
- se = new SessionEntry();
+
+ if(se == null) {
+ try {
+ se = sessionWraperType.newInstance();
+ } catch (InstantiationException e) {
+ logger.warn("Cannot instantiate session object of type: " + sessionWraperType.getCanonicalName(), e);
+ throw new IllegalArgumentException("Cannot instantiate session object of type: " + sessionWraperType.getCanonicalName(), e);
+ } catch (IllegalAccessException e) {
+ logger.warn("Cannot instantiate session object of type: " + sessionWraperType.getCanonicalName(), e);
+ throw new IllegalArgumentException("Cannot instantiate session object of type: " + sessionWraperType.getCanonicalName(), e);
+ }
}
+
se.session = session;
this.sessionIdToEntry.put(session.getSessionId(), se);
}
@@ -217,7 +228,7 @@ public String toString() {
}
//simple class to reduce collections overhead.
- private class SessionEntry {
+ protected static class SessionEntry {
BaseSession session;
NetworkReqListener listener;
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/data/RoutingAwareDataSource.java b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/data/RoutingAwareDataSource.java
new file mode 100644
index 000000000..7b8802d67
--- /dev/null
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/common/impl/data/RoutingAwareDataSource.java
@@ -0,0 +1,184 @@
+package org.jdiameter.common.impl.data;
+
+import org.jdiameter.api.BaseSession;
+import org.jdiameter.client.api.IContainer;
+import org.jdiameter.client.api.controller.IPeer;
+import org.jdiameter.common.api.data.IRoutingAwareSessionDatasource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of routing aware session datasource for {@link IRoutingAwareSessionDatasource}.
+ */
+public class RoutingAwareDataSource extends LocalDataSource implements IRoutingAwareSessionDatasource {
+
+ private static final Logger logger = LoggerFactory.getLogger(RoutingAwareDataSource.class);
+
+
+ /**
+ * Default constructor.
+ */
+ public RoutingAwareDataSource() {
+ super();
+ logger.debug("Constructor for RoutingAwareDataSource: nothing to do");
+ }
+
+ /**
+ * Parameterized constructor. Should be called by any subclasses.
+ *
+ * @param container container object
+ */
+ public RoutingAwareDataSource(IContainer container) {
+ super(container);
+ logger.debug("Constructor for RoutingAwareDataSource: nothing to do");
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.jdiameter.common.impl.data.LocalDataSource#addSession(org.jdiameter.api.BaseSession)
+ */
+ @Override
+ public void addSession(BaseSession session) {
+ addSession(session, RoutingAwareSessionEntry.class);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.jdiameter.common.api.data.IRoutingAwareSessionDatasource#setSessionPeer(java.lang.String, org.jdiameter.client.api.controller.IPeer)
+ */
+ @Override
+ public void setSessionPeer(String sessionId, IPeer peer) {
+ logger.debug("Assigning routing destination peer [{}] to session [{}]", peer, sessionId);
+ SessionEntry se = sessionIdToEntry.get(sessionId);
+ if (se == null) {
+ throw new IllegalArgumentException("No session entry for id: " + sessionId);
+ }
+ else if (!(se instanceof RoutingAwareSessionEntry)) {
+ throw new IllegalArgumentException("Session entry is of a wrong type for id: " + sessionId);
+ }
+ else {
+ ((RoutingAwareSessionEntry) se).peer = peer.getUri().getFQDN();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.jdiameter.common.api.data.IRoutingAwareSessionDatasource#getSessionPeer(java.lang.String)
+ */
+ @Override
+ public String getSessionPeer(String sessionId) {
+ SessionEntry se = sessionIdToEntry.get(sessionId);
+ logger.debug("Looking up routing peer for session [{}]: {}", sessionId, se);
+ return (se != null && se instanceof RoutingAwareSessionEntry) ? ((RoutingAwareSessionEntry) se).peer : null;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.jdiameter.common.api.data.IRoutingAwareSessionDatasource#removeSessionPeer(java.lang.String)
+ */
+ @Override
+ public String removeSessionPeer(String sessionId) {
+ SessionEntry se = sessionIdToEntry.get(sessionId);
+ logger.debug("Looking up routing peer for removal for session [{}]: {}", sessionId, se);
+ if (se != null && se instanceof RoutingAwareSessionEntry) {
+ String oldPeer = ((RoutingAwareSessionEntry) se).peer;
+ ((RoutingAwareSessionEntry) se).peer = null;
+ ((RoutingAwareSessionEntry) se).getUnanswerablePeers().add(oldPeer);
+ return oldPeer;
+ }
+ else {
+ return null;
+ }
+ }
+
+ @Override
+ public void clearUnanswerablePeers(String sessionId) {
+ SessionEntry se = sessionIdToEntry.get(sessionId);
+ if (se != null && se instanceof RoutingAwareSessionEntry) {
+ ((RoutingAwareSessionEntry) se).getUnanswerablePeers().clear();
+ }
+ }
+
+ @Override
+ public List getUnanswerablePeers(String sessionId) {
+ SessionEntry se = sessionIdToEntry.get(sessionId);
+ if (se != null && se instanceof RoutingAwareSessionEntry) {
+ return ((RoutingAwareSessionEntry) se).getUnanswerablePeers();
+ }
+ else {
+ return null;
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.jdiameter.api.SessionPersistenceStorage#dumpStickySessions(int)
+ */
+ @Override
+ public List dumpStickySessions(int maxLimit) {
+ int counter = 0;
+ DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
+ List sessions = maxLimit > 0 ? new ArrayList(maxLimit) : new ArrayList(sessionIdToEntry.size());
+
+ logger.debug("Reading [{}] sessions out of [{}]", maxLimit > 0 ? String.valueOf(maxLimit) : "unlimited", sessionIdToEntry.size());
+
+ for (Map.Entry entry : sessionIdToEntry.entrySet()) {
+ if (entry.getValue() instanceof RoutingAwareSessionEntry) {
+ RoutingAwareSessionEntry tmpEntry = (RoutingAwareSessionEntry) entry.getValue();
+ if (tmpEntry.peer != null) {
+ sessions.add(tmpEntry.preetyPrint(entry.getKey(), dateFormat));
+ if (maxLimit > 0 && ++counter >= maxLimit) {
+ break;
+ }
+ }
+ }
+ }
+
+ return sessions;
+ }
+
+ /**
+ * Extends basic session entry, which is used to store records in session storage, with extra info about
+ * a specific peer that is bound to a particular session. Extra info is used for session persistent routing.
+ */
+ protected static class RoutingAwareSessionEntry extends SessionEntry {
+ private List unanswerable = new ArrayList();
+ String peer;
+
+ public List getUnanswerablePeers() {
+ return unanswerable;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("RoutingAwareSessionEntry [peer=").append(peer).append(", unanswerable=[").append(Arrays.toString(unanswerable.toArray())).append("], " +
+ "toString()=").append(super.toString()).append("]");
+ return builder.toString();
+ }
+
+ /**
+ * Gets a readable and more user friendly format of an entry.
+ *
+ * @param key key used to store that entry in a session storage map
+ * @param dateFormat format used to print last session activity timestamp
+ * @return readable representation of this session entry
+ */
+ public String preetyPrint(String key, DateFormat dateFormat) {
+ StringBuilder builder = new StringBuilder("{id=[");
+ builder.append(key).append("], peer=[").append(peer)
+ .append("], timestamp=[").append(dateFormat.format(new Date(session.getLastAccessedTime())))
+ .append("], unanswerable=[").append(Arrays.toString(unanswerable.toArray()))
+ .append("]}").toString();
+ return builder.toString();
+ }
+ }
+}
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/FailureAwareRouter.java b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/FailureAwareRouter.java
new file mode 100644
index 000000000..2951467fe
--- /dev/null
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/FailureAwareRouter.java
@@ -0,0 +1,22 @@
+package org.jdiameter.server.impl;
+
+import org.jdiameter.api.Configuration;
+import org.jdiameter.api.MetaData;
+import org.jdiameter.client.api.IContainer;
+import org.jdiameter.client.api.controller.IRealmTable;
+import org.jdiameter.common.api.concurrent.IConcurrentFactory;
+import org.jdiameter.server.api.IRouter;
+
+/**
+ * Just a simple counterpart of failure aware router defined for a client role.
+ */
+public class FailureAwareRouter extends org.jdiameter.client.impl.router.FailureAwareRouter implements IRouter {
+
+ /**
+ * Parameterized constructor. Should be called by any subclasses.
+ */
+ public FailureAwareRouter(IContainer container, IConcurrentFactory concurrentFactory, IRealmTable realmTable, Configuration config, MetaData aMetaData) {
+ super(container, concurrentFactory, realmTable, config, aMetaData);
+ }
+
+}
diff --git a/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/MutablePeerTableImpl.java b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/MutablePeerTableImpl.java
index 895f52bb9..82311b65f 100644
--- a/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/MutablePeerTableImpl.java
+++ b/core/jdiameter/impl/src/main/java/org/jdiameter/server/impl/MutablePeerTableImpl.java
@@ -42,35 +42,6 @@
package org.jdiameter.server.impl;
-import static org.jdiameter.client.impl.helpers.Parameters.PeerName;
-import static org.jdiameter.client.impl.helpers.Parameters.PeerTable;
-import static org.jdiameter.client.impl.helpers.Parameters.StopTimeOut;
-import static org.jdiameter.client.impl.helpers.Parameters.UseUriAsFqdn;
-import static org.jdiameter.common.api.concurrent.IConcurrentFactory.ScheduledExecServices.ConnectionTimer;
-import static org.jdiameter.common.api.concurrent.IConcurrentFactory.ScheduledExecServices.DuplicationMessageTimer;
-import static org.jdiameter.common.api.concurrent.IConcurrentFactory.ScheduledExecServices.PeerOverloadTimer;
-import static org.jdiameter.server.impl.helpers.Parameters.AcceptUndefinedPeer;
-import static org.jdiameter.server.impl.helpers.Parameters.DuplicateProtection;
-import static org.jdiameter.server.impl.helpers.Parameters.DuplicateSize;
-import static org.jdiameter.server.impl.helpers.Parameters.DuplicateTimer;
-import static org.jdiameter.server.impl.helpers.Parameters.PeerAttemptConnection;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.URISyntaxException;
-import java.net.UnknownServiceException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
import org.jdiameter.api.Avp;
import org.jdiameter.api.AvpDataException;
import org.jdiameter.api.Configuration;
@@ -100,6 +71,7 @@
import org.jdiameter.client.api.io.TransportException;
import org.jdiameter.client.api.parser.IMessageParser;
import org.jdiameter.client.impl.controller.PeerTableImpl;
+import org.jdiameter.client.impl.helpers.Parameters;
import org.jdiameter.common.api.concurrent.IConcurrentFactory;
import org.jdiameter.common.api.statistic.IStatisticManager;
import org.jdiameter.server.api.IFsmFactory;
@@ -114,8 +86,35 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URISyntaxException;
+import java.net.UnknownServiceException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.jdiameter.client.impl.helpers.Parameters.PeerName;
+import static org.jdiameter.client.impl.helpers.Parameters.StopTimeOut;
+import static org.jdiameter.client.impl.helpers.Parameters.UseUriAsFqdn;
+import static org.jdiameter.common.api.concurrent.IConcurrentFactory.ScheduledExecServices.ConnectionTimer;
+import static org.jdiameter.common.api.concurrent.IConcurrentFactory.ScheduledExecServices.DuplicationMessageTimer;
+import static org.jdiameter.common.api.concurrent.IConcurrentFactory.ScheduledExecServices.PeerOverloadTimer;
+import static org.jdiameter.server.impl.helpers.Parameters.AcceptUndefinedPeer;
+import static org.jdiameter.server.impl.helpers.Parameters.DuplicateProtection;
+import static org.jdiameter.server.impl.helpers.Parameters.DuplicateSize;
+import static org.jdiameter.server.impl.helpers.Parameters.DuplicateTimer;
+import static org.jdiameter.server.impl.helpers.Parameters.PeerAttemptConnection;
+
/**
- *
* @author erick.svenson@yahoo.com
* @author Alexandre Mendonca
* @author Bartosz Baranowski
@@ -142,7 +141,7 @@ public class MutablePeerTableImpl extends PeerTableImpl implements IMutablePeerT
protected ScheduledFuture duplicationHandler = null;
protected ConcurrentHashMap storageAnswers = new ConcurrentHashMap();
- protected boolean isAcceptUndefinedPeer = false;
+ protected boolean isAcceptUndefinedPeer = false;
// Connections handling -----------------------------------------------------
private ConcurrentHashMap incConnections;
@@ -190,9 +189,9 @@ public String getDuplicationKey() {
}
public MutablePeerTableImpl(Configuration config, MetaData metaData, IContainer stack, org.jdiameter.server.api.IRouter router,
- ISessionFactory sessionFactory, IFsmFactory fsmFactory, ITransportLayerFactory trFactory,
- IMessageParser parser, INetwork network, IOverloadManager ovrManager,
- IStatisticManager statisticFactory, IConcurrentFactory concurrentFactory) {
+ ISessionFactory sessionFactory, IFsmFactory fsmFactory, ITransportLayerFactory trFactory,
+ IMessageParser parser, INetwork network, IOverloadManager ovrManager,
+ IStatisticManager statisticFactory, IConcurrentFactory concurrentFactory) {
logger.debug("MutablePeerTableImpl is being created");
this.metaData = metaData;
this.config = config;
@@ -213,8 +212,8 @@ public MutablePeerTableImpl(Configuration config, MetaData metaData, IContainer
this.duplicateTimer = config.getLongValue(DuplicateTimer.ordinal(), (Long) DuplicateTimer.defValue());
this.duplicateSize = config.getIntValue(DuplicateSize.ordinal(), (Integer) DuplicateSize.defValue());
}
- logger.debug("Duplicate Protection Configuration: Enabled? {}, Timer: {}, Size: {}",
- new Object[]{this.duplicateProtection, this.duplicateTimer, this.duplicateSize});
+ logger.debug("Duplicate Protection Configuration: Enabled? {}, Timer: {}, Size: {}", new Object[]{this.duplicateProtection, this.duplicateTimer, this
+ .duplicateSize});
if (predefinedPeerTable == null) {
predefinedPeerTable = new CopyOnWriteArraySet();
}
@@ -227,12 +226,36 @@ public MutablePeerTableImpl(Configuration config, MetaData metaData, IContainer
logger.debug("MutablePeerTableImpl has finished initialisation");
}
+ protected Configuration getPeerConfig(String fqdn) throws URISyntaxException, UnknownServiceException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Searching configuration for peer fqdn: " + fqdn);
+ }
+ Configuration result = null;
+ Configuration[] peers = config.getChildren(Parameters.PeerTable.ordinal());
+ if (peers != null && peers.length > 0) {
+ for (Configuration peerConfig : peers) {
+ if (peerConfig.isAttributeExist(PeerName.ordinal())) {
+ String peerConfigFqdn = new URI(peerConfig.getStringValue(PeerName.ordinal(), "")).getFQDN();
+ if (fqdn.equals(peerConfigFqdn)) {
+ result = peerConfig;
+ break;
+ }
+ }
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Peer configuration {}found for FQDN: {}", (result == null ? "not " : ""), fqdn);
+ }
+ return result;
+ }
+
+
@Override
protected Peer createPeer(int rating, String uri, String ip, String portRange, MetaData metaData, Configuration globalConfig,
- Configuration peerConfig, org.jdiameter.client.api.fsm.IFsmFactory fsmFactory,
- org.jdiameter.client.api.io.ITransportLayerFactory transportFactory,
- IStatisticManager statisticFactory, IConcurrentFactory concurrentFactory,
- IMessageParser parser) throws InternalException, TransportException, URISyntaxException, UnknownServiceException {
+ Configuration peerConfig, org.jdiameter.client.api.fsm.IFsmFactory fsmFactory,
+ org.jdiameter.client.api.io.ITransportLayerFactory transportFactory,
+ IStatisticManager statisticFactory, IConcurrentFactory concurrentFactory,
+ IMessageParser parser) throws InternalException, TransportException, URISyntaxException, UnknownServiceException {
logger.debug("Creating Peer for URI [{}]", uri);
if (predefinedPeerTable == null) {
logger.debug("Creating new empty predefined peer table");
@@ -241,7 +264,8 @@ protected Peer createPeer(int rating, String uri, String ip, String portRange, M
logger.debug("Adding URI [{}] to predefinedPeerTable", uri);
predefinedPeerTable.add(new URI(uri).getFQDN());
if (peerConfig.getBooleanValue(PeerAttemptConnection.ordinal(), false)) {
- logger.debug("Peer at URI [{}] is configured to attempt a connection (acting as a client) and a new peer instance will be created and returned", uri);
+ logger.debug("Peer at URI [{}] is configured to attempt a connection (i.e. acting as a client) and a new peer instance will be created and returned",
+ uri);
return newPeerInstance(rating, new URI(uri), ip, portRange, true, null,
metaData, globalConfig, peerConfig, (IFsmFactory) fsmFactory,
(ITransportLayerFactory) transportFactory, parser, statisticFactory, concurrentFactory);
@@ -253,24 +277,22 @@ protected Peer createPeer(int rating, String uri, String ip, String portRange, M
}
protected IPeer newPeerInstance(int rating, URI uri, String ip, String portRange, boolean attCnn, IConnection connection,
- MetaData metaData, Configuration globalConfig, Configuration peerConfig, IFsmFactory fsmFactory,
- ITransportLayerFactory transportFactory, IMessageParser parser,
- IStatisticManager statisticFactory, IConcurrentFactory concurrentFactory)
- throws URISyntaxException, UnknownServiceException, InternalException, TransportException {
+ MetaData metaData, Configuration globalConfig, Configuration peerConfig, IFsmFactory fsmFactory,
+ ITransportLayerFactory transportFactory, IMessageParser parser,
+ IStatisticManager statisticFactory, IConcurrentFactory concurrentFactory) throws URISyntaxException,
+ UnknownServiceException, InternalException, TransportException {
logger.debug("Creating and returning a new Peer Instance for URI [{}].", uri);
return new org.jdiameter.server.impl.PeerImpl(
- rating, uri, ip, portRange, attCnn, connection,
- this, (org.jdiameter.server.api.IMetaData) metaData, globalConfig, peerConfig, sessionFactory,
- fsmFactory, transportFactory, statisticFactory, concurrentFactory, parser, network, ovrManager, sessionDatasource
- );
+ rating, uri, ip, portRange, attCnn, connection,
+ this, (org.jdiameter.server.api.IMetaData) metaData, globalConfig, peerConfig, sessionFactory,
+ fsmFactory, transportFactory, statisticFactory, concurrentFactory, parser, network, ovrManager, sessionDatasource
+ );
}
- @Override
public void setPeerTableListener(PeerTableListener peerTableListener) {
this.peerTableListener = peerTableListener;
}
- @Override
public boolean elementChanged(int i, Object data) {
Configuration newConf = (Configuration) data;
stopTimeOut = newConf.getLongValue(StopTimeOut.ordinal(), (Long) StopTimeOut.defValue());
@@ -283,7 +305,6 @@ public boolean isDuplicateProtection() {
return duplicateProtection;
}
- @Override
public void start() throws IllegalDiameterStateException, IOException { // TODO: use parent method
logger.debug("Starting MutablePeerTableImpl. Starting router, overload scheduler, connection check timer, etc.");
router.start();
@@ -291,7 +312,6 @@ public void start() throws IllegalDiameterStateException, IOException { // TODO:
overloadScheduler = concurrentFactory.getScheduledExecutorService(PeerOverloadTimer.name());
Runnable overloadTask = new Runnable() {
- @Override
public void run() {
if (ovrManager != null) {
for (Peer p : peerTable.values()) {
@@ -305,12 +325,11 @@ public void run() {
if (duplicateProtection) {
duplicationScheduler = concurrentFactory.getScheduledExecutorService(DuplicationMessageTimer.name());
Runnable duplicateTask = new Runnable() {
- @Override
public void run() {
long now = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
- logger.debug("Running Duplicate Cleaning Task. Duplicate Storage size is: {}. Removing entries with time <= '{}'",
- storageAnswers.size(), now - duplicateTimer);
+ logger.debug("Running Duplicate Cleaning Task. Duplicate Storage size is: {}. Removing entries with time <= '{}'", storageAnswers.size(), now -
+ duplicateTimer);
}
for (StorageEntry s : storageAnswers.values()) {
if (s != null && s.getTime() + duplicateTimer <= now) {
@@ -326,8 +345,8 @@ public void run() {
}
}
if (logger.isDebugEnabled()) {
- logger.debug("Completed Duplicate Cleaning Task. New Duplicate Storage size is: {}. Total task runtime: {}ms",
- storageAnswers.size(), System.currentTimeMillis() - now);
+ logger.debug("Completed Duplicate Cleaning Task. New Duplicate Storage size is: {}. Total task runtime: {}ms", storageAnswers.size(), System
+ .currentTimeMillis() - now);
}
}
};
@@ -336,7 +355,6 @@ public void run() {
//
connScheduler = concurrentFactory.getScheduledExecutorService(ConnectionTimer.name());
Runnable connectionCheckTask = new Runnable() {
- @Override
public void run() {
Map connections = getIncConnections();
for (IConnection connection : connections.values()) {
@@ -407,25 +425,21 @@ private INetworkGuard createNetworkGuard(final ITransportLayerFactory transportF
metaData.getLocalPeer().getIPAddresses(),
metaData.getLocalPeer().getUri().getPort(),
new INetworkConnectionListener() {
- @Override
public void newNetworkConnection(final IConnection connection) {
//PCB added logging
logger.debug("newNetworkConnection. connection [{}]", connection.getKey());
synchronized (regLock) {
final IConnectionListener listener = new IConnectionListener() {
- @Override
public void connectionOpened(String connKey) {
logger.debug("Connection [{}] opened", connKey);
}
- @Override
@SuppressWarnings("unchecked")
public void connectionClosed(String connKey, List notSended) {
logger.debug("Connection [{}] closed", connKey);
unregister(true);
}
- @Override
public void messageReceived(String connKey, IMessage message) {
logger.debug("Message [{}] received to peer [{}]", message, connKey);
if (message.isRequest() && message.getCommandCode() == Message.CAPABILITIES_EXCHANGE_REQUEST) {
@@ -445,7 +459,8 @@ public void messageReceived(String connKey, IMessage message) {
try {
realm = message.getAvps().getAvp(Avp.ORIGIN_REALM).getDiameterIdentity();
logger.debug("Origin-Realm in new received message is [{}]", realm);
- } catch (AvpDataException e) {
+ }
+ catch (AvpDataException e) {
logger.warn("Unable to retrieve find Origin-Realm AVP in CER", e);
unregister(true);
return;
@@ -510,7 +525,7 @@ public void messageReceived(String connKey, IMessage message) {
}
peer = newPeerInstance(0, uri, connection.getRemoteAddress().getHostAddress(), null, false, connection,
- metaData, config, null, fsmFactory, transportFactory, parser, statisticFactory, concurrentFactory);
+ metaData, config, getPeerConfig(uri.getFQDN()), fsmFactory, transportFactory, parser, statisticFactory, concurrentFactory);
logger.debug("Created new peer instance [{}] and adding to peer table", peer);
peer.setRealm(realm);
appendPeerToPeerTable(peer);
@@ -534,7 +549,6 @@ public void messageReceived(String connKey, IMessage message) {
}
}
- @Override
public void internalError(String connKey, IMessage message, TransportException cause) {
logger.debug("Connection [{}] internalError [{}]", connKey, cause);
unregister(true);
@@ -563,7 +577,7 @@ public void unregister(boolean release) {
}
}
}
- );
+ );
}
private void appendPeerToPeerTable(IPeer peer) {
@@ -585,7 +599,6 @@ private void appendPeerToPeerTable(IPeer peer) {
}
}
- @Override
public void stopping(int disconnectCause) {
super.stopping(disconnectCause);
if (networkGuard != null) {
@@ -634,20 +647,12 @@ public void stopping(int disconnectCause) {
}
- @Override
public Peer addPeer(URI peerURI, String realm, boolean connecting) {
//TODO: add sKey here, now it adds peer to all realms.
//TODO: better, separate addPeer from realm!
try {
- Configuration peerConfig = null;
- Configuration[] peers = config.getChildren(PeerTable.ordinal());
- // find peer config
- for (Configuration c : peers) {
- if (peerURI.getFQDN().equals(c.getStringValue(PeerName.ordinal(), ""))) {
- peerConfig = c;
- break;
- }
- }
+ Configuration peerConfig = getPeerConfig(peerURI.getFQDN());
+
if (peerConfig == null) {
peerConfig = new EmptyConfiguration(false).add(PeerAttemptConnection, connecting);
}
@@ -660,7 +665,7 @@ public Peer addPeer(URI peerURI, String realm, boolean connecting) {
appendPeerToPeerTable(peer);
boolean found = false;
- Collection realms = this.router.getRealmTable().getRealms(realm);
+ Collection realms = this.router.getRealmTable().getRealms(realm);
for (Realm r : realms) {
if (r.getName().equals(realm)) {
((IRealm) r).addPeerName(peerURI.toString());
@@ -686,7 +691,6 @@ public Set getAllRealms() {
return new HashSet