From 5abd8e0359cfa1dc2437427f2f0446d8801441cb Mon Sep 17 00:00:00 2001 From: Javier Date: Mon, 14 Dec 2015 22:07:50 +0100 Subject: initial ack controlflow implementation --- sapd.pro | 2 +- sappeer.cc | 42 +++++++++++++++++++++--------------------- sappeer.h | 4 +++- saprotocol.cc | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- saprotocol.h | 20 ++++++++++++++++++-- sapsocket.cc | 45 +++++++++++++++++++++++++++++++++++++-------- sapsocket.h | 14 ++++++++++++-- webproxyconn.cc | 35 ++++++++++++++++++++++++++++------- webproxyconn.h | 12 ++++++++++-- 9 files changed, 184 insertions(+), 46 deletions(-) diff --git a/sapd.pro b/sapd.pro index 829254f..f8a4f00 100644 --- a/sapd.pro +++ b/sapd.pro @@ -1,6 +1,6 @@ TARGET = sapd TEMPLATE = app -QT += core gui dbus bluetooth +QT += core gui dbus network bluetooth CONFIG += console c++11 CONFIG += link_pkgconfig diff --git a/sappeer.cc b/sappeer.cc index 4660522..1e5be61 100644 --- a/sappeer.cc +++ b/sappeer.cc @@ -72,8 +72,8 @@ SAPConnection * SAPPeer::createServiceConnection(const QString &profile, const Q _sessions.insert(sessionId, socket); } - writeToSession(SAProtocol::defaultSessionId, - SAProtocol::packServiceConnectionRequestFrame(request)); + writeDataToSession(SAProtocol::defaultSessionId, + SAProtocol::packServiceConnectionRequestFrame(request)); return conn; } @@ -98,22 +98,22 @@ CapabilityPeer *SAPPeer::capabilityPeer() return findChild(); } -bool SAPPeer::writeToSession(int session, const QByteArray &data) +void SAPPeer::writeDataToSession(int session, const QByteArray &data) { - if (session == SAProtocol::defaultSessionId) { - // Session is default (always open) or already open - sendFrame(SAProtocol::packFrame(session, data)); - return true; - } else { - SAPSocket *socket = _sessions.value(session, 0); - if (socket && socket->isOpen()) { - sendFrame(SAProtocol::packFrame(session, data)); - return true; - } else { - qWarning() << "Session" << session << "not open yet"; - return false; - } - } + sendFrame(SAProtocol::packFrame(session, data, SAProtocol::FrameData)); +} + +void SAPPeer::writeControlToSession(int session, const QByteArray &data) +{ + sendFrame(SAProtocol::packFrame(session, data, SAProtocol::FrameControl)); +} + +void SAPPeer::writeAckToSession(int session, int seqNum) +{ + SAProtocol::ControlFrame frame; + frame.type = SAProtocol::ControlFrameBlockAck; + frame.seqNum = seqNum; + writeControlToSession(session, SAProtocol::packControlFrame(frame)); } void SAPPeer::acceptServiceConnection(SAPConnectionRequest *connReq, int statusCode) @@ -133,8 +133,8 @@ void SAPPeer::acceptServiceConnection(SAPConnectionRequest *connReq, int statusC qDebug() << "Accepting service conection with status" << statusCode; - writeToSession(SAProtocol::defaultSessionId, - SAProtocol::packServiceConnectionResponseFrame(resp)); + writeDataToSession(SAProtocol::defaultSessionId, + SAProtocol::packServiceConnectionResponseFrame(resp)); if (statusCode == 0) { foreach (SAPSocket *socket, conn->sockets()) { @@ -243,8 +243,8 @@ void SAPPeer::handleDefaultSessionMessage(const QByteArray &message) resp.sessions.push_back(s.sessionId); } - writeToSession(SAProtocol::defaultSessionId, - SAProtocol::packServiceConnectionResponseFrame(resp)); + writeDataToSession(SAProtocol::defaultSessionId, + SAProtocol::packServiceConnectionResponseFrame(resp)); return; } diff --git a/sappeer.h b/sappeer.h index 5e565db..8ac81b7 100644 --- a/sappeer.h +++ b/sappeer.h @@ -37,7 +37,9 @@ protected: CapabilityPeer *capabilityPeer(); /** Writes data to the remote. */ - bool writeToSession(int session, const QByteArray &data); + void writeDataToSession(int session, const QByteArray &data); + void writeControlToSession(int session, const QByteArray &control); + void writeAckToSession(int session, int seqNum); void acceptServiceConnection(SAPConnectionRequest *connReq, int statusCode); diff --git a/saprotocol.cc b/saprotocol.cc index 22b3111..94594a1 100644 --- a/saprotocol.cc +++ b/saprotocol.cc @@ -219,11 +219,11 @@ QByteArray SAProtocol::packFrame(const Frame &frame) return data.prepend(reinterpret_cast(&header), 2); } -QByteArray SAProtocol::packFrame(quint16 sessionId, const QByteArray &data) +QByteArray SAProtocol::packFrame(quint16 sessionId, const QByteArray &data, FrameType type) { Frame frame; frame.protocolVersion = currentProtocolVersion; - frame.type = FrameData; + frame.type = type; frame.sessionId = sessionId; frame.data = data; return packFrame(frame); @@ -253,6 +253,58 @@ QByteArray SAProtocol::packDataFrame(const DataFrame &frame) return data; } +SAProtocol::ControlFrame SAProtocol::unpackControlFrame(const QByteArray &data) +{ + ControlFrame frame; + int offset = 0; + int num_ranges; + + frame.type = static_cast(read(data, offset)); + + switch (frame.type) { + case ControlFrameImmediateAck: + case ControlFrameBlockAck: + frame.seqNum = read(data, offset); + break; + case ControlFrameNak: + num_ranges = read(data, offset); + frame.seqNums.reserve(num_ranges); + for (int i = 0; i < num_ranges; i++) { + SeqNumRange r; + r.first = read(data, offset); + r.second = read(data, offset); + frame.seqNums.append(r); + } + break; + default: + qWarning() << "Unknown frame type"; + break; + } + + return frame; +} + +QByteArray SAProtocol::packControlFrame(const ControlFrame &frame) +{ + QByteArray data; + append(data, frame.type); + switch (frame.type) { + case ControlFrameImmediateAck: + case ControlFrameBlockAck: + Q_ASSERT(frame.seqNums.empty()); + append(data, frame.seqNum); + break; + case ControlFrameNak: + append(data, frame.seqNums.size()); + foreach (const SeqNumRange &r, frame.seqNums) { + append(data, r.first); + append(data, r.second); + } + break; + } + return data; +} + SAProtocol::ServiceConnectionRequestFrame SAProtocol::unpackServiceConnectionRequestFrame(const QByteArray &data) { ServiceConnectionRequestFrame frame; diff --git a/saprotocol.h b/saprotocol.h index 79c6bcd..8d78dc4 100644 --- a/saprotocol.h +++ b/saprotocol.h @@ -91,8 +91,7 @@ public: static Frame unpackFrame(const QByteArray &data); static QByteArray packFrame(const Frame& frame); - static QByteArray packFrame(quint16 sessionId, const QByteArray &data); - + static QByteArray packFrame(quint16 sessionId, const QByteArray &data, FrameType type = FrameData); struct DataFrame { bool withSeqNum; // (not actually present in frame) @@ -105,6 +104,23 @@ public: static DataFrame unpackDataFrame(const QByteArray &data, bool withSeqNum); static QByteArray packDataFrame(const DataFrame& frame); + enum ControlFrameType { + ControlFrameImmediateAck = 0, + ControlFrameBlockAck = 1, + ControlFrameNak = 2 + }; + + typedef QPair SeqNumRange; + + struct ControlFrame { + ControlFrameType type; + QList seqNums; // Used for Naks only + quint16 seqNum; + }; + + static ControlFrame unpackControlFrame(const QByteArray &data); + static QByteArray packControlFrame(const ControlFrame& frame); + /* Default session messages */ enum DefaultSessionMessageType { diff --git a/sapsocket.cc b/sapsocket.cc index 472865c..b41e8f5 100644 --- a/sapsocket.cc +++ b/sapsocket.cc @@ -1,12 +1,15 @@ #include +#include #include "sappeer.h" #include "sapconnection.h" #include "sapsocket.h" +#define DELAYED_ACK_TIME 1000 + SAPSocket::SAPSocket(SAPConnection *conn, int sessionId, const SAPChannelInfo &chanInfo) : QObject(conn), _sessionId(sessionId), _info(chanInfo), _open(false), - _seqNum(0), _expectedSeqNum(0) + _outLastSeqNum(0), _inLastSeqNum(0), _inLastAck(0) { } @@ -48,16 +51,22 @@ bool SAPSocket::send(const QByteArray &data) { SAProtocol::DataFrame frame; + if (!isOpen()) { + qWarning() << "Socket is not yet open"; + return false; + } + frame.withSeqNum = isWithSeqNum(); if (isReliable()) { - frame.seqNum = _seqNum++; + frame.seqNum = ++_outLastSeqNum; } else { frame.seqNum = 0; } frame.unk_1 = 0; frame.data = data; - return peer()->writeToSession(_sessionId, SAProtocol::packDataFrame(frame)); + peer()->writeDataToSession(_sessionId, SAProtocol::packDataFrame(frame)); + return true; } void SAPSocket::setOpen(bool open) @@ -71,14 +80,20 @@ void SAPSocket::acceptIncomingData(const QByteArray &data) SAProtocol::DataFrame frame = SAProtocol::unpackDataFrame(data, isWithSeqNum()); if (isReliable()) { - if (frame.seqNum != _expectedSeqNum) { + quint16 expectedSeqNum = _inLastSeqNum + 1; + if (frame.seqNum != expectedSeqNum) { qWarning() << "Unexpected sequence number" << frame.seqNum << "on session" << _sessionId - << "(expected " << _expectedSeqNum << ")"; - } - _expectedSeqNum = frame.seqNum + 1; + << "(expected " << expectedSeqNum << ")"; + } else { + _inLastSeqNum = frame.seqNum; + + qDebug() << "Realiable received" << _inLastSeqNum; - // TODO Do we actually need to ack this somehow? + if (!_timer.isActive()) { + _timer.start(DELAYED_ACK_TIME, Qt::CoarseTimer, this); + } + } } _in.enqueue(frame.data); @@ -91,6 +106,20 @@ int SAPSocket::sessionId() const return _sessionId; } +void SAPSocket::timerEvent(QTimerEvent *event) +{ + if (event->timerId() == _timer.timerId()) { + if (_inLastSeqNum != _inLastAck) { + qDebug() << "Acking" << _inLastAck << _inLastSeqNum; + peer()->writeAckToSession(_sessionId, _inLastSeqNum); + _inLastAck = _inLastSeqNum; + } + _timer.stop(); + } else { + QObject::timerEvent(event); + } +} + bool SAPSocket::isReliable() const { return _info.qosType() == SAPChannelInfo::QoSReliabilityEnable; diff --git a/sapsocket.h b/sapsocket.h index 38d83ea..a2a360a 100644 --- a/sapsocket.h +++ b/sapsocket.h @@ -3,6 +3,7 @@ #include #include +#include #include "sapchannelinfo.h" @@ -38,6 +39,8 @@ protected: int sessionId() const; + virtual void timerEvent(QTimerEvent *event) override; + private: bool isReliable() const; bool isWithSeqNum() const; @@ -47,8 +50,15 @@ private: const SAPChannelInfo _info; bool _open; QQueue _in; - quint16 _seqNum; - quint16 _expectedSeqNum; + QBasicTimer _timer; + + /** Outgoing sequence number */ + quint16 _outLastSeqNum; + + /** Next expected incoming sequence number */ + quint16 _inLastSeqNum; + /** Last acknowledged sequence number */ + quint16 _inLastAck; friend class SAPPeer; }; diff --git a/webproxyconn.cc b/webproxyconn.cc index ae75e1a..1eb4909 100644 --- a/webproxyconn.cc +++ b/webproxyconn.cc @@ -19,23 +19,44 @@ WebProxyConn::RequestMessage WebProxyConn::unpackRequestMessage(const QByteArray int offset = 0; msg.command = read(data, offset); msg.subCommand = read(data, offset); - msg.type = read(data, offset); + msg.type = static_cast(read(data, offset)); msg.transactionId = read(data, offset); const quint32 len = read(data, offset); + msg.payload = data.mid(offset, len); - qDebug() << "command=" << msg.command << "sub=" << msg.subCommand - << "type=" << msg.type << "transaction=" << msg.transactionId; + return msg; +} + +void WebProxyConn::handleStartTransaction(const RequestMessage &msg) +{ + QString req = QString::fromUtf8(msg.payload); + qDebug() << req; +} - qDebug() << QString::fromUtf8(data.mid(offset, len)); +void WebProxyConn::handleCancelTransaction(const RequestMessage &msg) +{ - return msg; } void WebProxyConn::handleMessageReceived() { QByteArray data = _in->receive(); - qDebug() << data.toHex(); RequestMessage req = unpackRequestMessage(data); - qDebug() << "End of data"; + + if (req.command != 1 || req.subCommand != 1) { + qWarning() << "Invalid command/subcommand: " << req.command << "/" << req.subCommand; + return; + } + + switch (req.type) { + case RequestStartTransaction: + handleStartTransaction(req); + break; + case RequestCancelTransaction: + handleCancelTransaction(req); + break; + default: + qWarning() << "Unknown request type" << req.type; + } } diff --git a/webproxyconn.h b/webproxyconn.h index 264e792..705428d 100644 --- a/webproxyconn.h +++ b/webproxyconn.h @@ -13,16 +13,24 @@ public: WebProxyConn(SAPConnection *conn, QObject *parent = 0); protected: + enum RequestMessageType { + RequestStartTransaction = 1, + RequestCancelTransaction = 4 + }; + struct RequestMessage { quint8 command; // Seems to be always 1 - quint8 subCommand; - quint8 type; + quint8 subCommand; // Seems to be always 1 + RequestMessageType type; quint8 transactionId; // Monotonically increasing QByteArray payload; }; static RequestMessage unpackRequestMessage(const QByteArray &data); + void handleStartTransaction(const RequestMessage &msg); + void handleCancelTransaction(const RequestMessage &msg); + private slots: void handleMessageReceived(); -- cgit v1.2.3