From 14d20babe395e52d220bbc27e91cec2fddd1ed0f Mon Sep 17 00:00:00 2001 From: Javier Date: Thu, 24 Dec 2015 19:04:04 +0100 Subject: still testing sap control flow --- sapsocket.cc | 140 ++++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 120 insertions(+), 20 deletions(-) (limited to 'sapsocket.cc') diff --git a/sapsocket.cc b/sapsocket.cc index e9a69ec..25e956f 100644 --- a/sapsocket.cc +++ b/sapsocket.cc @@ -1,16 +1,18 @@ #include #include +#include #include "sappeer.h" #include "sapconnection.h" #include "sapsocket.h" #define DELAYED_ACK_TIME 1000 +#define RESEND_TIME 10000 #define WINDOW_SIZE_MSGS 10 SAPSocket::SAPSocket(SAPConnection *conn, int sessionId, const SAPChannelInfo &chanInfo) : QObject(conn), _sessionId(sessionId), _info(chanInfo), _open(false), - _outLastSeqNum(0), _outLastAck(0), _inLastSeqNum(0), _inLastAck(0) + _outLastAck(0), _outFlyingPkts(0), _inLastSeqNum(0), _inLastAck(0) { } @@ -50,8 +52,6 @@ QByteArray SAPSocket::receive() bool SAPSocket::send(const QByteArray &data) { - SAProtocol::DataFrame frame; - if (!isOpen()) { qWarning() << "Socket is not yet open"; return false; @@ -62,21 +62,26 @@ bool SAPSocket::send(const QByteArray &data) return false; } - if (_out.size() > WINDOW_SIZE_MSGS) { - // Send buffer is not empty; enqueue - // TODO Realiability - } - - frame.withSeqNum = isWithSeqNum(); if (isReliable()) { - frame.seqNum = ++_outLastSeqNum; + int seqNum = (_outLastAck + _out.size() + 1) % std::numeric_limits::max(); + if (_outFlyingPkts >= WINDOW_SIZE_MSGS) { + // Send buffer is not empty; enqueue + qDebug() << _sessionId << "Enqueuing" << seqNum << " size=" << data.size(); + } else { + qDebug() << _sessionId << "Sending" << seqNum << " size=" << data.size(); + sendPacket(seqNum, data); + _outFlyingPkts++; + } + _out.append(data); + if (!_resendTimer.isActive()) { + _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this); + } + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts; } else { - frame.seqNum = 0; + // Just send the packet as is. + sendPacket(0, data); } - frame.unk_1 = 0; // Is this related to fragmentation? - frame.data = data; - peer()->writeDataToSession(_sessionId, SAProtocol::packDataFrame(frame)); return true; } @@ -87,7 +92,6 @@ void SAPSocket::setOpen(bool open) void SAPSocket::acceptIncomingData(const QByteArray &data) { - if (data.isEmpty()) return; SAProtocol::DataFrame frame = SAProtocol::unpackDataFrame(data, isWithSeqNum()); if (isReliable()) { @@ -98,9 +102,10 @@ void SAPSocket::acceptIncomingData(const QByteArray &data) << "(expected " << expectedSeqNum << ")"; } else { _inLastSeqNum = frame.seqNum; + qDebug() << _sessionId << "Got seqNum" << frame.seqNum; - if (!_timer.isActive()) { - _timer.start(DELAYED_ACK_TIME, Qt::CoarseTimer, this); + if (!_ackTimer.isActive()) { + _ackTimer.start(DELAYED_ACK_TIME, Qt::CoarseTimer, this); } } } @@ -110,6 +115,25 @@ void SAPSocket::acceptIncomingData(const QByteArray &data) emit messageReceived(); } +void SAPSocket::acceptIncomingControl(const QByteArray &data) +{ + SAProtocol::ControlFrame frame = SAProtocol::unpackControlFrame(data); + + if (!isReliable()) { + qWarning() << "Received a control frame but socket is not in QoS mode"; + return; + } + + switch (frame.type) { + case SAProtocol::ControlFrameBlockAck: + handleBlockAck(frame.seqNum); + break; + default: + qWarning() << "Unhandled control frame type" << frame.type; + break; + } +} + int SAPSocket::sessionId() const { return _sessionId; @@ -117,12 +141,20 @@ int SAPSocket::sessionId() const void SAPSocket::timerEvent(QTimerEvent *event) { - if (event->timerId() == _timer.timerId()) { + if (event->timerId() == _ackTimer.timerId()) { if (_inLastSeqNum != _inLastAck) { - peer()->writeAckToSession(_sessionId, _inLastSeqNum); + sendBlockAck(_inLastSeqNum); + _inLastAck = _inLastSeqNum; } - _timer.stop(); + _ackTimer.stop(); + } else if (event->timerId() == _resendTimer.timerId()) { + if (!_out.isEmpty()) { + _outFlyingPkts = 0; // Assume everything went wrong + sendPacketsFromQueue(); + } else { + _resendTimer.stop(); + } } else { QObject::timerEvent(event); } @@ -138,3 +170,71 @@ bool SAPSocket::isWithSeqNum() const return _info.qosType() == SAPChannelInfo::QoSReliabilityDisable || _info.qosType() == SAPChannelInfo::QoSReliabilityEnable; } + +void SAPSocket::sendBlockAck(int seqNum) +{ + SAProtocol::ControlFrame frame; + frame.type = SAProtocol::ControlFrameBlockAck; + frame.seqNum = seqNum; + peer()->writeControlToSession(_sessionId, SAProtocol::packControlFrame(frame)); +} + +void SAPSocket::sendPacket(int seqNum, const QByteArray &data) +{ + SAProtocol::DataFrame frame; + frame.withSeqNum = isWithSeqNum(); + frame.seqNum = seqNum; + frame.unk_1 = 0; // Is this related to fragmentation? + frame.data = data; + peer()->writeDataToSession(_sessionId, SAProtocol::packDataFrame(frame)); +} + +void SAPSocket::handleBlockAck(int seqNum) +{ + qDebug() << _sessionId << "Received block ack for" << seqNum; + int n = seqNum - _outLastAck; + if (n < 0) n += std::numeric_limits::max(); + + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n; + + if (n <= 0) { + qWarning() << _sessionId << "repeated ack"; + return; + } + + if (n <= _out.size()) { + _out.erase(_out.begin(), _out.begin() + n); + _outLastAck += static_cast(n); + _outFlyingPkts -= n; + qDebug() << _sessionId << "new outlastack" << _outLastAck; + } + + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts; + + if (_outFlyingPkts == 0 || _out.size() == 0) { + qDebug() << _sessionId << "Stopping resend timer"; + _resendTimer.stop(); + } + + sendPacketsFromQueue(); +} + +void SAPSocket::sendPacketsFromQueue() +{ + const int n = qMin(WINDOW_SIZE_MSGS - _outFlyingPkts, _out.size()); + + for (int i = 0; i < n; i++) { + int seqNum = _outLastAck + i + 1; + const QByteArray &pkt = _out.at(i); + qDebug() << _sessionId << "Sending packet" << seqNum << "size=" << pkt.size(); + sendPacket(seqNum, pkt); + } + + _outFlyingPkts += n; + + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n; + + if (n > 0 && !_resendTimer.isActive()) { + _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this); + } +} -- cgit v1.2.3