From a45977185a485624095bff1a15024e9199eee676 Mon Sep 17 00:00:00 2001 From: Javier Date: Fri, 1 Jan 2016 22:05:42 +0100 Subject: reorganize source files into SAP and agents --- sap/sapsocket.cc | 250 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 sap/sapsocket.cc (limited to 'sap/sapsocket.cc') diff --git a/sap/sapsocket.cc b/sap/sapsocket.cc new file mode 100644 index 0000000..390aef0 --- /dev/null +++ b/sap/sapsocket.cc @@ -0,0 +1,250 @@ +#include +#include +#include + +#include "sappeer.h" +#include "sapconnection.h" +#include "sapsocket.h" + +#define DELAYED_ACK_TIME 1000 +#define RESEND_TIME 5000 +#define WINDOW_SIZE_MSGS 10 + +SAPSocket::SAPSocket(SAPConnection *conn, int sessionId, const SAPChannelInfo &chanInfo) : + QObject(conn), _sessionId(sessionId), _info(chanInfo), _open(false), + _outLastAck(0), _outFlyingPkts(0), _inLastSeqNum(0), _inLastAck(0) +{ +} + +SAPPeer * SAPSocket::peer() +{ + return connection()->peer(); +} + +SAPConnection * SAPSocket::connection() +{ + return static_cast(parent()); +} + +SAPChannelInfo SAPSocket::channelInfo() const +{ + return _info; +} + +bool SAPSocket::isOpen() const +{ + return _open; +} + +bool SAPSocket::messageAvailable() const +{ + return !_in.empty(); +} + +QByteArray SAPSocket::receive() +{ + if (!_in.empty()) { + return _in.dequeue(); + } else { + return QByteArray(); + } +} + +bool SAPSocket::send(const QByteArray &data) +{ + if (!isOpen()) { + qWarning() << "Socket is not yet open"; + return false; + } + + if (data.size() > 65000) { + qWarning() << "Fragmentation is not yet supported"; + return false; + } + + if (isReliable()) { + int seqNum = (_outLastAck + _out.size() + 1) % std::numeric_limits::max(); + if (_outFlyingPkts >= WINDOW_SIZE_MSGS) { + // Send buffer is not empty; enqueue + qDebug() << _sessionId << "Enqueuing" << seqNum << " size=" << data.size(); + } else { + qDebug() << _sessionId << "Sending" << seqNum << " size=" << data.size(); + sendPacket(seqNum, data); + _outFlyingPkts++; + } + _out.append(data); + if (!_resendTimer.isActive()) { + _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this); + } + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts; + } else { + // Just send the packet as is. + sendPacket(0, data); + } + + return true; +} + +void SAPSocket::setOpen(bool open) +{ + _open = open; +} + +void SAPSocket::acceptIncomingData(const QByteArray &data) +{ + SAProtocol::DataFrame frame = SAProtocol::unpackDataFrame(data, + isReliable(), supportsFragmentation()); + + if (isReliable()) { + quint16 expectedSeqNum = _inLastSeqNum + 1; + if (frame.seqNum != expectedSeqNum) { + qWarning() << "Unexpected sequence number" << frame.seqNum + << "on session" << _sessionId + << "(expected " << expectedSeqNum << ")"; + } else { + _inLastSeqNum = frame.seqNum; + qDebug() << _sessionId << "Got seqNum" << frame.seqNum << " size=" << data.size(); + + if (!_ackTimer.isActive()) { + _ackTimer.start(DELAYED_ACK_TIME, Qt::CoarseTimer, this); + } + } + } + + _in.enqueue(frame.data); + + emit messageReceived(); +} + +void SAPSocket::acceptIncomingControl(const QByteArray &data) +{ + SAProtocol::ControlFrame frame = SAProtocol::unpackControlFrame(data); + + if (!isReliable()) { + qWarning() << "Received a control frame but socket is not in QoS mode"; + return; + } + + switch (frame.type) { + case SAProtocol::ControlFrameBlockAck: + handleBlockAck(frame.seqNum); + break; + default: + qWarning() << "Unhandled control frame type" << frame.type; + break; + } +} + +int SAPSocket::sessionId() const +{ + return _sessionId; +} + +void SAPSocket::timerEvent(QTimerEvent *event) +{ + if (event->timerId() == _ackTimer.timerId()) { + qDebug() << "Ack timer tick"; + if (_inLastSeqNum != _inLastAck) { + sendBlockAck(_inLastSeqNum); + + _inLastAck = _inLastSeqNum; + } + _ackTimer.stop(); + } else if (event->timerId() == _resendTimer.timerId()) { + qDebug() << "Resend timer tick"; + if (!_out.isEmpty()) { + _outFlyingPkts = 0; // Assume everything went wrong + sendPacketsFromQueue(); + } else { + _resendTimer.stop(); + } + } else { + QObject::timerEvent(event); + } +} + +bool SAPSocket::isReliable() const +{ + return _info.qosType() == SAPChannelInfo::QoSReliabilityEnable; +} + +bool SAPSocket::supportsFragmentation() const +{ + return _info.qosType() == SAPChannelInfo::QoSReliabilityEnable || + _info.qosType() == SAPChannelInfo::QoSReliabilityDisable; +} + +void SAPSocket::sendBlockAck(int seqNum) +{ + SAProtocol::ControlFrame frame; + frame.type = SAProtocol::ControlFrameBlockAck; + frame.seqNum = seqNum; + peer()->writeControlToSession(_sessionId, SAProtocol::packControlFrame(frame)); +} + +void SAPSocket::sendPacket(int seqNum, const QByteArray &data) +{ + SAProtocol::DataFrame frame; + frame.withSeqNum = isReliable(); + frame.seqNum = seqNum; + frame.withFragStatus = supportsFragmentation(); + frame.fragStatus = SAProtocol::FragmentNone; + frame.data = data; + peer()->writeDataToSession(_sessionId, SAProtocol::packDataFrame(frame)); +} + +void SAPSocket::handleBlockAck(int seqNum) +{ + qDebug() << _sessionId << "Received block ack for" << seqNum; + int n = seqNum - _outLastAck; + if (n < 0) n += std::numeric_limits::max(); + + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n; + + if (n <= 0) { + qWarning() << _sessionId << "repeated ack"; + return; + } + + if (n <= _out.size()) { + _out.erase(_out.begin(), _out.begin() + n); + _outLastAck += static_cast(n); + _outFlyingPkts -= n; + qDebug() << _sessionId << "new outlastack" << _outLastAck; + } + + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts; + + Q_ASSERT(_outFlyingPkts <= _out.size()); + + if (_outFlyingPkts == 0 || _out.size() == 0) { + qDebug() << _sessionId << "Stopping resend timer"; + _resendTimer.stop(); + } + + sendPacketsFromQueue(); +} + +void SAPSocket::sendPacketsFromQueue() +{ + const int n = qMin(WINDOW_SIZE_MSGS, _out.size()) - _outFlyingPkts; + + Q_ASSERT(n >= 0); + + for (int i = _outFlyingPkts; i < _outFlyingPkts + n; i++) { + int seqNum = _outLastAck + i + 1; + const QByteArray &pkt = _out.at(i); + qDebug() << _sessionId << "Sending packet" << seqNum << "size=" << pkt.size(); + sendPacket(seqNum, pkt); + } + + _outFlyingPkts += n; + + Q_ASSERT(_outFlyingPkts <= _out.size()); + + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n; + + if (n > 0 && !_resendTimer.isActive()) { + _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this); + } +} -- cgit v1.2.3