From a45977185a485624095bff1a15024e9199eee676 Mon Sep 17 00:00:00 2001 From: Javier Date: Fri, 1 Jan 2016 22:05:42 +0100 Subject: reorganize source files into SAP and agents --- sapsocket.cc | 250 ----------------------------------------------------------- 1 file changed, 250 deletions(-) delete mode 100644 sapsocket.cc (limited to 'sapsocket.cc') diff --git a/sapsocket.cc b/sapsocket.cc deleted file mode 100644 index 390aef0..0000000 --- a/sapsocket.cc +++ /dev/null @@ -1,250 +0,0 @@ -#include -#include -#include - -#include "sappeer.h" -#include "sapconnection.h" -#include "sapsocket.h" - -#define DELAYED_ACK_TIME 1000 -#define RESEND_TIME 5000 -#define WINDOW_SIZE_MSGS 10 - -SAPSocket::SAPSocket(SAPConnection *conn, int sessionId, const SAPChannelInfo &chanInfo) : - QObject(conn), _sessionId(sessionId), _info(chanInfo), _open(false), - _outLastAck(0), _outFlyingPkts(0), _inLastSeqNum(0), _inLastAck(0) -{ -} - -SAPPeer * SAPSocket::peer() -{ - return connection()->peer(); -} - -SAPConnection * SAPSocket::connection() -{ - return static_cast(parent()); -} - -SAPChannelInfo SAPSocket::channelInfo() const -{ - return _info; -} - -bool SAPSocket::isOpen() const -{ - return _open; -} - -bool SAPSocket::messageAvailable() const -{ - return !_in.empty(); -} - -QByteArray SAPSocket::receive() -{ - if (!_in.empty()) { - return _in.dequeue(); - } else { - return QByteArray(); - } -} - -bool SAPSocket::send(const QByteArray &data) -{ - if (!isOpen()) { - qWarning() << "Socket is not yet open"; - return false; - } - - if (data.size() > 65000) { - qWarning() << "Fragmentation is not yet supported"; - return false; - } - - if (isReliable()) { - int seqNum = (_outLastAck + _out.size() + 1) % std::numeric_limits::max(); - if (_outFlyingPkts >= WINDOW_SIZE_MSGS) { - // Send buffer is not empty; enqueue - qDebug() << _sessionId << "Enqueuing" << seqNum << " size=" << data.size(); - } else { - qDebug() << _sessionId << "Sending" << seqNum << " size=" << data.size(); - sendPacket(seqNum, data); - _outFlyingPkts++; - } - _out.append(data); - if (!_resendTimer.isActive()) { - _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this); - } - qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts; - } else { - // Just send the packet as is. - sendPacket(0, data); - } - - return true; -} - -void SAPSocket::setOpen(bool open) -{ - _open = open; -} - -void SAPSocket::acceptIncomingData(const QByteArray &data) -{ - SAProtocol::DataFrame frame = SAProtocol::unpackDataFrame(data, - isReliable(), supportsFragmentation()); - - if (isReliable()) { - quint16 expectedSeqNum = _inLastSeqNum + 1; - if (frame.seqNum != expectedSeqNum) { - qWarning() << "Unexpected sequence number" << frame.seqNum - << "on session" << _sessionId - << "(expected " << expectedSeqNum << ")"; - } else { - _inLastSeqNum = frame.seqNum; - qDebug() << _sessionId << "Got seqNum" << frame.seqNum << " size=" << data.size(); - - if (!_ackTimer.isActive()) { - _ackTimer.start(DELAYED_ACK_TIME, Qt::CoarseTimer, this); - } - } - } - - _in.enqueue(frame.data); - - emit messageReceived(); -} - -void SAPSocket::acceptIncomingControl(const QByteArray &data) -{ - SAProtocol::ControlFrame frame = SAProtocol::unpackControlFrame(data); - - if (!isReliable()) { - qWarning() << "Received a control frame but socket is not in QoS mode"; - return; - } - - switch (frame.type) { - case SAProtocol::ControlFrameBlockAck: - handleBlockAck(frame.seqNum); - break; - default: - qWarning() << "Unhandled control frame type" << frame.type; - break; - } -} - -int SAPSocket::sessionId() const -{ - return _sessionId; -} - -void SAPSocket::timerEvent(QTimerEvent *event) -{ - if (event->timerId() == _ackTimer.timerId()) { - qDebug() << "Ack timer tick"; - if (_inLastSeqNum != _inLastAck) { - sendBlockAck(_inLastSeqNum); - - _inLastAck = _inLastSeqNum; - } - _ackTimer.stop(); - } else if (event->timerId() == _resendTimer.timerId()) { - qDebug() << "Resend timer tick"; - if (!_out.isEmpty()) { - _outFlyingPkts = 0; // Assume everything went wrong - sendPacketsFromQueue(); - } else { - _resendTimer.stop(); - } - } else { - QObject::timerEvent(event); - } -} - -bool SAPSocket::isReliable() const -{ - return _info.qosType() == SAPChannelInfo::QoSReliabilityEnable; -} - -bool SAPSocket::supportsFragmentation() const -{ - return _info.qosType() == SAPChannelInfo::QoSReliabilityEnable || - _info.qosType() == SAPChannelInfo::QoSReliabilityDisable; -} - -void SAPSocket::sendBlockAck(int seqNum) -{ - SAProtocol::ControlFrame frame; - frame.type = SAProtocol::ControlFrameBlockAck; - frame.seqNum = seqNum; - peer()->writeControlToSession(_sessionId, SAProtocol::packControlFrame(frame)); -} - -void SAPSocket::sendPacket(int seqNum, const QByteArray &data) -{ - SAProtocol::DataFrame frame; - frame.withSeqNum = isReliable(); - frame.seqNum = seqNum; - frame.withFragStatus = supportsFragmentation(); - frame.fragStatus = SAProtocol::FragmentNone; - frame.data = data; - peer()->writeDataToSession(_sessionId, SAProtocol::packDataFrame(frame)); -} - -void SAPSocket::handleBlockAck(int seqNum) -{ - qDebug() << _sessionId << "Received block ack for" << seqNum; - int n = seqNum - _outLastAck; - if (n < 0) n += std::numeric_limits::max(); - - qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n; - - if (n <= 0) { - qWarning() << _sessionId << "repeated ack"; - return; - } - - if (n <= _out.size()) { - _out.erase(_out.begin(), _out.begin() + n); - _outLastAck += static_cast(n); - _outFlyingPkts -= n; - qDebug() << _sessionId << "new outlastack" << _outLastAck; - } - - qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts; - - Q_ASSERT(_outFlyingPkts <= _out.size()); - - if (_outFlyingPkts == 0 || _out.size() == 0) { - qDebug() << _sessionId << "Stopping resend timer"; - _resendTimer.stop(); - } - - sendPacketsFromQueue(); -} - -void SAPSocket::sendPacketsFromQueue() -{ - const int n = qMin(WINDOW_SIZE_MSGS, _out.size()) - _outFlyingPkts; - - Q_ASSERT(n >= 0); - - for (int i = _outFlyingPkts; i < _outFlyingPkts + n; i++) { - int seqNum = _outLastAck + i + 1; - const QByteArray &pkt = _out.at(i); - qDebug() << _sessionId << "Sending packet" << seqNum << "size=" << pkt.size(); - sendPacket(seqNum, pkt); - } - - _outFlyingPkts += n; - - Q_ASSERT(_outFlyingPkts <= _out.size()); - - qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n; - - if (n > 0 && !_resendTimer.isActive()) { - _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this); - } -} -- cgit v1.2.3