From c3a1946675855b299a2b36550cdf2c2f69d153aa Mon Sep 17 00:00:00 2001 From: "Javier S. Pedro" Date: Mon, 17 Sep 2012 23:03:03 +0200 Subject: initial import --- distfoldd/agent.cc | 337 +++++++++++++++++++++++++++++++++++++++++++++++ distfoldd/agent.h | 202 ++++++++++++++++++++++++++++ distfoldd/clientagent.cc | 174 ++++++++++++++++++++++++ distfoldd/clientagent.h | 37 ++++++ distfoldd/compressor.cc | 94 +++++++++++++ distfoldd/compressor.h | 16 +++ distfoldd/discoverer.cc | 195 +++++++++++++++++++++++++++ distfoldd/discoverer.h | 82 ++++++++++++ distfoldd/distfoldd.pro | 35 +++++ distfoldd/distfolder.cc | 200 ++++++++++++++++++++++++++++ distfoldd/distfolder.h | 57 ++++++++ distfoldd/main.cc | 28 ++++ distfoldd/server.cc | 56 ++++++++ distfoldd/server.h | 26 ++++ distfoldd/serveragent.cc | 225 +++++++++++++++++++++++++++++++ distfoldd/serveragent.h | 23 ++++ distfoldd/watcher.cc | 132 +++++++++++++++++++ distfoldd/watcher.h | 38 ++++++ 18 files changed, 1957 insertions(+) create mode 100644 distfoldd/agent.cc create mode 100644 distfoldd/agent.h create mode 100644 distfoldd/clientagent.cc create mode 100644 distfoldd/clientagent.h create mode 100644 distfoldd/compressor.cc create mode 100644 distfoldd/compressor.h create mode 100644 distfoldd/discoverer.cc create mode 100644 distfoldd/discoverer.h create mode 100644 distfoldd/distfoldd.pro create mode 100644 distfoldd/distfolder.cc create mode 100644 distfoldd/distfolder.h create mode 100644 distfoldd/main.cc create mode 100644 distfoldd/server.cc create mode 100644 distfoldd/server.h create mode 100644 distfoldd/serveragent.cc create mode 100644 distfoldd/serveragent.h create mode 100644 distfoldd/watcher.cc create mode 100644 distfoldd/watcher.h (limited to 'distfoldd') diff --git a/distfoldd/agent.cc b/distfoldd/agent.cc new file mode 100644 index 0000000..a718230 --- /dev/null +++ b/distfoldd/agent.cc @@ -0,0 +1,337 @@ +#include "agent.h" + +#ifdef Q_OS_UNIX +#include +#include +#endif + +Agent::Agent(QSslSocket *socket, const QDir& dir, SyncFlags flags, QObject *parent) : + QObject(parent), _dir(dir), _subPath("/"), _flags(flags), _socket(socket) +{ + connect(_socket, SIGNAL(readyRead()), SLOT(handleDataAvailable())); + connect(_socket, SIGNAL(sslErrors(QList)), SLOT(handleSslErrors(QList))); + connect(_socket, SIGNAL(disconnected()), SLOT(handleDisconnected())); +} + +Agent::RemoteFileInfo::RemoteFileInfo() : + _name(), _type(FILE_TYPE_NONE), _mtime(), _size(0) +{ +} + +Agent::RemoteFileInfo::RemoteFileInfo(const QString &name, FileType type, const QDateTime &mtime, qint64 size) : + _name(name), _type(type), _mtime(mtime), _size(size) +{ +} + +Agent::ActionInfo::ActionInfo(FileAction action, const QFileInfo &fileInfo) : + _action(action), _info(fileInfo) +{ +} + +Agent::RemoteActionInfo::RemoteActionInfo(FileAction action, const QString &name, FileType type, const QDateTime &mtime, qint64 size) : + _action(action), _info(name, type, mtime, size) +{ + +} + +void Agent::sendMessage(MessageType msg, const QByteArray &content) +{ + const int header_len = sizeof(MessageHeader); + QByteArray ba(header_len + content.size(), '\0'); + + MessageHeader *h = reinterpret_cast(ba.data()); + h->msg = msg; + h->len = content.size(); + if (h->len > 0) { + ba.replace(header_len, content.size(), content); + } + Q_ASSERT(ba.size() == header_len + content.size()); + + qDebug() << "Sending " << msg << content.size(); + + _socket->write(ba); +} + +int Agent::inBufRequiredData() const +{ + const int header_len = sizeof(MessageHeader); + if (_inBuf.size() < header_len) { + return header_len; + } else { + const MessageHeader *h; + h = reinterpret_cast(_inBuf.data()); + return header_len + h->len; + } +} + +bool Agent::equalDateTime(const QDateTime& dt1, const QDateTime& dt2) +{ + const qint64 threshold = 5 * 1000; + qint64 msecs = dt1.msecsTo(dt2); + return msecs > -threshold && msecs < threshold; +} + +void Agent::setLocalFileDateTime(const QString &path, const QDateTime &dt) +{ +#ifdef Q_OS_UNIX + const char *filename = path.toLocal8Bit().constData(); + struct utimbuf times; + times.actime = dt.toTime_t(); + times.modtime = dt.toTime_t(); + int rc = utime(filename, ×); + if (rc != 0) { + qWarning() << "Could not set local mtime of" << path; + } +#endif +} + +QString Agent::wireToLocalPath(const QString &path) +{ + return _dir.absolutePath() + _subPath + path; +} + +QString Agent::localToWirePath(const QString& path) +{ + const QString& basePath = _dir.absolutePath() + _subPath; + Q_ASSERT(_subPath.endsWith('/')); + QString wire_path(path); + if (path.startsWith(basePath)) { + wire_path.remove(0, basePath.size()); + } else if (path + '/' == basePath) { + wire_path = QString(); // This is the root dir. + } else { + qWarning() << "Where does this come from?" << path; + } + return wire_path; +} + +QString Agent::wireParentPath(const QString &path) +{ + int index = path.lastIndexOf('/'); + if (index == -1) return QString(""); + else return path.left(index); +} + +QString Agent::findExistingCommonAncestor(const QString& path, + const QHash& local_files, + const QHash& remote_files) +{ + QString ancestor = path; + do { + ancestor = wireParentPath(ancestor); + } while (!local_files.contains(ancestor) || !remote_files.contains(ancestor)); + return ancestor; +} + +bool Agent::lessPathDepthThan(const QString& s1, const QString& s2) +{ + int d1 = s1.count('/'), d2 = s2.count('/'); + return d1 < d2; +} + +bool Agent::morePathDepthThan(const QString& s1, const QString& s2) +{ + int d1 = s1.count('/'), d2 = s2.count('/'); + return d1 > d2; +} + +QFileInfoList Agent::scanFiles(const QDir& dir) +{ + QFileInfoList all; + QFileInfoList sub = dir.entryInfoList(QDir::Dirs | QDir::Files | QDir::NoDotAndDotDot); + + all << QFileInfo(dir, "."); + + foreach (const QFileInfo &info, sub) { + if (info.isDir()) { + all << scanFiles(QDir(info.absoluteFilePath())); + } else { + all << info; + } + } + + return all; +} + +QByteArray Agent::encodeFileInfoList(const QFileInfoList& list) +{ + QByteArray ba; + if (list.empty()) return ba; + + ba.reserve(list.size() * (sizeof(FileListItem) + 10)); + + foreach (const QFileInfo& info, list) { + QString path = localToWirePath(info.absoluteFilePath()); + FileListItem item; + item.size = info.size(); + + if (info.isDir() && _flags & SYNC_PULL) { + // If we are pulling, then simulate all of our local directories being old, + // so that we get all the new file creations + // Existing files should be compared with the proper mtimes though. + item.mtime = 0; + } else { + item.mtime = info.lastModified().toTime_t(); + } + + if (info.isDir()) { + item.type = FILE_TYPE_DIR; + } else if (info.isFile()) { + item.type = FILE_TYPE_FILE; + } else { + qWarning() << "What is this?" << path; + continue; + } + QByteArray utfPath = encodeFileName(path); + + item.name_len = utfPath.size(); + ba.append(reinterpret_cast(&item), sizeof(FileListItem)); + ba.append(utfPath.constData()); + } + + return ba; +} + +Agent::RemoteFileInfoList Agent::decodeFileInfoList(const QByteArray& ba) +{ + RemoteFileInfoList list; + + int pos = 0; + while (pos < ba.size()) { + const FileListItem *item; + item = reinterpret_cast(&(ba.data()[pos])); + + list.append(RemoteFileInfo(QString::fromUtf8(&item->name[0], + item->name_len), + static_cast(item->type), + QDateTime::fromTime_t(item->mtime), + item->size)); + + pos += sizeof(FileListItem) + item->name_len; + } + + return list; +} + +QByteArray Agent::encodeActionInfoList(const ActionInfoList &list) +{ + QByteArray ba; + + ba.reserve(list.size() * (sizeof(ActionItem) + 10)); + + foreach (const ActionInfo& info, list) { + const QFileInfo fileInfo = info.fileInfo(); + QString path = localToWirePath(fileInfo.absoluteFilePath()); + ActionItem item; + item.action = info.action(); + item.size = fileInfo.size(); + item.mtime = fileInfo.lastModified().toTime_t(); + + if (fileInfo.isDir()) { + item.type = FILE_TYPE_DIR; + } else if (fileInfo.isFile()) { + item.type = FILE_TYPE_FILE; + } else { + item.type = FILE_TYPE_NONE; // Actions might refer to no-longer existing files + } + + QByteArray utfPath = path.toUtf8(); + + item.name_len = utfPath.size(); + ba.append(reinterpret_cast(&item), sizeof(item)); + ba.append(utfPath.constData()); + } + + return ba; +} + +Agent::RemoteActionInfoList Agent::decodeActionInfoList(const QByteArray& ba) +{ + RemoteActionInfoList list; + + int pos = 0; + while (pos < ba.size()) { + const ActionItem *item; + item = reinterpret_cast(&(ba.data()[pos])); + + list.append(RemoteActionInfo(static_cast(item->action), + QString::fromUtf8(&item->name[0], + item->name_len), + static_cast(item->type), + QDateTime::fromTime_t(item->mtime), + item->size)); + + pos += sizeof(ActionItem) + item->name_len; + } + + return list; +} + +bool Agent::lessPathDepthThan(const ActionInfo& a1, const ActionInfo& a2) +{ + return lessPathDepthThan(a1.fileInfo().absoluteFilePath(), + a2.fileInfo().absoluteFilePath()); +} + +bool Agent::morePathDepthThan(const ActionInfo& a1, const ActionInfo& a2) +{ + return morePathDepthThan(a1.fileInfo().absoluteFilePath(), + a2.fileInfo().absoluteFilePath()); +} + +QByteArray Agent::encodeFileName(const QString& wire_path) +{ + return wire_path.toUtf8(); +} + +QString Agent::decodeFileName(const QByteArray& ba) +{ + return QString::fromUtf8(ba); +} + +QByteArray Agent::encodeFileNameItem(const QString& wire_path) +{ + QByteArray utf8 = encodeFileName(wire_path); + QByteArray ba(sizeof(FileNameItem), 0); + FileNameItem *item = reinterpret_cast(ba.data()); + item->name_len = utf8.length(); + ba.append(utf8); + return ba; +} + +QString Agent::decodeFileNameItem(const QByteArray& ba, int* length) +{ + const FileNameItem *item = reinterpret_cast(ba.data()); + *length = sizeof(FileNameItem) + item->name_len; + QString wire_path = decodeFileName(ba.mid(sizeof(FileNameItem), item->name_len)); + return wire_path; +} + +void Agent::handleDataAvailable() +{ + do { + _inBuf.append(_socket->readAll()); + + while (_inBuf.size() >= inBufRequiredData()) { + MessageHeader *h = reinterpret_cast(_inBuf.data()); + QByteArray data; + if (h->len > 0) { + data = _inBuf.mid(sizeof(MessageHeader), h->len); + } + handleMessage(static_cast(h->msg), data); + _inBuf.remove(0, inBufRequiredData()); + } + } while (_socket->bytesAvailable() > 0); +} + +void Agent::handleSslErrors(const QList &errors) +{ + qDebug() << "SSL errors:" << errors; + _socket->ignoreSslErrors(); // TODO For now +} + +void Agent::handleDisconnected() +{ + qDebug() << "Disconnected at" << QDateTime::currentDateTime(); + deleteLater(); +} diff --git a/distfoldd/agent.h b/distfoldd/agent.h new file mode 100644 index 0000000..fa202f5 --- /dev/null +++ b/distfoldd/agent.h @@ -0,0 +1,202 @@ +#ifndef AGENT_H +#define AGENT_H + +#include +#include +#include + +#include "compressor.h" + +class Agent : public QObject +{ + Q_OBJECT + Q_FLAGS(SyncFlag SyncFlags) + +public: + enum SyncFlag { + SYNC_NORMAL = 0, + SYNC_PULL = 0x1, + SYNC_READ_ONLY = 0x2, + SYNC_COMPRESS = 0x8 + }; + Q_DECLARE_FLAGS(SyncFlags, SyncFlag) + +public: + explicit Agent(QSslSocket *socket, const QDir& dir, SyncFlags flags, QObject *parent = 0); + + static const uint servicePort = 17451; + +signals: + void finished(); + +protected: +#pragma pack(push) +#pragma pack(1) + enum MessageType { + MSG_HELLO = 0, + MSG_HELLO_REPLY, + MSG_SET_SUBROOT, //args: string + MSG_FILE_LIST, //args: FileListItem[] + MSG_FILE_ACTIONS_REPLY, //args: ActionItem[] + MSG_PULL_FILE, //args: string + MSG_PULL_FILE_REPLY, //args: data + MSG_PUSH_FILE, //args: FileNameItem + data + MSG_PUSH_FILE_METADATA, //args: FileListItem[] + MSG_DELETE_FILE, //args: string + MSG_BYE + }; + struct MessageHeader { + quint32 msg; + quint32 len; + }; + enum FileType { + FILE_TYPE_NONE = 0, + FILE_TYPE_FILE, + FILE_TYPE_DIR + }; + struct FileListItem { + quint64 size; + quint32 mtime; + quint16 name_len; + quint8 type; + char name[]; + }; + enum FileAction { + ACTION_NONE = 0, + ACTION_PULL_METADATA, + ACTION_PUSH_METADATA, + ACTION_PULL, + ACTION_PUSH, + ACTION_PULL_DELETE, + ACTION_PUSH_DELETE + }; + struct ActionItem { + quint64 size; + quint32 mtime; + quint16 name_len; + quint8 type; + quint8 action; + char name[]; + }; + struct FileNameItem { + quint16 name_len; + char name[]; + }; +#pragma pack(pop) + +protected: + class RemoteFileInfo { + public: + RemoteFileInfo(); + RemoteFileInfo(const QString &name, FileType type, const QDateTime &mtime, qint64 size); + + inline qint64 size() const { + return _size; + } + + inline QDateTime lastModified() const { + return _mtime; + } + + inline QString name() const { + return _name; + } + + inline bool isDir() const { + return _type == FILE_TYPE_DIR; + } + + private: + QString _name; + FileType _type; + QDateTime _mtime; + qint64 _size; + }; + typedef QList RemoteFileInfoList; + + class ActionInfo { + public: + ActionInfo(FileAction action, const QFileInfo& fileInfo); + + inline FileAction action() const { + return _action; + } + + inline QFileInfo fileInfo() const { + return _info; + } + + private: + FileAction _action; + QFileInfo _info; + }; + typedef QList ActionInfoList; + + class RemoteActionInfo { + public: + RemoteActionInfo(FileAction action, const QString& name, FileType type, const QDateTime& mtime, qint64 size); + + inline FileAction action() const { + return _action; + } + + inline RemoteFileInfo fileInfo() const { + return _info; + } + + private: + FileAction _action; + RemoteFileInfo _info; + }; + typedef QList RemoteActionInfoList; + +protected: + void sendMessage(MessageType msg, const QByteArray& data = QByteArray()); + virtual void handleMessage(MessageType msg, const QByteArray& data) = 0; + + int inBufRequiredData() const; + + bool equalDateTime(const QDateTime& dt1, const QDateTime& dt2); + void setLocalFileDateTime(const QString &path, const QDateTime& dt); + + QString wireToLocalPath(const QString& path); + QString localToWirePath(const QString& path); + + QString wireParentPath(const QString& path); + QString findExistingCommonAncestor(const QString& path, + const QHash& local_files, + const QHash& remote_files); + static bool lessPathDepthThan(const QString& s1, const QString& s2); + static bool morePathDepthThan(const QString& s1, const QString& s2); + + static QFileInfoList scanFiles(const QDir& dir); + QByteArray encodeFileInfoList(const QFileInfoList& list); + RemoteFileInfoList decodeFileInfoList(const QByteArray& ba); + + QByteArray encodeActionInfoList(const ActionInfoList& list); + RemoteActionInfoList decodeActionInfoList(const QByteArray& ba); + static bool lessPathDepthThan(const ActionInfo& a1, const ActionInfo& a2); + static bool morePathDepthThan(const ActionInfo& a1, const ActionInfo& a2); + + QByteArray encodeFileName(const QString& wire_path); + QString decodeFileName(const QByteArray& ba); + + QByteArray encodeFileNameItem(const QString& wire_path); + QString decodeFileNameItem(const QByteArray& ba, int* length); + +protected: + QDir _dir; + QString _subPath; + SyncFlags _flags; + QSslSocket *_socket; + QByteArray _inBuf; + +private slots: + void handleDataAvailable(); + void handleSslErrors(const QList& errors); + void handleDisconnected(); +}; + +Q_DECLARE_OPERATORS_FOR_FLAGS(Agent::SyncFlags) + +#endif // AGENT_H diff --git a/distfoldd/clientagent.cc b/distfoldd/clientagent.cc new file mode 100644 index 0000000..f77a21a --- /dev/null +++ b/distfoldd/clientagent.cc @@ -0,0 +1,174 @@ +#include + +#include "clientagent.h" + +ClientAgent::ClientAgent(const QHostAddress& addr, uint port, const QDir& local_dir, SyncFlags flags, QObject *parent) : + Agent(new QSslSocket, local_dir, flags, parent) +{ + qDebug() << "Starting client agent at" << QDateTime::currentDateTime(); + _socket->setParent(this); // Can't set parent until QObject constructed + _socket->connectToHostEncrypted(addr.toString(), port); + sendMessage(MSG_HELLO); + _state = STATE_HELLO; +} + +void ClientAgent::handleMessage(MessageType msg, const QByteArray &data) +{ + qDebug() << "Client::handleMessage" << msg << data.size(); + switch (msg) { + case MSG_HELLO_REPLY: + Q_ASSERT(_state == STATE_HELLO); + qDebug() << "Hello reply"; + _state = STATE_FILE_LIST; + sendFileList(); + break; + case MSG_FILE_ACTIONS_REPLY: + Q_ASSERT(_state == STATE_FILE_LIST); + handleActionInfoList(decodeActionInfoList(data)); + break; + case MSG_PULL_FILE_REPLY: + Q_ASSERT(_state == STATE_FILE_ACTIONS && !_pendingActions.empty()); + handlePulledFile(data); + break; + default: + qWarning() << "Unknown message"; + break; + } +} + +void ClientAgent::sendFileList() +{ + QFileInfoList list = scanFiles(QDir(wireToLocalPath(_subPath))); + sendMessage(MSG_FILE_LIST, encodeFileInfoList(list)); +} + +void ClientAgent::handleActionInfoList(const RemoteActionInfoList &list) +{ + foreach (const RemoteActionInfo& info, list) { + switch (info.action()) { + case ACTION_NONE: + qDebug() << " = " << info.fileInfo().name(); + break; + case ACTION_PULL: + qDebug() << " < " << info.fileInfo().name(); + break; + case ACTION_PULL_METADATA: + qDebug() << " " << info.fileInfo().name(); + break; + case ACTION_PUSH_METADATA: + qDebug() << " >m" << info.fileInfo().name(); + break; + case ACTION_PUSH_DELETE: + qDebug() << " >d" << info.fileInfo().name(); + break; + } + } + + _pendingActions = list; + _state = STATE_FILE_ACTIONS; + executeNextAction(); +} + +void ClientAgent::executeNextAction() +{ + if (_pendingActions.empty()) { + qDebug() << "Done"; + emit finished(); + sendMessage(MSG_BYE); + _socket->flush(); + _socket->close(); + return; + } + + qDebug() << "Remaining actions" << _pendingActions.count(); + + const RemoteActionInfo& info = _pendingActions.first(); + const QString wire_path = info.fileInfo().name(); + const QString local_path = wireToLocalPath(wire_path); + switch (info.action()) { + case ACTION_PULL: + if (_flags & SYNC_READ_ONLY) break; + if (info.fileInfo().isDir()) { + if (!QDir().mkpath(local_path)) { + qWarning() << "Failed to create local path" << local_path; + } + } else { + sendMessage(MSG_PULL_FILE, encodeFileName(wire_path)); + return; // Wait for the pulled file message. + } + break; + case ACTION_PULL_METADATA: + if (_flags & SYNC_READ_ONLY) break; + setLocalFileDateTime(local_path, info.fileInfo().lastModified()); + break; + case ACTION_PULL_DELETE: + handleDeleteFile(wire_path); + break; + case ACTION_PUSH: + handlePushFile(wire_path); + break; + case ACTION_PUSH_METADATA: + sendMessage(MSG_PUSH_FILE_METADATA, + encodeFileInfoList(QFileInfoList() << QFileInfo(local_path))); + break; + case ACTION_PUSH_DELETE: + sendMessage(MSG_DELETE_FILE, encodeFileName(wire_path)); + break; + default: + qWarning() << "Unknown action" << info.action(); + break; + } + _pendingActions.removeFirst(); + executeNextAction(); +} + +void ClientAgent::handlePulledFile(const QByteArray &data) +{ + const RemoteActionInfo& info = _pendingActions.takeFirst(); + if (_flags & SYNC_READ_ONLY) return; + QString local_path(wireToLocalPath(info.fileInfo().name())); + QFile file(local_path); + if (file.open(QIODevice::WriteOnly | QIODevice::Truncate)) { + file.write(Compressor::decompress(data)); + file.close(); + setLocalFileDateTime(local_path, info.fileInfo().lastModified()); + } else { + qWarning() << "Failed to open" << file.fileName() << "for writing"; + } + executeNextAction(); +} + +void ClientAgent::handlePushFile(const QString &path) +{ + QFile file(wireToLocalPath(path)); + if (file.open(QIODevice::ReadOnly)) { + QByteArray file_data = file.readAll(); + if (_flags & SYNC_COMPRESS) file_data = Compressor::compress(file_data); + QByteArray ba = encodeFileNameItem(path) + file_data; + sendMessage(MSG_PUSH_FILE, ba); + } else { + qWarning() << "Failed to open file" << file.fileName() << "for reading"; + } +} + +void ClientAgent::handleDeleteFile(const QString &wire_path) +{ + if (_flags & SYNC_READ_ONLY) return; + QFileInfo local(wireToLocalPath(wire_path)); + QString local_path = local.absoluteFilePath(); + if (local.isDir()) { + if (!QDir().rmdir(local_path)) { + qWarning() << "Failed to remove local dir" << local_path; + } + } else { + if (!QDir().remove(local_path)) { + qWarning() << "Failed to remove local file" << local_path; + } + } +} diff --git a/distfoldd/clientagent.h b/distfoldd/clientagent.h new file mode 100644 index 0000000..d54bc69 --- /dev/null +++ b/distfoldd/clientagent.h @@ -0,0 +1,37 @@ +#ifndef CLIENTAGENT_H +#define CLIENTAGENT_H + +#include +#include + +#include "agent.h" + +class ClientAgent : public Agent +{ + Q_OBJECT +public: + explicit ClientAgent(const QHostAddress& addr, uint port, const QDir& local_dir, SyncFlags flags, QObject *parent = 0); + + enum State { + STATE_HELLO, + STATE_FILE_LIST, + STATE_FILE_ACTIONS + }; + +protected: + void handleMessage(MessageType msg, const QByteArray &data); + +private: + void sendFileList(); + void handleActionInfoList(const RemoteActionInfoList& list); + void executeNextAction(); + void handlePulledFile(const QByteArray& data); + void handlePushFile(const QString& wire_path); + void handleDeleteFile(const QString& wire_path); + +private: + State _state; + RemoteActionInfoList _pendingActions; +}; + +#endif // CLIENTAGENT_H diff --git a/distfoldd/compressor.cc b/distfoldd/compressor.cc new file mode 100644 index 0000000..98eaa92 --- /dev/null +++ b/distfoldd/compressor.cc @@ -0,0 +1,94 @@ +#include + +#include +#include "compressor.h" + +Compressor::Compressor() +{ +} + +QByteArray Compressor::compress(const QByteArray& data) +{ + if (data.isEmpty()) return data; + + QByteArray in = data, out; + z_stream strm; + int ret; + + memset(&strm, 0, sizeof(strm)); + out.resize(qMax(in.size(), 1024)); + + strm.avail_in = in.size(); + strm.next_in = reinterpret_cast(in.data()); + strm.avail_out = out.size(); + strm.next_out = reinterpret_cast(out.data()); + + ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION); + if (ret != Z_OK) { + qWarning() << "deflateInit failed"; + return data; + } + + do { + if (strm.avail_out == 0) { + int cur_size = out.size(); + out.resize(cur_size * 2); + strm.avail_out = cur_size; + strm.next_out = reinterpret_cast(&out.data()[cur_size]); + } + ret = deflate(&strm, Z_FINISH); + } while (ret == Z_OK || ret == Z_BUF_ERROR); + + Q_ASSERT(ret == Z_STREAM_END); + Q_ASSERT(strm.avail_in == 0); + if (strm.avail_out > 0) { + out.resize(out.size() - strm.avail_out); + } + + deflateEnd(&strm); + + return out; +} + +QByteArray Compressor::decompress(const QByteArray& data) +{ + if (data.isEmpty()) return data; + + QByteArray in = data, out; + z_stream strm; + int ret; + + memset(&strm, 0, sizeof(strm)); + out.resize(qMax(static_cast(in.size() * 1.5), 1024)); + + strm.avail_in = in.size(); + strm.next_in = reinterpret_cast(in.data()); + strm.avail_out = out.size(); + strm.next_out = reinterpret_cast(out.data()); + + ret = inflateInit(&strm); + if (ret != Z_OK) { + qWarning() << "inflateInit failed"; + return data; + } + + do { + if (strm.avail_out == 0) { + int cur_size = out.size(); + out.resize(cur_size * 2); + strm.avail_out = cur_size; + strm.next_out = reinterpret_cast(&out.data()[cur_size]); + } + ret = inflate(&strm, Z_FINISH); + } while (ret == Z_OK || ret == Z_BUF_ERROR); + + Q_ASSERT(ret == Z_STREAM_END); + Q_ASSERT(strm.avail_in == 0); + if (strm.avail_out > 0) { + out.resize(out.size() - strm.avail_out); + } + + inflateEnd(&strm); + + return out; +} diff --git a/distfoldd/compressor.h b/distfoldd/compressor.h new file mode 100644 index 0000000..6ab8e13 --- /dev/null +++ b/distfoldd/compressor.h @@ -0,0 +1,16 @@ +#ifndef COMPRESSOR_H +#define COMPRESSOR_H + +#include + +class Compressor +{ +private: + Compressor(); + +public: + static QByteArray compress(const QByteArray& data); + static QByteArray decompress(const QByteArray& data); +}; + +#endif // COMPRESSOR_H diff --git a/distfoldd/discoverer.cc b/distfoldd/discoverer.cc new file mode 100644 index 0000000..629a955 --- /dev/null +++ b/distfoldd/discoverer.cc @@ -0,0 +1,195 @@ +#include + +#include "discoverer.h" + +Discoverer::Discoverer(const QUuid &uuid, uint port, const QString &serviceName, QObject *parent) : + QObject(parent), _folderUuid(uuid), _hostUuid(QUuid::createUuid()), + _serviceName(serviceName), _port(port), + _mtime(), _mtimeChanged(false), + _receiver(new QUdpSocket(this)), _sender(new QUdpSocket(this)), + _netConfig(new QNetworkConfigurationManager(this)), + _netInfo(new QSystemNetworkInfo(this)), + _timer(new QSystemAlignedTimer(this)), _fallbackTimer(new QTimer(this)) +{ + connect(_receiver, SIGNAL(readyRead()), SLOT(handleDataAvailable())); + if (!_receiver->bind(servicePort, QUdpSocket::ShareAddress)) { + qWarning() << "Failed to bind to discoverer port" << servicePort; + } + connect(_netConfig, SIGNAL(onlineStateChanged(bool)), SLOT(handleOnlineStateChanged(bool))); + connect(_timer, SIGNAL(timeout()), SLOT(handleTimerTimeout())); + connect(_fallbackTimer, SIGNAL(timeout()), SLOT(handleTimerTimeout())); + connect(_timer, SIGNAL(error(QSystemAlignedTimer::AlignedTimerError)), SLOT(handleTimerError())); + _timer->setSingleShot(true); + _fallbackTimer->setSingleShot(true); + if (_netConfig->isOnline() || + _netConfig->allConfigurations(QNetworkConfiguration::Defined).empty()) { + // Either only, or _netConfig has no configs so it is useless. + qDebug() << "Start timer"; + _timer->start(0, activeBroadcastInterval); + if (_timer && _timer->lastError() != QSystemAlignedTimer::NoError) { + qDebug() << "QSystemAlignedTimer broken at start"; + delete _timer; + _timer = 0; + _fallbackTimer->start(activeBroadcastInterval * 1000); + } + } else { + qDebug() << "I am NOT online"; + } +} + +void Discoverer::setLastModifiedTime(const QDateTime& dateTime) +{ + _mtime = dateTime; + if (_knownHosts.empty()) { + // Idle mode: do nothing. + } else { + // Active mode + switchToActiveMode(); + } + _mtimeChanged = true; +} + +QByteArray Discoverer::encodeMessage() const +{ + QByteArray ba; + QByteArray hostUuid = _hostUuid.toString().toAscii(); + QByteArray folderUuid = _folderUuid.toString().toAscii(); + QByteArray serviceNameUtf8 = _serviceName.toUtf8(); + Message msg; + + Q_ASSERT(hostUuid.length() == sizeof(msg.hostUuid)); + Q_ASSERT(folderUuid.length() == sizeof(msg.folderUuid)); + + ba.reserve(sizeof(msg) + serviceNameUtf8.size()); + + msg.mtime = _mtime.toTime_t(); + strncpy(msg.hostUuid, hostUuid.data(), sizeof(msg.hostUuid)); + msg.port = _port; + strncpy(msg.folderUuid, folderUuid.data(), sizeof(msg.folderUuid)); + + ba.append(reinterpret_cast(&msg), sizeof(msg)); + ba.append(serviceNameUtf8); + return ba; +} + +void Discoverer::switchToActiveMode() +{ + if (_timer) { + if (!_timer->isActive() || _timer->maximumInterval() > activeBroadcastInterval) { + if (_timer->isActive()) _timer->wokeUp(); + _timer->start(0, activeBroadcastInterval); + } + } else if (!_fallbackTimer->isActive() || _fallbackTimer->interval() > activeBroadcastInterval * 1000) { + _fallbackTimer->start(activeBroadcastInterval * 1000); + } +} + +void Discoverer::broadcastMessage() +{ + _sender->writeDatagram(encodeMessage(), QHostAddress::Broadcast, servicePort); + _lastBroadcast = QDateTime::currentDateTime(); + _mtimeChanged = false; + qDebug() << "Broadcast message at" << _lastBroadcast; +} + +void Discoverer::handleDataAvailable() +{ + while (_receiver->hasPendingDatagrams()) { + QByteArray data; + QHostAddress sender; + data.resize(_receiver->pendingDatagramSize()); + _receiver->readDatagram(data.data(), data.size(), &sender); + + handleBroadcastMessage(sender, data); + } +} + +void Discoverer::handleBroadcastMessage(const QHostAddress& sender, const QByteArray &data) +{ + const char *dataPtr = data.data(); + const Message *msg = reinterpret_cast(dataPtr); + QString serviceName = QString::fromUtf8(&dataPtr[sizeof(Message)]); + char uuidData[sizeof(msg->hostUuid)]; + + strncpy(uuidData, msg->hostUuid, sizeof(uuidData)); + QUuid hostUuid(QString::fromAscii(uuidData, sizeof(uuidData))); + strncpy(uuidData, msg->folderUuid, sizeof(uuidData)); + QUuid folderUuid(QString::fromAscii(uuidData, sizeof(uuidData))); + + if (hostUuid.isNull() || folderUuid.isNull()) { + qWarning() << "Null uuid"; + return; + } + if (hostUuid == _hostUuid) { + // A message from myself, ignore + return; + } + if (folderUuid != _folderUuid) { + // A message not about the same folder, ignore + return; + } + + qDebug() << "Got message from" << sender << msg->port; + handleHostSeen(hostUuid); + + QDateTime mtime = QDateTime::fromTime_t(msg->mtime); + qDebug() << mtime << _mtime << _mtime.secsTo(mtime); + if (_mtime.secsTo(mtime) > dateTimeCompareMinDelta) { + // We are outdated, let's try to connect to them. + emit foundMoreRecentHost(sender, msg->port, mtime); + } +} + +void Discoverer::handleHostSeen(const QUuid &hostUuid) +{ + bool host_previously_seen = _knownHosts.contains(hostUuid); + _knownHosts[hostUuid].lastSeen = QDateTime::currentDateTime(); + if (!host_previously_seen || _mtimeChanged) { + switchToActiveMode(); + } +} + +void Discoverer::handleTimerTimeout() +{ + qDebug() << "Timer"; + broadcastMessage(); + QDateTime now = QDateTime::currentDateTime(); + QHash::iterator i = _knownHosts.begin(); + while (i != _knownHosts.end()) { + if (i.value().lastSeen.secsTo(now) > lostHostTimeout) { + i = _knownHosts.erase(i); + } else { + ++i; + } + } + qDebug() << "Known hosts" << _knownHosts.size(); + if (_timer) { + _timer->start(idleBroadcastInterval * 0.7f, idleBroadcastInterval * 1.3f); + } else { + _fallbackTimer->start(idleBroadcastInterval * 1000); + } +} + +void Discoverer::handleTimerError() +{ + qDebug() << "SystemAlignedTimer broken"; + _timer->deleteLater(); + _timer = 0; + _fallbackTimer->start(activeBroadcastInterval * 1000); +} + +void Discoverer::handleOnlineStateChanged(bool online) +{ + if (online) { + qDebug() << "Online"; + if (_netInfo->currentMode() == QSystemNetworkInfo::UnknownMode || + _netInfo->currentMode() == QSystemNetworkInfo::WlanMode) { + switchToActiveMode(); + } + } else { + qDebug() << "Offline"; + _knownHosts.clear(); + _fallbackTimer->stop(); + if (_timer) _timer->stop(); + } +} diff --git a/distfoldd/discoverer.h b/distfoldd/discoverer.h new file mode 100644 index 0000000..ae106de --- /dev/null +++ b/distfoldd/discoverer.h @@ -0,0 +1,82 @@ +#ifndef DISCOVERER_H +#define DISCOVERER_H + +#include +#include +#include +#include +#include +#include +#include +#include + +QTM_USE_NAMESPACE + +class Discoverer : public QObject +{ + Q_OBJECT + +public: + Discoverer(const QUuid& uuid, uint port, const QString& serviceName, QObject *parent = 0); + + static const uint servicePort = 17451; + static const int dateTimeCompareMinDelta = 5; + + // In seconds + static const int idleBroadcastInterval = 15 * 60; + static const int activeBroadcastInterval = 5; + static const int lostHostTimeout = idleBroadcastInterval * 1.4f; + +signals: + void foundMoreRecentHost(const QHostAddress& address, uint port, const QDateTime& dateTime); + +public slots: + void setLastModifiedTime(const QDateTime& dateTime); + +private: +#pragma pack(push) +#pragma pack(1) + struct Message { + quint32 mtime; + char hostUuid[38]; + quint16 port; + char folderUuid[38]; + char name[]; + }; +#pragma pack() + struct HostInfo { + QDateTime lastSeen; + }; + + QByteArray encodeMessage() const; + + void switchToActiveMode(); + +private slots: + void broadcastMessage(); + void handleDataAvailable(); + void handleBroadcastMessage(const QHostAddress& sender, const QByteArray &data); + void handleHostSeen(const QUuid& hostUuid); + void handleTimerTimeout(); + void handleTimerError(); + void handleOnlineStateChanged(bool online); + +private: + QUuid _folderUuid; + QUuid _hostUuid; + QString _serviceName; + uint _port; + QDateTime _mtime; + bool _mtimeChanged; + QUdpSocket *_receiver; + QUdpSocket *_sender; + quint32 _myId; + QNetworkConfigurationManager *_netConfig; + QSystemNetworkInfo *_netInfo; + QDateTime _lastBroadcast; + QSystemAlignedTimer *_timer; + QTimer *_fallbackTimer; + QHash _knownHosts; +}; + +#endif // DISCOVERER_H diff --git a/distfoldd/distfoldd.pro b/distfoldd/distfoldd.pro new file mode 100644 index 0000000..3eb804e --- /dev/null +++ b/distfoldd/distfoldd.pro @@ -0,0 +1,35 @@ +TARGET = distfoldd +TEMPLATE = app +CONFIG += console +CONFIG -= app_bundle + +QT += core network +QT -= gui + +CONFIG += mobility +MOBILITY += systeminfo + +SOURCES += main.cc \ + distfolder.cc \ + server.cc \ + watcher.cc \ + clientagent.cc \ + serveragent.cc \ + agent.cc \ + discoverer.cc \ + compressor.cc + +HEADERS += \ + distfolder.h \ + server.h \ + watcher.h \ + clientagent.h \ + serveragent.h \ + agent.h \ + discoverer.h \ + compressor.h + +contains(MEEGO_EDITION,harmattan) { + target.path = /opt/distfold/bin + INSTALLS += target +} diff --git a/distfoldd/distfolder.cc b/distfoldd/distfolder.cc new file mode 100644 index 0000000..211aafe --- /dev/null +++ b/distfoldd/distfolder.cc @@ -0,0 +1,200 @@ +#include + +#include "clientagent.h" +#include "serveragent.h" +#include "distfolder.h" + +DistFolder::DistFolder(const QUuid& uuid, const QString& localPath, QObject *parent) : + QObject(parent), _uuid(uuid), _localPath(localPath), + _mtime(), _mtimeChanged(false), + _watcher(new Watcher(localPath, this)), + _server(new Server(this)), + _discoverer(new Discoverer(uuid, _server->serverPort(), QString("Files on %1").arg(_localPath.canonicalPath()), this)), + _syncFlags(Agent::SYNC_NORMAL), + _numAgents(0) +{ + Q_ASSERT(_localPath.isReadable()); + connect(_watcher, SIGNAL(pathAdded(QString)), SLOT(handlePathAdded(QString))); + connect(_watcher, SIGNAL(pathChanged(QString)), SLOT(handlePathChanged(QString))); + connect(_watcher, SIGNAL(pathRemoved(QString)), SLOT(handlePathRemoved(QString))); + updateLastModTime(); + connect(_discoverer, SIGNAL(foundMoreRecentHost(QHostAddress,uint,QDateTime)), + SLOT(handleMoreRecentHost(QHostAddress,uint,QDateTime))); + connect(_server, SIGNAL(newConnection()), SLOT(handleNewConnection())); + qDebug() << "Ready Folder UUID:" << _uuid; +} + +bool DistFolder::readOnlySync() const +{ + return _syncFlags & Agent::SYNC_READ_ONLY; +} + +void DistFolder::setReadOnlySync(bool read_only) +{ + if (read_only) { + qDebug() << "Enabling read only sync"; + _syncFlags |= Agent::SYNC_READ_ONLY; + } else { + _syncFlags &= ~Agent::SYNC_READ_ONLY; + } +} + +bool DistFolder::pullMode() const +{ + return _syncFlags & Agent::SYNC_PULL; +} + +void DistFolder::setPullMode(bool pull_mode) +{ + if (pull_mode) { + qDebug() << "Enabling pull sync"; + _syncFlags |= Agent::SYNC_PULL; + } else { + _syncFlags &= ~Agent::SYNC_PULL; + } + updateLastModTime(); +} + +bool DistFolder::compress() const +{ + return _syncFlags & Agent::SYNC_COMPRESS; +} + +void DistFolder::setCompress(bool compress) +{ + if (compress) { + qDebug() << "Enabling compression"; + _syncFlags |= Agent::SYNC_COMPRESS; + } else { + _syncFlags &= ~Agent::SYNC_COMPRESS; + } +} + +QDateTime DistFolder::scanLastModTime() +{ + return scanLastModTime(_localPath); +} + +void DistFolder::updateLastModTime(const QDateTime& dt) +{ + QDateTime curMtime = _mtime; + if (pullMode()) { + _mtime = QDateTime::fromTime_t(0); // Force being the puller for the next sync + } else { + if (dt.isNull()) { + _mtime = scanLastModTime(); + } else if (dt > _mtime) { + _mtime = dt; + } + } + if (_mtime != curMtime) { + if (_numAgents > 0) { + _mtimeChanged = true; + } else { + _discoverer->setLastModifiedTime(_mtime); + _mtimeChanged = false; + } + } +} + +QDateTime DistFolder::scanLastModTime(const QDir& dir) +{ + QFileInfoList list = dir.entryInfoList(QDir::Dirs | QDir::Files | QDir::NoDotAndDotDot); + QDateTime max = QFileInfo(dir.absolutePath()).lastModified(); + + foreach (const QFileInfo &info, list) { + QDateTime mtime; + if (info.isDir()) { + mtime = scanLastModTime(QDir(info.absoluteFilePath())); + } else { + mtime = info.lastModified(); + } + if (mtime > max) { + max = mtime; + } + } + + return max; +} + +void DistFolder::handleNewConnection() +{ + qDebug() << "Incoming connection"; + ServerAgent *agent = + new ServerAgent(static_cast(_server->nextPendingConnection()), + _localPath, _syncFlags, this); + connect(agent, SIGNAL(destroyed()), SLOT(handleDestroyedAgent())); + _numAgents++; + qDebug() << "Num agents" << _numAgents; +} + +void DistFolder::handlePathAdded(const QString &path) +{ + QFileInfo info(path); + qDebug() << "added: " << path; + if (info.lastModified() > _mtime) { + updateLastModTime(info.lastModified()); + } +} + +void DistFolder::handlePathChanged(const QString &path) +{ + QFileInfo info(path); + qDebug() << "changed: " << path; + if (info.lastModified() > _mtime) { + updateLastModTime(info.lastModified()); + } +} + +void DistFolder::handlePathRemoved(const QString &path) +{ + QFileInfo info(path); + qDebug() << "removed: " << path; + // Find a parent that has not been removed + const QString base_path = _localPath.absolutePath(); + do { + info = QFileInfo(info.absolutePath()); + } while (!info.exists() && // Until it exists and as long as we are under the base dir. + info.absoluteFilePath().startsWith(base_path)); + if (info.lastModified() > _mtime) { + updateLastModTime(info.lastModified()); + } +} + +void DistFolder::handleMoreRecentHost(const QHostAddress &address, uint port, const QDateTime &dateTime) +{ + Q_UNUSED(dateTime); + if (_numAgents == 0) { + qDebug() << "Trying to connect to" << address.toString() << port; + ClientAgent *agent = new ClientAgent(address, port, _localPath, _syncFlags, this); + connect(agent, SIGNAL(destroyed()), SLOT(handleDestroyedAgent())); + connect(agent, SIGNAL(finished()), SLOT(handleClientFinished())); + _numAgents++; + qDebug() << "Num agents" << _numAgents; + } else { + qDebug() << "Busy but a more recent host was found"; + // TODO + } +} + +void DistFolder::handleClientFinished() +{ + if (pullMode()) { + // Stop pulling once a succesful sync is done + qDebug() << "Stopping pull mode"; + setPullMode(false); + } +} + +void DistFolder::handleDestroyedAgent() +{ + _numAgents--; + qDebug() << "Num agents" << _numAgents; + if (_numAgents == 0) { + // All sync agents have finished + if (_mtimeChanged) { + _discoverer->setLastModifiedTime(_mtime); + _mtimeChanged = false; + } + } +} diff --git a/distfoldd/distfolder.h b/distfoldd/distfolder.h new file mode 100644 index 0000000..ae7a0ea --- /dev/null +++ b/distfoldd/distfolder.h @@ -0,0 +1,57 @@ +#ifndef DISTFOLDER_H +#define DISTFOLDER_H + +#include +#include +#include +#include + +#include "watcher.h" +#include "server.h" +#include "agent.h" +#include "discoverer.h" + +class DistFolder : public QObject +{ + Q_OBJECT + Q_PROPERTY(bool readOnlySync READ readOnlySync WRITE setReadOnlySync) + Q_PROPERTY(bool pullMode READ pullMode WRITE setPullMode) + Q_PROPERTY(bool compress READ compress WRITE setCompress) + +public: + explicit DistFolder(const QUuid& uuid, const QString& path, QObject *parent = 0); + + bool readOnlySync() const; + void setReadOnlySync(bool read_only); + bool pullMode() const; + void setPullMode(bool pull_mode); + bool compress() const; + void setCompress(bool compress); + +private: + QDateTime scanLastModTime(); + QDateTime scanLastModTime(const QDir& dir); + void updateLastModTime(const QDateTime& dt = QDateTime()); + +private slots: + void handleNewConnection(); + void handlePathAdded(const QString& path); + void handlePathChanged(const QString& path); + void handlePathRemoved(const QString& path); + void handleMoreRecentHost(const QHostAddress& address, uint port, const QDateTime& dateTime); + void handleClientFinished(); + void handleDestroyedAgent(); + +private: + QUuid _uuid; + QDir _localPath; + QDateTime _mtime; + bool _mtimeChanged; + Watcher *_watcher; + Server *_server; + Discoverer *_discoverer; + Agent::SyncFlags _syncFlags; + int _numAgents; +}; + +#endif // DISTFOLDER_H diff --git a/distfoldd/main.cc b/distfoldd/main.cc new file mode 100644 index 0000000..fbecf5a --- /dev/null +++ b/distfoldd/main.cc @@ -0,0 +1,28 @@ +#include +#include +#include + +#include "distfolder.h" + +int main(int argc, char *argv[]) +{ + QCoreApplication a(argc, argv); + a.setOrganizationName("distfold"); + a.setOrganizationDomain("com.javispedro.distfold"); + a.setApplicationName("distfoldd"); + a.setApplicationVersion("0.1"); + + QSettings settings; + foreach (const QString& group, settings.childGroups()) { + settings.beginGroup(group); + QUuid uuid(settings.value("uuid").toString()); + QString path = settings.value("path").toString(); + DistFolder *f = new DistFolder(uuid, path); + f->setReadOnlySync(settings.value("ro", false).toBool()); + f->setPullMode(settings.value("pull", false).toBool()); + f->setCompress(settings.value("compress", true).toBool()); + settings.endGroup(); + } + + return a.exec(); +} diff --git a/distfoldd/server.cc b/distfoldd/server.cc new file mode 100644 index 0000000..0361466 --- /dev/null +++ b/distfoldd/server.cc @@ -0,0 +1,56 @@ +#include +#include +#include + +#include "server.h" + +Server::Server(QObject *parent) : + QTcpServer(parent) +{ + loadKeys(); + if (!listen()) { + qWarning() << "Failed to start server socket"; + } +} + +void Server::loadKeys() +{ + QDir config_dir(QDir::home().absoluteFilePath(".config/distfold")); + QFile cert_file(config_dir.absoluteFilePath("server.crt")); + if (cert_file.open(QIODevice::ReadOnly)) { + _cert = QSslCertificate(&cert_file, QSsl::Pem); + cert_file.close(); + } + if (_cert.isNull()) { + qWarning() << "Could not load server certificate"; + } + QFile key_file(config_dir.absoluteFilePath("server.key")); + if (key_file.open(QIODevice::ReadOnly)) { + _key = QSslKey(&key_file, QSsl::Rsa, QSsl::Pem); + key_file.close(); + } + if (_key.isNull()) { + qWarning() << "Could not load private key"; + } +} + +void Server::incomingConnection(int socketDescriptor) +{ + QSslSocket *socket = new QSslSocket(this); + connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), + SLOT(handleSocketError(QAbstractSocket::SocketError))); + if (socket->setSocketDescriptor(socketDescriptor)) { + socket->setLocalCertificate(_cert); + socket->setPrivateKey(_key); + socket->startServerEncryption(); + addPendingConnection(socket); + } else { + delete socket; + } +} + +void Server::handleSocketError(QAbstractSocket::SocketError error) +{ + QSslSocket *socket = qobject_cast(sender()); + qDebug() << "Server socket error:" << socket->error() << socket->errorString(); +} diff --git a/distfoldd/server.h b/distfoldd/server.h new file mode 100644 index 0000000..0ed9100 --- /dev/null +++ b/distfoldd/server.h @@ -0,0 +1,26 @@ +#ifndef SERVER_H +#define SERVER_H + +#include +#include +#include + +class Server : public QTcpServer +{ + Q_OBJECT +public: + explicit Server(QObject *parent = 0); + +protected: + void loadKeys(); + void incomingConnection(int handle); + +private slots: + void handleSocketError(QAbstractSocket::SocketError error); + +private: + QSslCertificate _cert; + QSslKey _key; +}; + +#endif // SERVER_H diff --git a/distfoldd/serveragent.cc b/distfoldd/serveragent.cc new file mode 100644 index 0000000..13e7848 --- /dev/null +++ b/distfoldd/serveragent.cc @@ -0,0 +1,225 @@ +#include + +#include "serveragent.h" + +ServerAgent::ServerAgent(QSslSocket *socket, const QDir& local_dir, SyncFlags flags, QObject *parent) : + Agent(socket, local_dir, flags, parent) +{ + qDebug() << "Starting server agent at" << QDateTime::currentDateTime(); +} + +void ServerAgent::handleMessage(MessageType msg, const QByteArray& data) +{ + qDebug() << "Server::handleMessage" << msg << data.size(); + switch (msg) { + case MSG_HELLO: + sendMessage(MSG_HELLO_REPLY); + break; + case MSG_FILE_LIST: + handleClientFileList(decodeFileInfoList(data)); + break; + case MSG_PULL_FILE: + handlePullFile(decodeFileName(data)); + break; + case MSG_PUSH_FILE: + handlePushedFile(data); + break; + case MSG_PUSH_FILE_METADATA: + handlePushedMetadata(decodeFileInfoList(data)); + break; + case MSG_DELETE_FILE: + handleDeleteFile(decodeFileName(data)); + break; + case MSG_BYE: + qDebug() << "Got Bye"; + emit finished(); + _socket->close(); + break; + default: + qWarning() << "Unknown message"; + break; + } +} + +void ServerAgent::handleClientFileList(const RemoteFileInfoList& list) +{ + QFileInfoList files = scanFiles(QDir(wireToLocalPath(_subPath))); + ActionInfoList actions_create_dirs, actions_with_files, actions_remove_dirs, actions_update_dirs; + QHash local_files; + QHash remote_files; + QSet new_local_files; + + foreach (const QFileInfo& file, files) { + const QString wire_path = localToWirePath(file.absoluteFilePath()); + local_files[wire_path] = file; + new_local_files.insert(wire_path); + } + foreach (const RemoteFileInfo& file, list) { + remote_files[file.name()] = file; + } + + foreach (const RemoteFileInfo& remote, list) { + // Main synchronization logic goes here + QString wire_path = remote.name(); + qDebug() << "Server Handling file" << wire_path; + new_local_files.remove(wire_path); + if (local_files.contains(wire_path)) { + // Both remote and local have the file + QFileInfo local = local_files[wire_path]; + QString parent_name = wireParentPath(wire_path); + QFileInfo parent_local = local_files[parent_name]; + RemoteFileInfo parent_remote = remote_files[parent_name]; + Q_ASSERT(local.exists()); + if (remote.isDir() && local.isDir()) { + if (equalDateTime(remote.lastModified(), local.lastModified())) { + if (local.lastModified().toTime_t() == 0) { + // Workaround an issue if the local folder is a mountpoint + actions_update_dirs += ActionInfo(ACTION_PULL_METADATA, local); + } + // Nothing to do + } else if (remote.lastModified() > local.lastModified()) { + actions_update_dirs += ActionInfo(ACTION_PUSH_METADATA, local); + } else { + actions_update_dirs += ActionInfo(ACTION_PULL_METADATA, local); + } + } else if (remote.isDir() == local.isDir() && + equalDateTime(remote.lastModified(), local.lastModified()) && + remote.size() == local.size()) { + // Nothing to do + } else if (remote.isDir() != local.isDir()) { + // Dir to file transformation, delete the most recent file + if (parent_remote.lastModified() > parent_local.lastModified()) { + if (local.isDir()) { + actions_remove_dirs += ActionInfo(ACTION_PUSH_DELETE, local); + } else { + actions_with_files += ActionInfo(ACTION_PUSH_DELETE, local); + } + } else { + if (remote.isDir()) { + actions_remove_dirs += ActionInfo(ACTION_PULL_DELETE, local); + } else { + actions_with_files += ActionInfo(ACTION_PULL_DELETE, local); + } + } + } else if (remote.lastModified() > local.lastModified()) { + if (remote.isDir()) { + actions_create_dirs += ActionInfo(ACTION_PUSH, local); + } else { + actions_with_files += ActionInfo(ACTION_PUSH, local); + } + } else { + if (remote.isDir()) { + actions_create_dirs += ActionInfo(ACTION_PULL, local); + } else { + actions_with_files += ActionInfo(ACTION_PULL, local); + } + } + } else { + // File is in remote, but not in local + qDebug() << " file deleted"; + QString parent_name = findExistingCommonAncestor(wire_path, local_files, remote_files); + QFileInfo local(wireToLocalPath(wire_path)); // Create invalid QFileInfo + QFileInfo parent_local = local_files[parent_name]; + RemoteFileInfo parent_remote = remote_files[parent_name]; + if (parent_remote.lastModified() > parent_local.lastModified()) { + if (remote.isDir()) { + actions_create_dirs += ActionInfo(ACTION_PUSH, local); + actions_update_dirs += ActionInfo(ACTION_PUSH_METADATA, local); + } else { + actions_with_files += ActionInfo(ACTION_PUSH, local); + } + } else { + if (remote.isDir()) { + actions_remove_dirs += ActionInfo(ACTION_PULL_DELETE, local); + } else { + actions_with_files += ActionInfo(ACTION_PULL_DELETE, local); + } + } + } + } + + foreach (const QString& path, new_local_files) { + // File is in local, but not in remote + QString parent_name = findExistingCommonAncestor(path, local_files, remote_files); + QFileInfo local = local_files[path]; + QFileInfo parent_local = local_files[parent_name]; + RemoteFileInfo parent_remote = remote_files[parent_name]; + if (parent_remote.lastModified() > parent_local.lastModified()) { + if (local.isDir()) { + actions_remove_dirs += ActionInfo(ACTION_PUSH_DELETE, local); + } else { + actions_with_files += ActionInfo(ACTION_PUSH_DELETE, local); + } + } else { + if (local.isDir()) { + actions_create_dirs += ActionInfo(ACTION_PULL, local); + actions_update_dirs += ActionInfo(ACTION_PULL_METADATA, local); + } else { + actions_with_files += ActionInfo(ACTION_PULL, local); + } + } + } + + qSort(actions_create_dirs.begin(), actions_create_dirs.end(), + static_cast(lessPathDepthThan)); + qSort(actions_remove_dirs.begin(), actions_remove_dirs.end(), + static_cast(morePathDepthThan)); + qSort(actions_update_dirs.begin(), actions_update_dirs.end(), + static_cast(morePathDepthThan)); + + ActionInfoList actions = actions_create_dirs + + actions_with_files + actions_remove_dirs + + actions_update_dirs; + sendMessage(MSG_FILE_ACTIONS_REPLY, encodeActionInfoList(actions)); +} + +void ServerAgent::handlePullFile(const QString &path) +{ + QFile file(wireToLocalPath(path)); + if (file.open(QIODevice::ReadOnly)) { + QByteArray ba = file.readAll(); + if (_flags & SYNC_COMPRESS) { + int old_size = ba.size(); + ba = Compressor::compress(ba); + qDebug() << "Compress" << old_size << "->" << ba.size(); + } + file.close(); + sendMessage(MSG_PULL_FILE_REPLY, ba); + } else { + qWarning() << "Failed to open file" << file.fileName() << "for reading"; + } +} + +void ServerAgent::handlePushedFile(const QByteArray &ba) +{ + if (_flags & SYNC_READ_ONLY) return; + int len; + QString wire_path = decodeFileNameItem(ba, &len); + QByteArray file_data = Compressor::decompress(ba.mid(len)); + + QFile file(wireToLocalPath(wire_path)); + if (file.open(QIODevice::WriteOnly | QIODevice::Truncate)) { + file.write(file_data); + file.close(); + } else { + qWarning() << "Failed to open file" << file.fileName() << "for writing"; + } +} + +void ServerAgent::handlePushedMetadata(const RemoteFileInfoList& list) +{ + if (_flags & SYNC_READ_ONLY) return; + foreach (const RemoteFileInfo& remote, list) { + QString local_path = wireToLocalPath(remote.name()); + setLocalFileDateTime(local_path, remote.lastModified()); + } +} + +void ServerAgent::handleDeleteFile(const QString &path) +{ + if (_flags & SYNC_READ_ONLY) return; + QString local_path = wireToLocalPath(path); + if (!QDir().remove(local_path)) { + qWarning() << "Failed to remove file" << local_path; + } +} diff --git a/distfoldd/serveragent.h b/distfoldd/serveragent.h new file mode 100644 index 0000000..c692e60 --- /dev/null +++ b/distfoldd/serveragent.h @@ -0,0 +1,23 @@ +#ifndef SERVERAGENT_H +#define SERVERAGENT_H + +#include "agent.h" + +class ServerAgent : public Agent +{ + Q_OBJECT +public: + explicit ServerAgent(QSslSocket *socket, const QDir& local_dir, SyncFlags flags, QObject *parent = 0); + +protected: + void handleMessage(MessageType msg, const QByteArray& data); + +private: + void handleClientFileList(const RemoteFileInfoList& list); + void handlePullFile(const QString& path); + void handlePushedFile(const QByteArray& data); + void handlePushedMetadata(const RemoteFileInfoList& list); + void handleDeleteFile(const QString& path); +}; + +#endif // SERVERAGENT_H diff --git a/distfoldd/watcher.cc b/distfoldd/watcher.cc new file mode 100644 index 0000000..664ea58 --- /dev/null +++ b/distfoldd/watcher.cc @@ -0,0 +1,132 @@ +#include + +#include +#include +#include + +#include "watcher.h" + + +Watcher::Watcher(const QString& path, QObject *parent) : + QObject(parent) +{ + _fd = inotify_init(); + fcntl(_fd, F_SETFL, O_NONBLOCK); + _buffer.resize(sizeof(struct inotify_event) + NAME_MAX + 1); + _mask = IN_CLOSE_WRITE | IN_CREATE | IN_DELETE | + IN_MOVED_FROM | IN_MOVED_TO | IN_ONLYDIR; + + _notifier = new QSocketNotifier(_fd, QSocketNotifier::Read, this); + connect(_notifier, SIGNAL(activated(int)), SLOT(readInotify())); + + QStringList l = scanDirs(QDir(path)); + foreach (const QString& s, l) { + addWatch(s); + } +} + +void Watcher::readInotify() +{ + inotify_event *event; + const ssize_t struct_size = sizeof(struct inotify_event); + ssize_t pos, nread; + + do { + nread = read(_fd, _buffer.data(), _buffer.size()); + if (nread < 0) { + int err = errno; + if (err == EWOULDBLOCK) return; + qWarning() << "Error while reading from inotify" << err; + } + + pos = 0; + while (pos + struct_size <= nread) { + event = reinterpret_cast(&(_buffer.data()[pos])); + if (event->wd == -1) { + // Special treatment + // TODO + continue; + } + + QString path = _watches[event->wd]; + QString name; + QString filePath = path; + if (event->len > 0) { + name = QString::fromLocal8Bit(event->name); + filePath += "/" + name; + } + + if (event->mask & (IN_CREATE|IN_MOVED_TO)) { + if (event->mask & IN_ISDIR) { + // TODO There might already be folders in here. + addWatch(filePath); + } + emit pathAdded(filePath); + } + if (event->mask & IN_CLOSE_WRITE) { + emit pathChanged(filePath); + } + if (event->mask & (IN_MOVED_FROM|IN_DELETE)) { + if (event->mask & IN_ISDIR) { + removeWatch(filePath); + } + emit pathRemoved(filePath); + } + + pos += struct_size + event->len; + } + } while (nread > 0); +} + +QStringList Watcher::scanDirs(const QDir &dir) +{ + Q_ASSERT(dir.isReadable()); + QStringList l(dir.absolutePath()); + QStringList sub_dirs = dir.entryList(QDir::Dirs | QDir::NoDotAndDotDot); + + foreach (const QString& s, sub_dirs) { + const QString abs_path = dir.absoluteFilePath(s); + l << scanDirs(QDir(abs_path)); + } + + return l; +} + +void Watcher::addWatch(const QString& path) +{ + int wd = inotify_add_watch(_fd, path.toLocal8Bit().constData(), _mask); + if (wd == -1) { + qWarning() << "Failed to add watch for" << path; + return; + } + if (_watches.contains(wd)) { + qWarning() << "Watch for" << path << "was already setup"; + return; + } + + qDebug() << "Watching" << path; + _watches[wd] = path; + _dirs[path] = wd; +} + +void Watcher::removeWatch(const QString& path) +{ + if (!_dirs.contains(path)) { + qWarning() << "Watch for" << path << "not found"; + return; + } + int wd = _dirs[path]; + removeWatch(wd); +} + +void Watcher::removeWatch(int wd) +{ + if (!_watches.contains(wd)) { + qWarning() << "Watch" << wd << "not found"; + } + QString path = _watches[wd]; + qDebug() << "Unwatching" << path; + inotify_rm_watch(_fd, wd); + _dirs.remove(path); + _watches.remove(wd); +} diff --git a/distfoldd/watcher.h b/distfoldd/watcher.h new file mode 100644 index 0000000..861248e --- /dev/null +++ b/distfoldd/watcher.h @@ -0,0 +1,38 @@ +#ifndef WATCHER_H +#define WATCHER_H + +#include +#include +#include +#include + +class Watcher : public QObject +{ + Q_OBJECT +public: + explicit Watcher(const QString& path, QObject *parent = 0); + +signals: + void pathAdded(const QString& path); + void pathRemoved(const QString& path); + void pathChanged(const QString& path); + +private slots: + void readInotify(); + +private: + QStringList scanDirs(const QDir& dir); + void addWatch(const QString& path); + void removeWatch(const QString& path); + void removeWatch(int wd); + +private: + int _fd; + QSocketNotifier *_notifier; + quint32 _mask; + QByteArray _buffer; + QMap _watches; + QHash _dirs; +}; + +#endif // WATCHER_H -- cgit v1.2.3