diff options
| author | Javier S. Pedro <maemo@javispedro.com> | 2012-09-17 23:03:03 +0200 | 
|---|---|---|
| committer | Javier S. Pedro <maemo@javispedro.com> | 2012-09-17 23:03:03 +0200 | 
| commit | c3a1946675855b299a2b36550cdf2c2f69d153aa (patch) | |
| tree | f7adf66404cdc47994225b616bcaf082a07dd168 /distfoldd | |
| download | distfold-c3a1946675855b299a2b36550cdf2c2f69d153aa.tar.gz distfold-c3a1946675855b299a2b36550cdf2c2f69d153aa.zip | |
initial import
Diffstat (limited to 'distfoldd')
| -rw-r--r-- | distfoldd/agent.cc | 337 | ||||
| -rw-r--r-- | distfoldd/agent.h | 202 | ||||
| -rw-r--r-- | distfoldd/clientagent.cc | 174 | ||||
| -rw-r--r-- | distfoldd/clientagent.h | 37 | ||||
| -rw-r--r-- | distfoldd/compressor.cc | 94 | ||||
| -rw-r--r-- | distfoldd/compressor.h | 16 | ||||
| -rw-r--r-- | distfoldd/discoverer.cc | 195 | ||||
| -rw-r--r-- | distfoldd/discoverer.h | 82 | ||||
| -rw-r--r-- | distfoldd/distfoldd.pro | 35 | ||||
| -rw-r--r-- | distfoldd/distfolder.cc | 200 | ||||
| -rw-r--r-- | distfoldd/distfolder.h | 57 | ||||
| -rw-r--r-- | distfoldd/main.cc | 28 | ||||
| -rw-r--r-- | distfoldd/server.cc | 56 | ||||
| -rw-r--r-- | distfoldd/server.h | 26 | ||||
| -rw-r--r-- | distfoldd/serveragent.cc | 225 | ||||
| -rw-r--r-- | distfoldd/serveragent.h | 23 | ||||
| -rw-r--r-- | distfoldd/watcher.cc | 132 | ||||
| -rw-r--r-- | distfoldd/watcher.h | 38 | 
18 files changed, 1957 insertions, 0 deletions
| diff --git a/distfoldd/agent.cc b/distfoldd/agent.cc new file mode 100644 index 0000000..a718230 --- /dev/null +++ b/distfoldd/agent.cc @@ -0,0 +1,337 @@ +#include "agent.h" + +#ifdef Q_OS_UNIX +#include <sys/time.h> +#include <utime.h> +#endif + +Agent::Agent(QSslSocket *socket, const QDir& dir, SyncFlags flags, QObject *parent) : +    QObject(parent), _dir(dir), _subPath("/"), _flags(flags), _socket(socket) +{ +	connect(_socket, SIGNAL(readyRead()), SLOT(handleDataAvailable())); +	connect(_socket, SIGNAL(sslErrors(QList<QSslError>)), SLOT(handleSslErrors(QList<QSslError>))); +	connect(_socket, SIGNAL(disconnected()), SLOT(handleDisconnected())); +} + +Agent::RemoteFileInfo::RemoteFileInfo() : +    _name(), _type(FILE_TYPE_NONE), _mtime(), _size(0) +{ +} + +Agent::RemoteFileInfo::RemoteFileInfo(const QString &name, FileType type, const QDateTime &mtime, qint64 size) : +    _name(name), _type(type), _mtime(mtime), _size(size) +{ +} + +Agent::ActionInfo::ActionInfo(FileAction action, const QFileInfo &fileInfo) : +    _action(action), _info(fileInfo) +{ +} + +Agent::RemoteActionInfo::RemoteActionInfo(FileAction action, const QString &name, FileType type, const QDateTime &mtime, qint64 size) : +    _action(action), _info(name, type, mtime, size) +{ + +} + +void Agent::sendMessage(MessageType msg, const QByteArray &content) +{ +	const int header_len = sizeof(MessageHeader); +	QByteArray ba(header_len + content.size(), '\0'); + +	MessageHeader *h = reinterpret_cast<MessageHeader*>(ba.data()); +	h->msg = msg; +	h->len = content.size(); +	if (h->len > 0) { +		ba.replace(header_len, content.size(), content); +	} +	Q_ASSERT(ba.size() == header_len + content.size()); + +	qDebug() << "Sending " << msg << content.size(); + +	_socket->write(ba); +} + +int Agent::inBufRequiredData() const +{ +	const int header_len = sizeof(MessageHeader); +	if (_inBuf.size() < header_len) { +		return header_len; +	} else { +		const MessageHeader *h; +		h = reinterpret_cast<const MessageHeader*>(_inBuf.data()); +		return header_len + h->len; +	} +} + +bool Agent::equalDateTime(const QDateTime& dt1, const QDateTime& dt2) +{ +	const qint64 threshold = 5 * 1000; +	qint64 msecs = dt1.msecsTo(dt2); +	return msecs > -threshold && msecs < threshold; +} + +void Agent::setLocalFileDateTime(const QString &path, const QDateTime &dt) +{ +#ifdef Q_OS_UNIX +	const char *filename = path.toLocal8Bit().constData(); +	struct utimbuf times; +	times.actime = dt.toTime_t(); +	times.modtime = dt.toTime_t(); +	int rc = utime(filename, ×); +	if (rc != 0) { +		qWarning() << "Could not set local mtime of" << path; +	} +#endif +} + +QString Agent::wireToLocalPath(const QString &path) +{ +	return _dir.absolutePath() + _subPath + path; +} + +QString Agent::localToWirePath(const QString& path) +{ +	const QString& basePath = _dir.absolutePath() + _subPath; +	Q_ASSERT(_subPath.endsWith('/')); +	QString wire_path(path); +	if (path.startsWith(basePath)) { +		wire_path.remove(0, basePath.size()); +	} else if (path + '/' == basePath) { +		wire_path = QString(); // This is the root dir. +	} else { +		qWarning() << "Where does this come from?" << path; +	} +	return wire_path; +} + +QString Agent::wireParentPath(const QString &path) +{ +	int index = path.lastIndexOf('/'); +	if (index == -1) return QString(""); +	else return path.left(index); +} + +QString Agent::findExistingCommonAncestor(const QString& path, +                                          const QHash<QString, QFileInfo>& local_files, +                                          const QHash<QString, RemoteFileInfo>& remote_files) +{ +	QString ancestor = path; +	do { +		ancestor = wireParentPath(ancestor); +	} while (!local_files.contains(ancestor) || !remote_files.contains(ancestor)); +	return ancestor; +} + +bool Agent::lessPathDepthThan(const QString& s1, const QString& s2) +{ +	int d1 = s1.count('/'), d2 = s2.count('/'); +	return d1 < d2; +} + +bool Agent::morePathDepthThan(const QString& s1, const QString& s2) +{ +	int d1 = s1.count('/'), d2 = s2.count('/'); +	return d1 > d2; +} + +QFileInfoList Agent::scanFiles(const QDir& dir) +{ +	QFileInfoList all; +	QFileInfoList sub = dir.entryInfoList(QDir::Dirs | QDir::Files | QDir::NoDotAndDotDot); + +	all << QFileInfo(dir, "."); + +	foreach (const QFileInfo &info, sub) { +		if (info.isDir()) { +			all << scanFiles(QDir(info.absoluteFilePath())); +		} else { +			all << info; +		} +	} + +	return all; +} + +QByteArray Agent::encodeFileInfoList(const QFileInfoList& list) +{ +	QByteArray ba; +	if (list.empty()) return ba; + +	ba.reserve(list.size() * (sizeof(FileListItem) + 10)); + +	foreach (const QFileInfo& info, list) { +		QString path = localToWirePath(info.absoluteFilePath()); +		FileListItem item; +		item.size = info.size(); + +		if (info.isDir() && _flags & SYNC_PULL) { +			// If we are pulling, then simulate all of our local directories being old, +			// so that we get all the new file creations +			// Existing files should be compared with the proper mtimes though. +			item.mtime = 0; +		} else { +			item.mtime = info.lastModified().toTime_t(); +		} + +		if (info.isDir()) { +			item.type = FILE_TYPE_DIR; +		} else if (info.isFile()) { +			item.type = FILE_TYPE_FILE; +		} else { +			qWarning() << "What is this?" << path; +			continue; +		} +		QByteArray utfPath = encodeFileName(path); + +		item.name_len = utfPath.size(); +		ba.append(reinterpret_cast<char*>(&item), sizeof(FileListItem)); +		ba.append(utfPath.constData()); +	} + +	return ba; +} + +Agent::RemoteFileInfoList Agent::decodeFileInfoList(const QByteArray& ba) +{ +	RemoteFileInfoList list; + +	int pos = 0; +	while (pos < ba.size()) { +		const FileListItem *item; +		item = reinterpret_cast<const FileListItem*>(&(ba.data()[pos])); + +		list.append(RemoteFileInfo(QString::fromUtf8(&item->name[0], +		                                             item->name_len), +		                           static_cast<FileType>(item->type), +		                           QDateTime::fromTime_t(item->mtime), +		                           item->size)); + +		pos += sizeof(FileListItem) + item->name_len; +	} + +	return list; +} + +QByteArray Agent::encodeActionInfoList(const ActionInfoList &list) +{ +	QByteArray ba; + +	ba.reserve(list.size() * (sizeof(ActionItem) + 10)); + +	foreach (const ActionInfo& info, list) { +		const QFileInfo fileInfo = info.fileInfo(); +		QString path = localToWirePath(fileInfo.absoluteFilePath()); +		ActionItem item; +		item.action = info.action(); +		item.size = fileInfo.size(); +		item.mtime = fileInfo.lastModified().toTime_t(); + +		if (fileInfo.isDir()) { +			item.type = FILE_TYPE_DIR; +		} else if (fileInfo.isFile()) { +			item.type = FILE_TYPE_FILE; +		} else { +			item.type = FILE_TYPE_NONE; // Actions might refer to no-longer existing files +		} + +		QByteArray utfPath = path.toUtf8(); + +		item.name_len = utfPath.size(); +		ba.append(reinterpret_cast<char*>(&item), sizeof(item)); +		ba.append(utfPath.constData()); +	} + +	return ba; +} + +Agent::RemoteActionInfoList Agent::decodeActionInfoList(const QByteArray& ba) +{ +	RemoteActionInfoList list; + +	int pos = 0; +	while (pos < ba.size()) { +		const ActionItem *item; +		item = reinterpret_cast<const ActionItem*>(&(ba.data()[pos])); + +		list.append(RemoteActionInfo(static_cast<FileAction>(item->action), +		                             QString::fromUtf8(&item->name[0], +		                                               item->name_len), +		                             static_cast<FileType>(item->type), +		                             QDateTime::fromTime_t(item->mtime), +		                             item->size)); + +		pos += sizeof(ActionItem) + item->name_len; +	} + +	return list; +} + +bool Agent::lessPathDepthThan(const ActionInfo& a1, const ActionInfo& a2) +{ +	return lessPathDepthThan(a1.fileInfo().absoluteFilePath(), +	                         a2.fileInfo().absoluteFilePath()); +} + +bool Agent::morePathDepthThan(const ActionInfo& a1, const ActionInfo& a2) +{ +	return morePathDepthThan(a1.fileInfo().absoluteFilePath(), +	                         a2.fileInfo().absoluteFilePath()); +} + +QByteArray Agent::encodeFileName(const QString& wire_path) +{ +	return wire_path.toUtf8(); +} + +QString Agent::decodeFileName(const QByteArray& ba) +{ +	return QString::fromUtf8(ba); +} + +QByteArray Agent::encodeFileNameItem(const QString& wire_path) +{ +	QByteArray utf8 = encodeFileName(wire_path); +	QByteArray ba(sizeof(FileNameItem), 0); +	FileNameItem *item = reinterpret_cast<FileNameItem*>(ba.data()); +	item->name_len = utf8.length(); +	ba.append(utf8); +	return ba; +} + +QString Agent::decodeFileNameItem(const QByteArray& ba, int* length) +{ +	const FileNameItem *item = reinterpret_cast<const FileNameItem*>(ba.data()); +	*length = sizeof(FileNameItem) + item->name_len; +	QString wire_path = decodeFileName(ba.mid(sizeof(FileNameItem), item->name_len)); +	return wire_path; +} + +void Agent::handleDataAvailable() +{ +	do { +		_inBuf.append(_socket->readAll()); + +		while (_inBuf.size() >= inBufRequiredData()) { +			MessageHeader *h = reinterpret_cast<MessageHeader*>(_inBuf.data()); +			QByteArray data; +			if (h->len > 0) { +				data = _inBuf.mid(sizeof(MessageHeader), h->len); +			} +			handleMessage(static_cast<MessageType>(h->msg), data); +			_inBuf.remove(0, inBufRequiredData()); +		} +	} while (_socket->bytesAvailable() > 0); +} + +void Agent::handleSslErrors(const QList<QSslError> &errors) +{ +	qDebug() << "SSL errors:" << errors; +	_socket->ignoreSslErrors(); // TODO For now +} + +void Agent::handleDisconnected() +{ +	qDebug() << "Disconnected at" << QDateTime::currentDateTime(); +	deleteLater(); +} diff --git a/distfoldd/agent.h b/distfoldd/agent.h new file mode 100644 index 0000000..fa202f5 --- /dev/null +++ b/distfoldd/agent.h @@ -0,0 +1,202 @@ +#ifndef AGENT_H +#define AGENT_H + +#include <QtCore/QDateTime> +#include <QtCore/QDir> +#include <QtNetwork/QSslSocket> + +#include "compressor.h" + +class Agent : public QObject +{ +	Q_OBJECT +	Q_FLAGS(SyncFlag SyncFlags) + +public: +	enum SyncFlag { +		SYNC_NORMAL = 0, +		SYNC_PULL = 0x1, +		SYNC_READ_ONLY = 0x2, +		SYNC_COMPRESS = 0x8 +	}; +	Q_DECLARE_FLAGS(SyncFlags, SyncFlag) + +public: +	explicit Agent(QSslSocket *socket, const QDir& dir, SyncFlags flags, QObject *parent = 0); + +	static const uint servicePort = 17451; + +signals: +	void finished(); + +protected: +#pragma pack(push) +#pragma pack(1) +	enum MessageType { +		MSG_HELLO = 0, +		MSG_HELLO_REPLY, +		MSG_SET_SUBROOT, //args: string +		MSG_FILE_LIST, //args: FileListItem[] +		MSG_FILE_ACTIONS_REPLY, //args: ActionItem[] +		MSG_PULL_FILE, //args: string +		MSG_PULL_FILE_REPLY, //args: data +		MSG_PUSH_FILE, //args: FileNameItem + data +		MSG_PUSH_FILE_METADATA, //args: FileListItem[] +		MSG_DELETE_FILE, //args: string +		MSG_BYE +	}; +	struct MessageHeader { +		quint32 msg; +		quint32 len; +	}; +	enum FileType { +		FILE_TYPE_NONE = 0, +		FILE_TYPE_FILE, +		FILE_TYPE_DIR +	}; +	struct FileListItem { +		quint64 size; +		quint32 mtime; +		quint16 name_len; +		quint8 type; +		char name[]; +	}; +	enum FileAction { +		ACTION_NONE = 0, +		ACTION_PULL_METADATA, +		ACTION_PUSH_METADATA, +		ACTION_PULL, +		ACTION_PUSH, +		ACTION_PULL_DELETE, +		ACTION_PUSH_DELETE +	}; +	struct ActionItem { +		quint64 size; +		quint32 mtime; +		quint16 name_len; +		quint8  type; +		quint8  action; +		char name[]; +	}; +	struct FileNameItem { +		quint16 name_len; +		char name[]; +	}; +#pragma pack(pop) + +protected: +	class RemoteFileInfo { +	public: +		RemoteFileInfo(); +		RemoteFileInfo(const QString &name, FileType type, const QDateTime &mtime, qint64 size); + +		inline qint64 size() const { +			return _size; +		} + +		inline QDateTime lastModified() const { +			return _mtime; +		} + +		inline QString name() const { +			return _name; +		} + +		inline bool isDir() const { +			return _type == FILE_TYPE_DIR; +		} + +	private: +		QString _name; +		FileType _type; +		QDateTime _mtime; +		qint64 _size; +	}; +	typedef QList<RemoteFileInfo> RemoteFileInfoList; + +	class ActionInfo { +	public: +		ActionInfo(FileAction action, const QFileInfo& fileInfo); + +		inline FileAction action() const { +			return _action; +		} + +		inline QFileInfo fileInfo() const { +			return _info; +		} + +	private: +		FileAction _action; +		QFileInfo _info; +	}; +	typedef QList<ActionInfo> ActionInfoList; + +	class RemoteActionInfo { +	public: +		RemoteActionInfo(FileAction action, const QString& name, FileType type, const QDateTime& mtime, qint64 size); + +		inline FileAction action() const { +			return _action; +		} + +		inline RemoteFileInfo fileInfo() const { +			return _info; +		} + +	private: +		FileAction _action; +		RemoteFileInfo _info; +	}; +	typedef QList<RemoteActionInfo> RemoteActionInfoList; + +protected: +	void sendMessage(MessageType msg, const QByteArray& data = QByteArray()); +	virtual void handleMessage(MessageType msg, const QByteArray& data) = 0; + +	int inBufRequiredData() const; + +	bool equalDateTime(const QDateTime& dt1, const QDateTime& dt2); +	void setLocalFileDateTime(const QString &path, const QDateTime& dt); + +	QString wireToLocalPath(const QString& path); +	QString localToWirePath(const QString& path); + +	QString wireParentPath(const QString& path); +	QString findExistingCommonAncestor(const QString& path, +	                                   const QHash<QString, QFileInfo>& local_files, +	                                   const QHash<QString, RemoteFileInfo>& remote_files); +	static bool lessPathDepthThan(const QString& s1, const QString& s2); +	static bool morePathDepthThan(const QString& s1, const QString& s2); + +	static QFileInfoList scanFiles(const QDir& dir); +	QByteArray encodeFileInfoList(const QFileInfoList& list); +	RemoteFileInfoList decodeFileInfoList(const QByteArray& ba); + +	QByteArray encodeActionInfoList(const ActionInfoList& list); +	RemoteActionInfoList decodeActionInfoList(const QByteArray& ba); +	static bool lessPathDepthThan(const ActionInfo& a1, const ActionInfo& a2); +	static bool morePathDepthThan(const ActionInfo& a1, const ActionInfo& a2); + +	QByteArray encodeFileName(const QString& wire_path); +	QString decodeFileName(const QByteArray& ba); + +	QByteArray encodeFileNameItem(const QString& wire_path); +	QString decodeFileNameItem(const QByteArray& ba, int* length); + +protected: +	QDir _dir; +	QString _subPath; +	SyncFlags _flags; +	QSslSocket *_socket; +	QByteArray _inBuf; + +private slots: +	void handleDataAvailable(); +	void handleSslErrors(const QList<QSslError>& errors); +	void handleDisconnected(); +}; + +Q_DECLARE_OPERATORS_FOR_FLAGS(Agent::SyncFlags) + +#endif // AGENT_H diff --git a/distfoldd/clientagent.cc b/distfoldd/clientagent.cc new file mode 100644 index 0000000..f77a21a --- /dev/null +++ b/distfoldd/clientagent.cc @@ -0,0 +1,174 @@ +#include <QtCore/QDebug> + +#include "clientagent.h" + +ClientAgent::ClientAgent(const QHostAddress& addr, uint port, const QDir& local_dir, SyncFlags flags, QObject *parent) : +    Agent(new QSslSocket, local_dir, flags, parent) +{ +	qDebug() << "Starting client agent at" << QDateTime::currentDateTime(); +	_socket->setParent(this); // Can't set parent until QObject constructed +	_socket->connectToHostEncrypted(addr.toString(), port); +	sendMessage(MSG_HELLO); +	_state = STATE_HELLO; +} + +void ClientAgent::handleMessage(MessageType msg, const QByteArray &data) +{ +	qDebug() << "Client::handleMessage" << msg << data.size(); +	switch (msg) { +	case MSG_HELLO_REPLY: +		Q_ASSERT(_state == STATE_HELLO); +		qDebug() << "Hello reply"; +		_state = STATE_FILE_LIST; +		sendFileList(); +		break; +	case MSG_FILE_ACTIONS_REPLY: +		Q_ASSERT(_state == STATE_FILE_LIST); +		handleActionInfoList(decodeActionInfoList(data)); +		break; +	case MSG_PULL_FILE_REPLY: +		Q_ASSERT(_state == STATE_FILE_ACTIONS && !_pendingActions.empty()); +		handlePulledFile(data); +		break; +	default: +		qWarning() << "Unknown message"; +		break; +	} +} + +void ClientAgent::sendFileList() +{ +	QFileInfoList list = scanFiles(QDir(wireToLocalPath(_subPath))); +	sendMessage(MSG_FILE_LIST, encodeFileInfoList(list)); +} + +void ClientAgent::handleActionInfoList(const RemoteActionInfoList &list) +{ +	foreach (const RemoteActionInfo& info, list) { +		switch (info.action()) { +		case ACTION_NONE: +			qDebug() << " = " << info.fileInfo().name(); +			break; +		case ACTION_PULL: +			qDebug() << " < " << info.fileInfo().name(); +			break; +		case ACTION_PULL_METADATA: +			qDebug() << " <m" << info.fileInfo().name(); +			break; +		case ACTION_PULL_DELETE: +			qDebug() << " <d" << info.fileInfo().name(); +			break; +		case ACTION_PUSH: +			qDebug() << " > " << info.fileInfo().name(); +			break; +		case ACTION_PUSH_METADATA: +			qDebug() << " >m" << info.fileInfo().name(); +			break; +		case ACTION_PUSH_DELETE: +			qDebug() << " >d" << info.fileInfo().name(); +			break; +		} +	} + +	_pendingActions = list; +	_state = STATE_FILE_ACTIONS; +	executeNextAction(); +} + +void ClientAgent::executeNextAction() +{ +	if (_pendingActions.empty()) { +		qDebug() << "Done"; +		emit finished(); +		sendMessage(MSG_BYE); +		_socket->flush(); +		_socket->close(); +		return; +	} + +	qDebug() << "Remaining actions" << _pendingActions.count(); + +	const RemoteActionInfo& info = _pendingActions.first(); +	const QString wire_path = info.fileInfo().name(); +	const QString local_path = wireToLocalPath(wire_path); +	switch (info.action()) { +	case ACTION_PULL: +		if (_flags & SYNC_READ_ONLY) break; +		if (info.fileInfo().isDir()) { +			if (!QDir().mkpath(local_path)) { +				qWarning() << "Failed to create local path" << local_path; +			} +		} else { +			sendMessage(MSG_PULL_FILE, encodeFileName(wire_path)); +			return; // Wait for the pulled file message. +		} +		break; +	case ACTION_PULL_METADATA: +		if (_flags & SYNC_READ_ONLY) break; +		setLocalFileDateTime(local_path, info.fileInfo().lastModified()); +		break; +	case ACTION_PULL_DELETE: +		handleDeleteFile(wire_path); +		break; +	case ACTION_PUSH: +		handlePushFile(wire_path); +		break; +	case ACTION_PUSH_METADATA: +		sendMessage(MSG_PUSH_FILE_METADATA, +		            encodeFileInfoList(QFileInfoList() << QFileInfo(local_path))); +		break; +	case ACTION_PUSH_DELETE: +		sendMessage(MSG_DELETE_FILE, encodeFileName(wire_path)); +		break; +	default: +		qWarning() << "Unknown action" << info.action(); +		break; +	} +	_pendingActions.removeFirst(); +	executeNextAction(); +} + +void ClientAgent::handlePulledFile(const QByteArray &data) +{ +	const RemoteActionInfo& info = _pendingActions.takeFirst(); +	if (_flags & SYNC_READ_ONLY) return; +	QString local_path(wireToLocalPath(info.fileInfo().name())); +	QFile file(local_path); +	if (file.open(QIODevice::WriteOnly | QIODevice::Truncate)) { +		file.write(Compressor::decompress(data)); +		file.close(); +		setLocalFileDateTime(local_path, info.fileInfo().lastModified()); +	} else { +		qWarning() << "Failed to open" << file.fileName() << "for writing"; +	} +	executeNextAction(); +} + +void ClientAgent::handlePushFile(const QString &path) +{ +	QFile file(wireToLocalPath(path)); +	if (file.open(QIODevice::ReadOnly)) { +		QByteArray file_data = file.readAll(); +		if (_flags & SYNC_COMPRESS) file_data = Compressor::compress(file_data); +		QByteArray ba = encodeFileNameItem(path) + file_data; +		sendMessage(MSG_PUSH_FILE, ba); +	} else { +		qWarning() << "Failed to open file" << file.fileName() << "for reading"; +	} +} + +void ClientAgent::handleDeleteFile(const QString &wire_path) +{ +	if (_flags & SYNC_READ_ONLY) return; +	QFileInfo local(wireToLocalPath(wire_path)); +	QString local_path = local.absoluteFilePath(); +	if (local.isDir()) { +		if (!QDir().rmdir(local_path)) { +			qWarning() << "Failed to remove local dir" << local_path; +		} +	} else { +		if (!QDir().remove(local_path)) { +			qWarning() << "Failed to remove local file" << local_path; +		} +	} +} diff --git a/distfoldd/clientagent.h b/distfoldd/clientagent.h new file mode 100644 index 0000000..d54bc69 --- /dev/null +++ b/distfoldd/clientagent.h @@ -0,0 +1,37 @@ +#ifndef CLIENTAGENT_H +#define CLIENTAGENT_H + +#include <QtCore/QQueue> +#include <QtNetwork/QHostAddress> + +#include "agent.h" + +class ClientAgent : public Agent +{ +	Q_OBJECT +public: +	explicit ClientAgent(const QHostAddress& addr, uint port, const QDir& local_dir, SyncFlags flags, QObject *parent = 0); + +	enum State { +		STATE_HELLO, +		STATE_FILE_LIST, +		STATE_FILE_ACTIONS +	}; + +protected: +	void handleMessage(MessageType msg, const QByteArray &data); + +private: +	void sendFileList(); +	void handleActionInfoList(const RemoteActionInfoList& list); +	void executeNextAction(); +	void handlePulledFile(const QByteArray& data); +	void handlePushFile(const QString& wire_path); +	void handleDeleteFile(const QString& wire_path); + +private: +	State _state; +	RemoteActionInfoList _pendingActions; +}; + +#endif // CLIENTAGENT_H diff --git a/distfoldd/compressor.cc b/distfoldd/compressor.cc new file mode 100644 index 0000000..98eaa92 --- /dev/null +++ b/distfoldd/compressor.cc @@ -0,0 +1,94 @@ +#include <QtCore/QDebug> + +#include <zlib.h> +#include "compressor.h" + +Compressor::Compressor() +{ +} + +QByteArray Compressor::compress(const QByteArray& data) +{ +	if (data.isEmpty()) return data; + +	QByteArray in = data, out; +	z_stream strm; +	int ret; + +	memset(&strm, 0, sizeof(strm)); +	out.resize(qMax(in.size(), 1024)); + +	strm.avail_in = in.size(); +	strm.next_in = reinterpret_cast<Bytef*>(in.data()); +	strm.avail_out = out.size(); +	strm.next_out = reinterpret_cast<Bytef*>(out.data()); + +	ret = deflateInit(&strm, Z_DEFAULT_COMPRESSION); +	if (ret != Z_OK) { +		qWarning() << "deflateInit failed"; +		return data; +	} + +	do { +		if (strm.avail_out == 0) { +			int cur_size = out.size(); +			out.resize(cur_size * 2); +			strm.avail_out = cur_size; +			strm.next_out = reinterpret_cast<Bytef*>(&out.data()[cur_size]); +		} +		ret = deflate(&strm, Z_FINISH); +	} while (ret == Z_OK || ret == Z_BUF_ERROR); + +	Q_ASSERT(ret == Z_STREAM_END); +	Q_ASSERT(strm.avail_in == 0); +	if (strm.avail_out > 0) { +		out.resize(out.size() - strm.avail_out); +	} + +	deflateEnd(&strm); + +	return out; +} + +QByteArray Compressor::decompress(const QByteArray& data) +{ +	if (data.isEmpty()) return data; + +	QByteArray in = data, out; +	z_stream strm; +	int ret; + +	memset(&strm, 0, sizeof(strm)); +	out.resize(qMax(static_cast<int>(in.size() * 1.5), 1024)); + +	strm.avail_in = in.size(); +	strm.next_in = reinterpret_cast<Bytef*>(in.data()); +	strm.avail_out = out.size(); +	strm.next_out = reinterpret_cast<Bytef*>(out.data()); + +	ret = inflateInit(&strm); +	if (ret != Z_OK) { +		qWarning() << "inflateInit failed"; +		return data; +	} + +	do { +		if (strm.avail_out == 0) { +			int cur_size = out.size(); +			out.resize(cur_size * 2); +			strm.avail_out = cur_size; +			strm.next_out = reinterpret_cast<Bytef*>(&out.data()[cur_size]); +		} +		ret = inflate(&strm, Z_FINISH); +	} while (ret == Z_OK || ret == Z_BUF_ERROR); + +	Q_ASSERT(ret == Z_STREAM_END); +	Q_ASSERT(strm.avail_in == 0); +	if (strm.avail_out > 0) { +		out.resize(out.size() - strm.avail_out); +	} + +	inflateEnd(&strm); + +	return out; +} diff --git a/distfoldd/compressor.h b/distfoldd/compressor.h new file mode 100644 index 0000000..6ab8e13 --- /dev/null +++ b/distfoldd/compressor.h @@ -0,0 +1,16 @@ +#ifndef COMPRESSOR_H +#define COMPRESSOR_H + +#include <QtCore/QByteArray> + +class Compressor +{ +private: +	Compressor(); + +public: +	static QByteArray compress(const QByteArray& data); +	static QByteArray decompress(const QByteArray& data); +}; + +#endif // COMPRESSOR_H diff --git a/distfoldd/discoverer.cc b/distfoldd/discoverer.cc new file mode 100644 index 0000000..629a955 --- /dev/null +++ b/distfoldd/discoverer.cc @@ -0,0 +1,195 @@ +#include <QtCore/QDebug> + +#include "discoverer.h" + +Discoverer::Discoverer(const QUuid &uuid, uint port, const QString &serviceName, QObject *parent) : +    QObject(parent), _folderUuid(uuid), _hostUuid(QUuid::createUuid()), +    _serviceName(serviceName), _port(port), +    _mtime(), _mtimeChanged(false), +    _receiver(new QUdpSocket(this)), _sender(new QUdpSocket(this)), +    _netConfig(new QNetworkConfigurationManager(this)), +    _netInfo(new QSystemNetworkInfo(this)), +    _timer(new QSystemAlignedTimer(this)), _fallbackTimer(new QTimer(this)) +{ +	connect(_receiver, SIGNAL(readyRead()), SLOT(handleDataAvailable())); +	if (!_receiver->bind(servicePort, QUdpSocket::ShareAddress)) { +		qWarning() << "Failed to bind to discoverer port" << servicePort; +	} +	connect(_netConfig, SIGNAL(onlineStateChanged(bool)), SLOT(handleOnlineStateChanged(bool))); +	connect(_timer, SIGNAL(timeout()), SLOT(handleTimerTimeout())); +	connect(_fallbackTimer, SIGNAL(timeout()), SLOT(handleTimerTimeout())); +	connect(_timer, SIGNAL(error(QSystemAlignedTimer::AlignedTimerError)), SLOT(handleTimerError())); +	_timer->setSingleShot(true); +	_fallbackTimer->setSingleShot(true); +	if (_netConfig->isOnline() || +	    _netConfig->allConfigurations(QNetworkConfiguration::Defined).empty()) { +		// Either only, or _netConfig has no configs so it is useless. +		qDebug() << "Start timer"; +		_timer->start(0, activeBroadcastInterval); +		if (_timer && _timer->lastError() != QSystemAlignedTimer::NoError) { +			qDebug() << "QSystemAlignedTimer broken at start"; +			delete _timer; +			_timer = 0; +			_fallbackTimer->start(activeBroadcastInterval * 1000); +		} +	} else { +		qDebug() << "I am NOT online"; +	} +} + +void Discoverer::setLastModifiedTime(const QDateTime& dateTime) +{ +	_mtime = dateTime; +	if (_knownHosts.empty()) { +		// Idle mode: do nothing. +	} else { +		// Active mode +		switchToActiveMode(); +	} +	_mtimeChanged = true; +} + +QByteArray Discoverer::encodeMessage() const +{ +	QByteArray ba; +	QByteArray hostUuid = _hostUuid.toString().toAscii(); +	QByteArray folderUuid = _folderUuid.toString().toAscii(); +	QByteArray serviceNameUtf8 = _serviceName.toUtf8(); +	Message msg; + +	Q_ASSERT(hostUuid.length() == sizeof(msg.hostUuid)); +	Q_ASSERT(folderUuid.length() == sizeof(msg.folderUuid)); + +	ba.reserve(sizeof(msg) + serviceNameUtf8.size()); + +	msg.mtime = _mtime.toTime_t(); +	strncpy(msg.hostUuid, hostUuid.data(), sizeof(msg.hostUuid)); +	msg.port = _port; +	strncpy(msg.folderUuid, folderUuid.data(), sizeof(msg.folderUuid)); + +	ba.append(reinterpret_cast<char*>(&msg), sizeof(msg)); +	ba.append(serviceNameUtf8); +	return ba; +} + +void Discoverer::switchToActiveMode() +{ +	if (_timer) { +		if (!_timer->isActive() || _timer->maximumInterval() > activeBroadcastInterval) { +			if (_timer->isActive()) _timer->wokeUp(); +			_timer->start(0, activeBroadcastInterval); +		} +	} else if (!_fallbackTimer->isActive() || _fallbackTimer->interval() > activeBroadcastInterval * 1000) { +		_fallbackTimer->start(activeBroadcastInterval * 1000); +	} +} + +void Discoverer::broadcastMessage() +{ +	_sender->writeDatagram(encodeMessage(), QHostAddress::Broadcast, servicePort); +	_lastBroadcast = QDateTime::currentDateTime(); +	_mtimeChanged = false; +	qDebug() << "Broadcast message at" << _lastBroadcast; +} + +void Discoverer::handleDataAvailable() +{ +	while (_receiver->hasPendingDatagrams()) { +		QByteArray data; +		QHostAddress sender; +		data.resize(_receiver->pendingDatagramSize()); +		_receiver->readDatagram(data.data(), data.size(), &sender); + +		handleBroadcastMessage(sender, data); +	} +} + +void Discoverer::handleBroadcastMessage(const QHostAddress& sender, const QByteArray &data) +{ +	const char *dataPtr = data.data(); +	const Message *msg = reinterpret_cast<const Message*>(dataPtr); +	QString serviceName = QString::fromUtf8(&dataPtr[sizeof(Message)]); +	char uuidData[sizeof(msg->hostUuid)]; + +	strncpy(uuidData, msg->hostUuid, sizeof(uuidData)); +	QUuid hostUuid(QString::fromAscii(uuidData, sizeof(uuidData))); +	strncpy(uuidData, msg->folderUuid, sizeof(uuidData)); +	QUuid folderUuid(QString::fromAscii(uuidData, sizeof(uuidData))); + +	if (hostUuid.isNull() || folderUuid.isNull()) { +		qWarning() << "Null uuid"; +		return; +	} +	if (hostUuid == _hostUuid) { +		// A message from myself, ignore +		return; +	} +	if (folderUuid != _folderUuid) { +		// A message not about the same folder, ignore +		return; +	} + +	qDebug() << "Got message from" << sender << msg->port; +	handleHostSeen(hostUuid); + +	QDateTime mtime = QDateTime::fromTime_t(msg->mtime); +	qDebug() << mtime << _mtime << _mtime.secsTo(mtime); +	if (_mtime.secsTo(mtime) > dateTimeCompareMinDelta) { +		// We are outdated, let's try to connect to them. +		emit foundMoreRecentHost(sender, msg->port, mtime); +	} +} + +void Discoverer::handleHostSeen(const QUuid &hostUuid) +{ +	bool host_previously_seen = _knownHosts.contains(hostUuid); +	_knownHosts[hostUuid].lastSeen = QDateTime::currentDateTime(); +	if (!host_previously_seen || _mtimeChanged) { +		switchToActiveMode(); +	} +} + +void Discoverer::handleTimerTimeout() +{ +	qDebug() << "Timer"; +	broadcastMessage(); +	QDateTime now = QDateTime::currentDateTime(); +	QHash<QUuid, HostInfo>::iterator i = _knownHosts.begin(); +	while (i != _knownHosts.end()) { +		if (i.value().lastSeen.secsTo(now) > lostHostTimeout) { +			i = _knownHosts.erase(i); +		} else { +			++i; +		} +	} +	qDebug() << "Known hosts" << _knownHosts.size(); +	if (_timer) { +		_timer->start(idleBroadcastInterval * 0.7f, idleBroadcastInterval * 1.3f); +	} else { +		_fallbackTimer->start(idleBroadcastInterval * 1000); +	} +} + +void Discoverer::handleTimerError() +{ +	qDebug() << "SystemAlignedTimer broken"; +	_timer->deleteLater(); +	_timer = 0; +	_fallbackTimer->start(activeBroadcastInterval * 1000); +} + +void Discoverer::handleOnlineStateChanged(bool online) +{ +	if (online) { +		qDebug() << "Online"; +		if (_netInfo->currentMode() == QSystemNetworkInfo::UnknownMode || +		    _netInfo->currentMode() == QSystemNetworkInfo::WlanMode) { +			switchToActiveMode(); +		} +	} else { +		qDebug() << "Offline"; +		_knownHosts.clear(); +		_fallbackTimer->stop(); +		if (_timer) _timer->stop(); +	} +} diff --git a/distfoldd/discoverer.h b/distfoldd/discoverer.h new file mode 100644 index 0000000..ae106de --- /dev/null +++ b/distfoldd/discoverer.h @@ -0,0 +1,82 @@ +#ifndef DISCOVERER_H +#define DISCOVERER_H + +#include <QtCore/QTimer> +#include <QtCore/QUuid> +#include <QtCore/QDateTime> +#include <QtNetwork/QNetworkConfigurationManager> +#include <QtNetwork/QHostAddress> +#include <QtNetwork/QUdpSocket> +#include <QtSystemInfo/QSystemNetworkInfo> +#include <QtSystemInfo/QSystemAlignedTimer> + +QTM_USE_NAMESPACE + +class Discoverer : public QObject +{ +	Q_OBJECT + +public: +	Discoverer(const QUuid& uuid, uint port, const QString& serviceName, QObject *parent = 0); +	 +	static const uint servicePort = 17451; +	static const int dateTimeCompareMinDelta = 5; + +	// In seconds +	static const int idleBroadcastInterval = 15 * 60; +	static const int activeBroadcastInterval = 5; +	static const int lostHostTimeout =  idleBroadcastInterval * 1.4f; + +signals: +	void foundMoreRecentHost(const QHostAddress& address, uint port, const QDateTime& dateTime); + +public slots: +	void setLastModifiedTime(const QDateTime& dateTime); + +private: +#pragma pack(push) +#pragma pack(1) +	struct Message { +		quint32 mtime; +		char hostUuid[38]; +		quint16 port; +		char folderUuid[38]; +		char name[]; +	}; +#pragma pack() +	struct HostInfo { +		QDateTime lastSeen; +	}; + +	QByteArray encodeMessage() const; + +	void switchToActiveMode(); + +private slots: +	void broadcastMessage(); +	void handleDataAvailable(); +	void handleBroadcastMessage(const QHostAddress& sender, const QByteArray &data); +	void handleHostSeen(const QUuid& hostUuid); +	void handleTimerTimeout(); +	void handleTimerError(); +	void handleOnlineStateChanged(bool online); + +private: +	QUuid _folderUuid; +	QUuid _hostUuid; +	QString _serviceName; +	uint _port; +	QDateTime _mtime; +	bool _mtimeChanged; +	QUdpSocket *_receiver; +	QUdpSocket *_sender; +	quint32 _myId; +	QNetworkConfigurationManager *_netConfig; +	QSystemNetworkInfo *_netInfo; +	QDateTime _lastBroadcast; +	QSystemAlignedTimer *_timer; +	QTimer *_fallbackTimer; +	QHash<QUuid, HostInfo> _knownHosts; +}; + +#endif // DISCOVERER_H diff --git a/distfoldd/distfoldd.pro b/distfoldd/distfoldd.pro new file mode 100644 index 0000000..3eb804e --- /dev/null +++ b/distfoldd/distfoldd.pro @@ -0,0 +1,35 @@ +TARGET   = distfoldd +TEMPLATE = app +CONFIG   += console +CONFIG   -= app_bundle + +QT       += core network +QT       -= gui + +CONFIG   += mobility +MOBILITY += systeminfo + +SOURCES += main.cc \ +    distfolder.cc \ +    server.cc \ +    watcher.cc \ +    clientagent.cc \ +    serveragent.cc \ +    agent.cc \ +    discoverer.cc \ +    compressor.cc + +HEADERS += \ +    distfolder.h \ +    server.h \ +    watcher.h \ +    clientagent.h \ +    serveragent.h \ +    agent.h \ +    discoverer.h \ +    compressor.h + +contains(MEEGO_EDITION,harmattan) { +    target.path = /opt/distfold/bin +    INSTALLS += target +} diff --git a/distfoldd/distfolder.cc b/distfoldd/distfolder.cc new file mode 100644 index 0000000..211aafe --- /dev/null +++ b/distfoldd/distfolder.cc @@ -0,0 +1,200 @@ +#include <QtCore/QStringList> + +#include "clientagent.h" +#include "serveragent.h" +#include "distfolder.h" + +DistFolder::DistFolder(const QUuid& uuid, const QString& localPath, QObject *parent) : +    QObject(parent), _uuid(uuid), _localPath(localPath), +    _mtime(), _mtimeChanged(false), +    _watcher(new Watcher(localPath, this)), +    _server(new Server(this)), +    _discoverer(new Discoverer(uuid, _server->serverPort(), QString("Files on %1").arg(_localPath.canonicalPath()), this)), +    _syncFlags(Agent::SYNC_NORMAL), +    _numAgents(0) +{ +	Q_ASSERT(_localPath.isReadable()); +	connect(_watcher, SIGNAL(pathAdded(QString)), SLOT(handlePathAdded(QString))); +	connect(_watcher, SIGNAL(pathChanged(QString)), SLOT(handlePathChanged(QString))); +	connect(_watcher, SIGNAL(pathRemoved(QString)), SLOT(handlePathRemoved(QString))); +	updateLastModTime(); +	connect(_discoverer, SIGNAL(foundMoreRecentHost(QHostAddress,uint,QDateTime)), +	        SLOT(handleMoreRecentHost(QHostAddress,uint,QDateTime))); +	connect(_server, SIGNAL(newConnection()), SLOT(handleNewConnection())); +	qDebug() << "Ready Folder UUID:" << _uuid; +} + +bool DistFolder::readOnlySync() const +{ +	return _syncFlags & Agent::SYNC_READ_ONLY; +} + +void DistFolder::setReadOnlySync(bool read_only) +{ +	if (read_only) { +		qDebug() << "Enabling read only sync"; +		_syncFlags |= Agent::SYNC_READ_ONLY; +	} else { +		_syncFlags &= ~Agent::SYNC_READ_ONLY; +	} +} + +bool DistFolder::pullMode() const +{ +	return _syncFlags & Agent::SYNC_PULL; +} + +void DistFolder::setPullMode(bool pull_mode) +{ +	if (pull_mode) { +		qDebug() << "Enabling pull sync"; +		_syncFlags |= Agent::SYNC_PULL; +	} else { +		_syncFlags &= ~Agent::SYNC_PULL; +	} +	updateLastModTime(); +} + +bool DistFolder::compress() const +{ +	return _syncFlags & Agent::SYNC_COMPRESS; +} + +void DistFolder::setCompress(bool compress) +{ +	if (compress) { +		qDebug() << "Enabling compression"; +		_syncFlags |= Agent::SYNC_COMPRESS; +	} else { +		_syncFlags &= ~Agent::SYNC_COMPRESS; +	} +} + +QDateTime DistFolder::scanLastModTime() +{ +	return scanLastModTime(_localPath); +} + +void DistFolder::updateLastModTime(const QDateTime& dt) +{ +	QDateTime curMtime = _mtime; +	if (pullMode()) { +		_mtime = QDateTime::fromTime_t(0); // Force being the puller for the next sync +	} else { +		if (dt.isNull()) { +			_mtime = scanLastModTime(); +		} else if (dt > _mtime) { +			_mtime = dt; +		} +	} +	if (_mtime != curMtime) { +		if (_numAgents > 0) { +			_mtimeChanged = true; +		} else { +			_discoverer->setLastModifiedTime(_mtime); +			_mtimeChanged = false; +		} +	} +} + +QDateTime DistFolder::scanLastModTime(const QDir& dir) +{ +	QFileInfoList list = dir.entryInfoList(QDir::Dirs | QDir::Files | QDir::NoDotAndDotDot); +	QDateTime max = QFileInfo(dir.absolutePath()).lastModified(); + +	foreach (const QFileInfo &info, list) { +		QDateTime mtime; +		if (info.isDir()) { +			mtime = scanLastModTime(QDir(info.absoluteFilePath())); +		} else { +			mtime = info.lastModified(); +		} +		if (mtime > max) { +			max = mtime; +		} +	} + +	return max; +} + +void DistFolder::handleNewConnection() +{ +	qDebug() << "Incoming connection"; +	ServerAgent *agent = +	        new ServerAgent(static_cast<QSslSocket*>(_server->nextPendingConnection()), +	                        _localPath, _syncFlags, this); +	connect(agent, SIGNAL(destroyed()), SLOT(handleDestroyedAgent())); +	_numAgents++; +	qDebug() << "Num agents" << _numAgents; +} + +void DistFolder::handlePathAdded(const QString &path) +{ +	QFileInfo info(path); +	qDebug() << "added: " << path; +	if (info.lastModified() > _mtime) { +		updateLastModTime(info.lastModified()); +	} +} + +void DistFolder::handlePathChanged(const QString &path) +{ +	QFileInfo info(path); +	qDebug() << "changed: " << path; +	if (info.lastModified() > _mtime) { +		updateLastModTime(info.lastModified()); +	} +} + +void DistFolder::handlePathRemoved(const QString &path) +{ +	QFileInfo info(path); +	qDebug() << "removed: " << path; +	// Find a parent that has not been removed +	const QString base_path = _localPath.absolutePath(); +	do { +		info = QFileInfo(info.absolutePath()); +	} while (!info.exists() && // Until it exists and as long as we are under the base dir. +	         info.absoluteFilePath().startsWith(base_path)); +	if (info.lastModified() > _mtime) { +		updateLastModTime(info.lastModified()); +	} +} + +void DistFolder::handleMoreRecentHost(const QHostAddress &address, uint port, const QDateTime &dateTime) +{ +	Q_UNUSED(dateTime); +	if (_numAgents == 0) { +		qDebug() << "Trying to connect to" << address.toString() << port; +		ClientAgent *agent = new ClientAgent(address, port, _localPath, _syncFlags, this); +		connect(agent, SIGNAL(destroyed()), SLOT(handleDestroyedAgent())); +		connect(agent, SIGNAL(finished()), SLOT(handleClientFinished())); +		_numAgents++; +		qDebug() << "Num agents" << _numAgents; +	} else { +		qDebug() << "Busy but a more recent host was found"; +		// TODO +	} +} + +void DistFolder::handleClientFinished() +{ +	if (pullMode()) { +		// Stop pulling once a succesful sync is done +		qDebug() << "Stopping pull mode"; +		setPullMode(false); +	} +} + +void DistFolder::handleDestroyedAgent() +{ +	_numAgents--; +	qDebug() << "Num agents" << _numAgents; +	if (_numAgents == 0) { +		// All sync agents have finished +		if (_mtimeChanged) { +			_discoverer->setLastModifiedTime(_mtime); +			_mtimeChanged = false; +		} +	} +} diff --git a/distfoldd/distfolder.h b/distfoldd/distfolder.h new file mode 100644 index 0000000..ae7a0ea --- /dev/null +++ b/distfoldd/distfolder.h @@ -0,0 +1,57 @@ +#ifndef DISTFOLDER_H +#define DISTFOLDER_H + +#include <QtCore/QObject> +#include <QtCore/QDir> +#include <QtCore/QUuid> +#include <QtCore/QFileSystemWatcher> + +#include "watcher.h" +#include "server.h" +#include "agent.h" +#include "discoverer.h" + +class DistFolder : public QObject +{ +	Q_OBJECT +	Q_PROPERTY(bool readOnlySync READ readOnlySync WRITE setReadOnlySync) +	Q_PROPERTY(bool pullMode READ pullMode WRITE setPullMode) +	Q_PROPERTY(bool compress READ compress WRITE setCompress) + +public: +	explicit DistFolder(const QUuid& uuid, const QString& path, QObject *parent = 0); + +	bool readOnlySync() const; +	void setReadOnlySync(bool read_only); +	bool pullMode() const; +	void setPullMode(bool pull_mode); +	bool compress() const; +	void setCompress(bool compress); + +private: +	QDateTime scanLastModTime(); +	QDateTime scanLastModTime(const QDir& dir); +	void updateLastModTime(const QDateTime& dt = QDateTime()); + +private slots: +	void handleNewConnection(); +	void handlePathAdded(const QString& path); +	void handlePathChanged(const QString& path); +	void handlePathRemoved(const QString& path); +	void handleMoreRecentHost(const QHostAddress& address, uint port, const QDateTime& dateTime); +	void handleClientFinished(); +	void handleDestroyedAgent(); + +private: +	QUuid _uuid; +	QDir _localPath; +	QDateTime _mtime; +	bool _mtimeChanged; +	Watcher *_watcher; +	Server *_server; +	Discoverer *_discoverer; +	Agent::SyncFlags _syncFlags; +	int _numAgents; +}; + +#endif // DISTFOLDER_H diff --git a/distfoldd/main.cc b/distfoldd/main.cc new file mode 100644 index 0000000..fbecf5a --- /dev/null +++ b/distfoldd/main.cc @@ -0,0 +1,28 @@ +#include <QtCore/QCoreApplication> +#include <QtCore/QSettings> +#include <QtCore/QDebug> + +#include "distfolder.h" + +int main(int argc, char *argv[]) +{ +	QCoreApplication a(argc, argv); +	a.setOrganizationName("distfold"); +	a.setOrganizationDomain("com.javispedro.distfold"); +	a.setApplicationName("distfoldd"); +	a.setApplicationVersion("0.1"); + +	QSettings settings; +	foreach (const QString& group, settings.childGroups()) { +		settings.beginGroup(group); +		QUuid uuid(settings.value("uuid").toString()); +		QString path = settings.value("path").toString(); +		DistFolder *f = new DistFolder(uuid, path); +		f->setReadOnlySync(settings.value("ro", false).toBool()); +		f->setPullMode(settings.value("pull", false).toBool()); +		f->setCompress(settings.value("compress", true).toBool()); +		settings.endGroup(); +	} + +	return a.exec(); +} diff --git a/distfoldd/server.cc b/distfoldd/server.cc new file mode 100644 index 0000000..0361466 --- /dev/null +++ b/distfoldd/server.cc @@ -0,0 +1,56 @@ +#include <QtCore/QDebug> +#include <QtCore/QDir> +#include <QtNetwork/QSslSocket> + +#include "server.h" + +Server::Server(QObject *parent) : +    QTcpServer(parent) +{ +	loadKeys(); +	if (!listen()) { +		qWarning() << "Failed to start server socket"; +	} +} + +void Server::loadKeys() +{ +	QDir config_dir(QDir::home().absoluteFilePath(".config/distfold")); +	QFile cert_file(config_dir.absoluteFilePath("server.crt")); +	if (cert_file.open(QIODevice::ReadOnly)) { +		_cert = QSslCertificate(&cert_file, QSsl::Pem); +		cert_file.close(); +	} +	if (_cert.isNull()) { +		qWarning() << "Could not load server certificate"; +	} +	QFile key_file(config_dir.absoluteFilePath("server.key")); +	if (key_file.open(QIODevice::ReadOnly)) { +		_key = QSslKey(&key_file, QSsl::Rsa, QSsl::Pem); +		key_file.close(); +	} +	if (_key.isNull()) { +		qWarning() << "Could not load private key"; +	} +} + +void Server::incomingConnection(int socketDescriptor) +{ +	QSslSocket *socket = new QSslSocket(this); +	connect(socket, SIGNAL(error(QAbstractSocket::SocketError)), +	        SLOT(handleSocketError(QAbstractSocket::SocketError))); +	if (socket->setSocketDescriptor(socketDescriptor)) { +		socket->setLocalCertificate(_cert); +		socket->setPrivateKey(_key); +		socket->startServerEncryption(); +		addPendingConnection(socket); +	} else { +		delete socket; +	} +} + +void Server::handleSocketError(QAbstractSocket::SocketError error) +{ +	QSslSocket *socket = qobject_cast<QSslSocket*>(sender()); +	qDebug() << "Server socket error:" << socket->error() << socket->errorString(); +} diff --git a/distfoldd/server.h b/distfoldd/server.h new file mode 100644 index 0000000..0ed9100 --- /dev/null +++ b/distfoldd/server.h @@ -0,0 +1,26 @@ +#ifndef SERVER_H +#define SERVER_H + +#include <QtNetwork/QTcpServer> +#include <QtNetwork/QSslCertificate> +#include <QtNetwork/QSslKey> + +class Server : public QTcpServer +{ +	Q_OBJECT +public: +	explicit Server(QObject *parent = 0); + +protected: +	void loadKeys(); +	void incomingConnection(int handle); + +private slots: +	void handleSocketError(QAbstractSocket::SocketError error); + +private: +	QSslCertificate _cert; +	QSslKey _key; +}; + +#endif // SERVER_H diff --git a/distfoldd/serveragent.cc b/distfoldd/serveragent.cc new file mode 100644 index 0000000..13e7848 --- /dev/null +++ b/distfoldd/serveragent.cc @@ -0,0 +1,225 @@ +#include <QtCore/QDebug> + +#include "serveragent.h" + +ServerAgent::ServerAgent(QSslSocket *socket, const QDir& local_dir, SyncFlags flags, QObject *parent) : +    Agent(socket, local_dir, flags, parent) +{ +	qDebug() << "Starting server agent at" << QDateTime::currentDateTime(); +} + +void ServerAgent::handleMessage(MessageType msg, const QByteArray& data) +{ +	qDebug() << "Server::handleMessage" << msg << data.size(); +	switch (msg) { +	case MSG_HELLO: +		sendMessage(MSG_HELLO_REPLY); +		break; +	case MSG_FILE_LIST: +		handleClientFileList(decodeFileInfoList(data)); +		break; +	case MSG_PULL_FILE: +		handlePullFile(decodeFileName(data)); +		break; +	case MSG_PUSH_FILE: +		handlePushedFile(data); +		break; +	case MSG_PUSH_FILE_METADATA: +		handlePushedMetadata(decodeFileInfoList(data)); +		break; +	case MSG_DELETE_FILE: +		handleDeleteFile(decodeFileName(data)); +		break; +	case MSG_BYE: +		qDebug() << "Got Bye"; +		emit finished(); +		_socket->close(); +		break; +	default: +		qWarning() << "Unknown message"; +		break; +	} +} + +void ServerAgent::handleClientFileList(const RemoteFileInfoList& list) +{ +	QFileInfoList files = scanFiles(QDir(wireToLocalPath(_subPath))); +	ActionInfoList actions_create_dirs, actions_with_files, actions_remove_dirs, actions_update_dirs; +	QHash<QString, QFileInfo> local_files; +	QHash<QString, RemoteFileInfo> remote_files; +	QSet<QString> new_local_files; + +	foreach (const QFileInfo& file, files) { +		const QString wire_path = localToWirePath(file.absoluteFilePath()); +		local_files[wire_path] = file; +		new_local_files.insert(wire_path); +	} +	foreach (const RemoteFileInfo& file, list) { +		remote_files[file.name()] = file; +	} + +	foreach (const RemoteFileInfo& remote, list) { +		// Main synchronization logic goes here +		QString wire_path = remote.name(); +		qDebug() << "Server Handling file" << wire_path; +		new_local_files.remove(wire_path); +		if (local_files.contains(wire_path)) { +			// Both remote and local have the file +			QFileInfo local = local_files[wire_path]; +			QString parent_name = wireParentPath(wire_path); +			QFileInfo parent_local = local_files[parent_name]; +			RemoteFileInfo parent_remote = remote_files[parent_name]; +			Q_ASSERT(local.exists()); +			if (remote.isDir() && local.isDir()) { +				if (equalDateTime(remote.lastModified(), local.lastModified())) { +					if (local.lastModified().toTime_t() == 0) { +						// Workaround an issue if the local folder is a mountpoint +						actions_update_dirs += ActionInfo(ACTION_PULL_METADATA, local); +					} +				     // Nothing to do +				} else if (remote.lastModified() > local.lastModified()) { +					actions_update_dirs += ActionInfo(ACTION_PUSH_METADATA, local); +				} else { +					actions_update_dirs += ActionInfo(ACTION_PULL_METADATA, local); +				} +			} else if (remote.isDir() == local.isDir() && +			    equalDateTime(remote.lastModified(), local.lastModified()) && +			    remote.size() == local.size()) { +				// Nothing to do +			} else if (remote.isDir() != local.isDir()) { +				// Dir to file transformation, delete the most recent file +				if (parent_remote.lastModified() > parent_local.lastModified()) { +					if (local.isDir()) { +						actions_remove_dirs += ActionInfo(ACTION_PUSH_DELETE, local); +					} else { +						actions_with_files += ActionInfo(ACTION_PUSH_DELETE, local); +					} +				} else { +					if (remote.isDir()) { +						actions_remove_dirs += ActionInfo(ACTION_PULL_DELETE, local); +					} else { +						actions_with_files += ActionInfo(ACTION_PULL_DELETE, local); +					} +				} +			} else if (remote.lastModified() > local.lastModified()) { +				if (remote.isDir()) { +					actions_create_dirs += ActionInfo(ACTION_PUSH, local); +				} else { +					actions_with_files += ActionInfo(ACTION_PUSH, local); +				} +			} else { +				if (remote.isDir()) { +					actions_create_dirs += ActionInfo(ACTION_PULL, local); +				} else { +					actions_with_files += ActionInfo(ACTION_PULL, local); +				} +			} +		} else { +			// File is in remote, but not in local +			qDebug() << " file deleted"; +			QString parent_name = findExistingCommonAncestor(wire_path, local_files, remote_files); +			QFileInfo local(wireToLocalPath(wire_path)); // Create invalid QFileInfo +			QFileInfo parent_local = local_files[parent_name]; +			RemoteFileInfo parent_remote = remote_files[parent_name]; +			if (parent_remote.lastModified() > parent_local.lastModified()) { +				if (remote.isDir()) { +					actions_create_dirs += ActionInfo(ACTION_PUSH, local); +					actions_update_dirs += ActionInfo(ACTION_PUSH_METADATA, local); +				} else { +					actions_with_files += ActionInfo(ACTION_PUSH, local); +				} +			} else { +				if (remote.isDir()) { +					actions_remove_dirs += ActionInfo(ACTION_PULL_DELETE, local); +				} else { +					actions_with_files += ActionInfo(ACTION_PULL_DELETE, local); +				} +			} +		} +	} + +	foreach (const QString& path, new_local_files) { +		// File is in local, but not in remote +		QString parent_name = findExistingCommonAncestor(path, local_files, remote_files); +		QFileInfo local = local_files[path]; +		QFileInfo parent_local = local_files[parent_name]; +		RemoteFileInfo parent_remote = remote_files[parent_name]; +		if (parent_remote.lastModified() > parent_local.lastModified()) { +			if (local.isDir()) { +				actions_remove_dirs += ActionInfo(ACTION_PUSH_DELETE, local); +			} else { +				actions_with_files += ActionInfo(ACTION_PUSH_DELETE, local); +			} +		} else { +			if (local.isDir()) { +				actions_create_dirs += ActionInfo(ACTION_PULL, local); +				actions_update_dirs += ActionInfo(ACTION_PULL_METADATA, local); +			} else { +				actions_with_files += ActionInfo(ACTION_PULL, local); +			} +		} +	} + +	qSort(actions_create_dirs.begin(), actions_create_dirs.end(), +	      static_cast<bool (*)(const ActionInfo&, const ActionInfo&)>(lessPathDepthThan)); +	qSort(actions_remove_dirs.begin(), actions_remove_dirs.end(), +	      static_cast<bool (*)(const ActionInfo&, const ActionInfo&)>(morePathDepthThan)); +	qSort(actions_update_dirs.begin(), actions_update_dirs.end(), +	      static_cast<bool (*)(const ActionInfo&, const ActionInfo&)>(morePathDepthThan)); + +	ActionInfoList actions = actions_create_dirs + +	        actions_with_files + actions_remove_dirs + +	        actions_update_dirs; +	sendMessage(MSG_FILE_ACTIONS_REPLY, encodeActionInfoList(actions)); +} + +void ServerAgent::handlePullFile(const QString &path) +{ +	QFile file(wireToLocalPath(path)); +	if (file.open(QIODevice::ReadOnly)) { +		QByteArray ba = file.readAll(); +		if (_flags & SYNC_COMPRESS) { +			int old_size = ba.size(); +			ba = Compressor::compress(ba); +			qDebug() << "Compress" << old_size << "->" << ba.size(); +		} +		file.close(); +		sendMessage(MSG_PULL_FILE_REPLY, ba); +	} else { +		qWarning() << "Failed to open file" << file.fileName() << "for reading"; +	} +} + +void ServerAgent::handlePushedFile(const QByteArray &ba) +{ +	if (_flags & SYNC_READ_ONLY) return; +	int len; +	QString wire_path = decodeFileNameItem(ba, &len); +	QByteArray file_data = Compressor::decompress(ba.mid(len)); + +	QFile file(wireToLocalPath(wire_path)); +	if (file.open(QIODevice::WriteOnly | QIODevice::Truncate)) { +		file.write(file_data); +		file.close(); +	} else { +		qWarning() << "Failed to open file" << file.fileName() << "for writing"; +	} +} + +void ServerAgent::handlePushedMetadata(const RemoteFileInfoList& list) +{ +	if (_flags & SYNC_READ_ONLY) return; +	foreach (const RemoteFileInfo& remote, list) { +		QString local_path = wireToLocalPath(remote.name()); +		setLocalFileDateTime(local_path, remote.lastModified()); +	} +} + +void ServerAgent::handleDeleteFile(const QString &path) +{ +	if (_flags & SYNC_READ_ONLY) return; +	QString local_path = wireToLocalPath(path); +	if (!QDir().remove(local_path)) { +		qWarning() << "Failed to remove file" << local_path; +	} +} diff --git a/distfoldd/serveragent.h b/distfoldd/serveragent.h new file mode 100644 index 0000000..c692e60 --- /dev/null +++ b/distfoldd/serveragent.h @@ -0,0 +1,23 @@ +#ifndef SERVERAGENT_H +#define SERVERAGENT_H + +#include "agent.h" + +class ServerAgent : public Agent +{ +	Q_OBJECT +public: +	explicit ServerAgent(QSslSocket *socket, const QDir& local_dir, SyncFlags flags, QObject *parent = 0); + +protected: +	void handleMessage(MessageType msg, const QByteArray& data); + +private: +	void handleClientFileList(const RemoteFileInfoList& list); +	void handlePullFile(const QString& path); +	void handlePushedFile(const QByteArray& data); +	void handlePushedMetadata(const RemoteFileInfoList& list); +	void handleDeleteFile(const QString& path); +}; + +#endif // SERVERAGENT_H diff --git a/distfoldd/watcher.cc b/distfoldd/watcher.cc new file mode 100644 index 0000000..664ea58 --- /dev/null +++ b/distfoldd/watcher.cc @@ -0,0 +1,132 @@ +#include <QtCore/QDebug> + +#include <sys/errno.h> +#include <sys/fcntl.h> +#include <sys/inotify.h> + +#include "watcher.h" + + +Watcher::Watcher(const QString& path, QObject *parent) : +    QObject(parent) +{ +	_fd = inotify_init(); +	fcntl(_fd, F_SETFL, O_NONBLOCK); +	_buffer.resize(sizeof(struct inotify_event) + NAME_MAX + 1); +	_mask = IN_CLOSE_WRITE | IN_CREATE | IN_DELETE | +	        IN_MOVED_FROM | IN_MOVED_TO | IN_ONLYDIR; + +	_notifier = new QSocketNotifier(_fd, QSocketNotifier::Read, this); +	connect(_notifier, SIGNAL(activated(int)), SLOT(readInotify())); + +	QStringList l = scanDirs(QDir(path)); +	foreach (const QString& s, l) { +		addWatch(s); +	} +} + +void Watcher::readInotify() +{ +	inotify_event *event; +	const ssize_t struct_size = sizeof(struct inotify_event); +	ssize_t pos, nread; + +	do { +		nread = read(_fd, _buffer.data(), _buffer.size()); +		if (nread < 0) { +			int err = errno; +			if (err == EWOULDBLOCK) return; +			qWarning() << "Error while reading from inotify" << err; +		} + +		pos = 0; +		while (pos + struct_size <= nread) { +			event = reinterpret_cast<inotify_event*>(&(_buffer.data()[pos])); +			if (event->wd == -1) { +				// Special treatment +				// TODO +				continue; +			} + +			QString path = _watches[event->wd]; +			QString name; +			QString filePath = path; +			if (event->len > 0) { +				name = QString::fromLocal8Bit(event->name); +				filePath += "/" + name; +			} + +			if (event->mask & (IN_CREATE|IN_MOVED_TO)) { +				if (event->mask & IN_ISDIR) { +					// TODO There might already be folders in here. +					addWatch(filePath); +				} +				emit pathAdded(filePath); +			} +			if (event->mask & IN_CLOSE_WRITE) { +				emit pathChanged(filePath); +			} +			if (event->mask & (IN_MOVED_FROM|IN_DELETE)) { +				if (event->mask & IN_ISDIR) { +					removeWatch(filePath); +				} +				emit pathRemoved(filePath); +			} + +			pos += struct_size + event->len; +		} +	} while (nread > 0); +} + +QStringList Watcher::scanDirs(const QDir &dir) +{ +	Q_ASSERT(dir.isReadable()); +	QStringList l(dir.absolutePath()); +	QStringList sub_dirs = dir.entryList(QDir::Dirs | QDir::NoDotAndDotDot); + +	foreach (const QString& s, sub_dirs) { +		const QString abs_path = dir.absoluteFilePath(s); +		l << scanDirs(QDir(abs_path)); +	} + +	return l; +} + +void Watcher::addWatch(const QString& path) +{ +	int wd = inotify_add_watch(_fd, path.toLocal8Bit().constData(), _mask); +	if (wd == -1) { +		qWarning() << "Failed to add watch for" << path; +		return; +	} +	if (_watches.contains(wd)) { +		qWarning() << "Watch for" << path << "was already setup"; +		return; +	} + +	qDebug() << "Watching" << path; +	_watches[wd] = path; +	_dirs[path] = wd; +} + +void Watcher::removeWatch(const QString& path) +{ +	if (!_dirs.contains(path)) { +		qWarning() << "Watch for" << path << "not found"; +		return; +	} +	int wd = _dirs[path]; +	removeWatch(wd); +} + +void Watcher::removeWatch(int wd) +{ +	if (!_watches.contains(wd)) { +		qWarning() << "Watch" << wd << "not found"; +	} +	QString path = _watches[wd]; +	qDebug() << "Unwatching" << path; +	inotify_rm_watch(_fd, wd); +	_dirs.remove(path); +	_watches.remove(wd); +} diff --git a/distfoldd/watcher.h b/distfoldd/watcher.h new file mode 100644 index 0000000..861248e --- /dev/null +++ b/distfoldd/watcher.h @@ -0,0 +1,38 @@ +#ifndef WATCHER_H +#define WATCHER_H + +#include <QtCore/QDir> +#include <QtCore/QSocketNotifier> +#include <QtCore/QMap> +#include <QtCore/QHash> + +class Watcher : public QObject +{ +	Q_OBJECT +public: +	explicit Watcher(const QString& path, QObject *parent = 0); + +signals: +	void pathAdded(const QString& path); +	void pathRemoved(const QString& path); +	void pathChanged(const QString& path); +	 +private slots: +	void readInotify(); + +private: +	QStringList scanDirs(const QDir& dir); +	void addWatch(const QString& path); +	void removeWatch(const QString& path); +	void removeWatch(int wd); + +private: +	int _fd; +	QSocketNotifier *_notifier; +	quint32 _mask; +	QByteArray _buffer; +	QMap<int, QString> _watches; +	QHash<QString, int> _dirs; +}; + +#endif // WATCHER_H | 
