#include #include "sapmanager.h" #include "sapconnection.h" #include "sapconnectionrequest.h" #include "sapsocket.h" #include "sapchannelinfo.h" #include "capabilityagent.h" #include "capabilitypeer.h" #include "wmspeer.h" #include "sappeer.h" SAPPeer::SAPPeer(SAProtocol::Role role, const QString &localName, const QString &peerName, QObject *parent) : QObject(parent), _wms(new WMSPeer(role, localName, peerName, this)), _role(role), _localName(localName), _peerName(peerName) { } SAPConnection * SAPPeer::createServiceConnection(const QString &profile, const QString &requesterProfile, SAPServiceInfo::Role requesterRole) { SAPManager *manager = SAPManager::instance(); int initiator = manager->registeredAgentId(requesterProfile, requesterRole); if (initiator < 0) { qWarning() << "Requester profile not found:" << requesterProfile; return 0; } // Search remote agent id in capability database. CapabilityPeer *capPeer = capabilityPeer(); int acceptor = capPeer->remoteAgentId(profile, SAPServiceInfo::oppositeRole(requesterRole)); if (acceptor < 0) { qWarning() << "Remote profile not found:" << profile; return 0; } SAPServiceInfo sInfo = manager->serviceInfo(initiator); SAPConnection *conn = new SAPConnection(this, profile); SAProtocol::ServiceConnectionRequestFrame request; request.messageType = SAProtocol::ServiceConnectionRequest; request.initiatorId = initiator; request.acceptorId = acceptor; request.profile = profile; if (request.acceptorId == SAProtocol::capabilityDiscoveryAgentId) { // Addressing the capability discovery service? Be anonymous about it request.initiatorId = SAProtocol::capabilityDiscoveryAgentId; } foreach (const SAPChannelInfo &cInfo, sInfo.channels()) { SAProtocol::ServiceConnectionRequestSession session; int sessionId = findUnusedSessionId(); session.sessionId = sessionId; session.channelId = cInfo.channelId(); session.qosType = cInfo.qosType(); session.qosDataRate = cInfo.qosDataRate(); session.qosPriority = cInfo.qosPriority(); session.payloadType = cInfo.payloadType(); request.sessions.append(session); SAPSocket *socket = new SAPSocket(conn, sessionId, cInfo); conn->setSocket(session.channelId, socket); _sessions.insert(sessionId, socket); } writeDataToSession(SAProtocol::defaultSessionId, SAProtocol::packServiceConnectionRequestFrame(request)); return conn; } SAProtocol::Role SAPPeer::role() const { return _role; } QString SAPPeer::localName() const { return _localName; } QString SAPPeer::peerName() const { return _peerName; } CapabilityPeer *SAPPeer::capabilityPeer() { return findChild(); } void SAPPeer::writeDataToSession(int session, const QByteArray &data) { sendFrame(SAProtocol::packFrame(session, data, SAProtocol::FrameData)); } void SAPPeer::writeControlToSession(int session, const QByteArray &data) { sendFrame(SAProtocol::packFrame(session, data, SAProtocol::FrameControl)); } void SAPPeer::writeAckToSession(int session, int seqNum) { SAProtocol::ControlFrame frame; frame.type = SAProtocol::ControlFrameBlockAck; frame.seqNum = seqNum; writeControlToSession(session, SAProtocol::packControlFrame(frame)); } void SAPPeer::acceptServiceConnection(SAPConnectionRequest *connReq, int statusCode) { SAPConnection *conn = connReq->connection(); SAProtocol::ServiceConnectionResponseFrame resp; resp.messageType = SAProtocol::ServiceConnectionResponse; resp.acceptorId = connReq->acceptorId(); resp.initiatorId = connReq->initiatorId(); resp.profile = conn->profile(); resp.statusCode = statusCode; foreach (const SAPSocket *socket, conn->sockets()) { resp.sessions.push_back(socket->sessionId()); } qDebug() << "Accepting service conection with status" << statusCode; writeDataToSession(SAProtocol::defaultSessionId, SAProtocol::packServiceConnectionResponseFrame(resp)); if (statusCode == 0) { foreach (SAPSocket *socket, conn->sockets()) { socket->setOpen(true); emit socket->connected(); } emit conn->connected(); } else { // Cancel any partial opened sessions foreach (SAPSocket *socket, conn->sockets()) { _sessions.remove(socket->sessionId()); delete socket; } delete conn; // Also deletes child sockets } // Can't delete connReq now because we must return to it. // It will take care of itself. } void SAPPeer::handleSessionData(int session, const QByteArray &data) { if (session == SAProtocol::defaultSessionId) { // Default session data frame. handleDefaultSessionMessage(data); return; } else { SAPSocket *socket = _sessions.value(session, 0); if (socket && socket->isOpen()) { socket->acceptIncomingData(data); } else { qWarning() << "Got information for a session that's not yet open!" << session; } } } void SAPPeer::handleConnected() { emit connected(); // Manually call the capability agent in order to trigger initial capability discovery. CapabilityAgent *caps = CapabilityAgent::instance(); caps->peerFound(this); } void SAPPeer::handleDisconnected() { // TODO Figure out who should actually reconnect emit disconnected(); } int SAPPeer::findUnusedSessionId() const { if (_sessions.size() > SAProtocol::maxSessionId) { qWarning() << "Ran out of session ids!"; return -1; } int id = 1; while (_sessions.contains(id)) { id++; } Q_ASSERT(id <= SAProtocol::maxSessionId && id != SAProtocol::defaultSessionId); return id; } void SAPPeer::handleDefaultSessionMessage(const QByteArray &message) { const quint8 message_type = message[0]; switch (message_type) { case SAProtocol::ServiceConnectionRequest: { SAPManager *manager = SAPManager::instance(); SAProtocol::ServiceConnectionRequestFrame req = SAProtocol::unpackServiceConnectionRequestFrame(message); bool ok = true; qDebug() << "Service connection request to profile" << req.profile; SAPAgent *agent = manager->agent(req.acceptorId); if (!agent) { qWarning() << "Requested agent does not exist"; ok = false; } foreach (const SAProtocol::ServiceConnectionRequestSession &s, req.sessions) { if (_sessions.contains(s.sessionId)) { qWarning() << "Requested session" << s.sessionId << "is already active"; ok = false; } } if (!ok) { // Send a negative status code message back SAProtocol::ServiceConnectionResponseFrame resp; resp.messageType = SAProtocol::ServiceConnectionResponse; resp.acceptorId = req.acceptorId; resp.initiatorId = req.initiatorId; resp.profile = req.profile; resp.statusCode = 1; foreach (const SAProtocol::ServiceConnectionRequestSession &s, req.sessions) { resp.sessions.push_back(s.sessionId); } writeDataToSession(SAProtocol::defaultSessionId, SAProtocol::packServiceConnectionResponseFrame(resp)); return; } SAPConnection *conn = new SAPConnection(this, req.profile); foreach (const SAProtocol::ServiceConnectionRequestSession &s, req.sessions) { SAPChannelInfo cInfo; cInfo.setChannelId(s.channelId); cInfo.setQoSType(static_cast(s.qosType)); cInfo.setQoSDataRate(static_cast(s.qosDataRate)); cInfo.setQoSPriority(static_cast(s.qosPriority)); cInfo.setPayloadType(static_cast(s.payloadType)); SAPSocket *socket = new SAPSocket(conn, s.sessionId, cInfo); conn->setSocket(s.channelId, socket); qDebug() << " opening channel" << s.channelId << "as session" << s.sessionId; qDebug() << " " << cInfo; _sessions.insert(s.sessionId, socket); } SAPConnectionRequest *connReq = new SAPConnectionRequest(conn, req.initiatorId, req.acceptorId); agent->requestConnection(connReq); break; } case SAProtocol::ServiceConnectionResponse: { SAProtocol::ServiceConnectionResponseFrame frame = SAProtocol::unpackServiceConnectionResponseFrame(message); bool ok = frame.statusCode == 0; if (!ok) { qWarning() << "Failure to create a service connection"; } else { qDebug() << "Service connection OK"; } if (frame.sessions.isEmpty()) { qWarning() << "No sessions in frame, nothing to do"; return; } SAPSocket *firstSocket = _sessions.value(frame.sessions.first(), 0); if (!firstSocket) { qWarning() << "Unknown session id:" << frame.sessions.first(); return; } SAPConnection *conn = firstSocket->connection(); foreach (int session, frame.sessions) { SAPSocket *socket = _sessions.value(session, 0); if (socket) { if (socket->isOpen()) { qWarning() << "Session" << session << "was already open?"; } qDebug() << "Session" << session << "now live"; socket->setOpen(true); emit socket->connected(); } else { qWarning() << "Unknown session id:" << session; } } if (ok) { emit conn->connected(); } else { emit conn->error(frame.statusCode); emit conn->disconnected(); } break; } default: qWarning() << "Unknown default session message type:" << message_type; } }