#include #include #include #include "sappeer.h" #include "sapconnection.h" #include "sapsocket.h" #define DELAYED_ACK_TIME 1000 #define RESEND_TIME 5000 #define WINDOW_SIZE_MSGS 10 SAPSocket::SAPSocket(SAPConnection *conn, int sessionId, const SAPChannelInfo &chanInfo) : QObject(conn), _sessionId(sessionId), _info(chanInfo), _open(false), _outLastAck(0), _outFlyingPkts(0), _inLastSeqNum(0), _inLastAck(0) { } SAPPeer * SAPSocket::peer() { return connection()->peer(); } SAPConnection * SAPSocket::connection() { return static_cast(parent()); } SAPChannelInfo SAPSocket::channelInfo() const { return _info; } bool SAPSocket::isOpen() const { return _open; } bool SAPSocket::messageAvailable() const { return !_in.empty(); } QByteArray SAPSocket::receive() { if (!_in.empty()) { return _in.dequeue(); } else { return QByteArray(); } } bool SAPSocket::send(const QByteArray &data) { if (!isOpen()) { qWarning() << "Socket is not yet open"; return false; } if (data.size() > 65000) { qWarning() << "Fragmentation is not yet supported"; return false; } if (isReliable()) { int seqNum = (_outLastAck + _out.size() + 1) % std::numeric_limits::max(); if (_outFlyingPkts >= WINDOW_SIZE_MSGS) { // Send buffer is not empty; enqueue qDebug() << _sessionId << "Enqueuing" << seqNum << " size=" << data.size(); } else { qDebug() << _sessionId << "Sending" << seqNum << " size=" << data.size(); sendPacket(seqNum, data); _outFlyingPkts++; } _out.append(data); if (!_resendTimer.isActive()) { _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this); } qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts; } else { // Just send the packet as is. sendPacket(0, data); } return true; } void SAPSocket::setOpen(bool open) { _open = open; } void SAPSocket::acceptIncomingData(const QByteArray &data) { SAProtocol::DataFrame frame = SAProtocol::unpackDataFrame(data, isReliable(), supportsFragmentation()); if (isReliable()) { quint16 expectedSeqNum = _inLastSeqNum + 1; if (frame.seqNum != expectedSeqNum) { qWarning() << "Unexpected sequence number" << frame.seqNum << "on session" << _sessionId << "(expected " << expectedSeqNum << ")"; } else { _inLastSeqNum = frame.seqNum; qDebug() << _sessionId << "Got seqNum" << frame.seqNum << " size=" << data.size(); if (!_ackTimer.isActive()) { _ackTimer.start(DELAYED_ACK_TIME, Qt::CoarseTimer, this); } } } _in.enqueue(frame.data); emit messageReceived(); } void SAPSocket::acceptIncomingControl(const QByteArray &data) { SAProtocol::ControlFrame frame = SAProtocol::unpackControlFrame(data); if (!isReliable()) { qWarning() << "Received a control frame but socket is not in QoS mode"; return; } switch (frame.type) { case SAProtocol::ControlFrameBlockAck: handleBlockAck(frame.seqNum); break; default: qWarning() << "Unhandled control frame type" << frame.type; break; } } int SAPSocket::sessionId() const { return _sessionId; } void SAPSocket::timerEvent(QTimerEvent *event) { if (event->timerId() == _ackTimer.timerId()) { qDebug() << "Ack timer tick"; if (_inLastSeqNum != _inLastAck) { sendBlockAck(_inLastSeqNum); _inLastAck = _inLastSeqNum; } _ackTimer.stop(); } else if (event->timerId() == _resendTimer.timerId()) { qDebug() << "Resend timer tick"; if (!_out.isEmpty()) { _outFlyingPkts = 0; // Assume everything went wrong sendPacketsFromQueue(); } else { _resendTimer.stop(); } } else { QObject::timerEvent(event); } } bool SAPSocket::isReliable() const { return _info.qosType() == SAPChannelInfo::QoSReliabilityEnable; } bool SAPSocket::supportsFragmentation() const { return _info.qosType() == SAPChannelInfo::QoSReliabilityEnable || _info.qosType() == SAPChannelInfo::QoSReliabilityDisable; } void SAPSocket::sendBlockAck(int seqNum) { SAProtocol::ControlFrame frame; frame.type = SAProtocol::ControlFrameBlockAck; frame.seqNum = seqNum; peer()->writeControlToSession(_sessionId, SAProtocol::packControlFrame(frame)); } void SAPSocket::sendPacket(int seqNum, const QByteArray &data) { SAProtocol::DataFrame frame; frame.withSeqNum = isReliable(); frame.seqNum = seqNum; frame.withFragStatus = supportsFragmentation(); frame.fragStatus = SAProtocol::FragmentNone; frame.data = data; peer()->writeDataToSession(_sessionId, SAProtocol::packDataFrame(frame)); } void SAPSocket::handleBlockAck(int seqNum) { qDebug() << _sessionId << "Received block ack for" << seqNum; int n = seqNum - _outLastAck; if (n < 0) n += std::numeric_limits::max(); qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n; if (n <= 0) { qWarning() << _sessionId << "repeated ack"; return; } if (n <= _out.size()) { _out.erase(_out.begin(), _out.begin() + n); _outLastAck += static_cast(n); _outFlyingPkts -= n; qDebug() << _sessionId << "new outlastack" << _outLastAck; } qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts; Q_ASSERT(_outFlyingPkts <= _out.size()); if (_outFlyingPkts == 0 || _out.size() == 0) { qDebug() << _sessionId << "Stopping resend timer"; _resendTimer.stop(); } sendPacketsFromQueue(); } void SAPSocket::sendPacketsFromQueue() { const int n = qMin(WINDOW_SIZE_MSGS, _out.size()) - _outFlyingPkts; Q_ASSERT(n >= 0); for (int i = _outFlyingPkts; i < _outFlyingPkts + n; i++) { int seqNum = _outLastAck + i + 1; const QByteArray &pkt = _out.at(i); qDebug() << _sessionId << "Sending packet" << seqNum << "size=" << pkt.size(); sendPacket(seqNum, pkt); } _outFlyingPkts += n; Q_ASSERT(_outFlyingPkts <= _out.size()); qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n; if (n > 0 && !_resendTimer.isActive()) { _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this); } }