#include #include "sapmanager.h" #include "sapconnection.h" #include "sapconnectionrequest.h" #include "sapsocket.h" #include "sapchannelinfo.h" #include "capabilityagent.h" #include "capabilitypeer.h" #include "wmspeer.h" #include "sappeer.h" SAPPeer::SAPPeer(SAProtocol::Role role, const QString &localName, const QString &peerName, QObject *parent) : QObject(parent), _wms(new WMSPeer(role, localName, peerName, this)), _role(role), _localName(localName), _peerName(peerName) { } SAPPeer::~SAPPeer() { qDeleteAll(_conns); qDeleteAll(_sessions); _conns.clear(); _sessions.clear(); } SAPConnection * SAPPeer::createServiceConnection(const QString &profile, const QString &requesterProfile, SAPServiceInfo::Role requesterRole) { SAPManager *manager = SAPManager::instance(); int initiator = manager->registeredAgentId(requesterProfile, requesterRole); if (initiator < 0) { qWarning() << "Requester profile not found:" << requesterProfile; return 0; } // Search remote agent id in capability database. CapabilityPeer *capPeer = capabilityPeer(); int acceptor = capPeer->remoteAgentId(profile, SAPServiceInfo::oppositeRole(requesterRole)); if (acceptor < 0) { qWarning() << "Remote profile not found:" << profile; return 0; } SAPServiceInfo sInfo = manager->serviceInfo(initiator); SAPConnection *conn = new SAPConnection(this, profile); SAProtocol::ServiceConnectionRequestFrame request; request.messageType = SAProtocol::ServiceConnectionRequest; request.initiatorId = initiator; request.acceptorId = acceptor; request.profile = profile; if (request.acceptorId == SAProtocol::capabilityDiscoveryAgentId) { // Addressing the capability discovery service? Be anonymous about it request.initiatorId = SAProtocol::capabilityDiscoveryAgentId; } foreach (const SAPChannelInfo &cInfo, sInfo.channels()) { SAProtocol::ServiceConnectionRequestSession session; int sessionId = findUnusedSessionId(); session.sessionId = sessionId; session.channelId = cInfo.channelId(); session.qosType = cInfo.qosType(); session.qosDataRate = cInfo.qosDataRate(); session.qosPriority = cInfo.qosPriority(); session.payloadType = cInfo.payloadType(); request.sessions.append(session); SAPSocket *socket = new SAPSocket(conn, sessionId, cInfo); conn->setSocket(session.channelId, socket); _sessions.insert(sessionId, socket); } _conns.insert(profile, conn); writeDataToSession(SAProtocol::defaultSessionId, SAProtocol::packServiceConnectionRequestFrame(request)); return conn; } bool SAPPeer::terminateServiceConnection(const QString &profile, const QString &requesterProfile, SAPServiceInfo::Role requesterRole) { SAPManager *manager = SAPManager::instance(); int initiator = manager->registeredAgentId(requesterProfile, requesterRole); if (initiator < 0) { qWarning() << "Requester profile not found:" << requesterProfile; return false; } // Search remote agent id in capability database. CapabilityPeer *capPeer = capabilityPeer(); int acceptor = capPeer->remoteAgentId(profile, SAPServiceInfo::oppositeRole(requesterRole)); if (acceptor < 0) { qWarning() << "Remote profile not found:" << profile; return false; } SAPConnection *conn = _conns.value(profile, 0); if (conn) { SAProtocol::ServiceTerminationRequestFrame request; request.messageType = SAProtocol::ServiceTerminationRequest; request.initiatorId = initiator; request.acceptorId = acceptor; request.profile = profile; writeDataToSession(SAProtocol::defaultSessionId, SAProtocol::packServiceTerminationRequestFrame(request)); return true; } else { qWarning() << "Connection does not exist:" << profile; return false; } } SAProtocol::Role SAPPeer::role() const { return _role; } QString SAPPeer::localName() const { return _localName; } QString SAPPeer::peerName() const { return _peerName; } CapabilityPeer *SAPPeer::capabilityPeer() { return findChild(); } void SAPPeer::writeDataToSession(int session, const QByteArray &data) { sendFrame(SAProtocol::packFrame(session, data, SAProtocol::FrameData)); } void SAPPeer::writeControlToSession(int session, const QByteArray &data) { sendFrame(SAProtocol::packFrame(session, data, SAProtocol::FrameControl)); } void SAPPeer::acceptServiceConnection(SAPConnectionRequest *connReq, int statusCode) { SAPConnection *conn = connReq->connection(); SAProtocol::ServiceConnectionResponseFrame resp; resp.messageType = SAProtocol::ServiceConnectionResponse; resp.acceptorId = connReq->acceptorId(); resp.initiatorId = connReq->initiatorId(); resp.profile = conn->profile(); resp.statusCode = statusCode; foreach (const SAPSocket *socket, conn->sockets()) { resp.sessions.push_back(socket->sessionId()); } qDebug() << "Accepting service conection with status" << statusCode; writeDataToSession(SAProtocol::defaultSessionId, SAProtocol::packServiceConnectionResponseFrame(resp)); if (statusCode == 0) { foreach (SAPSocket *socket, conn->sockets()) { socket->setOpen(true); emit socket->connected(); } emit conn->connected(); } else { // Cancel any partial opened sessions foreach (SAPSocket *socket, conn->sockets()) { _sessions.remove(socket->sessionId()); } _conns.remove(conn->profile()); delete conn; } // Can't delete connReq now because we must return to it. // It will take care of itself. } void SAPPeer::handleSessionData(int session, const QByteArray &data) { if (session == SAProtocol::defaultSessionId) { // Default session data frame. handleDefaultSessionMessage(data); return; } else { SAPSocket *socket = _sessions.value(session, 0); if (socket && socket->isOpen()) { socket->acceptIncomingData(data); } else { qWarning() << "Got information for a session that's not yet open!" << session; } } } void SAPPeer::handleSessionControl(int session, const QByteArray &data) { // Default session ID should not receive control messages, so we don't check for it SAPSocket *socket = _sessions.value(session, 0); if (socket && socket->isOpen()) { socket->acceptIncomingControl(data); } else { qWarning() << "Got information for a session that's not yet open!" << session; } } void SAPPeer::handleConnected() { emit connected(); // Call in all the agents SAPManager *manager = SAPManager::instance(); foreach (SAPAgent *agent, manager->allAgents()) { agent->peerFound(this); } } void SAPPeer::handleDisconnected() { // Clear out all active sessions emit disconnected(); qDeleteAll(_conns); // Deleting connections will clean up all sockets too. _sessions.clear(); _conns.clear(); // TODO Figure out who should actually reconnect } int SAPPeer::findUnusedSessionId() const { if (_sessions.size() > SAProtocol::maxSessionId) { qWarning() << "Ran out of session ids!"; return -1; } int id = 1; while (_sessions.contains(id)) { id++; } Q_ASSERT(id <= SAProtocol::maxSessionId && id != SAProtocol::defaultSessionId); return id; } void SAPPeer::handleDefaultSessionMessage(const QByteArray &message) { const quint8 message_type = message[0]; switch (message_type) { case SAProtocol::ServiceConnectionRequest: { SAPManager *manager = SAPManager::instance(); SAProtocol::ServiceConnectionRequestFrame req = SAProtocol::unpackServiceConnectionRequestFrame(message); bool ok = true; qDebug() << "Service connection request to profile" << req.profile; SAPAgent *agent = manager->agent(req.acceptorId); if (!agent) { qWarning() << "Requested agent does not exist"; ok = false; } foreach (const SAProtocol::ServiceConnectionRequestSession &s, req.sessions) { if (_sessions.contains(s.sessionId)) { qWarning() << "Requested session" << s.sessionId << "is already active"; ok = false; } } if (!ok) { // Send a negative status code message back SAProtocol::ServiceConnectionResponseFrame resp; resp.messageType = SAProtocol::ServiceConnectionResponse; resp.acceptorId = req.acceptorId; resp.initiatorId = req.initiatorId; resp.profile = req.profile; resp.statusCode = 1; foreach (const SAProtocol::ServiceConnectionRequestSession &s, req.sessions) { resp.sessions.push_back(s.sessionId); } writeDataToSession(SAProtocol::defaultSessionId, SAProtocol::packServiceConnectionResponseFrame(resp)); return; } SAPConnection *conn = new SAPConnection(this, req.profile); foreach (const SAProtocol::ServiceConnectionRequestSession &s, req.sessions) { SAPChannelInfo cInfo; cInfo.setChannelId(s.channelId); cInfo.setQoSType(static_cast(s.qosType)); cInfo.setQoSDataRate(static_cast(s.qosDataRate)); cInfo.setQoSPriority(static_cast(s.qosPriority)); cInfo.setPayloadType(static_cast(s.payloadType)); SAPSocket *socket = new SAPSocket(conn, s.sessionId, cInfo); conn->setSocket(s.channelId, socket); qDebug() << " opening channel" << s.channelId << "as session" << s.sessionId; qDebug() << " " << cInfo; _sessions.insert(s.sessionId, socket); } _conns.insert(req.profile, conn); SAPConnectionRequest *connReq = new SAPConnectionRequest(conn, req.initiatorId, req.acceptorId); agent->requestConnection(connReq); break; } case SAProtocol::ServiceConnectionResponse: { SAProtocol::ServiceConnectionResponseFrame frame = SAProtocol::unpackServiceConnectionResponseFrame(message); bool ok = frame.statusCode == 0; if (!ok) { qWarning() << "Failure to create a service connection"; } else { qDebug() << "Service connection OK"; } if (frame.sessions.isEmpty()) { qWarning() << "No sessions in frame, nothing to do"; return; } SAPSocket *firstSocket = _sessions.value(frame.sessions.first(), 0); if (!firstSocket) { qWarning() << "Unknown session id:" << frame.sessions.first(); return; } SAPConnection *conn = firstSocket->connection(); if (ok) { foreach (int session, frame.sessions) { SAPSocket *socket = _sessions.value(session, 0); if (socket) { if (socket->isOpen()) { qWarning() << "Session" << session << "was already open?"; } qDebug() << "Session" << session << "now live"; socket->setOpen(true); emit socket->connected(); } else { qWarning() << "Unknown session id:" << session; } } emit conn->connected(); } else { emit conn->error(frame.statusCode); emit conn->disconnected(); delete conn; } break; } case SAProtocol::ServiceTerminationRequest: { SAProtocol::ServiceTerminationRequestFrame req = SAProtocol::unpackServiceTerminationRequestFrame(message); SAPConnection *conn = _conns.value(req.profile, 0); qDebug() << "Service termination request to profile" << req.profile; if (!conn) { // We did not find this profile; send a error back SAProtocol::ServiceTerminationResponseFrame resp; resp.messageType = SAProtocol::ServiceTerminationResponse; resp.acceptorId = req.acceptorId; resp.initiatorId = req.initiatorId; resp.profile = req.profile; resp.statusCode = 1; qWarning() << "Profile" << req.profile << "not connected, sending negative response"; writeDataToSession(SAProtocol::defaultSessionId, SAProtocol::packServiceTerminationResponseFrame(resp)); return; } // Ok, proceed to terminate the connection foreach (SAPSocket *socket, conn->sockets()) { emit socket->disconnected(); _sessions.remove(socket->sessionId()); } emit conn->disconnected(); _conns.remove(conn->profile()); delete conn; // Acknowledge everything was succesful SAProtocol::ServiceTerminationResponseFrame resp; resp.messageType = SAProtocol::ServiceTerminationResponse; resp.acceptorId = req.acceptorId; resp.initiatorId = req.initiatorId; resp.profile = req.profile; resp.statusCode = 0; writeDataToSession(SAProtocol::defaultSessionId, SAProtocol::packServiceTerminationResponseFrame(resp)); break; } default: qWarning() << "Unknown default session message type:" << message_type; } }