diff --git a/.gitignore b/.gitignore index ea77a9f..45d58e7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,10 @@ build/ conanbuildinfo.* conaninfo.txt +# Default CLion Cmake build folder +cmake-build-debug +cmake-build-releas + +# CLion IDE settings directory +.idea + diff --git a/CMakeLists.txt b/CMakeLists.txt index cdecf30..7d4b87c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,6 +2,8 @@ cmake_minimum_required(VERSION 2.8.12 FATAL_ERROR) project(ProtocolSession CXX) +option(build_tests "build tests" OFF) + set(CMAKE_POSITION_INDEPENDENT_CODE ON) set(CMAKE_CXX_STANDARD 11) @@ -31,3 +33,36 @@ set( # === build library === add_library(extension ${library_sources}) + +# === build tests === +if(build_tests) + + # test library prepared, + # mock headers not included, as they + set( + test_lib_source + test/Swarm.cpp + test/AbstractSessionController.cpp + test/BasicObserver.cpp + test/Utilities.cpp + #test/TorrentClient.cpp + #test/TorrentClientSwarm.cpp + ) + + # === test library === + include_directories("${CMAKE_SOURCE_DIR}/test") + + add_library(test_lib ${test_lib_source}) + + file(GLOB tests RELATIVE "${CMAKE_SOURCE_DIR}" "test/test_*.cpp") + + enable_testing() + + foreach(s ${tests}) + get_filename_component (sn ${s} NAME_WE) + add_executable(${sn} ${s}) + target_link_libraries(${sn} test_lib extension ${CONAN_LIBS}) + add_test(${sn} "bin/${sn}") + endforeach(s) + +endif() diff --git a/test/AbstractSessionController.cpp b/test/AbstractSessionController.cpp new file mode 100644 index 0000000..312c58c --- /dev/null +++ b/test/AbstractSessionController.cpp @@ -0,0 +1,13 @@ +// +// Created by bedeho on 09.03.17. +// + +#include "AbstractSessionController.hpp" + +AbstractSessionController::AbstractSessionController(const std::string & name) + : _name(name) { +} + +std::string AbstractSessionController::name() const noexcept { + return _name; +} \ No newline at end of file diff --git a/test/AbstractSessionController.hpp b/test/AbstractSessionController.hpp new file mode 100644 index 0000000..084fc05 --- /dev/null +++ b/test/AbstractSessionController.hpp @@ -0,0 +1,63 @@ +// +// Created by bedeho on 09.03.17. +// + +#ifndef SESSIONCONTROLLER_HPP +#define SESSIONCONTROLLER_HPP + +#include "PollableInterface.hpp" + +#include + +#include +#include +#include +#include + +namespace libtorrent { + class session; + class alert; + struct torrent_info; + +} + +class AbstractSessionController : public PollableInterface { + +public: + + AbstractSessionController(const std::string & name); + + /** + * Invitation to join the swarm for the given (ti) torrent + * @param ti torrent file information + * @param working_directory clean working directory which can be used + */ + virtual void join(const boost::shared_ptr & ti, + const std::string & working_directory, + const std::string & payload_directory) = 0; + + /** + * Returns endpoint upon which this session is listenning. An unset + * optional indicates it is not listening. + * @return + */ + virtual boost::optional session_endpoint() const = 0; + + /** + * Notify controller about full peer list for swarm. + */ + virtual void swarm_peer_list_ready(const std::unordered_map &) = 0; // <= give list of endpoints and names, in order to have capcity to connect with howmever it wants, at the desired time in the future? simple policy is just to connect to all of them + + /** + * Timeout processing + */ + virtual void poll() = 0; + + std::string name() const noexcept; + +private: + + const std::string _name; +}; + +#endif //SESSIONCONTROLLER_HPP diff --git a/test/BasicBuyer.cpp b/test/BasicBuyer.cpp new file mode 100644 index 0000000..2ae39a6 --- /dev/null +++ b/test/BasicBuyer.cpp @@ -0,0 +1,5 @@ +// +// Created by bedeho on 09.03.17. +// + +#include "BasicBuyer.hpp" \ No newline at end of file diff --git a/test/BasicBuyer.hpp b/test/BasicBuyer.hpp new file mode 100644 index 0000000..ea4524f --- /dev/null +++ b/test/BasicBuyer.hpp @@ -0,0 +1,8 @@ +// +// Created by bedeho on 09.03.17. +// + +#ifndef BASICBUYER_HPP +#define BASICBUYER_HPP + +#endif //BASICBUYER_HPP diff --git a/test/BasicObserver.cpp b/test/BasicObserver.cpp new file mode 100644 index 0000000..c16494c --- /dev/null +++ b/test/BasicObserver.cpp @@ -0,0 +1,101 @@ +// +// Created by bedeho on 09.03.17. +// + +#include "BasicObserver.hpp" +#include "Utilities.hpp" + +BasicObserver::BasicObserver(const std::string & name) + : AbstractSessionController(name) + , _state(Init()) + , _plugin(boost::make_shared(1000)){ + + _session.add_extension(boost::static_pointer_cast(_plugin)); +} + +void BasicObserver::join(const boost::shared_ptr & ti, const std::string & working_directory, const std::string & payload_directory) { + + if(boost::get(&_state)) { + + // Update state + _state = AddingTorrent(); + + libtorrent::add_torrent_params params; + params.ti = ti; + params.save_path = working_directory; // since payload is not here, libtorrent will - by default, attempt to download content from peers + + // We start a poller to allow blocking here, + // while still servicing this observers alert processing, + // until response from adding torrent returns + + Poller poller; + poller.add(this); + + // Add torrent using plugin + _plugin->submit(joystream::extension::request::AddTorrent(params, [this, params, &poller](libtorrent::error_code &ec, + libtorrent::torrent_handle &h) -> void { + + assert(boost::get(&_state)); + + if(ec) + this->_state = AddingTorrentFailed(); + else { + + this->_state = AddedTorrent(params, h); + + // If swarm peers were provided prior to torrent being added, then connect now + if(_swarm_peers) + connect_to_all(h, _swarm_peers.get()); + + + // bug ******** here ********* + + } + + // Signal that poller can stop + poller.stop(); + + })); + + // Start blocking poller + poller.start(); // add something about max time? + + } else + throw std::runtime_error("Cannot join, no longer in Init state."); +} + +boost::optional BasicObserver::session_endpoint() const { + return listening_endpoint(&_session); +} + +void BasicObserver::swarm_peer_list_ready(const std::unordered_map & swarm_peers) { + + // We can only process leer plist if torrent has been added successfully, if + // that has not yet occured, then we just hold on to list. + + if (auto *added_torrent_state = boost::get(&_state)) { + + // For now, we simply connect to all, in the future, + // user can supply block/accept name list + + connect_to_all(added_torrent_state->handle, swarm_peers); + + } else + _swarm_peers = swarm_peers; + +} + +void BasicObserver::poll() { + + // should some of this really be in based class?? + + process_pending_alert(&_session, + [this](libtorrent::alert * a) -> void { + + + if() { + + } + + }); +} \ No newline at end of file diff --git a/test/BasicObserver.hpp b/test/BasicObserver.hpp new file mode 100644 index 0000000..68e9d5a --- /dev/null +++ b/test/BasicObserver.hpp @@ -0,0 +1,74 @@ +// +// Created by bedeho on 09.03.17. +// + +#ifndef BASICOBSERVER_HPP +#define BASICOBSERVER_HPP + +#include "AbstractSessionController.hpp" + +#include + +class BasicObserver : public AbstractSessionController { + +public: + + struct Init {}; + struct AddingTorrent {}; + struct AddingTorrentFailed {}; + struct AddedTorrent { + + struct StartingObserveMode {}; + struct StartingObserveModeFailed {}; + + struct StartingPlugin {}; + struct StartingPluginFailed {}; + struct PluginStarted {}; + + typedef boost::variant< + StartingObserveMode, + StartingObserveModeFailed, + StartingPlugin, + StartingPluginFailed, + PluginStarted> State; + + AddedTorrent(const libtorrent::add_torrent_params & params, + const libtorrent::torrent_handle & handle) + : state(StartingObserveMode()) + , params(params) + , handle(handle) {} + + State state; + libtorrent::add_torrent_params params; + libtorrent::torrent_handle handle; + }; + + typedef boost::variant< + Init, + AddingTorrent, + AddingTorrentFailed, + AddedTorrent> State; + + BasicObserver(const std::string & name); + + virtual void join(const boost::shared_ptr & ti, + const std::string & working_directory, + const std::string & payload_directory); + + virtual boost::optional session_endpoint() const; + + virtual void swarm_peer_list_ready(const std::unordered_map &); + + virtual void poll(); + +private: + + State _state; + libtorrent::session _session; + boost::shared_ptr _plugin; + + + boost::optional> _swarm_peers; +}; + +#endif //BASICOBSERVER_HPP diff --git a/test/MockAlertManager.hpp b/test/MockAlertManager.hpp new file mode 100644 index 0000000..1171208 --- /dev/null +++ b/test/MockAlertManager.hpp @@ -0,0 +1,35 @@ +#ifndef EXTENSION_MOCK_ALERT_MANAGER_HPP +#define EXTENSION_MOCK_ALERT_MANAGER_HPP + +#include + +#include + +#include +#include + +namespace joystream { +namespace test { +namespace extension { + +class MockAlertManager : public interface::AlertManagerInterface { +public: + MOCK_METHOD0(native_handle, libtorrent::alert_manager*()); + /* + * Template methods can't be mocked directly in gmock. + * We hence mock overloaded methods as per our use. + */ + MOCK_METHOD1(plugin_emplace_alert, void(extension::status::Plugin)); + MOCK_METHOD1(request_emplace_alert, void(extension::alert::LoadedCallBack&)); + MOCK_METHOD6(anchorAnnounced_emplace_alert, void( + libtorrent::torrent_handle, libtorrent::tcp::endpoint&, + quint64, const Coin::typesafeOutPoint&, + const Coin::PublicKey&, const Coin::PubKeyHash& + )); +}; + +} +} +} + +#endif // EXTENSION_MOCK_ALERT_MANAGER_HPP diff --git a/test/MockPeer.hpp b/test/MockPeer.hpp new file mode 100644 index 0000000..c45ef3e --- /dev/null +++ b/test/MockPeer.hpp @@ -0,0 +1,27 @@ +#ifndef GMOCK_MOCK_PEER_HPP +#define GMOCK_MOCK_PEER_HPP + +#include + +#include + +namespace joystream { +namespace test { +namespace extension { + +class MockPeer : public interface::PeerInterface { + public: + MOCK_CONST_METHOD0(isOutgoing, bool()); + MOCK_CONST_METHOD0(connect, void()); + MOCK_CONST_METHOD1(getPeerInfo, void(libtorrent::peer_info &)); + MOCK_CONST_METHOD3(sendBuffer, void(char const*, int, int=0)); + MOCK_CONST_METHOD3(disconnect, void(libtorrent::error_code const&, libtorrent::operation_t, int=0)); + MOCK_CONST_METHOD0(pid, libtorrent::peer_id const&()); + MOCK_CONST_METHOD0(native_handle, libtorrent::peer_connection_handle()); +}; + +} +} +} + +#endif // GMOCK_MOCK_PEER_HPP diff --git a/test/MockSession.hpp b/test/MockSession.hpp new file mode 100644 index 0000000..4fe9c75 --- /dev/null +++ b/test/MockSession.hpp @@ -0,0 +1,25 @@ +#ifndef EXTENSION_MOCK_SESSION_HPP +#define EXTENSION_MOCK_SESSION_HPP + +#include + +#include + +namespace joystream { +namespace test { +namespace extension { + +class MockSession : public interface::SessionInterface { +public: + MOCK_METHOD0(pause, void()); + MOCK_METHOD0(resume, void()); + MOCK_METHOD0(native_handle, libtorrent::session_handle()); + MOCK_METHOD1(find, interface::TorrentInterface*(libtorrent::sha1_hash const &)); + MOCK_METHOD1(add, interface::TorrentInterface*(libtorrent::add_torrent_params const &)); +}; + +} +} +} + +#endif // EXTENSION_MOCK_SESSION_HPP diff --git a/test/MockTorrent.hpp b/test/MockTorrent.hpp new file mode 100644 index 0000000..9810fdb --- /dev/null +++ b/test/MockTorrent.hpp @@ -0,0 +1,25 @@ +#ifndef EXTENSION_MOCK_TORRENT_HPP +#define EXTENSION_MOCK_TORRENT_HPP + +#include + +#include +#include + +#include + +namespace joystream { +namespace test { +namespace extension { + +class MockTorrent : public interface::TorrentInterface { +public: + MOCK_CONST_METHOD0(infoHash, libtorrent::sha1_hash()); + MOCK_CONST_METHOD0(native_handle, libtorrent::torrent_handle()); +}; + +} +} +} + +#endif // EXTENSION_MOCK_TORRENT_HPP diff --git a/test/PollableInterface.hpp b/test/PollableInterface.hpp new file mode 100644 index 0000000..2c29888 --- /dev/null +++ b/test/PollableInterface.hpp @@ -0,0 +1,27 @@ +/** + * Copyright (C) JoyStream - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited + * Proprietary and confidential + * Written by Bedeho Mender , Feburary 18 2017 + */ + +#ifndef POLLABLEINTERFACE_HPP +#define POLLABLEINTERFACE_HPP + +#include +#include +#include + +/** + * @brief Interface for type which has time out based polling + */ +class PollableInterface { + +public: + + virtual void poll() = 0; +}; + + + +#endif // POLLABLEINTERFACE_HPP diff --git a/test/Poller.cpp b/test/Poller.cpp new file mode 100644 index 0000000..67d4d94 --- /dev/null +++ b/test/Poller.cpp @@ -0,0 +1,21 @@ +// +// Created by Bedeho Mender on 11/03/17. +// + +#include "Poller.hpp" + +Poller::Poller() + : _started(false) + , _stopOnNextIteration(false) {} + +void Poller::stop() { + + if(_stopOnNextIteration) + throw std::runtime_error("Poller already scheduled to stop"); + else + _stopOnNextIteration = true; +} + +bool Poller::isStoppingOnNextIteration() const noexcept { + return _stopOnNextIteration; +} \ No newline at end of file diff --git a/test/Poller.hpp b/test/Poller.hpp new file mode 100644 index 0000000..2d058c4 --- /dev/null +++ b/test/Poller.hpp @@ -0,0 +1,59 @@ +// +// Created by Bedeho Mender on 11/03/17. +// + +#ifndef POLLER_HPP +#define POLLER_HPP + +#include "PollableInterface.hpp" + +class Poller { + +public: + + Poller(); + + // allow specifying iteration_counter, or n iteration couner, which just + // continues until ::stop is called, have defualt value be + + // have default value for sleep value, but fix it in ms, to drop the complex templating + + void start(const std::chrono::milliseconds duration & iteration_sleep_duration, + const boost::optional & use_iteration_counter = boost::optional()) { + + // Re-entrancy block + if(_started) + throw std::runtime_error("Already started"); + else + _started = true; + + // Restart + _stopOnNextIteration = false; + + // <=== need to mix + + for(unsigned int i = 0; i < iteration_counter && !_stopOnNextIteration; i++) { + + for(auto s : subjects) + s->poll(); + + std::this_thread::sleep_for(iteration_sleep_duration); + } + + _started = false; + } + + void stop(); + + bool isStoppingOnNextIteration() const noexcept; + + std::vector subjects; + +private: + + bool _started; + bool _stopOnNextIteration; + +}; + +#endif //POLLER_HPP diff --git a/test/Swarm.cpp b/test/Swarm.cpp new file mode 100644 index 0000000..8af6500 --- /dev/null +++ b/test/Swarm.cpp @@ -0,0 +1,86 @@ +// +// Created by bedeho on 09.03.17. +// + +#include +#include +#include "Swarm.hpp" +#include "Utilities.hpp" + +#define WORK_DIR_NAME "swarm" + +// Swarm + + +// SwarmBuilder + +void SwarmBuilder::add(AbstractSessionController * controller) { + + if(_participants.count(controller->name()) > 0) + throw std::runtime_error("Name already taken"); + else + _participants.insert(std::make_pair(controller->name(), controller)); + +} + +Swarm SwarmBuilder::build(const boost::filesystem::path & base_folder) { + + // Working directory for swarm + boost::filesystem::path work_folder(base_folder); + work_folder.append(WORK_DIR_NAME); + + // Clean up from prior run. + // Its better to do this at start, as a prior run cannot be + // trusted to do its own cleanup, e.g. if it gets interrupted. + if(boost::filesystem::exists(boost::filesystem::status(work_folder))) { + + boost::system::error_code ec; + boost::filesystem::remove_all(work_folder, ec); + + if(ec) + throw std::runtime_error(std::string("Could not delete pre-existing swarm directory: ") + work_folder.string()); + } + + // Create work folder + if (!boost::filesystem::create_directories(work_folder)) + throw std::runtime_error(std::string("Could not create swarm directory: ") + work_folder.string()); + + // Create torrent payload and file + boost::shared_ptr ti = make_single_file_torrent(work_folder, + "payload_file.dat", + 2, // 16KiB factor + 113); // number of pieces + + // Create working directory for each session, and ask to join swarm + for(auto m : _participants) { + + boost::filesystem::path participant_folder(work_folder); + participant_folder.append(m.second->name()); + + // Create work folder + if (!boost::filesystem::create_directories(participant_folder)) + throw std::runtime_error(std::string("Could not create participant directory: ") + participant_folder.string()); + + // Ask to join + m.second->join(ti, participant_folder.string(), work_folder.string()); + } + + // Build swarm peer list + std::unordered_map endpoints; + + for(auto m : _participants) { + + auto opt_endpoint = m.second->session_endpoint(); + + if (opt_endpoint) + endpoints.insert(std::make_pair(m.second->name(), opt_endpoint.get())); + } + + // Tell everyone about it + for(auto m: _participants) + m.second->swarm_peer_list_ready(endpoints); + + // Returns swarm for given set of of participants + return Swarm(_participants); + +} \ No newline at end of file diff --git a/test/Swarm.hpp b/test/Swarm.hpp new file mode 100644 index 0000000..962b19f --- /dev/null +++ b/test/Swarm.hpp @@ -0,0 +1,55 @@ +// +// Created by bedeho on 09.03.17. +// + +#ifndef SWARM_HPP +#define SWARM_HPP + +#include +#include +#include +#include "AbstractSessionController.hpp" +#include "Poller.hpp" + +class SwarmBuilder; + +class Swarm { + +public: + + template< class Rep, class Period > + void start(unsigned int iteration_counter, + const std::chrono::duration & iteration_sleep_duration) { + + // Setup poller + Poller poller; + + for(auto m : participants) + poller.partcipants.push_back(m.second); + + // Poll + poller.run(iteration_counter, iteration_sleep_duration); + } + +private: + + friend class SwarmBuilder; + + std::unordered_map participants; +}; + +class SwarmBuilder { + +public: + + void add(AbstractSessionController * controller); + + Swarm build(const boost::filesystem::path & base_folder); + +private: + + std::unordered_map _participants; +}; + + +#endif // SWARM_HPP diff --git a/test/TorrentClient.cpp b/test/TorrentClient.cpp new file mode 100644 index 0000000..3959449 --- /dev/null +++ b/test/TorrentClient.cpp @@ -0,0 +1,349 @@ +/** + * Copyright (C) JoyStream - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited + * Proprietary and confidential + * Written by Bedeho Mender , Feburary 18 2017 + */ + +#include "TorrentClient.hpp" + +#include + +#include + +#include + +TorrentClient::TorrentClient(libtorrent::session * session, + extension::Plugin * plugin) + : _state(state::Init()) + , _session(session) + , _plugin(plugin) { +} + +void TorrentClient::add(const libtorrent::add_torrent_params & params) { + + if(boost::get(&_state)) { + + this->_state = state::AddingTorrent(); + + // Add torrent + _plugin->submit(extension::request::AddTorrent(params, [this](libtorrent::error_code & ec, libtorrent::torrent_handle & h) -> void { + + if(ec) + this->_state = state::AddingTorrentFailed(); + else + this->_state = state::AddedTorrent(params, h); + + })); + + // Run alert processing long enough to process all callbacks above to completion + Poller(this, 3, 1*std::chrono::seconds); + + // If we are still adding the torrent, then we have failed + if(boost::get(&_state)) + throw std::runtime_error("Torrent was not added within poller expiration time."); + + } else + throw std::runtime_error("Cannot start, is already started."); +} + +void TorrentClient::connect(const libtorrent::tcp::endpoint & endpoint) { + + if(auto * added_torrent = boost::get(&_state)) + added_torrent->handle.connect_peer(endpoint); + else + throw std::runtime_error("Cannot start, torrent not yet added."); +} + +/** +auto * added_torrent_state = boost::get(&this->_state); + +// Start plugin +_plugin->submit(extension::request::Start(params.info_hash, [added_torrent_state](const std::exception_ptr & e) { + + if(e) + *(added_torrent_state) = state::StartingPluginFailed(); + else + *(added_torrent_state) = state::PluginStarted(); + +})); +*/ + +/** +state::AddedTorrent * torrent_has_been_added(const State * state) { + + if(auto * added_torrent_state = boost::get(state)) { + if(boost::get(added_torrent_state)) + return added_torrent_state; + else + throw std::runtime_error("Cannot start plugin, have to be in state: AddedTorrent::PluginStarted"); + } else + throw std::runtime_error("Cannot start plugin, have to be in state: AddedTorrent."); +} + */ + +state::AddedTorrent * torrent_has_been_added(TorrentClient::State * state) { + + if(auto * added_torrent_state = boost::get(state)) + return added_torrent_state; + else + throw std::runtime_error("Torrent has not yet been added."); +} + +void TorrentClient::async_buy(const protocol_wire::BuyerTerms & terms) { + + state::AddedTorrent * added_torrent_state = torrent_has_been_added(&_state); + + if(!boost::get(added_torrent_state)) + throw std::runtime_error("Mode already set"); + + // + *(added_torrent_state) = state::AddedTorrent::Buy(terms); + + auto * buy_state = boost::get(added_torrent_state); + + // To buy mode + _plugin->submit(extension::request::ToBuyMode(added_torrent_state->params.info_hash, terms, [buy_state](const std::exception_ptr & e) { + + if(e) + (*buy_state) = state::AddedTorrent::Buy::StartingBuyModeFailed(); + else { + + (*buy_state) = state::AddedTorrent::Buy::BuyModeStarted(); + + // At this point, we have to wait for an asynchronous event, namely + // that a connection with a suitable seller is estalished, + // and we catch this in alert processor. + + } + + })); + +} + +void TorrentClient::async_sell(const protocol_wire::SellerTerms & terms) { + + state::AddedTorrent * added_torrent_state = has_plugin_started(&_state); + + *(added_torrent_state) = Sell(terms); + + auto * sell_state = boost::get(added_torrent_state); + + // To sell mode + _plugin->submit(extension::request::ToSellMode(added_torrent_state->params.info_hash, terms, [sell_state](const std::exception_ptr & e) { + + if(e) + (*sell_state) = state::AddedTorrent::Sell::StartingSellModeFailed(); + else { + + (*sell_state) = state::AddedTorrent::Sell::SellModeStarted(); + + // At this point, we have to wait for an asynchronous event, namely + // that a connection with a suitable seller is estalished, + // and we catch this in alert processor. + + } + + })); +} + +void TorrentClient::async_observe() { + + state::AddedTorrent * added_torrent_state = has_plugin_started(&_state); + + *(added_torrent_state) = Observe(); + + auto * observe_state = boost::get(added_torrent_state); + + // To sell mode + _plugin->submit(extension::request::ToObserveMode(added_torrent_state->params.info_hash, [observe_state](const std::exception_ptr & e) { + + if(e) + (*observe_state) = state::AddedTorrent::Observe::StartingObserveModeFailed(); + else { + + (*observe_state) = state::AddedTorrent::Observe::ObserveModeStarted(); + + // nothing more to do for this mode + + } + + })); +} + +void TorrentClient::poll() { + + /// The purpose of this routine is to facilitate communication between + /// the state machine and the session/plugin. + /// + /// a) To get messages from the latter to the former, session::pop_alerts + /// is used, and extension::alert::RequestResult are also processed, as they + /// can be used to send messages to the state machine also + /// + /// b) To allow former to ask latter for messages with state updates, + /// a timeout event is submitted to it. + + if(Started * s = boost::get(&_state)) { + + // Process alerts + std::vector alerts; + s->session->pop_alerts(&alerts); + + for(auto a : alerts) { + + if(extension::alert::RequestResult const * p = libtorrent::alert_cast(a)) + p->loadedCallback(); // Make loaded callback + else + s->process(a); + } + + // Get status update on torrent plugins + s->_plugin->submit(extension::request::PostTorrentPluginStatusUpdates()); + } +} + +State TorrentClient::state() const noexcept { + return _state; +} + +void TorrentClient::process(const libtorrent::alert * a) { + + if(extension::alert::TorrentPluginStatusUpdateAlert const * p = libtorrent::alert_cast(a)) + process(p); + else if(extension::alert::PeerPluginStatusUpdateAlert const * p = libtorrent::alert_cast(a)) + process(p); + + // ** if peer disconnect occured, then that is a problem? ** +} + +void TorrentClient::process(const extension::alert::TorrentPluginStatusUpdateAlert * p) { + + for(auto m: p->statuses) + _plugin->submit(extension::request::PostPeerPluginStatusUpdates(m.first)); +} + +struct SellerInformation { + + SellerInformation() {} + + SellerInformation(const protocol_wire::SellerTerms & terms, + const Coin::PublicKey & contractPk) + : terms(terms) + , contractPk(contractPk) { + } + + protocol_wire::SellerTerms terms; + Coin::PublicKey contractPk; +}; + +std::map select_N_sellers(unsigned int N, + const std::map & statuses); + +void TorrentClient::process(const extension::alert::PeerPluginStatusUpdateAlert * p) { + + state::AddedTorrent * s; + if(auto * added_torrent_state = boost::get(state)) { + if(s = boost::get(added_torrent_state)) + return; + } else + return; + + + // figure out if torrent hasbeen added + // if no, then ignore + // if yes, then figure out if mod + // buy => call + // sell => + // observe + +} + +void TorrentClient::process(const extension::alert::PeerPluginStatusUpdateAlert *p) { + + if(state == State::buy_mode_started) { + + std::map sellers; + + try { + sellers = select_N_sellers(terms.minNumberOfSellers(), p->statuses); + } catch(const std::runtime_error & e) { + //log("Coulndt find sufficient number of suitable sellers"); + return; + } + + // Create contract commitments and download information + protocol_session::PeerToStartDownloadInformationMap map; + paymentchannel::ContractTransactionBuilder::Commitments commitments(sellers.size()); + + uint32_t output_index = 0; + for(const auto & s: sellers) { + + // fixed for now + int64_t value = 100000; + Coin::KeyPair buyerKeyPair(Coin::PrivateKey::generate()); // **replace later with determinisic key** + Coin::PubKeyHash buyerFinalPkHash; + + protocol_session::StartDownloadConnectionInformation inf(s.second.terms, + output_index, + value, + buyerKeyPair, + buyerFinalPkHash); + + map.insert(std::make_pair(s.first, inf)); + + + commitments[output_index] = paymentchannel::Commitment(value, + buyerKeyPair.pk(), + s.second.contractPk, // payeePK + Coin::RelativeLockTime(Coin::RelativeLockTime::Units::Time, s.second.terms.minLock())); + + output_index++; + } + + // Create contract transaction + paymentchannel::ContractTransactionBuilder c; + c.setCommitments(commitments); + + Coin::Transaction tx = c.transaction(); + + // Starting download + + // state = StartingDownload() + + _plugin->submit(extension::request::StartDownloading(p->handle.info_hash(), tx, map, [=](const std::exception_ptr & e) -> void { + + if(e) + state = State::downloading_starting_failed; // StartingDownloadFailed() + else + state = State::downloading_started; // DownloadingStarted() + + // we are done, nothing more to do? + + })); + + } +} + +std::map select_N_sellers(unsigned int N, const std::map & statuses) { + + std::map selected; + + for(auto s : statuses) { + + libtorrent::tcp::endpoint ep = s.first; + extension::status::PeerPlugin status = s.second; + + if(status.peerBEP10SupportStatus == extension::BEPSupportStatus::supported && + status.peerBitSwaprBEPSupportStatus == extension::BEPSupportStatus::supported && + status.connection.is_initialized()) { + + auto & machine = status.connection.get().machine; + + if(machine.innerStateTypeIndex == typeid(protocol_statemachine::PreparingContract) && selected.size() < N) + selected[ep] = SellerInformation(machine.announcedModeAndTermsFromPeer.sellModeTerms(), + machine.payor.payeeContractPk()); + } + } + + return selected; +} diff --git a/test/TorrentClient.hpp b/test/TorrentClient.hpp new file mode 100644 index 0000000..28a186d --- /dev/null +++ b/test/TorrentClient.hpp @@ -0,0 +1,191 @@ +/** + * Copyright (C) JoyStream - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited + * Proprietary and confidential + * Written by Bedeho Mender , Feburary 18 2017 + */ + +#ifndef TORRENT_CLIENT_HPP +#define TORRENT_CLIENT_HPP + +#include "PollableInterface.hpp" +#include +#include +#include +#include +#include +#include +#include + +using namespace joystream; + +namespace libtorrent { + class session; + struct alert; +} + +namespace joystream { + namespace extension { + namespace alert { + struct TorrentPluginStatusUpdateAlert; + struct PeerPluginStatusUpdateAlert; + } + class Plugin; + } +} + +namespace state { + + struct Init {}; + struct AddingTorrent {}; + struct AddingTorrentFailed {}; + struct AddedTorrent { + + struct WaitingForMode {}; + + struct Buy { + + struct StartingBuyMode {}; + struct StartingBuyModeFailed {}; + + struct StartingPlugin {}; + struct StartingPluginFailed {}; + struct PluginStarted {}; + + struct StartingDownload {}; + struct StartingDownloadFailed {}; + struct DownloadingStarted {}; + + typedef boost::variant< + StartingBuyMode, + StartingBuyModeFailed, + StartingPlugin, + StartingPluginFailed, + PluginStarted, + StartingDownload, + StartingDownloadFailed, + StartingDownloadFailed, + DownloadingStarted> State; + + Buy(const protocol_wire::BuyerTerms & terms) + : state(StartingBuyMode()) + , terms(terms) {} + + State state; + protocol_wire::BuyerTerms terms; + }; + + struct Sell { + + struct StartingSellMode {}; + struct StartingSellModeFailed {}; + + struct StartingPlugin {}; + struct StartingPluginFailed {}; + struct PluginStarted {}; + + struct StartingUploading {}; + struct StartingUploadingFailed {}; + + struct Uploading {}; + + typedef boost::variant< + StartingSellMode, + StartingSellModeFailed, + StartingPlugin, + StartingPluginFailed, + PluginStarted, + StartingUploading, + StartingUploadingFailed, + Uploading> State; + + Sell(const protocol_wire::SellerTerms & terms) + : state(StartingSellMode()) + , terms(terms) {} + + State state; + protocol_wire::SellerTerms terms; + }; + + struct Observe { + + struct StartingObserveMode {}; + struct StartingObserveModeFailed {}; + + struct StartingPlugin {}; + struct StartingPluginFailed {}; + struct PluginStarted {}; + + typedef boost::variant< + StartingObserveMode, + StartingObserveModeFailed, + StartingPlugin, + StartingPluginFailed, + PluginStarted> State; + + Observe() + : state(StartingObserveMode()) {} + + State state; + }; + + typedef boost::variant< + Buy, + Sell, + Observe> State; + + AddedTorrent(const libtorrent::add_torrent_params & params, + const libtorrent::torrent_handle & handle) + : state(WaitingForMode()) + , params(params) + , handle(handle) {} + + State state; + libtorrent::add_torrent_params params; + libtorrent::torrent_handle handle; + }; + +} + +/** + * @brief Client for a torrent?? + */ +class TorrentClient : public PollableInterface { + +public: + + typedef boost::variant< + state::Init, + state::AddingTorrent, + state::AddingTorrentFailed, + state::AddedTorrent> State; + + TorrentClient(libtorrent::session * session, + extension::Plugin * plugin); + + void add(const libtorrent::add_torrent_params & params); + + void connect(const libtorrent::tcp::endpoint &); + + void async_buy(const protocol_wire::BuyerTerms & terms); + + void async_sell(const protocol_wire::SellerTerms & terms); + + void async_observe(); + + void poll(); + + State state() const noexcept; + +private: + + void process(const libtorrent::alert * a); + void process(const extension::alert::TorrentPluginStatusUpdateAlert * p); + void process(const extension::alert::PeerPluginStatusUpdateAlert * p); + + State _state; + libtorrent::session * _session; + extension::Plugin * _plugin; +}; + +#endif // TORRENT_CLIENT_HPP diff --git a/test/TorrentClientSwarm.cpp b/test/TorrentClientSwarm.cpp new file mode 100644 index 0000000..6e06ce6 --- /dev/null +++ b/test/TorrentClientSwarm.cpp @@ -0,0 +1,169 @@ +#include +#include +#include "TorrentClientSwarm.hpp" +#include "Utilities.hpp" + +#include + +#include + +#define WORK_DIR_NAME "swarm" + +//TorrentClientSwarm::Participant + +TorrentClientSwarm::Participant::Participant(const std::shared_ptr & session, + const boost::shared_ptr & plugin) + : session(session) + , plugin(plugin) + , client(session.get(), plugin.get()) { +} + +//TorrentClientSwarm + +TorrentClientSwarm::Participant * add_participant(std::vector> & v, + const libtorrent::add_torrent_params & params) { + + + // Create session + auto session = std::make_shared(); + + // Create plugin and add it to plugin + boost::make_shared plugin(1000); + + std::unique_ptr ptr(new TorrentClientSwarm::Participant(session, plugin)); + + v.push_back(std::move(ptr)); + + // Add torrent + ptr->client.add(params); + + return ptr.get(); +} + +TorrentClientSwarm::TorrentClientSwarm(const boost::filesystem::path & base_folder, + uint16_t number_of_normal_seeder_clients, + uint16_t number_of_normal_leecher_clients, + uint16_t number_of_observer_clients, + const std::vector & seller_client_terms, + const std::vector & buyer_client_terms) { + + /// Create work space + + // Working directory for swarm + const boost::filesystem::path work_folder = base_folder + WORK_DIR_NAME; + + // Clean up from prior run. + // Its better to do this at start, as a prior run cannot be + // trusted to do its own cleanup, e.g. if it gets interrupted. + if(boost::filesystem::exists(boost::filesystem::status(work_folder))) { + + boost::system::error_code ec; + boost::filesystem::remove_all(work_folder, ec); + + if(ec) { + std::cerr << "Could not delete pre-existing swarm directory: " << work_folder.string() << std::endl; + exit(1); + } + + } + + // Create work folder + if (!boost::filesystem::create_directories(work_folder)) { + std::cerr << "Could not create swarm directory: " << work_folder.string() << std::endl; + exit(1); + } + + // Create torrent payload and file + boost::shared_ptr ti = make_single_file_torrent(work_folder, + "payload_file.dat", + 2, // 16KiB factor + 300); // #pieces + /// Setup clients + + // Prepare params for uploaders: + libtorrent::add_torrent_params uploader_params; + uploader_params.save_path = work_folder.string(); + uploader_params.ti = ti; + + // Setup normal seeders + for(int i = 0;i < number_of_normal_seeder_clients;i++) + add_participant(normal_seeders, uploader_params); + + // Setup normal leechers + for(int i = 0;i < number_of_normal_leecher_clients;i++) { + + boost::filesystem::path download_folder = work_folder + ("normal_leecher_" + i); + + libtorrent::add_torrent_params params; + params.save_path = download_folder.string(); + params.ti = ti; + + add_participant(normal_leechers, params); + } + + // Setup observers + for(int i = 0;i < number_of_observer_clients;i++) { + + // how to make sure no seeding happens! + //auto ptr = add_participant(observers, uploader_params); + + // Set interaction to block + //ptr->client.async_observe(); + } + + // Setup sellers + for(int i = 0; i < seller_client_terms.size();i++) { + auto ptr = add_participant(sellers, uploader_params); + + // Sell + ptr->client.async_sell(seller_client_terms[i]); + } + + // Setup buyers + for(int i = 0; i < buyer_client_terms.size();i++) { + + boost::filesystem::path download_folder = work_folder + ("buyer_" + i); + + libtorrent::add_torrent_params params; + params.save_path = download_folder.string(); + params.ti = ti; + + auto ptr = add_participant(buyers, params); + + // Buy + ptr->client.async_buy(buyer_client_terms[i]); + } + +} + +void TorrentClientSwarm::fully_connect() { + + /// Fully connect + std::vector all_clients; + + // List all + for(auto p : normal_seeders) + all_clients.push_back(p.get()); + + for(auto p : normal_leechers) + all_clients.push_back(p.get()); + + for(auto p :observers) + all_clients.push_back(p.get()); + + for(auto p: sellers) + all_clients.push_back(p.get()); + + for(auto p : buyers) + all_clients.push_back(p.get()); + + // Connect all appropriately + for(int i = 0;i < all_clients.size();i++) + for(int j = i + 1;j < all_clients.size();j++) { + + libtorrent::tcp::endpoint ep = listening_endpoint(all_clients[j]->session.get()); + + all_clients[i]->client.connect(ep); + } + +} \ No newline at end of file diff --git a/test/TorrentClientSwarm.hpp b/test/TorrentClientSwarm.hpp new file mode 100644 index 0000000..0ab2d2b --- /dev/null +++ b/test/TorrentClientSwarm.hpp @@ -0,0 +1,82 @@ +// +// Created by Bedeho Mender on 28/02/17. +// + +#ifndef TORRENTCLIENTSWARM_HPP +#define TORRENTCLIENTSWARM_HPP + +#include "PollableInterface.hpp" +#include "TorrentClient.hpp" +#include + +struct TorrentClientSwarm { + + /** + * Creates a swarm with torrent clients + * @param base_folder + * @param number_of_normal_clients + * @param number_of_observer_clients + * @param seller_client_terms + * @param buyer_client_terms + */ + TorrentClientSwarm(const boost::filesystem::path & base_folder, + uint16_t number_of_normal_seeder_clients, + uint16_t number_of_normal_leecher_clients, + uint16_t number_of_observer_clients, + const std::vector & seller_client_terms, + const std::vector & buyer_client_terms); + + /** + * Connect all to each other. + */ + void fully_connect(); + + template< class Rep, class Period > + void run_event_loop(unsigned int iteration_counter, + const std::chrono::duration & iteration_sleep_duration); + + struct Participant { + + Participant(const std::shared_ptr & session, + const boost::shared_ptr & plugin); + + libtorrent::tcp::endpoint endpoint() const noexcept; + + std::shared_ptr session; + boost::shared_ptr plugin; + TorrentClient client; + }; + + std::vector> normal_seeders, + normal_leechers, + observers, + sellers, + buyers; +}; + + +template< class Rep, class Period > +void TorrentClientSwarm::run_event_loop(unsigned int iteration_counter, + const std::chrono::duration & iteration_sleep_duration) { + + // Create poller + Poller poller; + + // Add clients to poller + for(auto p : normal_seeders) + poller.subjects.push_back(&p->client); + + for(auto p : observers) + poller.subjects.push_back(&p->client); + + for(auto p : sellers) + poller.subjects.push_back(&p->client); + + for(auto p : buyers) + poller.subjects.push_back(&p->client); + + // Run event loop + poller.run(iteration_counter, iteration_sleep_duration); +} + +#endif //TORRENTCLIENTSWARM_HPP diff --git a/test/Utilities.cpp b/test/Utilities.cpp new file mode 100644 index 0000000..d8b4071 --- /dev/null +++ b/test/Utilities.cpp @@ -0,0 +1,104 @@ +// +// Created by bedeho on 08.03.17. +// + +#include "Utilities.hpp" +#include +#include +#include +#include +#include + +boost::shared_ptr make_single_file_torrent(const boost::filesystem::path & base_folder, + const std::string & payload_file_name, + unsigned int piece_size_factor, + unsigned int num_pieces) { + + /// Create file payload. + std::fstream fs; + + // Open/create + boost::filesystem::path payload_file(base_folder); + payload_file.append(payload_file_name); + + fs.open(payload_file.string(), std::fstream::out | std::fstream::binary); + + if(!fs.good()) { + std::cerr << "Error opening: " << payload_file.string() << std::endl; + exit(1); + } + + // Write piece data to file + unsigned int piece_size = piece_size_factor * 16 * 1024; // It must be a multiple of 16 kiB. + for(unsigned int piece_index = 0;piece_index < num_pieces;piece_index++) { + + std::vector piece_payload(piece_size, static_cast(piece_index)); + + fs.write(&piece_payload[0], piece_payload.size()); + } + + if(!fs.good()) { + std::cerr << "Error writing to file: " << payload_file.string() << std::endl; + exit(1); + } + + fs.close(); + + /// Create torrent file + + // Setup storage + libtorrent::file_storage storage; + storage.add_file(payload_file_name, piece_size * num_pieces); + + libtorrent::create_torrent t(storage, piece_size); + + // Create + libtorrent::entry torrent_dictionary = t.generate(); + + assert(torrent_dictionary.type() != libtorrent::entry::undefined_t); + + // Bencode dictionary + std::vector bencoded_torrent_dictionary; + std::back_insert_iterator > out(bencoded_torrent_dictionary); + libtorrent::bencode(out, torrent_dictionary); + + // Return torrent_info object + libtorrent::error_code ec; + + return boost::make_shared(&bencoded_torrent_dictionary[0], + bencoded_torrent_dictionary.size(), + boost::ref(ec), + 0); +} + +libtorrent::tcp::endpoint listening_endpoint(const libtorrent::session * s) { + + libtorrent::error_code ec; + libtorrent::tcp::endpoint ep(libtorrent::address::from_string("127.0.0.1", ec), s->listen_port()); + assert(ec); + + return ep; +} + +void connect_to_all(const libtorrent::torrent_handle & h, const std::unordered_map & swarm_peers) { + + for(auto m : swarm_peers) + h.connect_peer(m.second); +} + + +void process_pending_alert(libtorrent::session * s, const AlertProcessor & processor) { + + // Process alerts + std::vector alerts; + s->pop_alerts(&alerts); + + for(auto a : alerts) { + + if(joystream::extension::alert::RequestResult const * p = libtorrent::alert_cast(a)) + p->loadedCallback(); // Make loaded callback + else + processor(a); + } + +} \ No newline at end of file diff --git a/test/Utilities.hpp b/test/Utilities.hpp new file mode 100644 index 0000000..df45eed --- /dev/null +++ b/test/Utilities.hpp @@ -0,0 +1,31 @@ +// +// Created by Bedeho Mender on 08/03/17. +// + +#ifndef UTILITIES_HPP +#define UTILITIES_HPP + +#include +#include +#include + +namespace libtorrent { + class session; + class torrent_handle; + class alert; +} + +boost::shared_ptr make_single_file_torrent(const boost::filesystem::path & base_folder, + const std::string & payload_file_name, + unsigned int piece_size_factor, + unsigned int num_pieces); + + +libtorrent::tcp::endpoint listening_endpoint(const libtorrent::session * s); + +void connect_to_all(const libtorrent::torrent_handle & h, const std::unordered_map & swarm_peers); + +typedef std::function AlertProcessor; +void process_pending_alert(libtorrent::session * s, const AlertProcessor & processor); + +#endif //UTILITIES_HPP diff --git a/test/test_integrations.cpp b/test/test_integrations.cpp new file mode 100644 index 0000000..8458424 --- /dev/null +++ b/test/test_integrations.cpp @@ -0,0 +1,64 @@ +// +// Created by Bedeho Mender on 28/02/17. +// + +#include +#include + +//#include "TorrentClientSwarm.hpp" +#include "Swarm.hpp" +#include "BasicObserver.hpp" + +#include +//#include + +/** +TEST(IntegrationTesting, Connectivity) { + +} + +TEST(IntegrationTesting, OneToOne) { + + // Create swarm + // - one seller + // - one buyer + TorrentClientSwarm swarm(boost::filesystem::current_path(), + 0, + 0, + 0, + { protocol_wire::SellerTerms(1, 1000, 2, 1, 1) }, + { protocol_wire::BuyerTerms(5, 2000, 1, 1) }); + + // Establish full connectivity + swarm.fully_connect(); + + // Run swarm event loop for five times at 1s intervals + swarm.run_event_loop(5, std::chrono::seconds(1)); + + // *** assert something about final states *** + //assert(swarm.) +} +*/ + +TEST(IntegrationTesting, OneToOne) { + + BasicObserver observer("my_observer"); + + SwarmBuilder builder; + + builder.add(&observer); + + Swarm swarm = builder.build(boost::filesystem::current_path()); + + swarm.run(5, std::chrono::seconds(1)); + + // *** assert something about final states *** + //assert(observer.) +} + + +int main(int argc, char *argv[]) +{ + ::testing::InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +}