summaryrefslogtreecommitdiff
path: root/sapsocket.cc
diff options
context:
space:
mode:
authorJavier <dev.git@javispedro.com>2015-12-24 19:04:04 +0100
committerJavier <dev.git@javispedro.com>2015-12-24 19:04:04 +0100
commit14d20babe395e52d220bbc27e91cec2fddd1ed0f (patch)
tree4cd71caf6b0b8a8e4be7d4d3fb1306536a726346 /sapsocket.cc
parent309947a4316a94f4c5d43c9b39b733cc3bbab459 (diff)
downloadsapd-14d20babe395e52d220bbc27e91cec2fddd1ed0f.tar.gz
sapd-14d20babe395e52d220bbc27e91cec2fddd1ed0f.zip
still testing sap control flow
Diffstat (limited to 'sapsocket.cc')
-rw-r--r--sapsocket.cc140
1 files changed, 120 insertions, 20 deletions
diff --git a/sapsocket.cc b/sapsocket.cc
index e9a69ec..25e956f 100644
--- a/sapsocket.cc
+++ b/sapsocket.cc
@@ -1,16 +1,18 @@
#include <QtCore/QDebug>
#include <QtCore/QTimerEvent>
+#include <limits>
#include "sappeer.h"
#include "sapconnection.h"
#include "sapsocket.h"
#define DELAYED_ACK_TIME 1000
+#define RESEND_TIME 10000
#define WINDOW_SIZE_MSGS 10
SAPSocket::SAPSocket(SAPConnection *conn, int sessionId, const SAPChannelInfo &chanInfo) :
QObject(conn), _sessionId(sessionId), _info(chanInfo), _open(false),
- _outLastSeqNum(0), _outLastAck(0), _inLastSeqNum(0), _inLastAck(0)
+ _outLastAck(0), _outFlyingPkts(0), _inLastSeqNum(0), _inLastAck(0)
{
}
@@ -50,8 +52,6 @@ QByteArray SAPSocket::receive()
bool SAPSocket::send(const QByteArray &data)
{
- SAProtocol::DataFrame frame;
-
if (!isOpen()) {
qWarning() << "Socket is not yet open";
return false;
@@ -62,21 +62,26 @@ bool SAPSocket::send(const QByteArray &data)
return false;
}
- if (_out.size() > WINDOW_SIZE_MSGS) {
- // Send buffer is not empty; enqueue
- // TODO Realiability
- }
-
- frame.withSeqNum = isWithSeqNum();
if (isReliable()) {
- frame.seqNum = ++_outLastSeqNum;
+ int seqNum = (_outLastAck + _out.size() + 1) % std::numeric_limits<quint16>::max();
+ if (_outFlyingPkts >= WINDOW_SIZE_MSGS) {
+ // Send buffer is not empty; enqueue
+ qDebug() << _sessionId << "Enqueuing" << seqNum << " size=" << data.size();
+ } else {
+ qDebug() << _sessionId << "Sending" << seqNum << " size=" << data.size();
+ sendPacket(seqNum, data);
+ _outFlyingPkts++;
+ }
+ _out.append(data);
+ if (!_resendTimer.isActive()) {
+ _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this);
+ }
+ qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts;
} else {
- frame.seqNum = 0;
+ // Just send the packet as is.
+ sendPacket(0, data);
}
- frame.unk_1 = 0; // Is this related to fragmentation?
- frame.data = data;
- peer()->writeDataToSession(_sessionId, SAProtocol::packDataFrame(frame));
return true;
}
@@ -87,7 +92,6 @@ void SAPSocket::setOpen(bool open)
void SAPSocket::acceptIncomingData(const QByteArray &data)
{
- if (data.isEmpty()) return;
SAProtocol::DataFrame frame = SAProtocol::unpackDataFrame(data, isWithSeqNum());
if (isReliable()) {
@@ -98,9 +102,10 @@ void SAPSocket::acceptIncomingData(const QByteArray &data)
<< "(expected " << expectedSeqNum << ")";
} else {
_inLastSeqNum = frame.seqNum;
+ qDebug() << _sessionId << "Got seqNum" << frame.seqNum;
- if (!_timer.isActive()) {
- _timer.start(DELAYED_ACK_TIME, Qt::CoarseTimer, this);
+ if (!_ackTimer.isActive()) {
+ _ackTimer.start(DELAYED_ACK_TIME, Qt::CoarseTimer, this);
}
}
}
@@ -110,6 +115,25 @@ void SAPSocket::acceptIncomingData(const QByteArray &data)
emit messageReceived();
}
+void SAPSocket::acceptIncomingControl(const QByteArray &data)
+{
+ SAProtocol::ControlFrame frame = SAProtocol::unpackControlFrame(data);
+
+ if (!isReliable()) {
+ qWarning() << "Received a control frame but socket is not in QoS mode";
+ return;
+ }
+
+ switch (frame.type) {
+ case SAProtocol::ControlFrameBlockAck:
+ handleBlockAck(frame.seqNum);
+ break;
+ default:
+ qWarning() << "Unhandled control frame type" << frame.type;
+ break;
+ }
+}
+
int SAPSocket::sessionId() const
{
return _sessionId;
@@ -117,12 +141,20 @@ int SAPSocket::sessionId() const
void SAPSocket::timerEvent(QTimerEvent *event)
{
- if (event->timerId() == _timer.timerId()) {
+ if (event->timerId() == _ackTimer.timerId()) {
if (_inLastSeqNum != _inLastAck) {
- peer()->writeAckToSession(_sessionId, _inLastSeqNum);
+ sendBlockAck(_inLastSeqNum);
+
_inLastAck = _inLastSeqNum;
}
- _timer.stop();
+ _ackTimer.stop();
+ } else if (event->timerId() == _resendTimer.timerId()) {
+ if (!_out.isEmpty()) {
+ _outFlyingPkts = 0; // Assume everything went wrong
+ sendPacketsFromQueue();
+ } else {
+ _resendTimer.stop();
+ }
} else {
QObject::timerEvent(event);
}
@@ -138,3 +170,71 @@ bool SAPSocket::isWithSeqNum() const
return _info.qosType() == SAPChannelInfo::QoSReliabilityDisable ||
_info.qosType() == SAPChannelInfo::QoSReliabilityEnable;
}
+
+void SAPSocket::sendBlockAck(int seqNum)
+{
+ SAProtocol::ControlFrame frame;
+ frame.type = SAProtocol::ControlFrameBlockAck;
+ frame.seqNum = seqNum;
+ peer()->writeControlToSession(_sessionId, SAProtocol::packControlFrame(frame));
+}
+
+void SAPSocket::sendPacket(int seqNum, const QByteArray &data)
+{
+ SAProtocol::DataFrame frame;
+ frame.withSeqNum = isWithSeqNum();
+ frame.seqNum = seqNum;
+ frame.unk_1 = 0; // Is this related to fragmentation?
+ frame.data = data;
+ peer()->writeDataToSession(_sessionId, SAProtocol::packDataFrame(frame));
+}
+
+void SAPSocket::handleBlockAck(int seqNum)
+{
+ qDebug() << _sessionId << "Received block ack for" << seqNum;
+ int n = seqNum - _outLastAck;
+ if (n < 0) n += std::numeric_limits<quint16>::max();
+
+ qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n;
+
+ if (n <= 0) {
+ qWarning() << _sessionId << "repeated ack";
+ return;
+ }
+
+ if (n <= _out.size()) {
+ _out.erase(_out.begin(), _out.begin() + n);
+ _outLastAck += static_cast<unsigned int>(n);
+ _outFlyingPkts -= n;
+ qDebug() << _sessionId << "new outlastack" << _outLastAck;
+ }
+
+ qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts;
+
+ if (_outFlyingPkts == 0 || _out.size() == 0) {
+ qDebug() << _sessionId << "Stopping resend timer";
+ _resendTimer.stop();
+ }
+
+ sendPacketsFromQueue();
+}
+
+void SAPSocket::sendPacketsFromQueue()
+{
+ const int n = qMin(WINDOW_SIZE_MSGS - _outFlyingPkts, _out.size());
+
+ for (int i = 0; i < n; i++) {
+ int seqNum = _outLastAck + i + 1;
+ const QByteArray &pkt = _out.at(i);
+ qDebug() << _sessionId << "Sending packet" << seqNum << "size=" << pkt.size();
+ sendPacket(seqNum, pkt);
+ }
+
+ _outFlyingPkts += n;
+
+ qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n;
+
+ if (n > 0 && !_resendTimer.isActive()) {
+ _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this);
+ }
+}