From 333022ca00a182e4cdc2ac01b9206c6e370d38c4 Mon Sep 17 00:00:00 2001 From: Bedeho Mender Date: Mon, 27 Feb 2017 08:55:43 +0100 Subject: [PATCH 01/10] old mocks --- test/MockAlertManager.hpp | 35 +++++++++++++++++++++++++++++++++++ test/MockPeer.hpp | 27 +++++++++++++++++++++++++++ test/MockSession.hpp | 25 +++++++++++++++++++++++++ test/MockTorrent.hpp | 25 +++++++++++++++++++++++++ 4 files changed, 112 insertions(+) create mode 100644 test/MockAlertManager.hpp create mode 100644 test/MockPeer.hpp create mode 100644 test/MockSession.hpp create mode 100644 test/MockTorrent.hpp diff --git a/test/MockAlertManager.hpp b/test/MockAlertManager.hpp new file mode 100644 index 0000000..1171208 --- /dev/null +++ b/test/MockAlertManager.hpp @@ -0,0 +1,35 @@ +#ifndef EXTENSION_MOCK_ALERT_MANAGER_HPP +#define EXTENSION_MOCK_ALERT_MANAGER_HPP + +#include + +#include + +#include +#include + +namespace joystream { +namespace test { +namespace extension { + +class MockAlertManager : public interface::AlertManagerInterface { +public: + MOCK_METHOD0(native_handle, libtorrent::alert_manager*()); + /* + * Template methods can't be mocked directly in gmock. + * We hence mock overloaded methods as per our use. + */ + MOCK_METHOD1(plugin_emplace_alert, void(extension::status::Plugin)); + MOCK_METHOD1(request_emplace_alert, void(extension::alert::LoadedCallBack&)); + MOCK_METHOD6(anchorAnnounced_emplace_alert, void( + libtorrent::torrent_handle, libtorrent::tcp::endpoint&, + quint64, const Coin::typesafeOutPoint&, + const Coin::PublicKey&, const Coin::PubKeyHash& + )); +}; + +} +} +} + +#endif // EXTENSION_MOCK_ALERT_MANAGER_HPP diff --git a/test/MockPeer.hpp b/test/MockPeer.hpp new file mode 100644 index 0000000..c45ef3e --- /dev/null +++ b/test/MockPeer.hpp @@ -0,0 +1,27 @@ +#ifndef GMOCK_MOCK_PEER_HPP +#define GMOCK_MOCK_PEER_HPP + +#include + +#include + +namespace joystream { +namespace test { +namespace extension { + +class MockPeer : public interface::PeerInterface { + public: + MOCK_CONST_METHOD0(isOutgoing, bool()); + MOCK_CONST_METHOD0(connect, void()); + MOCK_CONST_METHOD1(getPeerInfo, void(libtorrent::peer_info &)); + MOCK_CONST_METHOD3(sendBuffer, void(char const*, int, int=0)); + MOCK_CONST_METHOD3(disconnect, void(libtorrent::error_code const&, libtorrent::operation_t, int=0)); + MOCK_CONST_METHOD0(pid, libtorrent::peer_id const&()); + MOCK_CONST_METHOD0(native_handle, libtorrent::peer_connection_handle()); +}; + +} +} +} + +#endif // GMOCK_MOCK_PEER_HPP diff --git a/test/MockSession.hpp b/test/MockSession.hpp new file mode 100644 index 0000000..4fe9c75 --- /dev/null +++ b/test/MockSession.hpp @@ -0,0 +1,25 @@ +#ifndef EXTENSION_MOCK_SESSION_HPP +#define EXTENSION_MOCK_SESSION_HPP + +#include + +#include + +namespace joystream { +namespace test { +namespace extension { + +class MockSession : public interface::SessionInterface { +public: + MOCK_METHOD0(pause, void()); + MOCK_METHOD0(resume, void()); + MOCK_METHOD0(native_handle, libtorrent::session_handle()); + MOCK_METHOD1(find, interface::TorrentInterface*(libtorrent::sha1_hash const &)); + MOCK_METHOD1(add, interface::TorrentInterface*(libtorrent::add_torrent_params const &)); +}; + +} +} +} + +#endif // EXTENSION_MOCK_SESSION_HPP diff --git a/test/MockTorrent.hpp b/test/MockTorrent.hpp new file mode 100644 index 0000000..9810fdb --- /dev/null +++ b/test/MockTorrent.hpp @@ -0,0 +1,25 @@ +#ifndef EXTENSION_MOCK_TORRENT_HPP +#define EXTENSION_MOCK_TORRENT_HPP + +#include + +#include +#include + +#include + +namespace joystream { +namespace test { +namespace extension { + +class MockTorrent : public interface::TorrentInterface { +public: + MOCK_CONST_METHOD0(infoHash, libtorrent::sha1_hash()); + MOCK_CONST_METHOD0(native_handle, libtorrent::torrent_handle()); +}; + +} +} +} + +#endif // EXTENSION_MOCK_TORRENT_HPP From e3a5946339472c5660304e691f9b965157e1ee22 Mon Sep 17 00:00:00 2001 From: Bedeho Mender Date: Mon, 27 Feb 2017 18:42:11 +0100 Subject: [PATCH 02/10] update ignore file --- .gitignore | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.gitignore b/.gitignore index ea77a9f..45d58e7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,10 @@ build/ conanbuildinfo.* conaninfo.txt +# Default CLion Cmake build folder +cmake-build-debug +cmake-build-releas + +# CLion IDE settings directory +.idea + From af98f8a3538370d93caaad9579761f1599c41c80 Mon Sep 17 00:00:00 2001 From: Bedeho Mender Date: Sat, 4 Mar 2017 08:01:55 +0100 Subject: [PATCH 03/10] wip --- CMakeLists.txt | 29 ++++ test/PollableInterface.hpp | 45 +++++ test/TorrentClient.cpp | 336 ++++++++++++++++++++++++++++++++++++ test/TorrentClient.hpp | 172 ++++++++++++++++++ test/TorrentClientSwarm.cpp | 150 ++++++++++++++++ test/TorrentClientSwarm.hpp | 76 ++++++++ test/test_integrations.cpp | 74 ++++++++ 7 files changed, 882 insertions(+) create mode 100644 test/PollableInterface.hpp create mode 100644 test/TorrentClient.cpp create mode 100644 test/TorrentClient.hpp create mode 100644 test/TorrentClientSwarm.cpp create mode 100644 test/TorrentClientSwarm.hpp create mode 100644 test/test_integrations.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index cdecf30..3f09bd7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,3 +31,32 @@ set( # === build library === add_library(extension ${library_sources}) + +# === build tests === +if(build_tests) + + # test library prepared, + # mock headers not included, as they + set( + test_lib_source + test/TorrentClient.cpp + test/TorrentClientSwarm.cpp + ) + + # === test library === + include_directories("${CMAKE_SOURCE_DIR}/test") + + add_library(test_lib ${test_lib_source}) + + file(GLOB tests RELATIVE "${CMAKE_SOURCE_DIR}" "test/test_*.cpp") + + enable_testing() + + foreach(s ${tests}) + get_filename_component (sn ${s} NAME_WE) + add_executable(${sn} ${s}) + target_link_libraries(${sn} test_lib extension ${CONAN_LIBS}) + add_test(${sn} "bin/${sn}") + endforeach(s) + +endif() diff --git a/test/PollableInterface.hpp b/test/PollableInterface.hpp new file mode 100644 index 0000000..8dbbe81 --- /dev/null +++ b/test/PollableInterface.hpp @@ -0,0 +1,45 @@ +/** + * Copyright (C) JoyStream - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited + * Proprietary and confidential + * Written by Bedeho Mender , Feburary 18 2017 + */ + +#ifndef POLLABLEINTERFACE_HPP +#define POLLABLEINTERFACE_HPP + +#include +#include +#include + +/** + * @brief Interface for type which has time out based polling + */ +class PollableInterface { + +public: + + virtual void poll() = 0; +}; + +struct Poller { + + const std::vector subjects; + + template< class Rep, class Period > + void run(unsigned int iteration_counter, + const std::chrono::duration & iteration_sleep_duration) { + + for(unsigned int i = 0; i < iteration_counter; i++) { + + for(auto s : subjects) + s->poll(); + + std::this_thread::sleep_for(iteration_sleep_duration); + } + + } + +}; + +#endif // POLLABLEINTERFACE_HPP diff --git a/test/TorrentClient.cpp b/test/TorrentClient.cpp new file mode 100644 index 0000000..9396c7f --- /dev/null +++ b/test/TorrentClient.cpp @@ -0,0 +1,336 @@ +/** + * Copyright (C) JoyStream - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited + * Proprietary and confidential + * Written by Bedeho Mender , Feburary 18 2017 + */ + +#include "TorrentClient.hpp" + +#include + +TorrentClient::Buyer(const std::unique_ptr & session, + const boost::shared_ptr & plugin) + : _state(state::Init()) + , _session(session) + , _plugin(plugin) { +} + +void TorrentClient::start_torrent_plugin(const libtorrent::add_torrent_params & params) { + + if(boost::get(&_state)) { + + this->_state = state::AddingTorrent(); + + // Add torrent + plugin->submit(extension::request::AddTorrent(params, [this](libtorrent::error_code & ec, libtorrent::torrent_handle &) -> void { + + if(ec) + this->_state = state::AddingTorrentFailed(); + else { + this->_state = state::AddedTorrent(params); + + auto * added_torrent_state = boost::get(&this->_state); + + // Start plugin + plugin->submit(extension::request::Start(params.info_hash, [added_torrent_state](const std::exception_ptr & e) { + + if(e) + *(added_torrent_state) = state::StartingPluginFailed(); + else + *(added_torrent_state) = state::PluginStarted(); + + })); + + } + + })); + + // Run alert processing long enough to process all callbacks above to completion + RunPollerLoop(this, 3, 1*std::chrono_literals::s); + + } else + throw std::runtime_error("Cannot start, is already started."); +} + +void TorrentClient::connect(const libtorrent::tcp::endpoint & endpoint) { + + if(auto * added_torrent = boost::get(&_state)) { + + // Get torrent handle + libtorrent::torrent_handle h = _session->find_torrent(added_torrent->params.info_hash); + + // Make connection with peer on torrent + h.connect_peer(endpoint); + + } else + throw std::runtime_error("Cannot start, torrent not yet added."); +} + +state::AddedTorrent * has_plugin_started(const State * state) { + + if(auto * added_torrent_state = boost::get(state)) { + if(boost::get(added_torrent_state)) + return added_torrent_state; + else + throw std::runtime_error("Cannot start plugin, have to be in state: AddedTorrent::PluginStarted"); + } else + throw std::runtime_error("Cannot start, have to be in state: AddedTorrent."); +} + +void Buy::async_buy(const protocol_wire::BuyerTerms & terms) { + + state::AddedTorrent * added_torrent_state = has_plugin_started(&_state); + + *(added_torrent_state) = Buy(terms); + + auto * buy_state = boost::get(added_torrent_state); + + // To buy mode + plugin->submit(extension::request::ToBuyMode(added_torrent_state->params.info_hash, terms, [buy_state](const std::exception_ptr & e) { + + if(e) + (*buy_state) = state::AddedTorrent::Buy::StartingBuyModeFailed(); + else { + + (*buy_state) = state::AddedTorrent::Buy::BuyModeStarted(); + + // At this point, we have to wait for an asynchronous event, namely + // that a connection with a suitable seller is estalished, + // and we catch this in alert processor. + + } + + })); + +} + +void TorrentClient::async_sell(const protocol_wire::SellerTerms & terms) { + + state::AddedTorrent * added_torrent_state = has_plugin_started(&_state); + + *(added_torrent_state) = Sell(terms); + + auto * sell_state = boost::get(added_torrent_state); + + // To sell mode + plugin->submit(extension::request::ToSellMode(added_torrent_state->params.info_hash, terms, [sell_state](const std::exception_ptr & e) { + + if(e) + (*sell_state) = state::AddedTorrent::Sell::StartingSellModeFailed(); + else { + + (*sell_state) = state::AddedTorrent::Sell::SellModeStarted(); + + // At this point, we have to wait for an asynchronous event, namely + // that a connection with a suitable seller is estalished, + // and we catch this in alert processor. + + } + + })); +} + +void TorrentClient::async_observe() { + + state::AddedTorrent * added_torrent_state = has_plugin_started(&_state); + + *(added_torrent_state) = Observe(); + + auto * observe_state = boost::get(added_torrent_state); + + // To sell mode + plugin->submit(extension::request::ToObserveMode(added_torrent_state->params.info_hash, [observe_state](const std::exception_ptr & e) { + + if(e) + (*observe_state) = state::AddedTorrent::Observe::StartingObserveModeFailed(); + else { + + (*observe_state) = state::AddedTorrent::Observe::ObserveModeStarted(); + + // nothing more to do for this mode + + } + + })); +} + +void TorrentClient::poll() { + + /// The purpose of this routine is to facilitate communication between + /// the state machine and the session/plugin. + /// + /// a) To get messages from the latter to the former, session::pop_alerts + /// is used, and extension::alert::RequestResult are also processed, as they + /// can be used to send messages to the state machine also + /// + /// b) To allow former to ask latter for messages with state updates, + /// a timeout event is submitted to it. + + if(Started * s = boost::get(&_state)) { + + // Process alerts + std::vector alerts; + s->session->pop_alerts(&alerts); + + for(auto a : alerts) { + + if(extension::alert::RequestResult const * p = libtorrent::alert_cast(a)) + p->loadedCallback(); // Make loaded callback + else + s->process(a); + } + + // Get status update on torrent plugins + s->plugin->submit(extension::request::PostTorrentPluginStatusUpdates()); + } +} + +State TorrentClient::state() const noexcept { + return _state; +} + +void TorrentClient::process(const libtorrent::alert * a) { + + if(extension::alert::TorrentPluginStatusUpdateAlert const * p = libtorrent::alert_cast(a)) + process(p); + else if(extension::alert::PeerPluginStatusUpdateAlert const * p = libtorrent::alert_cast(a)) + process(p); + + // ** if peer disconnect occured, then that is a problem? ** +} + +void TorrentClient::process(const extension::alert::TorrentPluginStatusUpdateAlert * p) { + + for(auto m: p->statuses) + plugin->submit(extension::request::PostPeerPluginStatusUpdates(m.first)); +} + +struct SellerInformation { + + SellerInformation() {} + + SellerInformation(const protocol_wire::SellerTerms & terms, + const Coin::PublicKey & contractPk) + : terms(terms) + , contractPk(contractPk) { + } + + protocol_wire::SellerTerms terms; + Coin::PublicKey contractPk; +}; + +std::map select_N_sellers(unsigned int N, + const std::map & statuses); + +void TorrentClient::process(const extension::alert::PeerPluginStatusUpdateAlert * p) { + + state::AddedTorrent * s; + if(auto * added_torrent_state = boost::get(state)) { + if(s = boost::get(added_torrent_state)) + + else + return; + } else + return; + + + // figure out if torrent hasbeen added + // if no, then ignore + // if yes, then figure out if mod + // buy => call + // sell => + // observe + +} + +void TorrentClient::process(const extension::alert::PeerPluginStatusUpdateAlert *p) { + + if(state == State::buy_mode_started) { + + std::map sellers; + + try { + sellers = select_N_sellers(terms.minNumberOfSellers(), p->statuses); + } catch(const std::runtime_error & e) { + //log("Coulndt find sufficient number of suitable sellers"); + return; + } + + // Create contract commitments and download information + protocol_session::PeerToStartDownloadInformationMap map; + paymentchannel::ContractTransactionBuilder::Commitments commitments(sellers.size()); + + uint32_t output_index = 0; + for(const auto & s: sellers) { + + // fixed for now + int64_t value = 100000; + Coin::KeyPair buyerKeyPair(Coin::PrivateKey::generate()); // **replace later with determinisic key** + Coin::PubKeyHash buyerFinalPkHash; + + protocol_session::StartDownloadConnectionInformation inf(s.second.terms, + output_index, + value, + buyerKeyPair, + buyerFinalPkHash); + + map.insert(std::make_pair(s.first, inf)); + + + commitments[output_index] = paymentchannel::Commitment(value, + buyerKeyPair.pk(), + s.second.contractPk, // payeePK + Coin::RelativeLockTime(Coin::RelativeLockTime::Units::Time, s.second.terms.minLock())); + + output_index++; + } + + // Create contract transaction + paymentchannel::ContractTransactionBuilder c; + c.setCommitments(commitments); + + Coin::Transaction tx = c.transaction(); + + // Starting download + + // state = StartingDownload() + + plugin->submit(extension::request::StartDownloading(p->handle.info_hash(), tx, map, [=](const std::exception_ptr & e) -> void { + + if(e) + state = State::downloading_starting_failed; // StartingDownloadFailed() + else + state = State::downloading_started; // DownloadingStarted() + + // we are done, nothing more to do? + + })); + + } else if () {} + else if () {} +} + +std::map select_N_sellers(unsigned int N, const std::map & statuses) { + + std::map selected; + + for(auto s : statuses) { + + libtorrent::tcp::endpoint ep = s.first; + extension::status::PeerPlugin status = s.second; + + if(status.peerBEP10SupportStatus == extension::BEPSupportStatus::supported && + status.peerBitSwaprBEPSupportStatus == extension::BEPSupportStatus::supported && + status.connection.is_initialized()) { + + auto & machine = status.connection.get().machine; + + if(machine.innerStateTypeIndex == typeid(protocol_statemachine::PreparingContract) && selected.size() < N) + selected[ep] = SellerInformation(machine.announcedModeAndTermsFromPeer.sellModeTerms(), + machine.payor.payeeContractPk()); + } + } + + return selected; +} diff --git a/test/TorrentClient.hpp b/test/TorrentClient.hpp new file mode 100644 index 0000000..db7ac43 --- /dev/null +++ b/test/TorrentClient.hpp @@ -0,0 +1,172 @@ +/** + * Copyright (C) JoyStream - All Rights Reserved + * Unauthorized copying of this file, via any medium is strictly prohibited + * Proprietary and confidential + * Written by Bedeho Mender , Feburary 18 2017 + */ + +#ifndef TORRENT_CLIENT_HPP +#define TORRENT_CLIENT_HPP + +#include "PollableInterface.hpp" + +#include +#include +#include +#include + +#include + +#include + +using namespace joystream; + +namespace libtorrent { + class session; +} + +namespace state { + + struct Init {}; + struct AddingTorrent {}; + struct AddingTorrentFailed {}; + struct AddedTorrent { + + struct WaitingForMode {}; + + struct Buy { + + struct StartingBuyMode {}; + struct StartingBuyModeFailed {}; + struct BuyModeStarted {}; // <== from here, we listen to alerts + + struct StartingPlugin {}; + struct StartingPluginFailed {}; + struct PluginStarted {}; + + struct StartingDownload {}; + struct StartingDownloadFailed {}; + + struct DownloadingStarted {}; + + typedef boost::variant State; + + Buy(const protocol_wire::BuyerTerms & terms) + : state(StartingBuyMode()) + , terms(terms) {} + + State state; + protocol_wire::BuyerTerms terms; + }; + + struct Sell { + + struct StartingSellMode {}; + struct StartingSellModeFailed {}; + struct SellModeStarted {}; // <== from here, we listen to alerts + + struct StartingPlugin {}; + struct StartingPluginFailed {}; + struct PluginStarted {}; + + struct StartingUploading {}; + struct StartingUploadingFailed {}; + + struct Uploading {}; + + typedef boost::variant State; + Sell(const protocol_wire::SellerTerms & terms) + : state(StartingSellMode()) + , terms(terms) {} + + State state; + protocol_wire::SellerTerms terms; + }; + + struct Observe { + + struct StartingObserveMode {}; + struct StartingObserveModeFailed {}; + struct ObserveModeStarted {}; // Done + + struct StartingPlugin {}; + struct StartingPluginFailed {}; + struct PluginStarted {}; + + typedef boost::variant State; + Observe() + : state(StartingObserveMode()) {} + + State state; + }; + + typedef boost::variant State; + + AddedTorrent(const libtorrent::add_torrent_params & params) + : state(StartingPlugin()) + , params(params) {} + + State state; + libtorrent::add_torrent_params params; + }; + +} + +/// change the name of this? some how indicate that it is in charge of a torrent on a joystream session, +/// torrent client gets confusing +/** + * @brief Client for a torrent?? + */ +class TorrentClient : public PollableInterface { + +public: + + typedef boost::variant State; + + TorrentClient(libtorrent::session * session, + const boost::shared_ptr & plugin); + + void start_torrent_plugin(const libtorrent::add_torrent_params & params); + + void connect(const libtorrent::tcp::endpoint &); + + void async_buy(const protocol_wire::BuyerTerms & terms); + + void async_sell(const protocol_wire::SellerTerms & terms); + + void async_observe(); + + void poll(); + + State state() const noexcept; + +private: + + void process(const libtorrent::alert * a); + void process(const extension::alert::TorrentPluginStatusUpdateAlert * p); + void process(const extension::alert::PeerPluginStatusUpdateAlert * p); + + State _state; + libtorrent::session * session; + boost::shared_ptr plugin; +}; + +#endif // TORRENT_CLIENT_HPP diff --git a/test/TorrentClientSwarm.cpp b/test/TorrentClientSwarm.cpp new file mode 100644 index 0000000..d39ff83 --- /dev/null +++ b/test/TorrentClientSwarm.cpp @@ -0,0 +1,150 @@ +#include "TorrentClientSwarm.hpp" + +#define WORK_DIR_NAME "swarm_workdir" + +//TorrentClientSwarm::Participant + +TorrentClientSwarm::Participant::Participant(const std::shared_ptr & session, + const boost::shared_ptr & plugin) + : session(session) + , plugin(plugin) + , client(session, plugin) { +} + +libtorrent::tcp::endpoint TorrentClientSwarm::Participant::endpoint() const noexcept { + // ? +} + +//TorrentClientSwarm + +std::unique_ptr make_participant() { + + // Create session + auto session = std::make_shared(); + + // Create plugin and add it to plugin + boost::make_shared(1000, + session->native_handle(), + &(session->native_handle()->alerts())); + + std::unique_ptr ptr(new Participant(session, plugin)); + +} + +std::unique_ptr make_uploader_participant() { + + auto p = make_participant(); + + // Add torrent + libtorrent::add_torrent_params params; + params.save_path = data_source_folder; // <== needs valid save path + params.ti = ti; + //ti.info_hash(); <== do we store this somewhere? + + p->session->add_torrent(params); + +} + +std::unique_ptr make_downloader_participant() { + + auto p = make_participant(); + + p->session->add_torrent(params); +} + + +libtorrent::add_torrent_params make_seeder_params() { + +} + +TorrentClientSwarm::TorrentClientSwarm(const std::string & base_folder, + const std::string & data_source_folder, + libtorrent::torrent_info & ti, + uint16_t number_of_normal_seeder_clients, + uint16_t number_of_normal_leecher_clients, + uint16_t number_of_observer_clients, + const std::vector seller_client_terms, + const std::vector & buyer_client_terms) { + + // Clean up from prior run. + // Its better to do this at start, as a prior run cannot be + // trusted to do its own cleanup, e.g. if it gets interrupted. + // boost::delete_folder(base_folder, ec); + // asset(ec); + + // Create same folder again + // boost::create_folder(base_folder, ec); + // assert(ec); + + /// Setup clients + + std::vector all_endpoints; + std::vector all_clients; + + // Setup normal seeders + for(int i = 0;i < number_of_normal_seeder_clients;i++) { + + std::unique_ptr ptr = make_participant(); + + normal_seeders.push_back(std::move(ptr)); + all_endpoints.push_back(ptr->endpoint()); + all_clients.push_back(&ptr->client); + } + + // Setup normal leechers + for(int i = 0;i < number_of_normal_leecher_clients;i++) { + + std::unique_ptr ptr = make_participant(); + + ptr->client.session()->add_torrent(params); + + // Add torrent + + normal_leecher.push_back(std::move(ptr)); + + all_endpoints.push_back(ptr->endpoint()); + all_clients.push_back(&ptr->client); + } + + // Setup observers + for(int i = 0;i < number_of_observer_clients;i++) { + std::unique_ptr ptr = make_participant(); + + // Add torrent + libtorrent::add_torrent_params params; + params.ti = ti; + ptr-> + + + // Start + ptr->client.start_torrent_plugin(params); + + normal_leecher.push_back(std::move(ptr)); + + all_endpoints.push_back(ptr->endpoint()); + all_clients.push_back(&ptr->client); + } + + // Setup sellers + for(int i = 0; i < seller_client_terms.size();i++) { + + all_endpoints.push_back(ptr->endpoint()); + all_clients.push_back(&ptr->client); + } + + // Setup buyers + for(int i = 0; i < buyer_client_terms.size();i++) { + + all_endpoints.push_back(ptr->endpoint()); + all_clients.push_back(&ptr->client); + } + + // Connect all appropriately + + for(auto c: all_clients) + for(auto e: all_endpoints) { + + if(c->endpoint() != e) + c->connect(e); + } +} \ No newline at end of file diff --git a/test/TorrentClientSwarm.hpp b/test/TorrentClientSwarm.hpp new file mode 100644 index 0000000..ae3eb7b --- /dev/null +++ b/test/TorrentClientSwarm.hpp @@ -0,0 +1,76 @@ +// +// Created by Bedeho Mender on 28/02/17. +// + +#ifndef TORRENTCLIENTSWARM_HPP +#define TORRENTCLIENTSWARM_HPP + +#include "PollableInterface.hpp" +#include "TorrentClient.hpp" +#include + +struct TorrentClientSwarm { + + /** + * Creates a swarm with torrent clients + * @param base_folder + * @param data_source_folder + * @param ti + * @param number_of_normal_clients + * @param number_of_observer_clients + * @param seller_client_terms + * @param buyer_client_terms + */ + TorrentClientSwarm(const std::string & base_folder, + const std::string & data_source_folder, + libtorrent::torrent_info & ti, + uint16_t number_of_normal_seeder_clients, + uint16_t number_of_normal_leecher_clients, + uint16_t number_of_observer_clients, + const std::vector seller_client_terms, + const std::vector & buyer_client_terms); + + template< class Rep, class Period > + void run_event_loop(unsigned int iteration_counter, + const std::chrono::duration & iteration_sleep_duration) { + + // Create poller + Poller poller; + + // Add clients to poller + for(const Participant & p : normal) + poller.subjects.push_back(&p.client); + + for(const Participant & p : observers) + poller.subjects.push_back(&p.client); + + for(const Participant & p : sellers) + poller.subjects.push_back(&p.client); + + for(const Participant & p : buyers) + poller.subjects.push_back(&p.client); + + // Run event loop + poller.run(iteration_counter, iteration_sleep_duration); + } + + struct Participant { + + Participant(const std::shared_ptr & session, + const boost::shared_ptr & plugin); + + libtorrent::tcp::endpoint endpoint() const noexcept; + + std::shared_ptr session; + boost::shared_ptr plugin; + TorrentClient client; + }; + + std::vector> normal_seeders, + normal_leecher, + observers, + sellers, + buyers; +}; + +#endif //TORRENTCLIENTSWARM_HPP diff --git a/test/test_integrations.cpp b/test/test_integrations.cpp new file mode 100644 index 0000000..81aefb8 --- /dev/null +++ b/test/test_integrations.cpp @@ -0,0 +1,74 @@ +#include +#include + +#include "PollableInterface.hpp" +#include "TorrentClientSwarm.hpp" +#include + + +#include + +#define POLLING_COUNT 3 +#define POLLING_SLEEP_DURATION 1*std::chrono_literals::s + +libtorrent::session * basic_session(); +libtorrent::torrent_info load(const std::string file); + +// +// how to locate valid path to torrent file, and partial downloads, +// which live in source tree? +// how to cleanup files when done? + +TEST(IntegrationTesting, Connectivity) { + +} + +/** +// generate torrent content (not random) +EXPORT std::shared_ptr create_torrent(std::ostream* file = 0 + , char const* name = "temporary", int piece_size = 16 * 1024, int num_pieces = 13 + , bool add_tracker = true, std::string ssl_certificate = ""); +*/ + +TEST(IntegrationTesting, OneToOne) { + + std::string base_folder; // = , + std::string data_source_folder; // = + + // Create torrent file and content + libtorrent::torrent_info & ti; // create_torrent_file("base_folder_here") + + // neutral + uint16_t number_of_normal_seeder_clients = 0, + number_of_normal_leecher_clients = 0, + number_of_observer_clients = 0; + + // sellers + std::vector seller_client_terms; + + // buyers + std::vector buyer_client_terms; + + // Create swarm + TorrentClientSwarm swarm(base_folder, + data_source_folder, + ti, + number_of_normal_seeder_clients, + number_of_normal_leecher_clients, + number_of_observer_clients, + seller_client_terms, + buyer_client_terms); + + + // Run swarm event loop + swarm.run_event_loop(POLLING_COUNT, POLLING_SLEEP_DURATION); + + // *** assert something about final states *** + //assert(swarm.) +} + +int main(int argc, char *argv[]) +{ + ::testing::InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +} From 4b64f48d419889f0c6a6f77813d72f6b76a988a5 Mon Sep 17 00:00:00 2001 From: Bedeho Mender Date: Mon, 6 Mar 2017 08:03:42 +0100 Subject: [PATCH 04/10] wip2 --- test/test_integrations.cpp | 58 +++++++++++++++----------------------- 1 file changed, 22 insertions(+), 36 deletions(-) diff --git a/test/test_integrations.cpp b/test/test_integrations.cpp index 81aefb8..3f53148 100644 --- a/test/test_integrations.cpp +++ b/test/test_integrations.cpp @@ -1,63 +1,49 @@ +// +// Created by Bedeho Mender on 28/02/17. +// + #include #include -#include "PollableInterface.hpp" #include "TorrentClientSwarm.hpp" -#include - -#include +//#include #define POLLING_COUNT 3 #define POLLING_SLEEP_DURATION 1*std::chrono_literals::s -libtorrent::session * basic_session(); -libtorrent::torrent_info load(const std::string file); +// Generate torrent content (not random) +std::shared_ptr create_torrent(std::ostream* file = 0, + char const* name = "temporary", + int piece_size = 16 * 1024, + int num_pieces = 13, + bool add_tracker = true, + std::string ssl_certificate = "") { -// -// how to locate valid path to torrent file, and partial downloads, -// which live in source tree? -// how to cleanup files when done? +} TEST(IntegrationTesting, Connectivity) { } -/** -// generate torrent content (not random) -EXPORT std::shared_ptr create_torrent(std::ostream* file = 0 - , char const* name = "temporary", int piece_size = 16 * 1024, int num_pieces = 13 - , bool add_tracker = true, std::string ssl_certificate = ""); -*/ - TEST(IntegrationTesting, OneToOne) { - std::string base_folder; // = , - std::string data_source_folder; // = + std::string base_folder; // = boost::this_folder(); , + std::string data_source_folder; // = base_folder + boost::file_sep ? "xxx" // Create torrent file and content - libtorrent::torrent_info & ti; // create_torrent_file("base_folder_here") - - // neutral - uint16_t number_of_normal_seeder_clients = 0, - number_of_normal_leecher_clients = 0, - number_of_observer_clients = 0; - - // sellers - std::vector seller_client_terms; - - // buyers - std::vector buyer_client_terms; + //std::ostream* file + std::shared_ptr ti = create_torrent_file("base_folder_here"); // Create swarm TorrentClientSwarm swarm(base_folder, data_source_folder, ti, - number_of_normal_seeder_clients, - number_of_normal_leecher_clients, - number_of_observer_clients, - seller_client_terms, - buyer_client_terms); + 1, + 1, + 0, + std::vector(), + std::vector()); // Run swarm event loop From 8d578d1c5d2e437c386983b0258d61701ef9ab03 Mon Sep 17 00:00:00 2001 From: Bedeho Mender Date: Mon, 6 Mar 2017 18:27:27 +0100 Subject: [PATCH 05/10] wip3 --- CMakeLists.txt | 2 + test/PollableInterface.hpp | 20 +-- test/TorrentClient.cpp | 282 ++++++++++++++++++------------------ test/TorrentClient.hpp | 177 ++++++++++++---------- test/TorrentClientSwarm.cpp | 4 +- 5 files changed, 253 insertions(+), 232 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 3f09bd7..5028808 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,6 +2,8 @@ cmake_minimum_required(VERSION 2.8.12 FATAL_ERROR) project(ProtocolSession CXX) +option(build_tests "build tests" OFF) + set(CMAKE_POSITION_INDEPENDENT_CODE ON) set(CMAKE_CXX_STANDARD 11) diff --git a/test/PollableInterface.hpp b/test/PollableInterface.hpp index 8dbbe81..69bd2fd 100644 --- a/test/PollableInterface.hpp +++ b/test/PollableInterface.hpp @@ -24,21 +24,21 @@ class PollableInterface { struct Poller { - const std::vector subjects; + std::vector subjects; - template< class Rep, class Period > - void run(unsigned int iteration_counter, - const std::chrono::duration & iteration_sleep_duration) { + template< class Rep, class Period > + void run(unsigned int iteration_counter, + const std::chrono::duration & iteration_sleep_duration) { - for(unsigned int i = 0; i < iteration_counter; i++) { + for(unsigned int i = 0; i < iteration_counter; i++) { - for(auto s : subjects) - s->poll(); + for(auto s : subjects) + s->poll(); - std::this_thread::sleep_for(iteration_sleep_duration); - } + std::this_thread::sleep_for(iteration_sleep_duration); + } - } + } }; diff --git a/test/TorrentClient.cpp b/test/TorrentClient.cpp index 9396c7f..4db67d3 100644 --- a/test/TorrentClient.cpp +++ b/test/TorrentClient.cpp @@ -9,45 +9,37 @@ #include -TorrentClient::Buyer(const std::unique_ptr & session, - const boost::shared_ptr & plugin) - : _state(state::Init()) - , _session(session) - , _plugin(plugin) { +#include + +TorrentClient::TorrentClient(libtorrent::session * session, + extension::Plugin * plugin) + : _state(state::Init()) + , _session(session) + , _plugin(plugin) { } -void TorrentClient::start_torrent_plugin(const libtorrent::add_torrent_params & params) { +void TorrentClient::add(const libtorrent::add_torrent_params & params) { if(boost::get(&_state)) { - this->_state = state::AddingTorrent(); - - // Add torrent - plugin->submit(extension::request::AddTorrent(params, [this](libtorrent::error_code & ec, libtorrent::torrent_handle &) -> void { - - if(ec) - this->_state = state::AddingTorrentFailed(); - else { - this->_state = state::AddedTorrent(params); - - auto * added_torrent_state = boost::get(&this->_state); + this->_state = state::AddingTorrent(); - // Start plugin - plugin->submit(extension::request::Start(params.info_hash, [added_torrent_state](const std::exception_ptr & e) { + // Add torrent + _plugin->submit(extension::request::AddTorrent(params, [this](libtorrent::error_code & ec, libtorrent::torrent_handle & h) -> void { - if(e) - *(added_torrent_state) = state::StartingPluginFailed(); + if(ec) + this->_state = state::AddingTorrentFailed(); else - *(added_torrent_state) = state::PluginStarted(); - - })); + this->_state = state::AddedTorrent(params, h); - } + })); - })); + // Run alert processing long enough to process all callbacks above to completion + Poller(this, 3, 1*std::chrono::seconds); - // Run alert processing long enough to process all callbacks above to completion - RunPollerLoop(this, 3, 1*std::chrono_literals::s); + // If we are still adding the torrent, then we have failed + if(boost::get(&_state)) + throw std::runtime_error("Torrent was not added within poller expiration time."); } else throw std::runtime_error("Cannot start, is already started."); @@ -55,80 +47,88 @@ void TorrentClient::start_torrent_plugin(const libtorrent::add_torrent_params & void TorrentClient::connect(const libtorrent::tcp::endpoint & endpoint) { - if(auto * added_torrent = boost::get(&_state)) { + if(auto * added_torrent = boost::get(&_state)) + added_torrent->handle.connect_peer(endpoint); + else + throw std::runtime_error("Cannot start, torrent not yet added."); +} - // Get torrent handle - libtorrent::torrent_handle h = _session->find_torrent(added_torrent->params.info_hash); +/** +auto * added_torrent_state = boost::get(&this->_state); - // Make connection with peer on torrent - h.connect_peer(endpoint); +// Start plugin +_plugin->submit(extension::request::Start(params.info_hash, [added_torrent_state](const std::exception_ptr & e) { - } else - throw std::runtime_error("Cannot start, torrent not yet added."); -} + if(e) + *(added_torrent_state) = state::StartingPluginFailed(); + else + *(added_torrent_state) = state::PluginStarted(); + +})); +*/ state::AddedTorrent * has_plugin_started(const State * state) { - if(auto * added_torrent_state = boost::get(state)) { - if(boost::get(added_torrent_state)) - return added_torrent_state; - else - throw std::runtime_error("Cannot start plugin, have to be in state: AddedTorrent::PluginStarted"); - } else - throw std::runtime_error("Cannot start, have to be in state: AddedTorrent."); + if(auto * added_torrent_state = boost::get(state)) { + if(boost::get(added_torrent_state)) + return added_torrent_state; + else + throw std::runtime_error("Cannot start plugin, have to be in state: AddedTorrent::PluginStarted"); + } else + throw std::runtime_error("Cannot start, have to be in state: AddedTorrent."); } void Buy::async_buy(const protocol_wire::BuyerTerms & terms) { - state::AddedTorrent * added_torrent_state = has_plugin_started(&_state); + state::AddedTorrent * added_torrent_state = has_plugin_started(&_state); - *(added_torrent_state) = Buy(terms); + *(added_torrent_state) = Buy(terms); - auto * buy_state = boost::get(added_torrent_state); + auto * buy_state = boost::get(added_torrent_state); - // To buy mode - plugin->submit(extension::request::ToBuyMode(added_torrent_state->params.info_hash, terms, [buy_state](const std::exception_ptr & e) { + // To buy mode + _plugin->submit(extension::request::ToBuyMode(added_torrent_state->params.info_hash, terms, [buy_state](const std::exception_ptr & e) { - if(e) - (*buy_state) = state::AddedTorrent::Buy::StartingBuyModeFailed(); - else { + if(e) + (*buy_state) = state::AddedTorrent::Buy::StartingBuyModeFailed(); + else { - (*buy_state) = state::AddedTorrent::Buy::BuyModeStarted(); + (*buy_state) = state::AddedTorrent::Buy::BuyModeStarted(); - // At this point, we have to wait for an asynchronous event, namely - // that a connection with a suitable seller is estalished, - // and we catch this in alert processor. + // At this point, we have to wait for an asynchronous event, namely + // that a connection with a suitable seller is estalished, + // and we catch this in alert processor. - } + } - })); + })); } void TorrentClient::async_sell(const protocol_wire::SellerTerms & terms) { - state::AddedTorrent * added_torrent_state = has_plugin_started(&_state); + state::AddedTorrent * added_torrent_state = has_plugin_started(&_state); - *(added_torrent_state) = Sell(terms); + *(added_torrent_state) = Sell(terms); - auto * sell_state = boost::get(added_torrent_state); + auto * sell_state = boost::get(added_torrent_state); - // To sell mode - plugin->submit(extension::request::ToSellMode(added_torrent_state->params.info_hash, terms, [sell_state](const std::exception_ptr & e) { + // To sell mode + _plugin->submit(extension::request::ToSellMode(added_torrent_state->params.info_hash, terms, [sell_state](const std::exception_ptr & e) { - if(e) - (*sell_state) = state::AddedTorrent::Sell::StartingSellModeFailed(); - else { + if(e) + (*sell_state) = state::AddedTorrent::Sell::StartingSellModeFailed(); + else { - (*sell_state) = state::AddedTorrent::Sell::SellModeStarted(); + (*sell_state) = state::AddedTorrent::Sell::SellModeStarted(); - // At this point, we have to wait for an asynchronous event, namely - // that a connection with a suitable seller is estalished, - // and we catch this in alert processor. + // At this point, we have to wait for an asynchronous event, namely + // that a connection with a suitable seller is estalished, + // and we catch this in alert processor. - } + } - })); + })); } void TorrentClient::async_observe() { @@ -140,15 +140,15 @@ void TorrentClient::async_observe() { auto * observe_state = boost::get(added_torrent_state); // To sell mode - plugin->submit(extension::request::ToObserveMode(added_torrent_state->params.info_hash, [observe_state](const std::exception_ptr & e) { + _plugin->submit(extension::request::ToObserveMode(added_torrent_state->params.info_hash, [observe_state](const std::exception_ptr & e) { if(e) - (*observe_state) = state::AddedTorrent::Observe::StartingObserveModeFailed(); + (*observe_state) = state::AddedTorrent::Observe::StartingObserveModeFailed(); else { - (*observe_state) = state::AddedTorrent::Observe::ObserveModeStarted(); + (*observe_state) = state::AddedTorrent::Observe::ObserveModeStarted(); - // nothing more to do for this mode + // nothing more to do for this mode } @@ -182,12 +182,12 @@ void TorrentClient::poll() { } // Get status update on torrent plugins - s->plugin->submit(extension::request::PostTorrentPluginStatusUpdates()); + s->_plugin->submit(extension::request::PostTorrentPluginStatusUpdates()); } } State TorrentClient::state() const noexcept { - return _state; + return _state; } void TorrentClient::process(const libtorrent::alert * a) { @@ -203,7 +203,7 @@ void TorrentClient::process(const libtorrent::alert * a) { void TorrentClient::process(const extension::alert::TorrentPluginStatusUpdateAlert * p) { for(auto m: p->statuses) - plugin->submit(extension::request::PostPeerPluginStatusUpdates(m.first)); + _plugin->submit(extension::request::PostPeerPluginStatusUpdates(m.first)); } struct SellerInformation { @@ -212,8 +212,8 @@ struct SellerInformation { SellerInformation(const protocol_wire::SellerTerms & terms, const Coin::PublicKey & contractPk) - : terms(terms) - , contractPk(contractPk) { + : terms(terms) + , contractPk(contractPk) { } protocol_wire::SellerTerms terms; @@ -225,14 +225,14 @@ std::map select_N_sellers(unsigned void TorrentClient::process(const extension::alert::PeerPluginStatusUpdateAlert * p) { - state::AddedTorrent * s; - if(auto * added_torrent_state = boost::get(state)) { - if(s = boost::get(added_torrent_state)) + state::AddedTorrent * s; + if(auto * added_torrent_state = boost::get(state)) { + if(s = boost::get(added_torrent_state)) - else - return; - } else - return; + else + return; + } else + return; // figure out if torrent hasbeen added @@ -246,69 +246,69 @@ void TorrentClient::process(const extension::alert::PeerPluginStatusUpdateAlert void TorrentClient::process(const extension::alert::PeerPluginStatusUpdateAlert *p) { - if(state == State::buy_mode_started) { + if(state == State::buy_mode_started) { - std::map sellers; + std::map sellers; - try { - sellers = select_N_sellers(terms.minNumberOfSellers(), p->statuses); - } catch(const std::runtime_error & e) { - //log("Coulndt find sufficient number of suitable sellers"); - return; - } + try { + sellers = select_N_sellers(terms.minNumberOfSellers(), p->statuses); + } catch(const std::runtime_error & e) { + //log("Coulndt find sufficient number of suitable sellers"); + return; + } - // Create contract commitments and download information - protocol_session::PeerToStartDownloadInformationMap map; - paymentchannel::ContractTransactionBuilder::Commitments commitments(sellers.size()); + // Create contract commitments and download information + protocol_session::PeerToStartDownloadInformationMap map; + paymentchannel::ContractTransactionBuilder::Commitments commitments(sellers.size()); - uint32_t output_index = 0; - for(const auto & s: sellers) { + uint32_t output_index = 0; + for(const auto & s: sellers) { - // fixed for now - int64_t value = 100000; - Coin::KeyPair buyerKeyPair(Coin::PrivateKey::generate()); // **replace later with determinisic key** - Coin::PubKeyHash buyerFinalPkHash; + // fixed for now + int64_t value = 100000; + Coin::KeyPair buyerKeyPair(Coin::PrivateKey::generate()); // **replace later with determinisic key** + Coin::PubKeyHash buyerFinalPkHash; - protocol_session::StartDownloadConnectionInformation inf(s.second.terms, - output_index, - value, - buyerKeyPair, - buyerFinalPkHash); + protocol_session::StartDownloadConnectionInformation inf(s.second.terms, + output_index, + value, + buyerKeyPair, + buyerFinalPkHash); - map.insert(std::make_pair(s.first, inf)); + map.insert(std::make_pair(s.first, inf)); - commitments[output_index] = paymentchannel::Commitment(value, - buyerKeyPair.pk(), - s.second.contractPk, // payeePK - Coin::RelativeLockTime(Coin::RelativeLockTime::Units::Time, s.second.terms.minLock())); + commitments[output_index] = paymentchannel::Commitment(value, + buyerKeyPair.pk(), + s.second.contractPk, // payeePK + Coin::RelativeLockTime(Coin::RelativeLockTime::Units::Time, s.second.terms.minLock())); - output_index++; - } + output_index++; + } - // Create contract transaction - paymentchannel::ContractTransactionBuilder c; - c.setCommitments(commitments); + // Create contract transaction + paymentchannel::ContractTransactionBuilder c; + c.setCommitments(commitments); - Coin::Transaction tx = c.transaction(); + Coin::Transaction tx = c.transaction(); - // Starting download + // Starting download - // state = StartingDownload() + // state = StartingDownload() - plugin->submit(extension::request::StartDownloading(p->handle.info_hash(), tx, map, [=](const std::exception_ptr & e) -> void { + _plugin->submit(extension::request::StartDownloading(p->handle.info_hash(), tx, map, [=](const std::exception_ptr & e) -> void { - if(e) - state = State::downloading_starting_failed; // StartingDownloadFailed() - else - state = State::downloading_started; // DownloadingStarted() + if(e) + state = State::downloading_starting_failed; // StartingDownloadFailed() + else + state = State::downloading_started; // DownloadingStarted() - // we are done, nothing more to do? + // we are done, nothing more to do? - })); + })); - } else if () {} - else if () {} + } else if () {} + else if () {} } std::map select_N_sellers(unsigned int N, const std::map & statuses) { @@ -317,19 +317,19 @@ std::map select_N_sellers(unsigned for(auto s : statuses) { - libtorrent::tcp::endpoint ep = s.first; - extension::status::PeerPlugin status = s.second; + libtorrent::tcp::endpoint ep = s.first; + extension::status::PeerPlugin status = s.second; - if(status.peerBEP10SupportStatus == extension::BEPSupportStatus::supported && - status.peerBitSwaprBEPSupportStatus == extension::BEPSupportStatus::supported && - status.connection.is_initialized()) { + if(status.peerBEP10SupportStatus == extension::BEPSupportStatus::supported && + status.peerBitSwaprBEPSupportStatus == extension::BEPSupportStatus::supported && + status.connection.is_initialized()) { - auto & machine = status.connection.get().machine; + auto & machine = status.connection.get().machine; - if(machine.innerStateTypeIndex == typeid(protocol_statemachine::PreparingContract) && selected.size() < N) - selected[ep] = SellerInformation(machine.announcedModeAndTermsFromPeer.sellModeTerms(), - machine.payor.payeeContractPk()); - } + if(machine.innerStateTypeIndex == typeid(protocol_statemachine::PreparingContract) && selected.size() < N) + selected[ep] = SellerInformation(machine.announcedModeAndTermsFromPeer.sellModeTerms(), + machine.payor.payeeContractPk()); + } } return selected; diff --git a/test/TorrentClient.hpp b/test/TorrentClient.hpp index db7ac43..28a186d 100644 --- a/test/TorrentClient.hpp +++ b/test/TorrentClient.hpp @@ -9,20 +9,29 @@ #define TORRENT_CLIENT_HPP #include "PollableInterface.hpp" - #include #include +#include #include -#include - #include - +#include #include using namespace joystream; namespace libtorrent { class session; + struct alert; +} + +namespace joystream { + namespace extension { + namespace alert { + struct TorrentPluginStatusUpdateAlert; + struct PeerPluginStatusUpdateAlert; + } + class Plugin; + } } namespace state { @@ -32,103 +41,112 @@ namespace state { struct AddingTorrentFailed {}; struct AddedTorrent { - struct WaitingForMode {}; + struct WaitingForMode {}; - struct Buy { + struct Buy { - struct StartingBuyMode {}; - struct StartingBuyModeFailed {}; - struct BuyModeStarted {}; // <== from here, we listen to alerts + struct StartingBuyMode {}; + struct StartingBuyModeFailed {}; - struct StartingPlugin {}; - struct StartingPluginFailed {}; - struct PluginStarted {}; + struct StartingPlugin {}; + struct StartingPluginFailed {}; + struct PluginStarted {}; - struct StartingDownload {}; - struct StartingDownloadFailed {}; + struct StartingDownload {}; + struct StartingDownloadFailed {}; + struct DownloadingStarted {}; - struct DownloadingStarted {}; + typedef boost::variant< + StartingBuyMode, + StartingBuyModeFailed, + StartingPlugin, + StartingPluginFailed, + PluginStarted, + StartingDownload, + StartingDownloadFailed, + StartingDownloadFailed, + DownloadingStarted> State; - typedef boost::variant State; + Buy(const protocol_wire::BuyerTerms & terms) + : state(StartingBuyMode()) + , terms(terms) {} - Buy(const protocol_wire::BuyerTerms & terms) - : state(StartingBuyMode()) - , terms(terms) {} + State state; + protocol_wire::BuyerTerms terms; + }; - State state; - protocol_wire::BuyerTerms terms; - }; + struct Sell { - struct Sell { + struct StartingSellMode {}; + struct StartingSellModeFailed {}; - struct StartingSellMode {}; - struct StartingSellModeFailed {}; - struct SellModeStarted {}; // <== from here, we listen to alerts + struct StartingPlugin {}; + struct StartingPluginFailed {}; + struct PluginStarted {}; - struct StartingPlugin {}; - struct StartingPluginFailed {}; - struct PluginStarted {}; + struct StartingUploading {}; + struct StartingUploadingFailed {}; - struct StartingUploading {}; - struct StartingUploadingFailed {}; + struct Uploading {}; - struct Uploading {}; + typedef boost::variant< + StartingSellMode, + StartingSellModeFailed, + StartingPlugin, + StartingPluginFailed, + PluginStarted, + StartingUploading, + StartingUploadingFailed, + Uploading> State; - typedef boost::variant State; - Sell(const protocol_wire::SellerTerms & terms) - : state(StartingSellMode()) - , terms(terms) {} + Sell(const protocol_wire::SellerTerms & terms) + : state(StartingSellMode()) + , terms(terms) {} - State state; - protocol_wire::SellerTerms terms; - }; + State state; + protocol_wire::SellerTerms terms; + }; - struct Observe { + struct Observe { - struct StartingObserveMode {}; - struct StartingObserveModeFailed {}; - struct ObserveModeStarted {}; // Done + struct StartingObserveMode {}; + struct StartingObserveModeFailed {}; - struct StartingPlugin {}; - struct StartingPluginFailed {}; - struct PluginStarted {}; + struct StartingPlugin {}; + struct StartingPluginFailed {}; + struct PluginStarted {}; - typedef boost::variant State; - Observe() - : state(StartingObserveMode()) {} + typedef boost::variant< + StartingObserveMode, + StartingObserveModeFailed, + StartingPlugin, + StartingPluginFailed, + PluginStarted> State; - State state; + Observe() + : state(StartingObserveMode()) {} + + State state; }; - typedef boost::variant State; + typedef boost::variant< + Buy, + Sell, + Observe> State; - AddedTorrent(const libtorrent::add_torrent_params & params) - : state(StartingPlugin()) - , params(params) {} + AddedTorrent(const libtorrent::add_torrent_params & params, + const libtorrent::torrent_handle & handle) + : state(WaitingForMode()) + , params(params) + , handle(handle) {} State state; libtorrent::add_torrent_params params; + libtorrent::torrent_handle handle; }; } -/// change the name of this? some how indicate that it is in charge of a torrent on a joystream session, -/// torrent client gets confusing /** * @brief Client for a torrent?? */ @@ -136,15 +154,16 @@ class TorrentClient : public PollableInterface { public: - typedef boost::variant State; + typedef boost::variant< + state::Init, + state::AddingTorrent, + state::AddingTorrentFailed, + state::AddedTorrent> State; TorrentClient(libtorrent::session * session, - const boost::shared_ptr & plugin); + extension::Plugin * plugin); - void start_torrent_plugin(const libtorrent::add_torrent_params & params); + void add(const libtorrent::add_torrent_params & params); void connect(const libtorrent::tcp::endpoint &); @@ -165,8 +184,8 @@ class TorrentClient : public PollableInterface { void process(const extension::alert::PeerPluginStatusUpdateAlert * p); State _state; - libtorrent::session * session; - boost::shared_ptr plugin; + libtorrent::session * _session; + extension::Plugin * _plugin; }; #endif // TORRENT_CLIENT_HPP diff --git a/test/TorrentClientSwarm.cpp b/test/TorrentClientSwarm.cpp index d39ff83..24526fb 100644 --- a/test/TorrentClientSwarm.cpp +++ b/test/TorrentClientSwarm.cpp @@ -116,8 +116,8 @@ TorrentClientSwarm::TorrentClientSwarm(const std::string & base_folder, ptr-> - // Start - ptr->client.start_torrent_plugin(params); + // Start + ptr->client.start_torrent_plugin(params); normal_leecher.push_back(std::move(ptr)); From f8c9c59313bc1c67636917c0f14d051b4834503e Mon Sep 17 00:00:00 2001 From: Bedeho Mender Date: Wed, 8 Mar 2017 07:34:08 +0100 Subject: [PATCH 06/10] wip4 --- test/Utilities.hpp | 17 +++++++++++++++++ test/test_integrations.cpp | 8 -------- 2 files changed, 17 insertions(+), 8 deletions(-) create mode 100644 test/Utilities.hpp diff --git a/test/Utilities.hpp b/test/Utilities.hpp new file mode 100644 index 0000000..7238102 --- /dev/null +++ b/test/Utilities.hpp @@ -0,0 +1,17 @@ +// +// Created by Bedeho Mender on 08/03/17. +// + +#ifndef UTILITIES_HPP +#define UTILITIES_HPP + +// NB: Taken from libtorrent/test/setup_transfer.(hpp/cpp) +std::shared_ptr create_torrent(std::ostream* file = 0, + char const* name = "temporary", + int piece_size = 16 * 1024, + int num_pieces = 13, + bool add_tracker = true, + std::string ssl_certificate = ""); + + +#endif //UTILITIES_HPP diff --git a/test/test_integrations.cpp b/test/test_integrations.cpp index 3f53148..a2ca528 100644 --- a/test/test_integrations.cpp +++ b/test/test_integrations.cpp @@ -12,15 +12,7 @@ #define POLLING_COUNT 3 #define POLLING_SLEEP_DURATION 1*std::chrono_literals::s -// Generate torrent content (not random) -std::shared_ptr create_torrent(std::ostream* file = 0, - char const* name = "temporary", - int piece_size = 16 * 1024, - int num_pieces = 13, - bool add_tracker = true, - std::string ssl_certificate = "") { -} TEST(IntegrationTesting, Connectivity) { From 15c49dd6c4cd7ddcfaec2b1e9ec828d822e7ace6 Mon Sep 17 00:00:00 2001 From: Bedeho Mender Date: Wed, 8 Mar 2017 21:19:56 +0100 Subject: [PATCH 07/10] wip5 --- test/TorrentClient.cpp | 7 +- test/TorrentClientSwarm.cpp | 180 +++++++++++++++++++----------------- test/TorrentClientSwarm.hpp | 64 +++++++------ test/Utilities.cpp | 78 ++++++++++++++++ test/Utilities.hpp | 17 ++-- test/test_integrations.cpp | 33 +++---- 6 files changed, 233 insertions(+), 146 deletions(-) create mode 100644 test/Utilities.cpp diff --git a/test/TorrentClient.cpp b/test/TorrentClient.cpp index 4db67d3..52b5740 100644 --- a/test/TorrentClient.cpp +++ b/test/TorrentClient.cpp @@ -75,7 +75,7 @@ state::AddedTorrent * has_plugin_started(const State * state) { else throw std::runtime_error("Cannot start plugin, have to be in state: AddedTorrent::PluginStarted"); } else - throw std::runtime_error("Cannot start, have to be in state: AddedTorrent."); + throw std::runtime_error("Cannot start plugin, have to be in state: AddedTorrent."); } void Buy::async_buy(const protocol_wire::BuyerTerms & terms) { @@ -228,8 +228,6 @@ void TorrentClient::process(const extension::alert::PeerPluginStatusUpdateAlert state::AddedTorrent * s; if(auto * added_torrent_state = boost::get(state)) { if(s = boost::get(added_torrent_state)) - - else return; } else return; @@ -307,8 +305,7 @@ void TorrentClient::process(const extension::alert::PeerPluginStatusUpdateAlert })); - } else if () {} - else if () {} + } } std::map select_N_sellers(unsigned int N, const std::map & statuses) { diff --git a/test/TorrentClientSwarm.cpp b/test/TorrentClientSwarm.cpp index 24526fb..b58c7ff 100644 --- a/test/TorrentClientSwarm.cpp +++ b/test/TorrentClientSwarm.cpp @@ -1,6 +1,13 @@ +#include +#include #include "TorrentClientSwarm.hpp" +#include "Utilities.hpp" -#define WORK_DIR_NAME "swarm_workdir" +#include + +#include + +#define WORK_DIR_NAME "swarm" //TorrentClientSwarm::Participant @@ -8,143 +15,148 @@ TorrentClientSwarm::Participant::Participant(const std::shared_ptr & plugin) : session(session) , plugin(plugin) - , client(session, plugin) { -} - -libtorrent::tcp::endpoint TorrentClientSwarm::Participant::endpoint() const noexcept { - // ? + , client(session.get(), plugin.get()) { } //TorrentClientSwarm -std::unique_ptr make_participant() { +TorrentClientSwarm::Participant * add_participant(std::vector> & v, + const libtorrent::add_torrent_params & params) { + // Create session auto session = std::make_shared(); // Create plugin and add it to plugin - boost::make_shared(1000, - session->native_handle(), - &(session->native_handle()->alerts())); - - std::unique_ptr ptr(new Participant(session, plugin)); + boost::make_shared plugin(1000); -} + std::unique_ptr ptr(new TorrentClientSwarm::Participant(session, plugin)); -std::unique_ptr make_uploader_participant() { - - auto p = make_participant(); + v.push_back(std::move(ptr)); // Add torrent - libtorrent::add_torrent_params params; - params.save_path = data_source_folder; // <== needs valid save path - params.ti = ti; - //ti.info_hash(); <== do we store this somewhere? - - p->session->add_torrent(params); - -} - -std::unique_ptr make_downloader_participant() { - - auto p = make_participant(); + libtorrent::error_code ec; + session->add_torrent(params, ec); - p->session->add_torrent(params); + return ptr.get(); } - -libtorrent::add_torrent_params make_seeder_params() { - -} - -TorrentClientSwarm::TorrentClientSwarm(const std::string & base_folder, - const std::string & data_source_folder, - libtorrent::torrent_info & ti, +TorrentClientSwarm::TorrentClientSwarm(const boost::filesystem::path & base_folder, uint16_t number_of_normal_seeder_clients, uint16_t number_of_normal_leecher_clients, uint16_t number_of_observer_clients, - const std::vector seller_client_terms, + const std::vector & seller_client_terms, const std::vector & buyer_client_terms) { + /// Create work space + + // Working directory for swarm + const boost::filesystem::path work_folder = base_folder + WORK_DIR_NAME; + // Clean up from prior run. // Its better to do this at start, as a prior run cannot be // trusted to do its own cleanup, e.g. if it gets interrupted. - // boost::delete_folder(base_folder, ec); - // asset(ec); + if(boost::filesystem::exists(boost::filesystem::status(work_folder))) { - // Create same folder again - // boost::create_folder(base_folder, ec); - // assert(ec); + boost::system::error_code ec; + boost::filesystem::remove_all(work_folder, ec); - /// Setup clients + if(ec) { + std::cerr << "Could not delete pre-existing swarm directory: " << work_folder.string() << std::endl; + exit(1); + } - std::vector all_endpoints; - std::vector all_clients; + } - // Setup normal seeders - for(int i = 0;i < number_of_normal_seeder_clients;i++) { + // Create work folder + if (!boost::filesystem::create_directories(work_folder)) { + std::cerr << "Could not create swarm directory: " << work_folder.string() << std::endl; + exit(1); + } - std::unique_ptr ptr = make_participant(); + // Create torrent payload and file + boost::shared_ptr ti = make_single_file_torrent(work_folder, + "payload_file.dat", + 2, // 16KiB factor + 300); // #pieces + /// Setup clients - normal_seeders.push_back(std::move(ptr)); - all_endpoints.push_back(ptr->endpoint()); - all_clients.push_back(&ptr->client); - } + // Prepare params for uploaders: + libtorrent::add_torrent_params uploader_params; + uploader_params.save_path = work_folder.string(); + uploader_params.ti = ti; + + // Setup normal seeders + for(int i = 0;i < number_of_normal_seeder_clients;i++) + add_participant(normal_seeders, uploader_params); // Setup normal leechers for(int i = 0;i < number_of_normal_leecher_clients;i++) { - std::unique_ptr ptr = make_participant(); - - ptr->client.session()->add_torrent(params); - - // Add torrent + boost::filesystem::path download_folder = work_folder + ("normal_leecher_" + i); - normal_leecher.push_back(std::move(ptr)); + libtorrent::add_torrent_params params; + params.save_path = download_folder.string(); + params.ti = ti; - all_endpoints.push_back(ptr->endpoint()); - all_clients.push_back(&ptr->client); + add_participant(normal_leechers, params); } // Setup observers for(int i = 0;i < number_of_observer_clients;i++) { - std::unique_ptr ptr = make_participant(); - // Add torrent + //auto ptr = add_participant(observers, uploader_params); + + // Set interaction to block + //ptr-> + } + + // Setup sellers + for(int i = 0; i < seller_client_terms.size();i++) + add_participant(sellers, uploader_params); + + // Setup buyers + for(int i = 0; i < buyer_client_terms.size();i++) { + + boost::filesystem::path download_folder = work_folder + ("buyer_" + i); + libtorrent::add_torrent_params params; + params.save_path = download_folder.string(); params.ti = ti; - ptr-> + add_participant(buyers, params); + } - // Start - ptr->client.start_torrent_plugin(params); +} - normal_leecher.push_back(std::move(ptr)); +void TorrentClientSwarm::fully_connect() { - all_endpoints.push_back(ptr->endpoint()); - all_clients.push_back(&ptr->client); - } + /// Fully connect + std::vector all_clients; - // Setup sellers - for(int i = 0; i < seller_client_terms.size();i++) { + // List all + for(auto p : normal_seeders) + all_clients.push_back(p.get()); - all_endpoints.push_back(ptr->endpoint()); - all_clients.push_back(&ptr->client); - } + for(auto p : normal_leechers) + all_clients.push_back(p.get()); - // Setup buyers - for(int i = 0; i < buyer_client_terms.size();i++) { + for(auto p :observers) + all_clients.push_back(p.get()); - all_endpoints.push_back(ptr->endpoint()); - all_clients.push_back(&ptr->client); - } + for(auto p: sellers) + all_clients.push_back(p.get()); + + for(auto p : buyers) + all_clients.push_back(p.get()); // Connect all appropriately + for(int i = 0;i < all_clients.size();i++) + for(int j = i + 1;j < all_clients.size();j++) { - for(auto c: all_clients) - for(auto e: all_endpoints) { + libtorrent::tcp::endpoint ep = listening_endpoint(all_clients[j]->session.get()); - if(c->endpoint() != e) - c->connect(e); + all_clients[i]->client.connect(ep); } + } \ No newline at end of file diff --git a/test/TorrentClientSwarm.hpp b/test/TorrentClientSwarm.hpp index ae3eb7b..0ab2d2b 100644 --- a/test/TorrentClientSwarm.hpp +++ b/test/TorrentClientSwarm.hpp @@ -7,52 +7,33 @@ #include "PollableInterface.hpp" #include "TorrentClient.hpp" -#include +#include struct TorrentClientSwarm { /** * Creates a swarm with torrent clients * @param base_folder - * @param data_source_folder - * @param ti * @param number_of_normal_clients * @param number_of_observer_clients * @param seller_client_terms * @param buyer_client_terms */ - TorrentClientSwarm(const std::string & base_folder, - const std::string & data_source_folder, - libtorrent::torrent_info & ti, + TorrentClientSwarm(const boost::filesystem::path & base_folder, uint16_t number_of_normal_seeder_clients, uint16_t number_of_normal_leecher_clients, uint16_t number_of_observer_clients, - const std::vector seller_client_terms, + const std::vector & seller_client_terms, const std::vector & buyer_client_terms); + /** + * Connect all to each other. + */ + void fully_connect(); + template< class Rep, class Period > void run_event_loop(unsigned int iteration_counter, - const std::chrono::duration & iteration_sleep_duration) { - - // Create poller - Poller poller; - - // Add clients to poller - for(const Participant & p : normal) - poller.subjects.push_back(&p.client); - - for(const Participant & p : observers) - poller.subjects.push_back(&p.client); - - for(const Participant & p : sellers) - poller.subjects.push_back(&p.client); - - for(const Participant & p : buyers) - poller.subjects.push_back(&p.client); - - // Run event loop - poller.run(iteration_counter, iteration_sleep_duration); - } + const std::chrono::duration & iteration_sleep_duration); struct Participant { @@ -67,10 +48,35 @@ struct TorrentClientSwarm { }; std::vector> normal_seeders, - normal_leecher, + normal_leechers, observers, sellers, buyers; }; + +template< class Rep, class Period > +void TorrentClientSwarm::run_event_loop(unsigned int iteration_counter, + const std::chrono::duration & iteration_sleep_duration) { + + // Create poller + Poller poller; + + // Add clients to poller + for(auto p : normal_seeders) + poller.subjects.push_back(&p->client); + + for(auto p : observers) + poller.subjects.push_back(&p->client); + + for(auto p : sellers) + poller.subjects.push_back(&p->client); + + for(auto p : buyers) + poller.subjects.push_back(&p->client); + + // Run event loop + poller.run(iteration_counter, iteration_sleep_duration); +} + #endif //TORRENTCLIENTSWARM_HPP diff --git a/test/Utilities.cpp b/test/Utilities.cpp new file mode 100644 index 0000000..3284249 --- /dev/null +++ b/test/Utilities.cpp @@ -0,0 +1,78 @@ +// +// Created by bedeho on 08.03.17. +// + +#include "Utilities.hpp" +#include +#include +#include + +boost::shared_ptr make_single_file_torrent(const boost::filesystem::path & base_folder, + const std::string & payload_file_name, + unsigned int piece_size_factor, + unsigned int num_pieces) { + + /// Create file payload. + std::fstream fs; + + // Open/create + boost::filesystem::path payload_file = base_folder + payload_file_name; + + fs.open(payload_file.string(), std::fstream::out | std::fstream::binary); + + if(!fs.good()) { + std::cerr << "Error opening: " << payload_file.string() << std::endl; + exit(1); + } + + // Write piece data to file + unsigned int piece_size = piece_size_factor * 16 * 1024; // It must be a multiple of 16 kiB. + for(unsigned int piece_index = 0;piece_index < num_pieces;piece_index++) { + + std::vector piece_payload(piece_size, static_cast(piece_index)); + + fs.write(&piece_payload[0], piece_payload.size()); + } + + if(!fs.good()) { + std::cerr << "Error writing to file: " << payload_file.string() << std::endl; + exit(1); + } + + fs.close(); + + /// Create torrent file + + // Setup storage + libtorrent::file_storage storage; + storage.add_file(payload_file_name, piece_size * num_pieces); + + libtorrent::create_torrent t(storage, piece_size); + + // Create + libtorrent::entry torrent_dictionary = t.generate(); + + assert(torrent_dictionary.type() != libtorrent::entry::undefined_t); + + // Bencode dictionary + std::vector bencoded_torrent_dictionary; + std::back_insert_iterator > out(bencoded_torrent_dictionary); + libtorrent::bencode(out, torrent_dictionary); + + // Return torrent_info object + libtorrent::error_code ec; + + return boost::make_shared(&bencoded_torrent_dictionary[0], + bencoded_torrent_dictionary.size(), + boost::ref(ec), + 0); +} + +libtorrent::tcp::endpoint listening_endpoint(const libtorrent::session * s) { + + libtorrent::error_code ec; + libtorrent::tcp::endpoint ep(libtorrent::address::from_string("127.0.0.1", ec), s->listen_port()); + assert(ec); + + return ep; +} \ No newline at end of file diff --git a/test/Utilities.hpp b/test/Utilities.hpp index 7238102..87e2714 100644 --- a/test/Utilities.hpp +++ b/test/Utilities.hpp @@ -5,13 +5,16 @@ #ifndef UTILITIES_HPP #define UTILITIES_HPP -// NB: Taken from libtorrent/test/setup_transfer.(hpp/cpp) -std::shared_ptr create_torrent(std::ostream* file = 0, - char const* name = "temporary", - int piece_size = 16 * 1024, - int num_pieces = 13, - bool add_tracker = true, - std::string ssl_certificate = ""); +namespace libtorrent { + class session; +} +boost::shared_ptr make_single_file_torrent(const boost::filesystem::path & base_folder, + const std::string & payload_file_name, + unsigned int piece_size_factor, + unsigned int num_pieces); + + +libtorrent::tcp::endpoint listening_endpoint(const libtorrent::session * s); #endif //UTILITIES_HPP diff --git a/test/test_integrations.cpp b/test/test_integrations.cpp index a2ca528..a3c62f6 100644 --- a/test/test_integrations.cpp +++ b/test/test_integrations.cpp @@ -7,39 +7,30 @@ #include "TorrentClientSwarm.hpp" +#include //#include -#define POLLING_COUNT 3 -#define POLLING_SLEEP_DURATION 1*std::chrono_literals::s - - - TEST(IntegrationTesting, Connectivity) { } TEST(IntegrationTesting, OneToOne) { - std::string base_folder; // = boost::this_folder(); , - std::string data_source_folder; // = base_folder + boost::file_sep ? "xxx" - - // Create torrent file and content - //std::ostream* file - std::shared_ptr ti = create_torrent_file("base_folder_here"); - // Create swarm - TorrentClientSwarm swarm(base_folder, - data_source_folder, - ti, - 1, - 1, + // - one seller + // - one buyer + TorrentClientSwarm swarm(boost::filesystem::current_path(), + 0, + 0, 0, - std::vector(), - std::vector()); + { protocol_wire::SellerTerms(1, 1000, 2, 1, 1) }, + { protocol_wire::BuyerTerms(5, 2000, 1, 1) }); + // Establish full connectivity + swarm.fully_connect(); - // Run swarm event loop - swarm.run_event_loop(POLLING_COUNT, POLLING_SLEEP_DURATION); + // Run swarm event loop for five times at 1s intervals + swarm.run_event_loop(5, std::chrono::seconds(1)); // *** assert something about final states *** //assert(swarm.) From 0bc86ba92ca7dd4ea5c5241ee157d1a228823612 Mon Sep 17 00:00:00 2001 From: Bedeho Mender Date: Thu, 9 Mar 2017 16:09:24 +0100 Subject: [PATCH 08/10] wip6 --- CMakeLists.txt | 8 ++- test/AbstractSessionController.cpp | 13 +++++ test/AbstractSessionController.hpp | 62 ++++++++++++++++++++ test/BasicBuyer.cpp | 5 ++ test/BasicBuyer.hpp | 8 +++ test/BasicObserver.cpp | 94 ++++++++++++++++++++++++++++++ test/BasicObserver.hpp | 74 +++++++++++++++++++++++ test/Swarm.cpp | 77 ++++++++++++++++++++++++ test/Swarm.hpp | 46 +++++++++++++++ test/TorrentClient.cpp | 24 ++++++-- test/TorrentClientSwarm.cpp | 19 ++++-- test/Utilities.cpp | 32 +++++++++- test/Utilities.hpp | 11 ++++ test/test_integrations.cpp | 23 +++++++- 14 files changed, 480 insertions(+), 16 deletions(-) create mode 100644 test/AbstractSessionController.cpp create mode 100644 test/AbstractSessionController.hpp create mode 100644 test/BasicBuyer.cpp create mode 100644 test/BasicBuyer.hpp create mode 100644 test/BasicObserver.cpp create mode 100644 test/BasicObserver.hpp create mode 100644 test/Swarm.cpp create mode 100644 test/Swarm.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5028808..7d4b87c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -41,8 +41,12 @@ if(build_tests) # mock headers not included, as they set( test_lib_source - test/TorrentClient.cpp - test/TorrentClientSwarm.cpp + test/Swarm.cpp + test/AbstractSessionController.cpp + test/BasicObserver.cpp + test/Utilities.cpp + #test/TorrentClient.cpp + #test/TorrentClientSwarm.cpp ) # === test library === diff --git a/test/AbstractSessionController.cpp b/test/AbstractSessionController.cpp new file mode 100644 index 0000000..312c58c --- /dev/null +++ b/test/AbstractSessionController.cpp @@ -0,0 +1,13 @@ +// +// Created by bedeho on 09.03.17. +// + +#include "AbstractSessionController.hpp" + +AbstractSessionController::AbstractSessionController(const std::string & name) + : _name(name) { +} + +std::string AbstractSessionController::name() const noexcept { + return _name; +} \ No newline at end of file diff --git a/test/AbstractSessionController.hpp b/test/AbstractSessionController.hpp new file mode 100644 index 0000000..20a850d --- /dev/null +++ b/test/AbstractSessionController.hpp @@ -0,0 +1,62 @@ +// +// Created by bedeho on 09.03.17. +// + +#ifndef PROTOCOLSESSION_SESSIONCONTROLLER_HPP_H +#define PROTOCOLSESSION_SESSIONCONTROLLER_HPP_H + + +#include + +#include +#include +#include +#include + +namespace libtorrent { + class session; + class alert; + struct torrent_info; + +} + +class AbstractSessionController { + +public: + + AbstractSessionController(const std::string & name); + + /** + * Invitation to join the swarm for the given (ti) torrent + * @param ti torrent file information + * @param working_directory clean working directory which can be used + */ + virtual void join(const boost::shared_ptr & ti, + const std::string & working_directory, + const std::string & payload_directory) = 0; + + /** + * Returns endpoint upon which this session is listenning. An unset + * optional indicates it is not listening. + * @return + */ + virtual boost::optional session_endpoint() const = 0; + + /** + * Notify controller about full peer list for swarm. + */ + virtual void swarm_peer_list_ready(const std::unordered_map &) = 0; // <= give list of endpoints and names, in order to have capcity to connect with howmever it wants, at the desired time in the future? simple policy is just to connect to all of them + + /** + * Timeout processing + */ + virtual void poll() = 0; + + std::string name() const noexcept; + +private: + + const std::string _name; +}; + +#endif //PROTOCOLSESSION_SESSIONCONTROLLER_HPP_H diff --git a/test/BasicBuyer.cpp b/test/BasicBuyer.cpp new file mode 100644 index 0000000..2ae39a6 --- /dev/null +++ b/test/BasicBuyer.cpp @@ -0,0 +1,5 @@ +// +// Created by bedeho on 09.03.17. +// + +#include "BasicBuyer.hpp" \ No newline at end of file diff --git a/test/BasicBuyer.hpp b/test/BasicBuyer.hpp new file mode 100644 index 0000000..ea4524f --- /dev/null +++ b/test/BasicBuyer.hpp @@ -0,0 +1,8 @@ +// +// Created by bedeho on 09.03.17. +// + +#ifndef BASICBUYER_HPP +#define BASICBUYER_HPP + +#endif //BASICBUYER_HPP diff --git a/test/BasicObserver.cpp b/test/BasicObserver.cpp new file mode 100644 index 0000000..ba71202 --- /dev/null +++ b/test/BasicObserver.cpp @@ -0,0 +1,94 @@ +// +// Created by bedeho on 09.03.17. +// + +#include "BasicObserver.hpp" +#include "Utilities.hpp" + +BasicObserver::BasicObserver(const std::string & name) + : AbstractSessionController(name) + , _state(Init()) + , _plugin(boost::make_shared(1000)){ + + _session.add_extension(boost::static_pointer_cast(_plugin)); +} + +void BasicObserver::join(const boost::shared_ptr & ti, const std::string & working_directory, const std::string & payload_directory) { + + if(boost::get(&_state)) { + + // Update state + _state = AddingTorrent(); + + libtorrent::add_torrent_params params; + params.ti = ti; + params.save_path = working_directory; // since payload is not here, libtorrent will - by default, attempt to download content from peers + + // **** change setting to some how prevent libtorrent from downloading + + // problem, if we allow going forward, without the torrent being added, + // then if someone else tries to connect to us, they will fail. + // sol 1: dont use addtorrent, use add_torrent, which is synchrnous + // sol 2: some how block in ::join call untill we know we can move forward?? + // + + // Add torrent using plugin + _plugin->submit(joystream::extension::request::AddTorrent(params, [this, params](libtorrent::error_code &ec, + libtorrent::torrent_handle &h) -> void { + + assert(boost::get(&_state)); + + if(ec) + this->_state = AddingTorrentFailed(); + else { + + this->_state = AddedTorrent(params, h); + + // If swarm peers were provided prior to torrent being added, then connect now + if(_swarm_peers) + connect_to_all(h, _swarm_peers.get()); + + + // bug ******** here ********* + + } + + })); + + } else + throw std::runtime_error("Cannot join, no longer in Init state."); +} + +boost::optional BasicObserver::session_endpoint() const { + return listening_endpoint(&_session); +} + +void BasicObserver::swarm_peer_list_ready(const std::unordered_map & swarm_peers) { + + // We can only process leer plist if torrent has been added successfully, if + // that has not yet occured, then we just hold on to list. + + if (auto *added_torrent_state = boost::get(&_state)) { + + // For now, we simply connect to all, in the future, + // user can supply block/accept name list + + connect_to_all(added_torrent_state->handle, swarm_peers); + + } else + _swarm_peers = swarm_peers; + +} + +void BasicObserver::poll() { + + process_pending_alert(&_session, + [this](libtorrent::alert * a) -> void { + + + if() { + + } + + }); +} \ No newline at end of file diff --git a/test/BasicObserver.hpp b/test/BasicObserver.hpp new file mode 100644 index 0000000..68e9d5a --- /dev/null +++ b/test/BasicObserver.hpp @@ -0,0 +1,74 @@ +// +// Created by bedeho on 09.03.17. +// + +#ifndef BASICOBSERVER_HPP +#define BASICOBSERVER_HPP + +#include "AbstractSessionController.hpp" + +#include + +class BasicObserver : public AbstractSessionController { + +public: + + struct Init {}; + struct AddingTorrent {}; + struct AddingTorrentFailed {}; + struct AddedTorrent { + + struct StartingObserveMode {}; + struct StartingObserveModeFailed {}; + + struct StartingPlugin {}; + struct StartingPluginFailed {}; + struct PluginStarted {}; + + typedef boost::variant< + StartingObserveMode, + StartingObserveModeFailed, + StartingPlugin, + StartingPluginFailed, + PluginStarted> State; + + AddedTorrent(const libtorrent::add_torrent_params & params, + const libtorrent::torrent_handle & handle) + : state(StartingObserveMode()) + , params(params) + , handle(handle) {} + + State state; + libtorrent::add_torrent_params params; + libtorrent::torrent_handle handle; + }; + + typedef boost::variant< + Init, + AddingTorrent, + AddingTorrentFailed, + AddedTorrent> State; + + BasicObserver(const std::string & name); + + virtual void join(const boost::shared_ptr & ti, + const std::string & working_directory, + const std::string & payload_directory); + + virtual boost::optional session_endpoint() const; + + virtual void swarm_peer_list_ready(const std::unordered_map &); + + virtual void poll(); + +private: + + State _state; + libtorrent::session _session; + boost::shared_ptr _plugin; + + + boost::optional> _swarm_peers; +}; + +#endif //BASICOBSERVER_HPP diff --git a/test/Swarm.cpp b/test/Swarm.cpp new file mode 100644 index 0000000..fff3875 --- /dev/null +++ b/test/Swarm.cpp @@ -0,0 +1,77 @@ +// +// Created by bedeho on 09.03.17. +// + +#include +#include +#include "Swarm.hpp" +#include "Utilities.hpp" + +#define WORK_DIR_NAME "swarm" + +void Swarm::add(AbstractSessionController * controller) { + + if(participants.count(controller->name()) > 0) + throw std::runtime_error("Name already taken"); + else + participants.insert(std::make_pair(controller->name(), controller)); + +} + +void Swarm::setup(const boost::filesystem::path & base_folder) { + + // Working directory for swarm + boost::filesystem::path work_folder(base_folder); + work_folder.append(WORK_DIR_NAME); + + // Clean up from prior run. + // Its better to do this at start, as a prior run cannot be + // trusted to do its own cleanup, e.g. if it gets interrupted. + if(boost::filesystem::exists(boost::filesystem::status(work_folder))) { + + boost::system::error_code ec; + boost::filesystem::remove_all(work_folder, ec); + + if(ec) + throw std::runtime_error(std::string("Could not delete pre-existing swarm directory: ") + work_folder.string()); + } + + // Create work folder + if (!boost::filesystem::create_directories(work_folder)) + throw std::runtime_error(std::string("Could not create swarm directory: ") + work_folder.string()); + + // Create torrent payload and file + boost::shared_ptr ti = make_single_file_torrent(work_folder, + "payload_file.dat", + 2, // 16KiB factor + 113); // number of pieces + + // Create working directory for each session, and ask to join swarm + for(auto m : participants) { + + boost::filesystem::path participant_folder(work_folder); + participant_folder.append(m.second->name()); + + // Create work folder + if (!boost::filesystem::create_directories(participant_folder)) + throw std::runtime_error(std::string("Could not create participant directory: ") + participant_folder.string()); + + // Ask to join + m.second->join(ti, participant_folder.string(), work_folder.string()); + } + + // Build swarm peer list + std::unordered_map endpoints; + + for(auto m : participants) { + + auto opt_endpoint = m.second->session_endpoint(); + + if (opt_endpoint) + endpoints.insert(std::make_pair(m.second->name(), opt_endpoint.get())); + } + + // Tell everyone about it + for(auto m: participants) + m.second->swarm_peer_list_ready(endpoints); +} \ No newline at end of file diff --git a/test/Swarm.hpp b/test/Swarm.hpp new file mode 100644 index 0000000..b050d7a --- /dev/null +++ b/test/Swarm.hpp @@ -0,0 +1,46 @@ +// +// Created by bedeho on 09.03.17. +// + +#ifndef SWARM_HPP +#define SWARM_HPP + +#include +#include +#include +#include "AbstractSessionController.hpp" + +namespace libtorrent { + struct alert; + class session; +} + + +class Swarm { + +public: + + void add(AbstractSessionController * controller); + + void setup(const boost::filesystem::path & base_folder); + + template< class Rep, class Period > + void run(unsigned int iteration_counter, + const std::chrono::duration & iteration_sleep_duration) { + + for(unsigned int i = 0; i < iteration_counter; i++) { + + for (auto m : participants) + m.second->poll(); + + std::this_thread::sleep_for(iteration_sleep_duration); + } + } + +private: + + std::unordered_map participants; +}; + + +#endif // SWARM_HPP diff --git a/test/TorrentClient.cpp b/test/TorrentClient.cpp index 52b5740..3959449 100644 --- a/test/TorrentClient.cpp +++ b/test/TorrentClient.cpp @@ -9,6 +9,8 @@ #include +#include + #include TorrentClient::TorrentClient(libtorrent::session * session, @@ -67,7 +69,8 @@ _plugin->submit(extension::request::Start(params.info_hash, [added_torrent_state })); */ -state::AddedTorrent * has_plugin_started(const State * state) { +/** +state::AddedTorrent * torrent_has_been_added(const State * state) { if(auto * added_torrent_state = boost::get(state)) { if(boost::get(added_torrent_state)) @@ -77,12 +80,25 @@ state::AddedTorrent * has_plugin_started(const State * state) { } else throw std::runtime_error("Cannot start plugin, have to be in state: AddedTorrent."); } + */ -void Buy::async_buy(const protocol_wire::BuyerTerms & terms) { +state::AddedTorrent * torrent_has_been_added(TorrentClient::State * state) { - state::AddedTorrent * added_torrent_state = has_plugin_started(&_state); + if(auto * added_torrent_state = boost::get(state)) + return added_torrent_state; + else + throw std::runtime_error("Torrent has not yet been added."); +} + +void TorrentClient::async_buy(const protocol_wire::BuyerTerms & terms) { + + state::AddedTorrent * added_torrent_state = torrent_has_been_added(&_state); + + if(!boost::get(added_torrent_state)) + throw std::runtime_error("Mode already set"); - *(added_torrent_state) = Buy(terms); + // + *(added_torrent_state) = state::AddedTorrent::Buy(terms); auto * buy_state = boost::get(added_torrent_state); diff --git a/test/TorrentClientSwarm.cpp b/test/TorrentClientSwarm.cpp index b58c7ff..6e06ce6 100644 --- a/test/TorrentClientSwarm.cpp +++ b/test/TorrentClientSwarm.cpp @@ -35,8 +35,7 @@ TorrentClientSwarm::Participant * add_participant(std::vectoradd_torrent(params, ec); + ptr->client.add(params); return ptr.get(); } @@ -105,15 +104,20 @@ TorrentClientSwarm::TorrentClientSwarm(const boost::filesystem::path & base_fold // Setup observers for(int i = 0;i < number_of_observer_clients;i++) { + // how to make sure no seeding happens! //auto ptr = add_participant(observers, uploader_params); // Set interaction to block - //ptr-> + //ptr->client.async_observe(); } // Setup sellers - for(int i = 0; i < seller_client_terms.size();i++) - add_participant(sellers, uploader_params); + for(int i = 0; i < seller_client_terms.size();i++) { + auto ptr = add_participant(sellers, uploader_params); + + // Sell + ptr->client.async_sell(seller_client_terms[i]); + } // Setup buyers for(int i = 0; i < buyer_client_terms.size();i++) { @@ -124,7 +128,10 @@ TorrentClientSwarm::TorrentClientSwarm(const boost::filesystem::path & base_fold params.save_path = download_folder.string(); params.ti = ti; - add_participant(buyers, params); + auto ptr = add_participant(buyers, params); + + // Buy + ptr->client.async_buy(buyer_client_terms[i]); } } diff --git a/test/Utilities.cpp b/test/Utilities.cpp index 3284249..d8b4071 100644 --- a/test/Utilities.cpp +++ b/test/Utilities.cpp @@ -4,8 +4,10 @@ #include "Utilities.hpp" #include -#include -#include +#include +#include +#include +#include boost::shared_ptr make_single_file_torrent(const boost::filesystem::path & base_folder, const std::string & payload_file_name, @@ -16,7 +18,8 @@ boost::shared_ptr make_single_file_torrent(const boost std::fstream fs; // Open/create - boost::filesystem::path payload_file = base_folder + payload_file_name; + boost::filesystem::path payload_file(base_folder); + payload_file.append(payload_file_name); fs.open(payload_file.string(), std::fstream::out | std::fstream::binary); @@ -75,4 +78,27 @@ libtorrent::tcp::endpoint listening_endpoint(const libtorrent::session * s) { assert(ec); return ep; +} + +void connect_to_all(const libtorrent::torrent_handle & h, const std::unordered_map & swarm_peers) { + + for(auto m : swarm_peers) + h.connect_peer(m.second); +} + + +void process_pending_alert(libtorrent::session * s, const AlertProcessor & processor) { + + // Process alerts + std::vector alerts; + s->pop_alerts(&alerts); + + for(auto a : alerts) { + + if(joystream::extension::alert::RequestResult const * p = libtorrent::alert_cast(a)) + p->loadedCallback(); // Make loaded callback + else + processor(a); + } + } \ No newline at end of file diff --git a/test/Utilities.hpp b/test/Utilities.hpp index 87e2714..df45eed 100644 --- a/test/Utilities.hpp +++ b/test/Utilities.hpp @@ -5,8 +5,14 @@ #ifndef UTILITIES_HPP #define UTILITIES_HPP +#include +#include +#include + namespace libtorrent { class session; + class torrent_handle; + class alert; } boost::shared_ptr make_single_file_torrent(const boost::filesystem::path & base_folder, @@ -17,4 +23,9 @@ boost::shared_ptr make_single_file_torrent(const boost libtorrent::tcp::endpoint listening_endpoint(const libtorrent::session * s); +void connect_to_all(const libtorrent::torrent_handle & h, const std::unordered_map & swarm_peers); + +typedef std::function AlertProcessor; +void process_pending_alert(libtorrent::session * s, const AlertProcessor & processor); + #endif //UTILITIES_HPP diff --git a/test/test_integrations.cpp b/test/test_integrations.cpp index a3c62f6..5503401 100644 --- a/test/test_integrations.cpp +++ b/test/test_integrations.cpp @@ -5,11 +5,14 @@ #include #include -#include "TorrentClientSwarm.hpp" +//#include "TorrentClientSwarm.hpp" +#include "Swarm.hpp" +#include "BasicObserver.hpp" #include //#include +/** TEST(IntegrationTesting, Connectivity) { } @@ -35,6 +38,24 @@ TEST(IntegrationTesting, OneToOne) { // *** assert something about final states *** //assert(swarm.) } +*/ + +TEST(IntegrationTesting, OneToOne) { + + BasicObserver observer("my_observer"); + + Swarm swarm; + + swarm.add(&observer); + + swarm.setup(boost::filesystem::current_path()); + + swarm.run(5, std::chrono::seconds(1)); + + // *** assert something about final states *** + //assert(observer.) +} + int main(int argc, char *argv[]) { From 6f68a88447da9ee564f711592bd8e258a18fb856 Mon Sep 17 00:00:00 2001 From: Bedeho Mender Date: Sat, 11 Mar 2017 12:19:33 +0100 Subject: [PATCH 09/10] wip7 --- test/AbstractSessionController.hpp | 9 ++--- test/BasicObserver.cpp | 16 +++++++-- test/PollableInterface.hpp | 18 ---------- test/Poller.cpp | 21 ++++++++++++ test/Poller.hpp | 53 ++++++++++++++++++++++++++++++ test/Swarm.hpp | 37 ++++++++++++++------- 6 files changed, 118 insertions(+), 36 deletions(-) create mode 100644 test/Poller.cpp create mode 100644 test/Poller.hpp diff --git a/test/AbstractSessionController.hpp b/test/AbstractSessionController.hpp index 20a850d..084fc05 100644 --- a/test/AbstractSessionController.hpp +++ b/test/AbstractSessionController.hpp @@ -2,9 +2,10 @@ // Created by bedeho on 09.03.17. // -#ifndef PROTOCOLSESSION_SESSIONCONTROLLER_HPP_H -#define PROTOCOLSESSION_SESSIONCONTROLLER_HPP_H +#ifndef SESSIONCONTROLLER_HPP +#define SESSIONCONTROLLER_HPP +#include "PollableInterface.hpp" #include @@ -20,7 +21,7 @@ namespace libtorrent { } -class AbstractSessionController { +class AbstractSessionController : public PollableInterface { public: @@ -59,4 +60,4 @@ class AbstractSessionController { const std::string _name; }; -#endif //PROTOCOLSESSION_SESSIONCONTROLLER_HPP_H +#endif //SESSIONCONTROLLER_HPP diff --git a/test/BasicObserver.cpp b/test/BasicObserver.cpp index ba71202..2690646 100644 --- a/test/BasicObserver.cpp +++ b/test/BasicObserver.cpp @@ -28,12 +28,15 @@ void BasicObserver::join(const boost::shared_ptr & ti, // problem, if we allow going forward, without the torrent being added, // then if someone else tries to connect to us, they will fail. - // sol 1: dont use addtorrent, use add_torrent, which is synchrnous - // sol 2: some how block in ::join call untill we know we can move forward?? + // sol 1: dont use addTorrent, use add_torrent, which is synchronous + // sol 2: some how block in ::join call until we know we can move forward?? // + Poller poller; + poller.add(this); // + // Add torrent using plugin - _plugin->submit(joystream::extension::request::AddTorrent(params, [this, params](libtorrent::error_code &ec, + _plugin->submit(joystream::extension::request::AddTorrent(params, [this, params, &poller](libtorrent::error_code &ec, libtorrent::torrent_handle &h) -> void { assert(boost::get(&_state)); @@ -53,8 +56,13 @@ void BasicObserver::join(const boost::shared_ptr & ti, } + poller.stop(); + })); + // --> + poller.start(); // add something about max time? + } else throw std::runtime_error("Cannot join, no longer in Init state."); } @@ -82,6 +90,8 @@ void BasicObserver::swarm_peer_list_ready(const std::unordered_map void { diff --git a/test/PollableInterface.hpp b/test/PollableInterface.hpp index 69bd2fd..2c29888 100644 --- a/test/PollableInterface.hpp +++ b/test/PollableInterface.hpp @@ -22,24 +22,6 @@ class PollableInterface { virtual void poll() = 0; }; -struct Poller { - std::vector subjects; - - template< class Rep, class Period > - void run(unsigned int iteration_counter, - const std::chrono::duration & iteration_sleep_duration) { - - for(unsigned int i = 0; i < iteration_counter; i++) { - - for(auto s : subjects) - s->poll(); - - std::this_thread::sleep_for(iteration_sleep_duration); - } - - } - -}; #endif // POLLABLEINTERFACE_HPP diff --git a/test/Poller.cpp b/test/Poller.cpp new file mode 100644 index 0000000..67d4d94 --- /dev/null +++ b/test/Poller.cpp @@ -0,0 +1,21 @@ +// +// Created by Bedeho Mender on 11/03/17. +// + +#include "Poller.hpp" + +Poller::Poller() + : _started(false) + , _stopOnNextIteration(false) {} + +void Poller::stop() { + + if(_stopOnNextIteration) + throw std::runtime_error("Poller already scheduled to stop"); + else + _stopOnNextIteration = true; +} + +bool Poller::isStoppingOnNextIteration() const noexcept { + return _stopOnNextIteration; +} \ No newline at end of file diff --git a/test/Poller.hpp b/test/Poller.hpp new file mode 100644 index 0000000..a4f8719 --- /dev/null +++ b/test/Poller.hpp @@ -0,0 +1,53 @@ +// +// Created by Bedeho Mender on 11/03/17. +// + +#ifndef POLLER_HPP +#define POLLER_HPP + +#include "PollableInterface.hpp" + +class Poller { + +public: + + Poller(); + + template< class Rep, class Period > + void start(unsigned int iteration_counter, + const std::chrono::duration & iteration_sleep_duration) { + + // Re-entrancy block + if(_started) + throw std::runtime_error("Already started"); + else + _started = true; + + // Restart + _stopOnNextIteration = false; + + for(unsigned int i = 0; i < iteration_counter && !_stopOnNextIteration; i++) { + + for(auto s : subjects) + s->poll(); + + std::this_thread::sleep_for(iteration_sleep_duration); + } + + _started = false; + } + + void stop(); + + bool isStoppingOnNextIteration() const noexcept; + + std::vector subjects; + +private: + + bool _started; + bool _stopOnNextIteration; + +}; + +#endif //POLLER_HPP diff --git a/test/Swarm.hpp b/test/Swarm.hpp index b050d7a..cb98283 100644 --- a/test/Swarm.hpp +++ b/test/Swarm.hpp @@ -15,30 +15,45 @@ namespace libtorrent { class session; } - class Swarm { public: - void add(AbstractSessionController * controller); + template< class Rep, class Period > + void start(unsigned int iteration_counter, + const std::chrono::duration & iteration_sleep_duration) { - void setup(const boost::filesystem::path & base_folder); + // Setup poller + Poller poller; - template< class Rep, class Period > - void run(unsigned int iteration_counter, - const std::chrono::duration & iteration_sleep_duration) { + for(auto m : participants) + poller.partcipants.push_back(m.second); - for(unsigned int i = 0; i < iteration_counter; i++) { + poller.run(iteration_counter, iteration_sleep_duration); - for (auto m : participants) - m.second->poll(); - std::this_thread::sleep_for(iteration_sleep_duration); - } } private: + Swarm(std::unordered_map participants); + + std::unordered_map participants; +}; + + +class SwarmBuilder { + +public: + + void add(AbstractSessionController * controller); + + Swarm build(const boost::filesystem::path & base_folder); + +private: + + void setup(const boost::filesystem::path & base_folder); + std::unordered_map participants; }; From 217e8f75bdaaa16ef9284ac55076d9ede27cfeaa Mon Sep 17 00:00:00 2001 From: Bedeho Mender Date: Mon, 13 Mar 2017 06:52:15 +0100 Subject: [PATCH 10/10] wip8 --- test/BasicObserver.cpp | 15 ++++++--------- test/Poller.hpp | 12 +++++++++--- test/Swarm.cpp | 23 ++++++++++++++++------- test/Swarm.hpp | 16 +++++----------- test/test_integrations.cpp | 6 +++--- 5 files changed, 39 insertions(+), 33 deletions(-) diff --git a/test/BasicObserver.cpp b/test/BasicObserver.cpp index 2690646..c16494c 100644 --- a/test/BasicObserver.cpp +++ b/test/BasicObserver.cpp @@ -24,16 +24,12 @@ void BasicObserver::join(const boost::shared_ptr & ti, params.ti = ti; params.save_path = working_directory; // since payload is not here, libtorrent will - by default, attempt to download content from peers - // **** change setting to some how prevent libtorrent from downloading - - // problem, if we allow going forward, without the torrent being added, - // then if someone else tries to connect to us, they will fail. - // sol 1: dont use addTorrent, use add_torrent, which is synchronous - // sol 2: some how block in ::join call until we know we can move forward?? - // + // We start a poller to allow blocking here, + // while still servicing this observers alert processing, + // until response from adding torrent returns Poller poller; - poller.add(this); // + poller.add(this); // Add torrent using plugin _plugin->submit(joystream::extension::request::AddTorrent(params, [this, params, &poller](libtorrent::error_code &ec, @@ -56,11 +52,12 @@ void BasicObserver::join(const boost::shared_ptr & ti, } + // Signal that poller can stop poller.stop(); })); - // --> + // Start blocking poller poller.start(); // add something about max time? } else diff --git a/test/Poller.hpp b/test/Poller.hpp index a4f8719..2d058c4 100644 --- a/test/Poller.hpp +++ b/test/Poller.hpp @@ -13,9 +13,13 @@ class Poller { Poller(); - template< class Rep, class Period > - void start(unsigned int iteration_counter, - const std::chrono::duration & iteration_sleep_duration) { + // allow specifying iteration_counter, or n iteration couner, which just + // continues until ::stop is called, have defualt value be + + // have default value for sleep value, but fix it in ms, to drop the complex templating + + void start(const std::chrono::milliseconds duration & iteration_sleep_duration, + const boost::optional & use_iteration_counter = boost::optional()) { // Re-entrancy block if(_started) @@ -26,6 +30,8 @@ class Poller { // Restart _stopOnNextIteration = false; + // <=== need to mix + for(unsigned int i = 0; i < iteration_counter && !_stopOnNextIteration; i++) { for(auto s : subjects) diff --git a/test/Swarm.cpp b/test/Swarm.cpp index fff3875..8af6500 100644 --- a/test/Swarm.cpp +++ b/test/Swarm.cpp @@ -9,16 +9,21 @@ #define WORK_DIR_NAME "swarm" -void Swarm::add(AbstractSessionController * controller) { +// Swarm - if(participants.count(controller->name()) > 0) + +// SwarmBuilder + +void SwarmBuilder::add(AbstractSessionController * controller) { + + if(_participants.count(controller->name()) > 0) throw std::runtime_error("Name already taken"); else - participants.insert(std::make_pair(controller->name(), controller)); + _participants.insert(std::make_pair(controller->name(), controller)); } -void Swarm::setup(const boost::filesystem::path & base_folder) { +Swarm SwarmBuilder::build(const boost::filesystem::path & base_folder) { // Working directory for swarm boost::filesystem::path work_folder(base_folder); @@ -47,7 +52,7 @@ void Swarm::setup(const boost::filesystem::path & base_folder) { 113); // number of pieces // Create working directory for each session, and ask to join swarm - for(auto m : participants) { + for(auto m : _participants) { boost::filesystem::path participant_folder(work_folder); participant_folder.append(m.second->name()); @@ -63,7 +68,7 @@ void Swarm::setup(const boost::filesystem::path & base_folder) { // Build swarm peer list std::unordered_map endpoints; - for(auto m : participants) { + for(auto m : _participants) { auto opt_endpoint = m.second->session_endpoint(); @@ -72,6 +77,10 @@ void Swarm::setup(const boost::filesystem::path & base_folder) { } // Tell everyone about it - for(auto m: participants) + for(auto m: _participants) m.second->swarm_peer_list_ready(endpoints); + + // Returns swarm for given set of of participants + return Swarm(_participants); + } \ No newline at end of file diff --git a/test/Swarm.hpp b/test/Swarm.hpp index cb98283..962b19f 100644 --- a/test/Swarm.hpp +++ b/test/Swarm.hpp @@ -9,11 +9,9 @@ #include #include #include "AbstractSessionController.hpp" +#include "Poller.hpp" -namespace libtorrent { - struct alert; - class session; -} +class SwarmBuilder; class Swarm { @@ -29,19 +27,17 @@ class Swarm { for(auto m : participants) poller.partcipants.push_back(m.second); + // Poll poller.run(iteration_counter, iteration_sleep_duration); - - } private: - Swarm(std::unordered_map participants); + friend class SwarmBuilder; std::unordered_map participants; }; - class SwarmBuilder { public: @@ -52,9 +48,7 @@ class SwarmBuilder { private: - void setup(const boost::filesystem::path & base_folder); - - std::unordered_map participants; + std::unordered_map _participants; }; diff --git a/test/test_integrations.cpp b/test/test_integrations.cpp index 5503401..8458424 100644 --- a/test/test_integrations.cpp +++ b/test/test_integrations.cpp @@ -44,11 +44,11 @@ TEST(IntegrationTesting, OneToOne) { BasicObserver observer("my_observer"); - Swarm swarm; + SwarmBuilder builder; - swarm.add(&observer); + builder.add(&observer); - swarm.setup(boost::filesystem::current_path()); + Swarm swarm = builder.build(boost::filesystem::current_path()); swarm.run(5, std::chrono::seconds(1));