From 14d20babe395e52d220bbc27e91cec2fddd1ed0f Mon Sep 17 00:00:00 2001 From: Javier Date: Thu, 24 Dec 2015 19:04:04 +0100 Subject: still testing sap control flow --- sapbtlistener.cc | 2 +- sapbtpeer.cc | 11 ++++- sapbtpeer.h | 1 + sappeer.cc | 22 +++++---- sappeer.h | 5 +- sapsocket.cc | 140 +++++++++++++++++++++++++++++++++++++++++++++++-------- sapsocket.h | 13 ++++-- webproxyconn.cc | 8 +--- 8 files changed, 159 insertions(+), 43 deletions(-) diff --git a/sapbtlistener.cc b/sapbtlistener.cc index cd8c015..9776b59 100644 --- a/sapbtlistener.cc +++ b/sapbtlistener.cc @@ -146,7 +146,7 @@ void SAPBTListener::nudge(const QBluetoothAddress &address) // For some reason, using UUIDs here fails on SailfishOS socket->connectToService(address, 1); #else - socket->connectToService(address, SAProtocol::nudgeServiceUuid); + socket->connectToService(address, SAProtocol::nudgeServiceUuid); #endif } diff --git a/sapbtpeer.cc b/sapbtpeer.cc index bf3bddb..188c37b 100644 --- a/sapbtpeer.cc +++ b/sapbtpeer.cc @@ -5,7 +5,7 @@ #include "sapsocket.h" #include "sapbtpeer.h" -#define PROTO_DEBUG 1 +#define PROTO_DEBUG 0 SAPBTPeer::SAPBTPeer(SAProtocol::Role role, QBluetoothSocket *socket, QObject *parent) : SAPPeer(role, socket->localAddress().toString(), socket->peerAddress().toString(), parent), @@ -137,7 +137,7 @@ void SAPBTPeer::handleFrame(const QByteArray &data) handleDataFrame(frame); break; case SAProtocol::FrameControl: - qWarning() << "Got control frame, what to do?"; + handleControlFrame(frame); break; default: qWarning() << "Unknown frame type" << frame.type; @@ -153,6 +153,13 @@ void SAPBTPeer::handleDataFrame(const SAProtocol::Frame &frame) handleSessionData(frame.sessionId, frame.data); } +void SAPBTPeer::handleControlFrame(const SAProtocol::Frame &frame) +{ + Q_ASSERT(frame.type == SAProtocol::FrameControl); + + handleSessionControl(frame.sessionId, frame.data); +} + void SAPBTPeer::handleAuthenticationFrame(const QByteArray &data) { SAProtocol::SecurityFrame sframe = SAProtocol::unpackSecurityFrame(data); diff --git a/sapbtpeer.h b/sapbtpeer.h index 66f8cbb..75af77f 100644 --- a/sapbtpeer.h +++ b/sapbtpeer.h @@ -23,6 +23,7 @@ private slots: private: void handleFrame(const QByteArray &data); void handleDataFrame(const SAProtocol::Frame &frame); + void handleControlFrame(const SAProtocol::Frame &frame); void handleAuthenticationFrame(const QByteArray &data); private: diff --git a/sappeer.cc b/sappeer.cc index 960e515..7f570eb 100644 --- a/sappeer.cc +++ b/sappeer.cc @@ -132,7 +132,6 @@ CapabilityPeer *SAPPeer::capabilityPeer() void SAPPeer::writeDataToSession(int session, const QByteArray &data) { - qDebug() << "frame size" << data.size(); sendFrame(SAProtocol::packFrame(session, data, SAProtocol::FrameData)); } @@ -141,14 +140,6 @@ void SAPPeer::writeControlToSession(int session, const QByteArray &data) sendFrame(SAProtocol::packFrame(session, data, SAProtocol::FrameControl)); } -void SAPPeer::writeAckToSession(int session, int seqNum) -{ - SAProtocol::ControlFrame frame; - frame.type = SAProtocol::ControlFrameBlockAck; - frame.seqNum = seqNum; - writeControlToSession(session, SAProtocol::packControlFrame(frame)); -} - void SAPPeer::acceptServiceConnection(SAPConnectionRequest *connReq, int statusCode) { SAPConnection *conn = connReq->connection(); @@ -206,6 +197,17 @@ void SAPPeer::handleSessionData(int session, const QByteArray &data) } } +void SAPPeer::handleSessionControl(int session, const QByteArray &data) +{ + // Default session ID should not receive control messages, so we don't check for it + SAPSocket *socket = _sessions.value(session, 0); + if (socket && socket->isOpen()) { + socket->acceptIncomingControl(data); + } else { + qWarning() << "Got information for a session that's not yet open!" << session; + } +} + void SAPPeer::handleConnected() { emit connected(); @@ -301,6 +303,8 @@ void SAPPeer::handleDefaultSessionMessage(const QByteArray &message) _sessions.insert(s.sessionId, socket); } + _conns.insert(req.profile, conn); + SAPConnectionRequest *connReq = new SAPConnectionRequest(conn, req.initiatorId, req.acceptorId); agent->requestConnection(connReq); diff --git a/sappeer.h b/sappeer.h index abb36e4..ffddea4 100644 --- a/sappeer.h +++ b/sappeer.h @@ -39,14 +39,17 @@ protected: /** Writes data to the remote. */ void writeDataToSession(int session, const QByteArray &data); + /** Writes a control frame to the remote. */ void writeControlToSession(int session, const QByteArray &control); - void writeAckToSession(int session, int seqNum); void acceptServiceConnection(SAPConnectionRequest *connReq, int statusCode); /** Distributes data to the appropiate socket. */ void handleSessionData(int session, const QByteArray &data); + /** Distributes a control message to the apropiate socket. */ + void handleSessionControl(int session, const QByteArray &data); + /** Perform service discovery once connected. */ void handleConnected(); void handleDisconnected(); diff --git a/sapsocket.cc b/sapsocket.cc index e9a69ec..25e956f 100644 --- a/sapsocket.cc +++ b/sapsocket.cc @@ -1,16 +1,18 @@ #include #include +#include #include "sappeer.h" #include "sapconnection.h" #include "sapsocket.h" #define DELAYED_ACK_TIME 1000 +#define RESEND_TIME 10000 #define WINDOW_SIZE_MSGS 10 SAPSocket::SAPSocket(SAPConnection *conn, int sessionId, const SAPChannelInfo &chanInfo) : QObject(conn), _sessionId(sessionId), _info(chanInfo), _open(false), - _outLastSeqNum(0), _outLastAck(0), _inLastSeqNum(0), _inLastAck(0) + _outLastAck(0), _outFlyingPkts(0), _inLastSeqNum(0), _inLastAck(0) { } @@ -50,8 +52,6 @@ QByteArray SAPSocket::receive() bool SAPSocket::send(const QByteArray &data) { - SAProtocol::DataFrame frame; - if (!isOpen()) { qWarning() << "Socket is not yet open"; return false; @@ -62,21 +62,26 @@ bool SAPSocket::send(const QByteArray &data) return false; } - if (_out.size() > WINDOW_SIZE_MSGS) { - // Send buffer is not empty; enqueue - // TODO Realiability - } - - frame.withSeqNum = isWithSeqNum(); if (isReliable()) { - frame.seqNum = ++_outLastSeqNum; + int seqNum = (_outLastAck + _out.size() + 1) % std::numeric_limits::max(); + if (_outFlyingPkts >= WINDOW_SIZE_MSGS) { + // Send buffer is not empty; enqueue + qDebug() << _sessionId << "Enqueuing" << seqNum << " size=" << data.size(); + } else { + qDebug() << _sessionId << "Sending" << seqNum << " size=" << data.size(); + sendPacket(seqNum, data); + _outFlyingPkts++; + } + _out.append(data); + if (!_resendTimer.isActive()) { + _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this); + } + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts; } else { - frame.seqNum = 0; + // Just send the packet as is. + sendPacket(0, data); } - frame.unk_1 = 0; // Is this related to fragmentation? - frame.data = data; - peer()->writeDataToSession(_sessionId, SAProtocol::packDataFrame(frame)); return true; } @@ -87,7 +92,6 @@ void SAPSocket::setOpen(bool open) void SAPSocket::acceptIncomingData(const QByteArray &data) { - if (data.isEmpty()) return; SAProtocol::DataFrame frame = SAProtocol::unpackDataFrame(data, isWithSeqNum()); if (isReliable()) { @@ -98,9 +102,10 @@ void SAPSocket::acceptIncomingData(const QByteArray &data) << "(expected " << expectedSeqNum << ")"; } else { _inLastSeqNum = frame.seqNum; + qDebug() << _sessionId << "Got seqNum" << frame.seqNum; - if (!_timer.isActive()) { - _timer.start(DELAYED_ACK_TIME, Qt::CoarseTimer, this); + if (!_ackTimer.isActive()) { + _ackTimer.start(DELAYED_ACK_TIME, Qt::CoarseTimer, this); } } } @@ -110,6 +115,25 @@ void SAPSocket::acceptIncomingData(const QByteArray &data) emit messageReceived(); } +void SAPSocket::acceptIncomingControl(const QByteArray &data) +{ + SAProtocol::ControlFrame frame = SAProtocol::unpackControlFrame(data); + + if (!isReliable()) { + qWarning() << "Received a control frame but socket is not in QoS mode"; + return; + } + + switch (frame.type) { + case SAProtocol::ControlFrameBlockAck: + handleBlockAck(frame.seqNum); + break; + default: + qWarning() << "Unhandled control frame type" << frame.type; + break; + } +} + int SAPSocket::sessionId() const { return _sessionId; @@ -117,12 +141,20 @@ int SAPSocket::sessionId() const void SAPSocket::timerEvent(QTimerEvent *event) { - if (event->timerId() == _timer.timerId()) { + if (event->timerId() == _ackTimer.timerId()) { if (_inLastSeqNum != _inLastAck) { - peer()->writeAckToSession(_sessionId, _inLastSeqNum); + sendBlockAck(_inLastSeqNum); + _inLastAck = _inLastSeqNum; } - _timer.stop(); + _ackTimer.stop(); + } else if (event->timerId() == _resendTimer.timerId()) { + if (!_out.isEmpty()) { + _outFlyingPkts = 0; // Assume everything went wrong + sendPacketsFromQueue(); + } else { + _resendTimer.stop(); + } } else { QObject::timerEvent(event); } @@ -138,3 +170,71 @@ bool SAPSocket::isWithSeqNum() const return _info.qosType() == SAPChannelInfo::QoSReliabilityDisable || _info.qosType() == SAPChannelInfo::QoSReliabilityEnable; } + +void SAPSocket::sendBlockAck(int seqNum) +{ + SAProtocol::ControlFrame frame; + frame.type = SAProtocol::ControlFrameBlockAck; + frame.seqNum = seqNum; + peer()->writeControlToSession(_sessionId, SAProtocol::packControlFrame(frame)); +} + +void SAPSocket::sendPacket(int seqNum, const QByteArray &data) +{ + SAProtocol::DataFrame frame; + frame.withSeqNum = isWithSeqNum(); + frame.seqNum = seqNum; + frame.unk_1 = 0; // Is this related to fragmentation? + frame.data = data; + peer()->writeDataToSession(_sessionId, SAProtocol::packDataFrame(frame)); +} + +void SAPSocket::handleBlockAck(int seqNum) +{ + qDebug() << _sessionId << "Received block ack for" << seqNum; + int n = seqNum - _outLastAck; + if (n < 0) n += std::numeric_limits::max(); + + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n; + + if (n <= 0) { + qWarning() << _sessionId << "repeated ack"; + return; + } + + if (n <= _out.size()) { + _out.erase(_out.begin(), _out.begin() + n); + _outLastAck += static_cast(n); + _outFlyingPkts -= n; + qDebug() << _sessionId << "new outlastack" << _outLastAck; + } + + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts; + + if (_outFlyingPkts == 0 || _out.size() == 0) { + qDebug() << _sessionId << "Stopping resend timer"; + _resendTimer.stop(); + } + + sendPacketsFromQueue(); +} + +void SAPSocket::sendPacketsFromQueue() +{ + const int n = qMin(WINDOW_SIZE_MSGS - _outFlyingPkts, _out.size()); + + for (int i = 0; i < n; i++) { + int seqNum = _outLastAck + i + 1; + const QByteArray &pkt = _out.at(i); + qDebug() << _sessionId << "Sending packet" << seqNum << "size=" << pkt.size(); + sendPacket(seqNum, pkt); + } + + _outFlyingPkts += n; + + qDebug() << _sessionId << "pending" << _out.size() << " flying" << _outFlyingPkts << " n" << n; + + if (n > 0 && !_resendTimer.isActive()) { + _resendTimer.start(RESEND_TIME, Qt::CoarseTimer, this); + } +} diff --git a/sapsocket.h b/sapsocket.h index 0452e9d..ce3c98b 100644 --- a/sapsocket.h +++ b/sapsocket.h @@ -36,6 +36,7 @@ signals: protected: void setOpen(bool open); void acceptIncomingData(const QByteArray &data); + void acceptIncomingControl(const QByteArray &data); int sessionId() const; @@ -45,18 +46,24 @@ private: bool isReliable() const; bool isWithSeqNum() const; + void sendBlockAck(int seqNum); + void sendPacket(int seqNum, const QByteArray &data); + + void handleBlockAck(int seqNum); + void sendPacketsFromQueue(); + private: const int _sessionId; const SAPChannelInfo _info; bool _open; QQueue _in; QQueue _out; - QBasicTimer _timer; + QBasicTimer _ackTimer; + QBasicTimer _resendTimer; - /** Outgoing sequence number */ - quint16 _outLastSeqNum; /** Last acknowledged sent message. */ quint16 _outLastAck; + quint16 _outFlyingPkts; /** Next expected incoming sequence number */ quint16 _inLastSeqNum; diff --git a/webproxyconn.cc b/webproxyconn.cc index 8369417..fa4bc9d 100644 --- a/webproxyconn.cc +++ b/webproxyconn.cc @@ -78,9 +78,6 @@ QByteArray WebProxyConn::removeHeaders(const QByteArray &req) int offset = 0; int next; - qDebug() << req; - qDebug() << "--- end ---"; - while ((next = req.indexOf('\n', offset)) > 0) { QByteArray line = req.mid(offset, next - offset).trimmed(); if (line.isEmpty()) { @@ -110,7 +107,7 @@ void WebProxyConn::handleRequest(const Message &msg) connect(trans, SIGNAL(dataReceived(QByteArray)), this, SLOT(handleTransDataReceived(QByteArray))); connect(trans, SIGNAL(disconnected()), this, SLOT(handleTransDisconnected())); - // Discard request body if it was a CONNECT request. + // Discard request body, but if it was a CONNECT request. if (hdr.connect) { payload = removeHeaders(payload); } @@ -119,7 +116,6 @@ void WebProxyConn::handleRequest(const Message &msg) } if (!payload.isEmpty()) { - qDebug() << "Sending" << msg.transactionId << QString::fromLatin1(payload.mid(0, 30)) << "-"; trans->write(payload); } } @@ -168,8 +164,6 @@ void WebProxyConn::handleTransDataReceived(const QByteArray &data) msg.transactionId = trans->transactionId(); msg.payload = data; - qDebug() << "Receiving" << msg.transactionId << QString::fromLatin1(data.mid(0, 30)) << "-"; - sendMessage(msg); } -- cgit v1.2.3