summaryrefslogtreecommitdiff
path: root/distfoldd
diff options
context:
space:
mode:
Diffstat (limited to 'distfoldd')
-rw-r--r--distfoldd/agent.cc337
-rw-r--r--distfoldd/agent.h202
-rw-r--r--distfoldd/clientagent.cc174
-rw-r--r--distfoldd/clientagent.h37
-rw-r--r--distfoldd/compressor.cc94
-rw-r--r--distfoldd/compressor.h16
-rw-r--r--distfoldd/discoverer.cc195
-rw-r--r--distfoldd/discoverer.h82
-rw-r--r--distfoldd/distfoldd.pro35
-rw-r--r--distfoldd/distfolder.cc200
-rw-r--r--distfoldd/distfolder.h57
-rw-r--r--distfoldd/main.cc28
-rw-r--r--distfoldd/server.cc56
-rw-r--r--distfoldd/server.h26
-rw-r--r--distfoldd/serveragent.cc225
-rw-r--r--distfoldd/serveragent.h23
-rw-r--r--distfoldd/watcher.cc132
-rw-r--r--distfoldd/watcher.h38
18 files changed, 1957 insertions, 0 deletions
diff --git a/distfoldd/agent.cc b/distfoldd/agent.cc
new file mode 100644
index 0000000..a718230
--- /dev/null
+++ b/distfoldd/agent.cc
@@ -0,0 +1,337 @@
+#include "agent.h"
+
+#ifdef Q_OS_UNIX
+#include <sys/time.h>
+#include <utime.h>
+#endif
+
+Agent::Agent(QSslSocket *socket, const QDir& dir, SyncFlags flags, QObject *parent) :
+ QObject(parent), _dir(dir), _subPath("/"), _flags(flags), _socket(socket)
+{
+ connect(_socket, SIGNAL(readyRead()), SLOT(handleDataAvailable()));
+ connect(_socket, SIGNAL(sslErrors(QList<QSslError>)), SLOT(handleSslErrors(QList<QSslError>)));
+ connect(_socket, SIGNAL(disconnected()), SLOT(handleDisconnected()));
+}
+
+Agent::RemoteFileInfo::RemoteFileInfo() :
+ _name(), _type(FILE_TYPE_NONE), _mtime(), _size(0)
+{
+}
+
+Agent::RemoteFileInfo::RemoteFileInfo(const QString &name, FileType type, const QDateTime &mtime, qint64 size) :
+ _name(name), _type(type), _mtime(mtime), _size(size)
+{
+}
+
+Agent::ActionInfo::ActionInfo(FileAction action, const QFileInfo &fileInfo) :
+ _action(action), _info(fileInfo)
+{
+}
+
+Agent::RemoteActionInfo::RemoteActionInfo(FileAction action, const QString &name, FileType type, const QDateTime &mtime, qint64 size) :
+ _action(action), _info(name, type, mtime, size)
+{
+
+}
+
+void Agent::sendMessage(MessageType msg, const QByteArray &content)
+{
+ const int header_len = sizeof(MessageHeader);
+ QByteArray ba(header_len + content.size(), '\0');
+
+ MessageHeader *h = reinterpret_cast<MessageHeader*>(ba.data());
+ h->msg = msg;
+ h->len = content.size();
+ if (h->len > 0) {
+ ba.replace(header_len, content.size(), content);
+ }
+ Q_ASSERT(ba.size() == header_len + content.size());
+
+ qDebug() << "Sending " << msg << content.size();
+
+ _socket->write(ba);
+}
+
+int Agent::inBufRequiredData() const
+{
+ const int header_len = sizeof(MessageHeader);
+ if (_inBuf.size() < header_len) {
+ return header_len;
+ } else {
+ const MessageHeader *h;
+ h = reinterpret_cast<const MessageHeader*>(_inBuf.data());
+ return header_len + h->len;
+ }
+}
+
+bool Agent::equalDateTime(const QDateTime& dt1, const QDateTime& dt2)
+{
+ const qint64 threshold = 5 * 1000;
+ qint64 msecs = dt1.msecsTo(dt2);
+ return msecs > -threshold && msecs < threshold;
+}
+
+void Agent::setLocalFileDateTime(const QString &path, const QDateTime &dt)
+{
+#ifdef Q_OS_UNIX
+ const char *filename = path.toLocal8Bit().constData();
+ struct utimbuf times;
+ times.actime = dt.toTime_t();
+ times.modtime = dt.toTime_t();
+ int rc = utime(filename, &times);
+ if (rc != 0) {
+ qWarning() << "Could not set local mtime of" << path;
+ }
+#endif
+}
+
+QString Agent::wireToLocalPath(const QString &path)
+{
+ return _dir.absolutePath() + _subPath + path;
+}
+
+QString Agent::localToWirePath(const QString& path)
+{
+ const QString& basePath = _dir.absolutePath() + _subPath;
+ Q_ASSERT(_subPath.endsWith('/'));
+ QString wire_path(path);
+ if (path.startsWith(basePath)) {
+ wire_path.remove(0, basePath.size());
+ } else if (path + '/' == basePath) {
+ wire_path = QString(); // This is the root dir.
+ } else {
+ qWarning() << "Where does this come from?" << path;
+ }
+ return wire_path;
+}
+
+QString Agent::wireParentPath(const QString &path)
+{
+ int index = path.lastIndexOf('/');
+ if (index == -1) return QString("");
+ else return path.left(index);
+}
+
+QString Agent::findExistingCommonAncestor(const QString& path,
+ const QHash<QString, QFileInfo>& local_files,
+ const QHash<QString, RemoteFileInfo>& remote_files)
+{
+ QString ancestor = path;
+ do {
+ ancestor = wireParentPath(ancestor);
+ } while (!local_files.contains(ancestor) || !remote_files.contains(ancestor));
+ return ancestor;
+}
+
+bool Agent::lessPathDepthThan(const QString& s1, const QString& s2)
+{
+ int d1 = s1.count('/'), d2 = s2.count('/');
+ return d1 < d2;
+}
+
+bool Agent::morePathDepthThan(const QString& s1, const QString& s2)
+{
+ int d1 = s1.count('/'), d2 = s2.count('/');
+ return d1 > d2;
+}
+
+QFileInfoList Agent::scanFiles(const QDir& dir)
+{
+ QFileInfoList all;
+ QFileInfoList sub = dir.entryInfoList(QDir::Dirs | QDir::Files | QDir::NoDotAndDotDot);
+
+ all << QFileInfo(dir, ".");
+
+ foreach (const QFileInfo &info, sub) {
+ if (info.isDir()) {
+ all << scanFiles(QDir(info.absoluteFilePath()));
+ } else {
+ all << info;
+ }
+ }
+
+ return all;
+}
+
+QByteArray Agent::encodeFileInfoList(const QFileInfoList& list)
+{
+ QByteArray ba;
+ if (list.empty()) return ba;
+
+ ba.reserve(list.size() * (sizeof(FileListItem) + 10));
+
+ foreach (const QFileInfo& info, list) {
+ QString path = localToWirePath(info.absoluteFilePath());
+ FileListItem item;
+ item.size = info.size();
+
+ if (info.isDir() && _flags & SYNC_PULL) {
+ // If we are pulling, then simulate all of our local directories being old,
+ // so that we get all the new file creations
+ // Existing files should be compared with the proper mtimes though.
+ item.mtime = 0;
+ } else {
+ item.mtime = info.lastModified().toTime_t();
+ }
+
+ if (info.isDir()) {
+ item.type = FILE_TYPE_DIR;
+ } else if (info.isFile()) {
+ item.type = FILE_TYPE_FILE;
+ } else {
+ qWarning() << "What is this?" << path;
+ continue;
+ }
+ QByteArray utfPath = encodeFileName(path);
+
+ item.name_len = utfPath.size();
+ ba.append(reinterpret_cast<char*>(&item), sizeof(FileListItem));
+ ba.append(utfPath.constData());
+ }
+
+ return ba;
+}
+
+Agent::RemoteFileInfoList Agent::decodeFileInfoList(const QByteArray& ba)
+{
+ RemoteFileInfoList list;
+
+ int pos = 0;
+ while (pos < ba.size()) {
+ const FileListItem *item;
+ item = reinterpret_cast<const FileListItem*>(&(ba.data()[pos]));
+
+ list.append(RemoteFileInfo(QString::fromUtf8(&item->name[0],
+ item->name_len),
+ static_cast<FileType>(item->type),
+ QDateTime::fromTime_t(item->mtime),
+ item->size));
+
+ pos += sizeof(FileListItem) + item->name_len;
+ }
+
+ return list;
+}
+
+QByteArray Agent::encodeActionInfoList(const ActionInfoList &list)
+{
+ QByteArray ba;
+
+ ba.reserve(list.size() * (sizeof(ActionItem) + 10));
+
+ foreach (const ActionInfo& info, list) {
+ const QFileInfo fileInfo = info.fileInfo();
+ QString path = localToWirePath(fileInfo.absoluteFilePath());
+ ActionItem item;
+ item.action = info.action();
+ item.size = fileInfo.size();
+ item.mtime = fileInfo.lastModified().toTime_t();
+
+ if (fileInfo.isDir()) {
+ item.type = FILE_TYPE_DIR;
+ } else if (fileInfo.isFile()) {
+ item.type = FILE_TYPE_FILE;
+ } else {
+ item.type = FILE_TYPE_NONE; // Actions might refer to no-longer existing files
+ }
+
+ QByteArray utfPath = path.toUtf8();
+
+ item.name_len = utfPath.size();
+ ba.append(reinterpret_cast<char*>(&item), sizeof(item));
+ ba.append(utfPath.constData());
+ }
+
+ return ba;
+}
+
+Agent::RemoteActionInfoList Agent::decodeActionInfoList(const QByteArray& ba)
+{
+ RemoteActionInfoList list;
+
+ int pos = 0;
+ while (pos < ba.size()) {
+ const ActionItem *item;
+ item = reinterpret_cast<const ActionItem*>(&(ba.data()[pos]));
+
+ list.append(RemoteActionInfo(static_cast<FileAction>(item->action),
+ QString::fromUtf8(&item->name[0],
+ item->name_len),
+ static_cast<FileType>(item->type),
+ QDateTime::fromTime_t(item->mtime),
+ item->size));
+
+ pos += sizeof(ActionItem) + item->name_len;
+ }
+
+ return list;
+}
+
+bool Agent::lessPathDepthThan(const ActionInfo& a1, const ActionInfo& a2)
+{
+ return lessPathDepthThan(a1.fileInfo().absoluteFilePath(),
+ a2.fileInfo().absoluteFilePath());
+}
+
+bool Agent::morePathDepthThan(const ActionInfo& a1, const ActionInfo& a2)
+{
+ return morePathDepthThan(a1.fileInfo().absoluteFilePath(),
+ a2.fileInfo().absoluteFilePath());
+}
+
+QByteArray Agent::encodeFileName(const QString& wire_path)
+{
+ return wire_path.toUtf8();
+}
+
+QString Agent::decodeFileName(const QByteArray& ba)
+{
+ return QString::fromUtf8(ba);
+}
+
+QByteArray Agent::encodeFileNameItem(const QString& wire_path)
+{
+ QByteArray utf8 = encodeFileName(wire_path);
+ QByteArray ba(sizeof(FileNameItem), 0);
+ FileNameItem *item = reinterpret_cast<FileNameItem*>(ba.data());
+ item->name_len = utf8.length();
+ ba.append(utf8);
+ return ba;
+}
+
+QString Agent::decodeFileNameItem(const QByteArray& ba, int* length)
+{
+ const FileNameItem *item = reinterpret_cast<const FileNameItem*>(ba.data());
+ *length = sizeof(FileNameItem) + item->name_len;
+ QString wire_path = decodeFileName(ba.mid(sizeof(FileNameItem), item->name_len));
+ return wire_path;
+}
+
+void Agent::handleDataAvailable()
+{
+ do {
+ _inBuf.append(_socket->readAll());
+
+ while (_inBuf.size() >= inBufRequiredData()) {
+ MessageHeader *h = reinterpret_cast<MessageHeader*>(_inBuf.data());
+ QByteArray data;
+ if (h->len > 0) {
+ data = _inBuf.mid(sizeof(MessageHeader), h->len);
+ }
+ handleMessage(static_cast<MessageType>(h->msg), data);
+ _inBuf.remove(0, inBufRequiredData());
+ }
+ } while (_socket->bytesAvailable() > 0);
+}
+
+void Agent::handleSslErrors(const QList<QSslError> &errors)
+{
+ qDebug() << "SSL errors:" << errors;
+ _socket->ignoreSslErrors(); // TODO For now
+}
+
+void Agent::handleDisconnected()
+{
+ qDebug() << "Disconnected at" << QDateTime::currentDateTime();
+ deleteLater();
+}
diff --git a/distfoldd/agent.h b/distfoldd/agent.h
new file mode 100644
index 0000000..fa202f5
--- /dev/null
+++ b/distfoldd/agent.h
@@ -0,0 +1,202 @@
+#ifndef AGENT_H
+#define AGENT_H
+
+#include <QtCore/QDateTime>
+#include <QtCore/QDir>
+#include <QtNetwork/QSslSocket>
+
+#include "compressor.h"
+
+class Agent : public QObject
+{
+ Q_OBJECT
+ Q_FLAGS(SyncFlag SyncFlags)
+
+public:
+ enum SyncFlag {
+ SYNC_NORMAL = 0,
+ SYNC_PULL = 0x1,
+ SYNC_READ_ONLY = 0x2,
+ SYNC_COMPRESS = 0x8
+ };
+ Q_DECLARE_FLAGS(SyncFlags, SyncFlag)
+
+public:
+ explicit Agent(QSslSocket *socket, const QDir& dir, SyncFlags flags, QObject *parent = 0);
+
+ static const uint servicePort = 17451;
+
+signals:
+ void finished();
+
+protected:
+#pragma pack(push)
+#pragma pack(1)
+ enum MessageType {
+ MSG_HELLO = 0,
+ MSG_HELLO_REPLY,
+ MSG_SET_SUBROOT, //args: string
+ MSG_FILE_LIST, //args: FileListItem[]
+ MSG_FILE_ACTIONS_REPLY, //args: ActionItem[]
+ MSG_PULL_FILE, //args: string
+ MSG_PULL_FILE_REPLY, //args: data
+ MSG_PUSH_FILE, //args: FileNameItem + data
+ MSG_PUSH_FILE_METADATA, //args: FileListItem[]
+ MSG_DELETE_FILE, //args: string
+ MSG_BYE
+ };
+ struct MessageHeader {
+ quint32 msg;
+ quint32 len;
+ };
+ enum FileType {
+ FILE_TYPE_NONE = 0,
+ FILE_TYPE_FILE,
+ FILE_TYPE_DIR
+ };
+ struct FileListItem {
+ quint64 size;
+ quint32 mtime;
+ quint16 name_len;
+ quint8 type;
+ char name[];
+ };
+ enum FileAction {
+ ACTION_NONE = 0,
+ ACTION_PULL_METADATA,
+ ACTION_PUSH_METADATA,
+ ACTION_PULL,
+ ACTION_PUSH,
+ ACTION_PULL_DELETE,
+ ACTION_PUSH_DELETE
+ };
+ struct ActionItem {
+ quint64 size;
+ quint32 mtime;
+ quint16 name_len;
+ quint8 type;
+ quint8 action;
+ char name[];
+ };
+ struct FileNameItem {
+ quint16 name_len;
+ char name[];
+ };
+#pragma pack(pop)
+
+protected:
+ class RemoteFileInfo {
+ public:
+ RemoteFileInfo();
+ RemoteFileInfo(const QString &name, FileType type, const QDateTime &mtime, qint64 size);
+
+ inline qint64 size() const {
+ return _size;
+ }
+
+ inline QDateTime lastModified() const {
+ return _mtime;
+ }
+
+ inline QString name() const {
+ return _name;
+ }
+
+ inline bool isDir() const {
+ return _type == FILE_TYPE_DIR;
+ }
+
+ private:
+ QString _name;
+ FileType _type;
+ QDateTime _mtime;
+ qint64 _size;
+ };
+ typedef QList<RemoteFileInfo> RemoteFileInfoList;
+
+ class ActionInfo {
+ public:
+ ActionInfo(FileAction action, const QFileInfo& fileInfo);
+
+ inline FileAction action() const {
+ return _action;
+ }
+
+ inline QFileInfo fileInfo() const {
+ return _info;
+ }
+
+ private:
+ FileAction _action;
+ QFileInfo _info;
+ };
+ typedef QList<ActionInfo> ActionInfoList;
+
+ class RemoteActionInfo {
+ public:
+ RemoteActionInfo(FileAction action, const QString& name, FileType type, const QDateTime& mtime, qint64 size);
+
+ inline FileAction action() const {
+ return _action;
+ }
+
+ inline RemoteFileInfo fileInfo() const {
+ return _info;
+ }
+
+ private:
+ FileAction _action;
+ RemoteFileInfo _info;
+ };
+ typedef QList<RemoteActionInfo> RemoteActionInfoList;
+
+protected:
+ void sendMessage(MessageType msg, const QByteArray& data = QByteArray());
+ virtual void handleMessage(MessageType msg, const QByteArray& data) = 0;
+
+ int inBufRequiredData() const;
+
+ bool equalDateTime(const QDateTime& dt1, const QDateTime& dt2);
+ void setLocalFileDateTime(const QString &path, const QDateTime& dt);
+
+ QString wireToLocalPath(const QString& path);
+ QString localToWirePath(const QString& path);
+
+ QString wireParentPath(const QString& path);
+ QString findExistingCommonAncestor(const QString& path,
+ const QHash<QString, QFileInfo>& local_files,
+ const QHash<QString, RemoteFileInfo>& remote_files);
+ static bool lessPathDepthThan(const QString& s1, const QString& s2);
+ static bool morePathDepthThan(const QString& s1, const QString& s2);
+
+ static QFileInfoList scanFiles(const QDir& dir);
+ QByteArray encodeFileInfoList(const QFileInfoList& list);
+ RemoteFileInfoList decodeFileInfoList(const QByteArray& ba);
+
+ QByteArray encodeActionInfoList(const ActionInfoList& list);
+ RemoteActionInfoList decodeActionInfoList(const QByteArray& ba);
+ static bool lessPathDepthThan(const ActionInfo& a1, const ActionInfo& a2);
+ static bool morePathDepthThan(const ActionInfo& a1, const ActionInfo& a2);
+
+ QByteArray encodeFileName(const QString& wire_path);
+ QString decodeFileName(const QByteArray& ba);
+
+ QByteArray encodeFileNameItem(const QString& wire_path);
+ QString decodeFileNameItem(const QByteArray& ba, int* length);
+
+protected:
+ QDir _dir;
+ QString _subPath;
+ SyncFlags _flags;
+ QSslSocket *_socket;
+ QByteArray _inBuf;
+
+private slots:
+ void handleDataAvailable();
+ void handleSslErrors(const QList<QSslError>& errors);
+ void handleDisconnected();
+};
+
+Q_DECLARE_OPERATORS_FOR_FLAGS(Agent::SyncFlags)
+
+#endif // AGENT_H
diff --git a/distfoldd/clientagent.cc b/distfoldd/clientagent.cc
new file mode 100644
index 0000000..f77a21a
--- /dev/null
+++ b/distfoldd/clientagent.cc
@@ -0,0 +1,174 @@
+#include <QtCore/QDebug>
+
+#include "clientagent.h"
+
+ClientAgent::ClientAgent(const QHostAddress& addr, uint port, const QDir& local_dir, SyncFlags flags, QObject *parent) :
+ Agent(new QSslSocket, local_dir, flags, parent)
+{
+ qDebug() << "Starting client agent at" << QDateTime::currentDateTime();
+ _socket->setParent(this); // Can't set parent until QObject constructed
+ _socket->connectToHostEncrypted(addr.toString(), port);
+ sendMessage(MSG_HELLO);
+ _state = STATE_HELLO;
+}
+
+void ClientAgent::handleMessage(MessageType msg, const QByteArray &data)
+{
+ qDebug() << "Client::handleMessage" << msg << data.size();
+ switch (msg) {
+ case MSG_HELLO_REPLY:
+ Q_ASSERT(_state == STATE_HELLO);
+ qDebug() << "Hello reply";
+ _state = STATE_FILE_LIST;
+ sendFileList();
+ break;
+ case MSG_FILE_ACTIONS_REPLY:
+ Q_ASSERT(_state == STATE_FILE_LIST);
+ handleActionInfoList(decodeActionInfoList(data));
+ break;
+ case MSG_PULL_FILE_REPLY:
+ Q_ASSERT(_state == STATE_FILE_ACTIONS && !_pendingActions.empty());
+ handlePulledFile(data);
+ break;
+ default:
+ qWarning() << "Unknown message";
+ break;
+ }
+}
+
+void ClientAgent::sendFileList()
+{
+ QFileInfoList list = scanFiles(QDir(wireToLocalPath(_subPath)));
+ sendMessage(MSG_FILE_LIST, encodeFileInfoList(list));
+}
+
+void ClientAgent::handleActionInfoList(const RemoteActionInfoList &list)
+{
+ foreach (const RemoteActionInfo& info, list) {
+ switch (info.action()) {
+ case ACTION_NONE:
+ qDebug() << " = " << info.fileInfo().name();
+ break;
+ case ACTION_PULL:
+ qDebug() << " < " << info.fileInfo().name();
+ break;
+ case ACTION_PULL_METADATA:
+ qDebug() << " <m" << info.fileInfo().name();
+ break;
+ case ACTION_PULL_DELETE:
+ qDebug() << " <d" << info.fileInfo().name();
+ break;
+ case ACTION_PUSH:
+ qDebug() << " > " << info.fileInfo().name();
+ break;
+ case ACTION_PUSH_METADATA:
+ qDebug() << " >m" << info.fileInfo().name();
+ break;
+ case ACTION_PUSH_DELETE:
+ qDebug() << " >d" << info.fileInfo().name();
+ break;
+ }
+ }
+
+ _pendingActions = list;
+ _state = STATE_FILE_ACTIONS;
+ executeNextAction();
+}
+
+void ClientAgent::executeNextAction()
+{
+ if (_pendingActions.empty()) {
+ qDebug() << "Done";
+ emit finished();
+ sendMessage(MSG_BYE);
+ _socket->flush();
+ _socket->close();
+ return;
+ }
+
+ qDebug() << "Remaining actions" << _pendingActions.count();
+
+ const RemoteActionInfo& info = _pendingActions.first();
+ const QString wire_path = info.fileInfo().name();
+ const QString local_path = wireToLocalPath(wire_path);
+ switch (info.action()) {
+ case ACTION_PULL:
+ if (_flags & SYNC_READ_ONLY) break;
+ if (info.fileInfo().isDir()) {
+ if (!QDir().mkpath(local_path)) {
+ qWarning() << "Failed to create local path" << local_path;
+ }
+ } else {
+ sendMessage(MSG_PULL_FILE, encodeFileName(wire_path));
+ return; // Wait for the pulled file message.
+ }
+ break;
+ case ACTION_PULL_METADATA:
+ if (_flags & SYNC_READ_ONLY) break;
+ setLocalFileDateTime(local_path, info.fileInfo().lastModified());
+ break;
+ case ACTION_PULL_DELETE:
+ handleDeleteFile(wire_path);
+ break;
+ case ACTION_PUSH:
+ handlePushFile(wire_path);
+ break;
+ case ACTION_PUSH_METADATA:
+ sendMessage(MSG_PUSH_FILE_METADATA,
+ encodeFileInfoList(QFileInfoList() << QFileInfo(local_path)));
+ break;
+ case ACTION_PUSH_DELETE:
+ sendMessage(MSG_DELETE_FILE, encodeFileName(wire_path));
+ break;
+ default:
+ qWarning() << "Unknown action" << info.action();
+ break;
+ }
+ _pendingActions.removeFirst();
+ executeNextAction();
+}
+
+void ClientAgent::handlePulledFile(const QByteArray &data)
+{
+ const RemoteActionInfo& info = _pendingActions.takeFirst();
+ if (_flags & SYNC_READ_ONLY) return;
+ QString local_path(wireToLocalPath(info.fileInfo().name()));
+ QFile file(local_path);
+ if (file.open(QIODevice::WriteOnly | QIODevice::Truncate)) {
+ file.write(Compressor::decompress(data));
+ file.close();
+ setLocalFileDateTime(local_path, info.fileInfo().lastModified());
+ } else {
+ qWarning() << "Failed to open" << file.fileName() << "for writing";
+ }
+ executeNextAction();
+}
+
+void ClientAgent::handlePushFile(const QString &path)
+{
+ QFile file(wireToLocalPath(path));
+ if (file.open(QIODevice::ReadOnly)) {
+ QByteArray file_data = file.readAll();
+ if (_flags & SYNC_COMPRESS) file_data = Compressor::compress(file_data);
+ QByteArray ba = encodeFileNameItem(path) + file_data;
+ sendMessage(MSG_PUSH_FILE, ba);
+ } else {
+ qWarning() << "Failed to open file" << file.fileName() << "for reading";
+ }
+}
+
+void ClientAgent::handleDeleteFile(const QString &wire_path)
+{
+ if (_flags & SYNC_READ_ONLY) return;
+ QFileInfo local(wireToLocalPath(wire_path));
+ QString local_path = local.absoluteFilePath();
+ if (local.isDir()) {
+ if (!QDir().rmdir(local_path)) {
+ qWarning() << "Failed to remove local dir" << local_path;
+ }
+ } else {
+ if (!QDir().remove(local_path)) {
+ qWarning() << "Failed to remove local file" << local_path;
+ }
+ }
+}
diff --git a/distfoldd/clientagent.h b/distfoldd/clientagent.h
new file mode 100644
index 0000000..d54bc69
--- /dev/null
+++ b/distfoldd/clientagent.h
@@ -0,0 +1,37 @@
+#ifndef CLIENTAGENT_H
+#define CLIENTAGENT_H
+
+#include <QtCore/QQueue>
+#include <QtNetwork/QHostAddress>
+
+#include "agent.h"
+
+class ClientAgent : public Agent
+{
+ Q_OBJECT
+public:
+ explicit ClientAgent(const QHostAddress& addr, uint port, const QDir& local_dir, SyncFlags flags, QObject *parent = 0);
+
+ enum State {
+ STATE_HELLO,
+ STATE_FILE_LIST,
+ STATE_FILE_ACTIONS
+ };
+
+protected:
+ void handleMessage(MessageType msg, const QByteArray &data);
+
+private:
+ void sendFileList();
+ void handleActionInfoList(const RemoteActionInfoList& list);
+ void executeNextAction();
+ void handlePulledFile(const QByteArray& data);
+ void handlePushFile(const QString& wire_path);
+ void handleDeleteFile(const QString& wire_path);
+
+private:
+ State _state;
+ RemoteActionInfoList _pendingActions;
+};
+
+#endif // CLIENTAGENT_H
diff --git a/distfoldd/compressor.cc b/distfoldd/compressor.cc
new file mode 100644
index 0000000..98eaa92
--- /dev/null
+++ b/distfoldd/compressor.cc
@@ -0,0 +1,94 @@
+#include <QtCore/QDebug>
+
+#include <zlib.h>
+#include "compressor.h"
+
+Compressor::Compressor()
+{
+}
+
+QByteArray Compressor::compress(const QByteArray& data)
+{
+ if (data.isEmpty()) return data;
+
+ QByteArray in = data, out;
+ z_stream strm;
+ int ret;
+
+ memset(&strm, 0, sizeof(strm));
+ out.resize(qMax(in.size(), 1024));
+
+ strm.avail_in = in.size();
+ strm.next_in = reinterpret_cast<Bytef*>(in.data());
+ strm.avail_out = out.size();
+ strm.next_out = reinterpret_cast<Bytef*>(out.data());
+
+ ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION);
+ if (ret != Z_OK) {
+ qWarning() << "deflateInit failed";
+ return data;
+ }
+
+ do {
+ if (strm.avail_out == 0) {
+ int cur_size = out.size();
+ out.resize(cur_size * 2);
+ strm.avail_out = cur_size;
+ strm.next_out = reinterpret_cast<Bytef*>(&out.data()[cur_size]);
+ }
+ ret = deflate(&strm, Z_FINISH);
+ } while (ret == Z_OK || ret == Z_BUF_ERROR);
+
+ Q_ASSERT(ret == Z_STREAM_END);
+ Q_ASSERT(strm.avail_in == 0);
+ if (strm.avail_out > 0) {
+ out.resize(out.size() - strm.avail_out);
+ }
+
+ deflateEnd(&strm);
+
+ return out;
+}
+
+QByteArray Compressor::decompress(const QByteArray& data)
+{
+ if (data.isEmpty()) return data;
+
+ QByteArray in = data, out;
+ z_stream strm;
+ int ret;
+
+ memset(&strm, 0, sizeof(strm));
+ out.resize(qMax(static_cast<int>(in.size() * 1.5), 1024));
+
+ strm.avail_in = in.size();
+ strm.next_in = reinterpret_cast<Bytef*>(in.data());
+ strm.avail_out = out.size();
+ strm.next_out = reinterpret_cast<Bytef*>(out.data());
+
+ ret = inflateInit(&strm);
+ if (ret != Z_OK) {
+ qWarning() << "inflateInit failed";
+ return data;
+ }
+
+ do {
+ if (strm.avail_out == 0) {
+ int cur_size = out.size();
+ out.resize(cur_size * 2);
+ strm.avail_out = cur_size;
+ strm.next_out = reinterpret_cast<Bytef*>(&out.data()[cur_size]);
+ }
+ ret = inflate(&strm, Z_FINISH);
+ } while (ret == Z_OK || ret == Z_BUF_ERROR);
+
+ Q_ASSERT(ret == Z_STREAM_END);
+ Q_ASSERT(strm.avail_in == 0);
+ if (strm.avail_out > 0) {
+ out.resize(out.size() - strm.avail_out);
+ }
+
+ inflateEnd(&strm);
+
+ return out;
+}
diff --git a/distfoldd/compressor.h b/distfoldd/compressor.h
new file mode 100644
index 0000000..6ab8e13
--- /dev/null
+++ b/distfoldd/compressor.h
@@ -0,0 +1,16 @@
+#ifndef COMPRESSOR_H
+#define COMPRESSOR_H
+
+#include <QtCore/QByteArray>
+
+class Compressor
+{
+private:
+ Compressor();
+
+public:
+ static QByteArray compress(const QByteArray& data);
+ static QByteArray decompress(const QByteArray& data);
+};
+
+#endif // COMPRESSOR_H
diff --git a/distfoldd/discoverer.cc b/distfoldd/discoverer.cc
new file mode 100644
index 0000000..629a955
--- /dev/null
+++ b/distfoldd/discoverer.cc
@@ -0,0 +1,195 @@
+#include <QtCore/QDebug>
+
+#include "discoverer.h"
+
+Discoverer::Discoverer(const QUuid &uuid, uint port, const QString &serviceName, QObject *parent) :
+ QObject(parent), _folderUuid(uuid), _hostUuid(QUuid::createUuid()),
+ _serviceName(serviceName), _port(port),
+ _mtime(), _mtimeChanged(false),
+ _receiver(new QUdpSocket(this)), _sender(new QUdpSocket(this)),
+ _netConfig(new QNetworkConfigurationManager(this)),
+ _netInfo(new QSystemNetworkInfo(this)),
+ _timer(new QSystemAlignedTimer(this)), _fallbackTimer(new QTimer(this))
+{
+ connect(_receiver, SIGNAL(readyRead()), SLOT(handleDataAvailable()));
+ if (!_receiver->bind(servicePort, QUdpSocket::ShareAddress)) {
+ qWarning() << "Failed to bind to discoverer port" << servicePort;
+ }
+ connect(_netConfig, SIGNAL(onlineStateChanged(bool)), SLOT(handleOnlineStateChanged(bool)));
+ connect(_timer, SIGNAL(timeout()), SLOT(handleTimerTimeout()));
+ connect(_fallbackTimer, SIGNAL(timeout()), SLOT(handleTimerTimeout()));
+ connect(_timer, SIGNAL(error(QSystemAlignedTimer::AlignedTimerError)), SLOT(handleTimerError()));
+ _timer->setSingleShot(true);
+ _fallbackTimer->setSingleShot(true);
+ if (_netConfig->isOnline() ||
+ _netConfig->allConfigurations(QNetworkConfiguration::Defined).empty()) {
+ // Either only, or _netConfig has no configs so it is useless.
+ qDebug() << "Start timer";
+ _timer->start(0, activeBroadcastInterval);
+ if (_timer && _timer->lastError() != QSystemAlignedTimer::NoError) {
+ qDebug() << "QSystemAlignedTimer broken at start";
+ delete _timer;
+ _timer = 0;
+ _fallbackTimer->start(activeBroadcastInterval * 1000);
+ }
+ } else {
+ qDebug() << "I am NOT online";
+ }
+}
+
+void Discoverer::setLastModifiedTime(const QDateTime& dateTime)
+{
+ _mtime = dateTime;
+ if (_knownHosts.empty()) {
+ // Idle mode: do nothing.
+ } else {
+ // Active mode
+ switchToActiveMode();
+ }
+ _mtimeChanged = true;
+}
+
+QByteArray Discoverer::encodeMessage() const
+{
+ QByteArray ba;
+ QByteArray hostUuid = _hostUuid.toString().toAscii();
+ QByteArray folderUuid = _folderUuid.toString().toAscii();
+ QByteArray serviceNameUtf8 = _serviceName.toUtf8();
+ Message msg;
+
+ Q_ASSERT(hostUuid.length() == sizeof(msg.hostUuid));
+ Q_ASSERT(folderUuid.length() == sizeof(msg.folderUuid));
+
+ ba.reserve(sizeof(msg) + serviceNameUtf8.size());
+
+ msg.mtime = _mtime.toTime_t();
+ strncpy(msg.hostUuid, hostUuid.data(), sizeof(msg.hostUuid));
+ msg.port = _port;
+ strncpy(msg.folderUuid, folderUuid.data(), sizeof(msg.folderUuid));
+
+ ba.append(reinterpret_cast<char*>(&msg), sizeof(msg));
+ ba.append(serviceNameUtf8);
+ return ba;
+}
+
+void Discoverer::switchToActiveMode()
+{
+ if (_timer) {
+ if (!_timer->isActive() || _timer->maximumInterval() > activeBroadcastInterval) {
+ if (_timer->isActive()) _timer->wokeUp();
+ _timer->start(0, activeBroadcastInterval);
+ }
+ } else if (!_fallbackTimer->isActive() || _fallbackTimer->interval() > activeBroadcastInterval * 1000) {
+ _fallbackTimer->start(activeBroadcastInterval * 1000);
+ }
+}
+
+void Discoverer::broadcastMessage()
+{
+ _sender->writeDatagram(encodeMessage(), QHostAddress::Broadcast, servicePort);
+ _lastBroadcast = QDateTime::currentDateTime();
+ _mtimeChanged = false;
+ qDebug() << "Broadcast message at" << _lastBroadcast;
+}
+
+void Discoverer::handleDataAvailable()
+{
+ while (_receiver->hasPendingDatagrams()) {
+ QByteArray data;
+ QHostAddress sender;
+ data.resize(_receiver->pendingDatagramSize());
+ _receiver->readDatagram(data.data(), data.size(), &sender);
+
+ handleBroadcastMessage(sender, data);
+ }
+}
+
+void Discoverer::handleBroadcastMessage(const QHostAddress& sender, const QByteArray &data)
+{
+ const char *dataPtr = data.data();
+ const Message *msg = reinterpret_cast<const Message*>(dataPtr);
+ QString serviceName = QString::fromUtf8(&dataPtr[sizeof(Message)]);
+ char uuidData[sizeof(msg->hostUuid)];
+
+ strncpy(uuidData, msg->hostUuid, sizeof(uuidData));
+ QUuid hostUuid(QString::fromAscii(uuidData, sizeof(uuidData)));
+ strncpy(uuidData, msg->folderUuid, sizeof(uuidData));
+ QUuid folderUuid(QString::fromAscii(uuidData, sizeof(uuidData)));
+
+ if (hostUuid.isNull() || folderUuid.isNull()) {
+ qWarning() << "Null uuid";
+ return;
+ }
+ if (hostUuid == _hostUuid) {
+ // A message from myself, ignore
+ return;
+ }
+ if (folderUuid != _folderUuid) {
+ // A message not about the same folder, ignore
+ return;
+ }
+
+ qDebug() << "Got message from" << sender << msg->port;
+ handleHostSeen(hostUuid);
+
+ QDateTime mtime = QDateTime::fromTime_t(msg->mtime);
+ qDebug() << mtime << _mtime << _mtime.secsTo(mtime);
+ if (_mtime.secsTo(mtime) > dateTimeCompareMinDelta) {
+ // We are outdated, let's try to connect to them.
+ emit foundMoreRecentHost(sender, msg->port, mtime);
+ }
+}
+
+void Discoverer::handleHostSeen(const QUuid &hostUuid)
+{
+ bool host_previously_seen = _knownHosts.contains(hostUuid);
+ _knownHosts[hostUuid].lastSeen = QDateTime::currentDateTime();
+ if (!host_previously_seen || _mtimeChanged) {
+ switchToActiveMode();
+ }
+}
+
+void Discoverer::handleTimerTimeout()
+{
+ qDebug() << "Timer";
+ broadcastMessage();
+ QDateTime now = QDateTime::currentDateTime();
+ QHash<QUuid, HostInfo>::iterator i = _knownHosts.begin();
+ while (i != _knownHosts.end()) {
+ if (i.value().lastSeen.secsTo(now) > lostHostTimeout) {
+ i = _knownHosts.erase(i);
+ } else {
+ ++i;
+ }
+ }
+ qDebug() << "Known hosts" << _knownHosts.size();
+ if (_timer) {
+ _timer->start(idleBroadcastInterval * 0.7f, idleBroadcastInterval * 1.3f);
+ } else {
+ _fallbackTimer->start(idleBroadcastInterval * 1000);
+ }
+}
+
+void Discoverer::handleTimerError()
+{
+ qDebug() << "SystemAlignedTimer broken";
+ _timer->deleteLater();
+ _timer = 0;
+ _fallbackTimer->start(activeBroadcastInterval * 1000);
+}
+
+void Discoverer::handleOnlineStateChanged(bool online)
+{
+ if (online) {
+ qDebug() << "Online";
+ if (_netInfo->currentMode() == QSystemNetworkInfo::UnknownMode ||
+ _netInfo->currentMode() == QSystemNetworkInfo::WlanMode) {
+ switchToActiveMode();
+ }
+ } else {
+ qDebug() << "Offline";
+ _knownHosts.clear();
+ _fallbackTimer->stop();
+ if (_timer) _timer->stop();
+ }
+}
diff --git a/distfoldd/discoverer.h b/distfoldd/discoverer.h
new file mode 100644
index 0000000..ae106de
--- /dev/null
+++ b/distfoldd/discoverer.h
@@ -0,0 +1,82 @@
+#ifndef DISCOVERER_H
+#define DISCOVERER_H
+
+#include <QtCore/QTimer>
+#include <QtCore/QUuid>
+#include <QtCore/QDateTime>
+#include <QtNetwork/QNetworkConfigurationManager>
+#include <QtNetwork/QHostAddress>
+#include <QtNetwork/QUdpSocket>
+#include <QtSystemInfo/QSystemNetworkInfo>
+#include <QtSystemInfo/QSystemAlignedTimer>
+
+QTM_USE_NAMESPACE
+
+class Discoverer : public QObject
+{
+ Q_OBJECT
+
+public:
+ Discoverer(const QUuid& uuid, uint port, const QString& serviceName, QObject *parent = 0);
+
+ static const uint servicePort = 17451;
+ static const int dateTimeCompareMinDelta = 5;
+
+ // In seconds
+ static const int idleBroadcastInterval = 15 * 60;
+ static const int activeBroadcastInterval = 5;
+ static const int lostHostTimeout = idleBroadcastInterval * 1.4f;
+
+signals:
+ void foundMoreRecentHost(const QHostAddress& address, uint port, const QDateTime& dateTime);
+
+public slots:
+ void setLastModifiedTime(const QDateTime& dateTime);
+
+private:
+#pragma pack(push)
+#pragma pack(1)
+ struct Message {
+ quint32 mtime;
+ char hostUuid[38];
+ quint16 port;
+ char folderUuid[38];
+ char name[];
+ };
+#pragma pack()
+ struct HostInfo {
+ QDateTime lastSeen;
+ };
+
+ QByteArray encodeMessage() const;
+
+ void switchToActiveMode();
+
+private slots:
+ void broadcastMessage();
+ void handleDataAvailable();
+ void handleBroadcastMessage(const QHostAddress& sender, const QByteArray &data);
+ void handleHostSeen(const QUuid& hostUuid);
+ void handleTimerTimeout();
+ void handleTimerError();
+ void handleOnlineStateChanged(bool online);
+
+private:
+ QUuid _folderUuid;
+ QUuid _hostUuid;
+ QString _serviceName;
+ uint _port;
+ QDateTime _mtime;
+ bool _mtimeChanged;
+ QUdpSocket *_receiver;
+ QUdpSocket *_sender;
+ quint32 _myId;
+ QNetworkConfigurationManager *_netConfig;
+ QSystemNetworkInfo *_netInfo;
+ QDateTime _lastBroadcast;
+ QSystemAlignedTimer *_timer;
+ QTimer *_fallbackTimer;
+ QHash<QUuid, HostInfo> _knownHosts;
+};
+
+#endif // DISCOVERER_H
diff --git a/distfoldd/distfoldd.pro b/distfoldd/distfoldd.pro
new file mode 100644
index 0000000..3eb804e
--- /dev/null
+++ b/distfoldd/distfoldd.pro
@@ -0,0 +1,35 @@
+TARGET = distfoldd
+TEMPLATE = app
+CONFIG += console
+CONFIG -= app_bundle
+
+QT += core network
+QT -= gui
+
+CONFIG += mobility
+MOBILITY += systeminfo
+
+SOURCES += main.cc \
+ distfolder.cc \
+ server.cc \
+ watcher.cc \
+ clientagent.cc \
+ serveragent.cc \
+ agent.cc \
+ discoverer.cc \
+ compressor.cc
+
+HEADERS += \
+ distfolder.h \
+ server.h \
+ watcher.h \
+ clientagent.h \
+ serveragent.h \
+ agent.h \
+ discoverer.h \
+ compressor.h
+
+contains(MEEGO_EDITION,harmattan) {
+ target.path = /opt/distfold/bin
+ INSTALLS += target
+}
diff --git a/distfoldd/distfolder.cc b/distfoldd/distfolder.cc
new file mode 100644
index 0000000..211aafe
--- /dev/null
+++ b/distfoldd/distfolder.cc
@@ -0,0 +1,200 @@
+#include <QtCore/QStringList>
+
+#include "clientagent.h"
+#include "serveragent.h"
+#include "distfolder.h"
+
+DistFolder::DistFolder(const QUuid& uuid, const QString& localPath, QObject *parent) :
+ QObject(parent), _uuid(uuid), _localPath(localPath),
+ _mtime(), _mtimeChanged(false),
+ _watcher(new Watcher(localPath, this)),
+ _server(new Server(this)),
+ _discoverer(new Discoverer(uuid, _server->serverPort(), QString("Files on %1").arg(_localPath.canonicalPath()), this)),
+ _syncFlags(Agent::SYNC_NORMAL),
+ _numAgents(0)
+{
+ Q_ASSERT(_localPath.isReadable());
+ connect(_watcher, SIGNAL(pathAdded(QString)), SLOT(handlePathAdded(QString)));
+ connect(_watcher, SIGNAL(pathChanged(QString)), SLOT(handlePathChanged(QString)));
+ connect(_watcher, SIGNAL(pathRemoved(QString)), SLOT(handlePathRemoved(QString)));
+ updateLastModTime();
+ connect(_discoverer, SIGNAL(foundMoreRecentHost(QHostAddress,uint,QDateTime)),
+ SLOT(handleMoreRecentHost(QHostAddress,uint,QDateTime)));
+ connect(_server, SIGNAL(newConnection()), SLOT(handleNewConnection()));
+ qDebug() << "Ready Folder UUID:" << _uuid;
+}
+
+bool DistFolder::readOnlySync() const
+{
+ return _syncFlags & Agent::SYNC_READ_ONLY;
+}
+
+void DistFolder::setReadOnlySync(bool read_only)
+{
+ if (read_only) {
+ qDebug() << "Enabling read only sync";
+ _syncFlags |= Agent::SYNC_READ_ONLY;
+ } else {
+ _syncFlags &= ~Agent::SYNC_READ_ONLY;
+ }
+}
+
+bool DistFolder::pullMode() const
+{
+ return _syncFlags & Agent::SYNC_PULL;
+}
+
+void DistFolder::setPullMode(bool pull_mode)
+{
+ if (pull_mode) {
+ qDebug() << "Enabling pull sync";
+ _syncFlags |= Agent::SYNC_PULL;
+ } else {
+ _syncFlags &= ~Agent::SYNC_PULL;
+ }
+ updateLastModTime();
+}
+
+bool DistFolder::compress() const
+{
+ return _syncFlags & Agent::SYNC_COMPRESS;
+}
+
+void DistFolder::setCompress(bool compress)
+{
+ if (compress) {
+ qDebug() << "Enabling compression";
+ _syncFlags |= Agent::SYNC_COMPRESS;
+ } else {
+ _syncFlags &= ~Agent::SYNC_COMPRESS;
+ }
+}
+
+QDateTime DistFolder::scanLastModTime()
+{
+ return scanLastModTime(_localPath);
+}
+
+void DistFolder::updateLastModTime(const QDateTime& dt)
+{
+ QDateTime curMtime = _mtime;
+ if (pullMode()) {
+ _mtime = QDateTime::fromTime_t(0); // Force being the puller for the next sync
+ } else {
+ if (dt.isNull()) {
+ _mtime = scanLastModTime();
+ } else if (dt > _mtime) {
+ _mtime = dt;
+ }
+ }
+ if (_mtime != curMtime) {
+ if (_numAgents > 0) {
+ _mtimeChanged = true;
+ } else {
+ _discoverer->setLastModifiedTime(_mtime);
+ _mtimeChanged = false;
+ }
+ }
+}
+
+QDateTime DistFolder::scanLastModTime(const QDir& dir)
+{
+ QFileInfoList list = dir.entryInfoList(QDir::Dirs | QDir::Files | QDir::NoDotAndDotDot);
+ QDateTime max = QFileInfo(dir.absolutePath()).lastModified();
+
+ foreach (const QFileInfo &info, list) {
+ QDateTime mtime;
+ if (info.isDir()) {
+ mtime = scanLastModTime(QDir(info.absoluteFilePath()));
+ } else {
+ mtime = info.lastModified();
+ }
+ if (mtime > max) {
+ max = mtime;
+ }
+ }
+
+ return max;
+}
+
+void DistFolder::handleNewConnection()
+{
+ qDebug() << "Incoming connection";
+ ServerAgent *agent =
+ new ServerAgent(static_cast<QSslSocket*>(_server->nextPendingConnection()),
+ _localPath, _syncFlags, this);
+ connect(agent, SIGNAL(destroyed()), SLOT(handleDestroyedAgent()));
+ _numAgents++;
+ qDebug() << "Num agents" << _numAgents;
+}
+
+void DistFolder::handlePathAdded(const QString &path)
+{
+ QFileInfo info(path);
+ qDebug() << "added: " << path;
+ if (info.lastModified() > _mtime) {
+ updateLastModTime(info.lastModified());
+ }
+}
+
+void DistFolder::handlePathChanged(const QString &path)
+{
+ QFileInfo info(path);
+ qDebug() << "changed: " << path;
+ if (info.lastModified() > _mtime) {
+ updateLastModTime(info.lastModified());
+ }
+}
+
+void DistFolder::handlePathRemoved(const QString &path)
+{
+ QFileInfo info(path);
+ qDebug() << "removed: " << path;
+ // Find a parent that has not been removed
+ const QString base_path = _localPath.absolutePath();
+ do {
+ info = QFileInfo(info.absolutePath());
+ } while (!info.exists() && // Until it exists and as long as we are under the base dir.
+ info.absoluteFilePath().startsWith(base_path));
+ if (info.lastModified() > _mtime) {
+ updateLastModTime(info.lastModified());
+ }
+}
+
+void DistFolder::handleMoreRecentHost(const QHostAddress &address, uint port, const QDateTime &dateTime)
+{
+ Q_UNUSED(dateTime);
+ if (_numAgents == 0) {
+ qDebug() << "Trying to connect to" << address.toString() << port;
+ ClientAgent *agent = new ClientAgent(address, port, _localPath, _syncFlags, this);
+ connect(agent, SIGNAL(destroyed()), SLOT(handleDestroyedAgent()));
+ connect(agent, SIGNAL(finished()), SLOT(handleClientFinished()));
+ _numAgents++;
+ qDebug() << "Num agents" << _numAgents;
+ } else {
+ qDebug() << "Busy but a more recent host was found";
+ // TODO
+ }
+}
+
+void DistFolder::handleClientFinished()
+{
+ if (pullMode()) {
+ // Stop pulling once a succesful sync is done
+ qDebug() << "Stopping pull mode";
+ setPullMode(false);
+ }
+}
+
+void DistFolder::handleDestroyedAgent()
+{
+ _numAgents--;
+ qDebug() << "Num agents" << _numAgents;
+ if (_numAgents == 0) {
+ // All sync agents have finished
+ if (_mtimeChanged) {
+ _discoverer->setLastModifiedTime(_mtime);
+ _mtimeChanged = false;
+ }
+ }
+}
diff --git a/distfoldd/distfolder.h b/distfoldd/distfolder.h
new file mode 100644
index 0000000..ae7a0ea
--- /dev/null
+++ b/distfoldd/distfolder.h
@@ -0,0 +1,57 @@
+#ifndef DISTFOLDER_H
+#define DISTFOLDER_H
+
+#include <QtCore/QObject>
+#include <QtCore/QDir>
+#include <QtCore/QUuid>
+#include <QtCore/QFileSystemWatcher>
+
+#include "watcher.h"
+#include "server.h"
+#include "agent.h"
+#include "discoverer.h"
+
+class DistFolder : public QObject
+{
+ Q_OBJECT
+ Q_PROPERTY(bool readOnlySync READ readOnlySync WRITE setReadOnlySync)
+ Q_PROPERTY(bool pullMode READ pullMode WRITE setPullMode)
+ Q_PROPERTY(bool compress READ compress WRITE setCompress)
+
+public:
+ explicit DistFolder(const QUuid& uuid, const QString& path, QObject *parent = 0);
+
+ bool readOnlySync() const;
+ void setReadOnlySync(bool read_only);
+ bool pullMode() const;
+ void setPullMode(bool pull_mode);
+ bool compress() const;
+ void setCompress(bool compress);
+
+private:
+ QDateTime scanLastModTime();
+ QDateTime scanLastModTime(const QDir& dir);
+ void updateLastModTime(const QDateTime& dt = QDateTime());
+
+private slots:
+ void handleNewConnection();
+ void handlePathAdded(const QString& path);
+ void handlePathChanged(const QString& path);
+ void handlePathRemoved(const QString& path);
+ void handleMoreRecentHost(const QHostAddress& address, uint port, const QDateTime& dateTime);
+ void handleClientFinished();
+ void handleDestroyedAgent();
+
+private:
+ QUuid _uuid;
+ QDir _localPath;
+ QDateTime _mtime;
+ bool _mtimeChanged;
+ Watcher *_watcher;
+ Server *_server;
+ Discoverer *_discoverer;
+ Agent::SyncFlags _syncFlags;
+ int _numAgents;
+};
+
+#endif // DISTFOLDER_H
diff --git a/distfoldd/main.cc b/distfoldd/main.cc
new file mode 100644
index 0000000..fbecf5a
--- /dev/null
+++ b/distfoldd/main.cc
@@ -0,0 +1,28 @@
+#include <QtCore/QCoreApplication>
+#include <QtCore/QSettings>
+#include <QtCore/QDebug>
+
+#include "distfolder.h"
+
+int main(int argc, char *argv[])
+{
+ QCoreApplication a(argc, argv);
+ a.setOrganizationName("distfold");
+ a.setOrganizationDomain("com.javispedro.distfold");
+ a.setApplicationName("distfoldd");
+ a.setApplicationVersion("0.1");
+
+ QSettings settings;
+ foreach (const QString& group, settings.childGroups()) {
+ settings.beginGroup(group);
+ QUuid uuid(settings.value("uuid").toString());
+ QString path = settings.value("path").toString();
+ DistFolder *f = new DistFolder(uuid, path);
+ f->setReadOnlySync(settings.value("ro", false).toBool());
+ f->setPullMode(settings.value("pull", false).toBool());
+ f->setCompress(settings.value("compress", true).toBool());
+ settings.endGroup();
+ }
+
+ return a.exec();
+}
diff --git a/distfoldd/server.cc b/distfoldd/server.cc
new file mode 100644
index 0000000..0361466
--- /dev/null
+++ b/distfoldd/server.cc
@@ -0,0 +1,56 @@
+#include <QtCore/QDebug>
+#include <QtCore/QDir>
+#include <QtNetwork/QSslSocket>
+
+#include "server.h"
+
+Server::Server(QObject *parent) :
+ QTcpServer(parent)
+{
+ loadKeys();
+ if (!listen()) {
+ qWarning() << "Failed to start server socket";
+ }
+}
+
+void Server::loadKeys()
+{
+ QDir config_dir(QDir::home().absoluteFilePath(".config/distfold"));
+ QFile cert_file(config_dir.absoluteFilePath("server.crt"));
+ if (cert_file.open(QIODevice::ReadOnly)) {
+ _cert = QSslCertificate(&cert_file, QSsl::Pem);
+ cert_file.close();
+ }
+ if (_cert.isNull()) {
+ qWarning() << "Could not load server certificate";
+ }
+ QFile key_file(config_dir.absoluteFilePath("server.key"));
+ if (key_file.open(QIODevice::ReadOnly)) {
+ _key = QSslKey(&key_file, QSsl::Rsa, QSsl::Pem);
+ key_file.close();
+ }
+ if (_key.isNull()) {
+ qWarning() << "Could not load private key";
+ }
+}
+
+void Server::incomingConnection(int socketDescriptor)
+{
+ QSslSocket *socket = new QSslSocket(this);
+ connect(socket, SIGNAL(error(QAbstractSocket::SocketError)),
+ SLOT(handleSocketError(QAbstractSocket::SocketError)));
+ if (socket->setSocketDescriptor(socketDescriptor)) {
+ socket->setLocalCertificate(_cert);
+ socket->setPrivateKey(_key);
+ socket->startServerEncryption();
+ addPendingConnection(socket);
+ } else {
+ delete socket;
+ }
+}
+
+void Server::handleSocketError(QAbstractSocket::SocketError error)
+{
+ QSslSocket *socket = qobject_cast<QSslSocket*>(sender());
+ qDebug() << "Server socket error:" << socket->error() << socket->errorString();
+}
diff --git a/distfoldd/server.h b/distfoldd/server.h
new file mode 100644
index 0000000..0ed9100
--- /dev/null
+++ b/distfoldd/server.h
@@ -0,0 +1,26 @@
+#ifndef SERVER_H
+#define SERVER_H
+
+#include <QtNetwork/QTcpServer>
+#include <QtNetwork/QSslCertificate>
+#include <QtNetwork/QSslKey>
+
+class Server : public QTcpServer
+{
+ Q_OBJECT
+public:
+ explicit Server(QObject *parent = 0);
+
+protected:
+ void loadKeys();
+ void incomingConnection(int handle);
+
+private slots:
+ void handleSocketError(QAbstractSocket::SocketError error);
+
+private:
+ QSslCertificate _cert;
+ QSslKey _key;
+};
+
+#endif // SERVER_H
diff --git a/distfoldd/serveragent.cc b/distfoldd/serveragent.cc
new file mode 100644
index 0000000..13e7848
--- /dev/null
+++ b/distfoldd/serveragent.cc
@@ -0,0 +1,225 @@
+#include <QtCore/QDebug>
+
+#include "serveragent.h"
+
+ServerAgent::ServerAgent(QSslSocket *socket, const QDir& local_dir, SyncFlags flags, QObject *parent) :
+ Agent(socket, local_dir, flags, parent)
+{
+ qDebug() << "Starting server agent at" << QDateTime::currentDateTime();
+}
+
+void ServerAgent::handleMessage(MessageType msg, const QByteArray& data)
+{
+ qDebug() << "Server::handleMessage" << msg << data.size();
+ switch (msg) {
+ case MSG_HELLO:
+ sendMessage(MSG_HELLO_REPLY);
+ break;
+ case MSG_FILE_LIST:
+ handleClientFileList(decodeFileInfoList(data));
+ break;
+ case MSG_PULL_FILE:
+ handlePullFile(decodeFileName(data));
+ break;
+ case MSG_PUSH_FILE:
+ handlePushedFile(data);
+ break;
+ case MSG_PUSH_FILE_METADATA:
+ handlePushedMetadata(decodeFileInfoList(data));
+ break;
+ case MSG_DELETE_FILE:
+ handleDeleteFile(decodeFileName(data));
+ break;
+ case MSG_BYE:
+ qDebug() << "Got Bye";
+ emit finished();
+ _socket->close();
+ break;
+ default:
+ qWarning() << "Unknown message";
+ break;
+ }
+}
+
+void ServerAgent::handleClientFileList(const RemoteFileInfoList& list)
+{
+ QFileInfoList files = scanFiles(QDir(wireToLocalPath(_subPath)));
+ ActionInfoList actions_create_dirs, actions_with_files, actions_remove_dirs, actions_update_dirs;
+ QHash<QString, QFileInfo> local_files;
+ QHash<QString, RemoteFileInfo> remote_files;
+ QSet<QString> new_local_files;
+
+ foreach (const QFileInfo& file, files) {
+ const QString wire_path = localToWirePath(file.absoluteFilePath());
+ local_files[wire_path] = file;
+ new_local_files.insert(wire_path);
+ }
+ foreach (const RemoteFileInfo& file, list) {
+ remote_files[file.name()] = file;
+ }
+
+ foreach (const RemoteFileInfo& remote, list) {
+ // Main synchronization logic goes here
+ QString wire_path = remote.name();
+ qDebug() << "Server Handling file" << wire_path;
+ new_local_files.remove(wire_path);
+ if (local_files.contains(wire_path)) {
+ // Both remote and local have the file
+ QFileInfo local = local_files[wire_path];
+ QString parent_name = wireParentPath(wire_path);
+ QFileInfo parent_local = local_files[parent_name];
+ RemoteFileInfo parent_remote = remote_files[parent_name];
+ Q_ASSERT(local.exists());
+ if (remote.isDir() && local.isDir()) {
+ if (equalDateTime(remote.lastModified(), local.lastModified())) {
+ if (local.lastModified().toTime_t() == 0) {
+ // Workaround an issue if the local folder is a mountpoint
+ actions_update_dirs += ActionInfo(ACTION_PULL_METADATA, local);
+ }
+ // Nothing to do
+ } else if (remote.lastModified() > local.lastModified()) {
+ actions_update_dirs += ActionInfo(ACTION_PUSH_METADATA, local);
+ } else {
+ actions_update_dirs += ActionInfo(ACTION_PULL_METADATA, local);
+ }
+ } else if (remote.isDir() == local.isDir() &&
+ equalDateTime(remote.lastModified(), local.lastModified()) &&
+ remote.size() == local.size()) {
+ // Nothing to do
+ } else if (remote.isDir() != local.isDir()) {
+ // Dir to file transformation, delete the most recent file
+ if (parent_remote.lastModified() > parent_local.lastModified()) {
+ if (local.isDir()) {
+ actions_remove_dirs += ActionInfo(ACTION_PUSH_DELETE, local);
+ } else {
+ actions_with_files += ActionInfo(ACTION_PUSH_DELETE, local);
+ }
+ } else {
+ if (remote.isDir()) {
+ actions_remove_dirs += ActionInfo(ACTION_PULL_DELETE, local);
+ } else {
+ actions_with_files += ActionInfo(ACTION_PULL_DELETE, local);
+ }
+ }
+ } else if (remote.lastModified() > local.lastModified()) {
+ if (remote.isDir()) {
+ actions_create_dirs += ActionInfo(ACTION_PUSH, local);
+ } else {
+ actions_with_files += ActionInfo(ACTION_PUSH, local);
+ }
+ } else {
+ if (remote.isDir()) {
+ actions_create_dirs += ActionInfo(ACTION_PULL, local);
+ } else {
+ actions_with_files += ActionInfo(ACTION_PULL, local);
+ }
+ }
+ } else {
+ // File is in remote, but not in local
+ qDebug() << " file deleted";
+ QString parent_name = findExistingCommonAncestor(wire_path, local_files, remote_files);
+ QFileInfo local(wireToLocalPath(wire_path)); // Create invalid QFileInfo
+ QFileInfo parent_local = local_files[parent_name];
+ RemoteFileInfo parent_remote = remote_files[parent_name];
+ if (parent_remote.lastModified() > parent_local.lastModified()) {
+ if (remote.isDir()) {
+ actions_create_dirs += ActionInfo(ACTION_PUSH, local);
+ actions_update_dirs += ActionInfo(ACTION_PUSH_METADATA, local);
+ } else {
+ actions_with_files += ActionInfo(ACTION_PUSH, local);
+ }
+ } else {
+ if (remote.isDir()) {
+ actions_remove_dirs += ActionInfo(ACTION_PULL_DELETE, local);
+ } else {
+ actions_with_files += ActionInfo(ACTION_PULL_DELETE, local);
+ }
+ }
+ }
+ }
+
+ foreach (const QString& path, new_local_files) {
+ // File is in local, but not in remote
+ QString parent_name = findExistingCommonAncestor(path, local_files, remote_files);
+ QFileInfo local = local_files[path];
+ QFileInfo parent_local = local_files[parent_name];
+ RemoteFileInfo parent_remote = remote_files[parent_name];
+ if (parent_remote.lastModified() > parent_local.lastModified()) {
+ if (local.isDir()) {
+ actions_remove_dirs += ActionInfo(ACTION_PUSH_DELETE, local);
+ } else {
+ actions_with_files += ActionInfo(ACTION_PUSH_DELETE, local);
+ }
+ } else {
+ if (local.isDir()) {
+ actions_create_dirs += ActionInfo(ACTION_PULL, local);
+ actions_update_dirs += ActionInfo(ACTION_PULL_METADATA, local);
+ } else {
+ actions_with_files += ActionInfo(ACTION_PULL, local);
+ }
+ }
+ }
+
+ qSort(actions_create_dirs.begin(), actions_create_dirs.end(),
+ static_cast<bool (*)(const ActionInfo&, const ActionInfo&)>(lessPathDepthThan));
+ qSort(actions_remove_dirs.begin(), actions_remove_dirs.end(),
+ static_cast<bool (*)(const ActionInfo&, const ActionInfo&)>(morePathDepthThan));
+ qSort(actions_update_dirs.begin(), actions_update_dirs.end(),
+ static_cast<bool (*)(const ActionInfo&, const ActionInfo&)>(morePathDepthThan));
+
+ ActionInfoList actions = actions_create_dirs +
+ actions_with_files + actions_remove_dirs +
+ actions_update_dirs;
+ sendMessage(MSG_FILE_ACTIONS_REPLY, encodeActionInfoList(actions));
+}
+
+void ServerAgent::handlePullFile(const QString &path)
+{
+ QFile file(wireToLocalPath(path));
+ if (file.open(QIODevice::ReadOnly)) {
+ QByteArray ba = file.readAll();
+ if (_flags & SYNC_COMPRESS) {
+ int old_size = ba.size();
+ ba = Compressor::compress(ba);
+ qDebug() << "Compress" << old_size << "->" << ba.size();
+ }
+ file.close();
+ sendMessage(MSG_PULL_FILE_REPLY, ba);
+ } else {
+ qWarning() << "Failed to open file" << file.fileName() << "for reading";
+ }
+}
+
+void ServerAgent::handlePushedFile(const QByteArray &ba)
+{
+ if (_flags & SYNC_READ_ONLY) return;
+ int len;
+ QString wire_path = decodeFileNameItem(ba, &len);
+ QByteArray file_data = Compressor::decompress(ba.mid(len));
+
+ QFile file(wireToLocalPath(wire_path));
+ if (file.open(QIODevice::WriteOnly | QIODevice::Truncate)) {
+ file.write(file_data);
+ file.close();
+ } else {
+ qWarning() << "Failed to open file" << file.fileName() << "for writing";
+ }
+}
+
+void ServerAgent::handlePushedMetadata(const RemoteFileInfoList& list)
+{
+ if (_flags & SYNC_READ_ONLY) return;
+ foreach (const RemoteFileInfo& remote, list) {
+ QString local_path = wireToLocalPath(remote.name());
+ setLocalFileDateTime(local_path, remote.lastModified());
+ }
+}
+
+void ServerAgent::handleDeleteFile(const QString &path)
+{
+ if (_flags & SYNC_READ_ONLY) return;
+ QString local_path = wireToLocalPath(path);
+ if (!QDir().remove(local_path)) {
+ qWarning() << "Failed to remove file" << local_path;
+ }
+}
diff --git a/distfoldd/serveragent.h b/distfoldd/serveragent.h
new file mode 100644
index 0000000..c692e60
--- /dev/null
+++ b/distfoldd/serveragent.h
@@ -0,0 +1,23 @@
+#ifndef SERVERAGENT_H
+#define SERVERAGENT_H
+
+#include "agent.h"
+
+class ServerAgent : public Agent
+{
+ Q_OBJECT
+public:
+ explicit ServerAgent(QSslSocket *socket, const QDir& local_dir, SyncFlags flags, QObject *parent = 0);
+
+protected:
+ void handleMessage(MessageType msg, const QByteArray& data);
+
+private:
+ void handleClientFileList(const RemoteFileInfoList& list);
+ void handlePullFile(const QString& path);
+ void handlePushedFile(const QByteArray& data);
+ void handlePushedMetadata(const RemoteFileInfoList& list);
+ void handleDeleteFile(const QString& path);
+};
+
+#endif // SERVERAGENT_H
diff --git a/distfoldd/watcher.cc b/distfoldd/watcher.cc
new file mode 100644
index 0000000..664ea58
--- /dev/null
+++ b/distfoldd/watcher.cc
@@ -0,0 +1,132 @@
+#include <QtCore/QDebug>
+
+#include <sys/errno.h>
+#include <sys/fcntl.h>
+#include <sys/inotify.h>
+
+#include "watcher.h"
+
+
+Watcher::Watcher(const QString& path, QObject *parent) :
+ QObject(parent)
+{
+ _fd = inotify_init();
+ fcntl(_fd, F_SETFL, O_NONBLOCK);
+ _buffer.resize(sizeof(struct inotify_event) + NAME_MAX + 1);
+ _mask = IN_CLOSE_WRITE | IN_CREATE | IN_DELETE |
+ IN_MOVED_FROM | IN_MOVED_TO | IN_ONLYDIR;
+
+ _notifier = new QSocketNotifier(_fd, QSocketNotifier::Read, this);
+ connect(_notifier, SIGNAL(activated(int)), SLOT(readInotify()));
+
+ QStringList l = scanDirs(QDir(path));
+ foreach (const QString& s, l) {
+ addWatch(s);
+ }
+}
+
+void Watcher::readInotify()
+{
+ inotify_event *event;
+ const ssize_t struct_size = sizeof(struct inotify_event);
+ ssize_t pos, nread;
+
+ do {
+ nread = read(_fd, _buffer.data(), _buffer.size());
+ if (nread < 0) {
+ int err = errno;
+ if (err == EWOULDBLOCK) return;
+ qWarning() << "Error while reading from inotify" << err;
+ }
+
+ pos = 0;
+ while (pos + struct_size <= nread) {
+ event = reinterpret_cast<inotify_event*>(&(_buffer.data()[pos]));
+ if (event->wd == -1) {
+ // Special treatment
+ // TODO
+ continue;
+ }
+
+ QString path = _watches[event->wd];
+ QString name;
+ QString filePath = path;
+ if (event->len > 0) {
+ name = QString::fromLocal8Bit(event->name);
+ filePath += "/" + name;
+ }
+
+ if (event->mask & (IN_CREATE|IN_MOVED_TO)) {
+ if (event->mask & IN_ISDIR) {
+ // TODO There might already be folders in here.
+ addWatch(filePath);
+ }
+ emit pathAdded(filePath);
+ }
+ if (event->mask & IN_CLOSE_WRITE) {
+ emit pathChanged(filePath);
+ }
+ if (event->mask & (IN_MOVED_FROM|IN_DELETE)) {
+ if (event->mask & IN_ISDIR) {
+ removeWatch(filePath);
+ }
+ emit pathRemoved(filePath);
+ }
+
+ pos += struct_size + event->len;
+ }
+ } while (nread > 0);
+}
+
+QStringList Watcher::scanDirs(const QDir &dir)
+{
+ Q_ASSERT(dir.isReadable());
+ QStringList l(dir.absolutePath());
+ QStringList sub_dirs = dir.entryList(QDir::Dirs | QDir::NoDotAndDotDot);
+
+ foreach (const QString& s, sub_dirs) {
+ const QString abs_path = dir.absoluteFilePath(s);
+ l << scanDirs(QDir(abs_path));
+ }
+
+ return l;
+}
+
+void Watcher::addWatch(const QString& path)
+{
+ int wd = inotify_add_watch(_fd, path.toLocal8Bit().constData(), _mask);
+ if (wd == -1) {
+ qWarning() << "Failed to add watch for" << path;
+ return;
+ }
+ if (_watches.contains(wd)) {
+ qWarning() << "Watch for" << path << "was already setup";
+ return;
+ }
+
+ qDebug() << "Watching" << path;
+ _watches[wd] = path;
+ _dirs[path] = wd;
+}
+
+void Watcher::removeWatch(const QString& path)
+{
+ if (!_dirs.contains(path)) {
+ qWarning() << "Watch for" << path << "not found";
+ return;
+ }
+ int wd = _dirs[path];
+ removeWatch(wd);
+}
+
+void Watcher::removeWatch(int wd)
+{
+ if (!_watches.contains(wd)) {
+ qWarning() << "Watch" << wd << "not found";
+ }
+ QString path = _watches[wd];
+ qDebug() << "Unwatching" << path;
+ inotify_rm_watch(_fd, wd);
+ _dirs.remove(path);
+ _watches.remove(wd);
+}
diff --git a/distfoldd/watcher.h b/distfoldd/watcher.h
new file mode 100644
index 0000000..861248e
--- /dev/null
+++ b/distfoldd/watcher.h
@@ -0,0 +1,38 @@
+#ifndef WATCHER_H
+#define WATCHER_H
+
+#include <QtCore/QDir>
+#include <QtCore/QSocketNotifier>
+#include <QtCore/QMap>
+#include <QtCore/QHash>
+
+class Watcher : public QObject
+{
+ Q_OBJECT
+public:
+ explicit Watcher(const QString& path, QObject *parent = 0);
+
+signals:
+ void pathAdded(const QString& path);
+ void pathRemoved(const QString& path);
+ void pathChanged(const QString& path);
+
+private slots:
+ void readInotify();
+
+private:
+ QStringList scanDirs(const QDir& dir);
+ void addWatch(const QString& path);
+ void removeWatch(const QString& path);
+ void removeWatch(int wd);
+
+private:
+ int _fd;
+ QSocketNotifier *_notifier;
+ quint32 _mask;
+ QByteArray _buffer;
+ QMap<int, QString> _watches;
+ QHash<QString, int> _dirs;
+};
+
+#endif // WATCHER_H