summaryrefslogtreecommitdiff
path: root/sap/sapsocket.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sap/sapsocket.cc')
-rw-r--r--sap/sapsocket.cc250
1 files changed, 250 insertions, 0 deletions
diff --git a/sap/sapsocket.cc b/sap/sapsocket.cc
new file mode 100644
index 0000000..390aef0
--- /dev/null
+++ b/sap/sapsocket.cc
@@ -0,0 +1,250 @@
+#include <QtCore/QDebug>
+#include <QtCore/QTimerEvent>
+#include <limits>
+
+#include "sappeer.h"
+#include "sapconnection.h"
+#include "sapsocket.h"
+
+#define DELAYED_ACK_TIME 1000
+#define RESEND_TIME 5000
+#define WINDOW_SIZE_MSGS 10
+
+SAPSocket::SAPSocket(SAPConnection *conn, int sessionId, const SAPChannelInfo &chanInfo) :
+ QObject(conn), _sessionId(sessionId), _info(chanInfo), _open(false),
+ _outLastAck(0), _outFlyingPkts(0), _inLastSeqNum(0), _inLastAck(0)
+{
+}
+
+SAPPeer * SAPSocket::peer()
+{
+ return connection()->peer();
+}
+
+SAPConnection * SAPSocket::connection()
+{
+ return static_cast<SAPConnection*>(parent());
+}
+
+SAPChannelInfo SAPSocket::channelInfo() const
+{
+ return _info;
+}
+
+bool SAPSocket::isOpen() const
+{
+ return _open;
+}
+
+bool SAPSocket::messageAvailable() const
+{
+ return !_in.empty();
+}
+
+QByteArray SAPSocket::receive()
+{
+ if (!_in.empty()) {
+ return _in.dequeue();
+ } else {
+ return QByteArray();
+ }
+}
+
+bool SAPSocket::send(const QByteArray &data)
+{
+ if (!isOpen()) {
+ qWarning() << "Socket is not yet open";
+ return false;
+ }
+
+ if (data.size() > 65000) {
+ qWarning() << "Fragmentation is not yet supported";
+ return false;
+ }
+
+ if (isReliable()) {
+ int seqNum = (_outLastAck + _out.size() + 1) % std::numeric_limits<quint16>::max();
+ if (_outFlyingPkts >= WINDOW_SIZE_MSGS) {
+ // Send buffer is not empty; enqueue
+ qDebug() << _sessionId << "Enqueuing" << seqNum << " size=" << data.size();
+ } else {
+ qDebug() << _sessionId << "Sending" << seqNum << " size=" << data.size();
+ sendPacket(seqNum, data);
+ _outFlyingPkts++;
+ }
+ _out.append(data);
+ if (!_resendTimer.isActive()) {
+ _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this);
+ }
+ qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts;
+ } else {
+ // Just send the packet as is.
+ sendPacket(0, data);
+ }
+
+ return true;
+}
+
+void SAPSocket::setOpen(bool open)
+{
+ _open = open;
+}
+
+void SAPSocket::acceptIncomingData(const QByteArray &data)
+{
+ SAProtocol::DataFrame frame = SAProtocol::unpackDataFrame(data,
+ isReliable(), supportsFragmentation());
+
+ if (isReliable()) {
+ quint16 expectedSeqNum = _inLastSeqNum + 1;
+ if (frame.seqNum != expectedSeqNum) {
+ qWarning() << "Unexpected sequence number" << frame.seqNum
+ << "on session" << _sessionId
+ << "(expected " << expectedSeqNum << ")";
+ } else {
+ _inLastSeqNum = frame.seqNum;
+ qDebug() << _sessionId << "Got seqNum" << frame.seqNum << " size=" << data.size();
+
+ if (!_ackTimer.isActive()) {
+ _ackTimer.start(DELAYED_ACK_TIME, Qt::CoarseTimer, this);
+ }
+ }
+ }
+
+ _in.enqueue(frame.data);
+
+ emit messageReceived();
+}
+
+void SAPSocket::acceptIncomingControl(const QByteArray &data)
+{
+ SAProtocol::ControlFrame frame = SAProtocol::unpackControlFrame(data);
+
+ if (!isReliable()) {
+ qWarning() << "Received a control frame but socket is not in QoS mode";
+ return;
+ }
+
+ switch (frame.type) {
+ case SAProtocol::ControlFrameBlockAck:
+ handleBlockAck(frame.seqNum);
+ break;
+ default:
+ qWarning() << "Unhandled control frame type" << frame.type;
+ break;
+ }
+}
+
+int SAPSocket::sessionId() const
+{
+ return _sessionId;
+}
+
+void SAPSocket::timerEvent(QTimerEvent *event)
+{
+ if (event->timerId() == _ackTimer.timerId()) {
+ qDebug() << "Ack timer tick";
+ if (_inLastSeqNum != _inLastAck) {
+ sendBlockAck(_inLastSeqNum);
+
+ _inLastAck = _inLastSeqNum;
+ }
+ _ackTimer.stop();
+ } else if (event->timerId() == _resendTimer.timerId()) {
+ qDebug() << "Resend timer tick";
+ if (!_out.isEmpty()) {
+ _outFlyingPkts = 0; // Assume everything went wrong
+ sendPacketsFromQueue();
+ } else {
+ _resendTimer.stop();
+ }
+ } else {
+ QObject::timerEvent(event);
+ }
+}
+
+bool SAPSocket::isReliable() const
+{
+ return _info.qosType() == SAPChannelInfo::QoSReliabilityEnable;
+}
+
+bool SAPSocket::supportsFragmentation() const
+{
+ return _info.qosType() == SAPChannelInfo::QoSReliabilityEnable ||
+ _info.qosType() == SAPChannelInfo::QoSReliabilityDisable;
+}
+
+void SAPSocket::sendBlockAck(int seqNum)
+{
+ SAProtocol::ControlFrame frame;
+ frame.type = SAProtocol::ControlFrameBlockAck;
+ frame.seqNum = seqNum;
+ peer()->writeControlToSession(_sessionId, SAProtocol::packControlFrame(frame));
+}
+
+void SAPSocket::sendPacket(int seqNum, const QByteArray &data)
+{
+ SAProtocol::DataFrame frame;
+ frame.withSeqNum = isReliable();
+ frame.seqNum = seqNum;
+ frame.withFragStatus = supportsFragmentation();
+ frame.fragStatus = SAProtocol::FragmentNone;
+ frame.data = data;
+ peer()->writeDataToSession(_sessionId, SAProtocol::packDataFrame(frame));
+}
+
+void SAPSocket::handleBlockAck(int seqNum)
+{
+ qDebug() << _sessionId << "Received block ack for" << seqNum;
+ int n = seqNum - _outLastAck;
+ if (n < 0) n += std::numeric_limits<quint16>::max();
+
+ qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n;
+
+ if (n <= 0) {
+ qWarning() << _sessionId << "repeated ack";
+ return;
+ }
+
+ if (n <= _out.size()) {
+ _out.erase(_out.begin(), _out.begin() + n);
+ _outLastAck += static_cast<unsigned int>(n);
+ _outFlyingPkts -= n;
+ qDebug() << _sessionId << "new outlastack" << _outLastAck;
+ }
+
+ qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts;
+
+ Q_ASSERT(_outFlyingPkts <= _out.size());
+
+ if (_outFlyingPkts == 0 || _out.size() == 0) {
+ qDebug() << _sessionId << "Stopping resend timer";
+ _resendTimer.stop();
+ }
+
+ sendPacketsFromQueue();
+}
+
+void SAPSocket::sendPacketsFromQueue()
+{
+ const int n = qMin(WINDOW_SIZE_MSGS, _out.size()) - _outFlyingPkts;
+
+ Q_ASSERT(n >= 0);
+
+ for (int i = _outFlyingPkts; i < _outFlyingPkts + n; i++) {
+ int seqNum = _outLastAck + i + 1;
+ const QByteArray &pkt = _out.at(i);
+ qDebug() << _sessionId << "Sending packet" << seqNum << "size=" << pkt.size();
+ sendPacket(seqNum, pkt);
+ }
+
+ _outFlyingPkts += n;
+
+ Q_ASSERT(_outFlyingPkts <= _out.size());
+
+ qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n;
+
+ if (n > 0 && !_resendTimer.isActive()) {
+ _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this);
+ }
+}