diff --git a/src/ledger-module.cpp b/src/ledger-module.cpp index 43aacec..0acfd8e 100644 --- a/src/ledger-module.cpp +++ b/src/ledger-module.cpp @@ -19,9 +19,10 @@ NDN_LOG_INIT(cledger.benchmark.ledger); NDN_LOG_INIT(cledger.ledger); #endif -LedgerModule::LedgerModule(ndn::Face& face, ndn::KeyChain& keyChain, const std::string& configPath) +LedgerModule::LedgerModule(ndn::Face& face, ndn::KeyChain& keyChain, const std::string& configPath, time::milliseconds replyPeriod) : m_face(face) , m_keyChain(keyChain) + , m_replyPeriod(replyPeriod) { // load the config and create storage m_config.load(configPath); @@ -67,6 +68,9 @@ LedgerModule::LedgerModule(ndn::Face& face, ndn::KeyChain& keyChain, const std:: m_sync = std::make_unique(m_syncOps, m_secOps, m_face, m_storage->getInterface(), [this] (const Record& record) { + // refresh the timer anyway + refreshReplyTimer(); + auto stateName = m_dag->add(record); updateStatesTracker(stateName); if (record.getType() != tlv::REPLY_RECORD) { @@ -314,7 +318,7 @@ LedgerModule::afterValidation(const Data& data) newRecord.setType(tlv::GENESIS_RECORD); // a gensis record must be its first SVS publication, // therefore we can guess the name - auto genesisName = m_sync->getSyncBase()->getMyDataName(1); + auto genesisName = m_sync->getNextName(); NDN_LOG_DEBUG("Referencing to [Genesis] " << genesisName); newRecord.setPointers({genesisName}); } @@ -447,4 +451,14 @@ LedgerModule::sendResponse(const Name& name, const Block& block, bool realtime) }); } +void +LedgerModule::refreshReplyTimer() +{ + if (m_replyEvent) { + m_replyEvent.cancel(); + } + m_replyEvent = m_scheduler.schedule(m_replyPeriod, [this] { + publishReply(); + }); +} } // namespace cledger::ledger diff --git a/src/ledger-module.hpp b/src/ledger-module.hpp index 5662145..7608b2d 100644 --- a/src/ledger-module.hpp +++ b/src/ledger-module.hpp @@ -21,7 +21,7 @@ using appendtlv::AppendStatus; class LedgerModule : boost::noncopyable { public: - LedgerModule(ndn::Face& face, ndn::KeyChain& keyChain, const std::string& configPath); + LedgerModule(ndn::Face& face, ndn::KeyChain& keyChain, const std::string& configPath, time::milliseconds replyPeriod = 3600_s); const std::unique_ptr& getLedgerStorage() @@ -75,6 +75,9 @@ class LedgerModule : boost::noncopyable void updateStatesTracker(const Name& stateName, bool interlocked = false); + void + refreshReplyTimer(); + ndn::Face& m_face; LedgerConfig m_config; Scheduler m_scheduler{m_face.getIoService()}; @@ -99,6 +102,10 @@ class LedgerModule : boost::noncopyable std::unique_ptr m_dag; // this shouldn't keep growing, so it's safe to put into the memory std::set m_repliedRecords; + + // reply management + time::milliseconds m_replyPeriod; + ndn::scheduler::EventId m_replyEvent; }; } // namespace cledger::ledger diff --git a/src/sync/ledger-svs.cpp b/src/sync/ledger-svs.cpp index 7c824e0..e3d9d10 100644 --- a/src/sync/ledger-svs.cpp +++ b/src/sync/ledger-svs.cpp @@ -24,26 +24,4 @@ LedgerSVSDataStore::insert(const Data& data) m_storageIntf.adder(data.getName(), data.wireEncode()); } -LedgerSVSBase::LedgerSVSBase(const Name& syncPrefix, - const Name& nodePrefix, - ndn::Face& face, - const UpdateCallback& updateCallback, - const SecurityOptions& securityOptions, - std::shared_ptr dataStore) - : SVSyncBase(syncPrefix, Name(nodePrefix).append(syncPrefix), nodePrefix, - face, updateCallback, securityOptions, std::move(dataStore)) -{} - -Name -LedgerSVSBase::getDataName(const NodeID& nid, const SeqNo& seqNo) -{ - return Name(nid).append(m_syncPrefix).appendNumber(seqNo); -} - -Name -LedgerSVSBase::getMyDataName(const SeqNo& seqNo) -{ - return Name(m_dataPrefix).appendNumber(seqNo); -} - } // namespace cledger::sync \ No newline at end of file diff --git a/src/sync/ledger-svs.hpp b/src/sync/ledger-svs.hpp index 4297b6b..0b7c4b5 100644 --- a/src/sync/ledger-svs.hpp +++ b/src/sync/ledger-svs.hpp @@ -41,24 +41,6 @@ class LedgerSVSDataStore : public DataStore storage::Interface m_storageIntf; }; -class LedgerSVSBase : public SVSyncBase -{ -public: - LedgerSVSBase(const Name& syncPrefix, - const Name& nodePrefix, - ndn::Face& face, - const UpdateCallback& updateCallback, - const SecurityOptions& securityOptions, - std::shared_ptr dataStore); - - Name - getDataName(const NodeID& nid, const SeqNo& seqNo) override; - - Name - getMyDataName(const SeqNo& seqNo); -}; - - } // namespace cledger::sync #endif // CLEDGER_SYNC_LEDGER_SVS_HPP \ No newline at end of file diff --git a/src/sync/sync-module.cpp b/src/sync/sync-module.cpp index 680b8b8..3a6908b 100644 --- a/src/sync/sync-module.cpp +++ b/src/sync/sync-module.cpp @@ -11,115 +11,34 @@ SyncModule::SyncModule(const SyncOptions &options, const SecurityOptions& secOps , m_storageIntf(storageIntf) , m_yieldCb(yield) { - m_svs = std::make_shared(m_syncOptions.prefix, - m_syncOptions.id, - m_face, std::bind(&SyncModule::onMissingData, this, _1), m_secOptions, - std::make_shared(m_storageIntf)); + m_ps = std::make_shared(m_syncOptions.prefix, + m_syncOptions.id, + m_face, [] (auto&&) {}, m_secOptions, + std::make_shared(m_storageIntf)); + // Subscribe to all data packets with prefix /chat (the "topic") + m_ps->subscribe(Name("/"), [this] (const auto& subData) { + // assume no segmentation + auto name = subData.name; + auto contentBlock = subData.data; + m_yieldCb(Record(name, Block(contentBlock))); + }); } -std::tuple -SyncModule::parseDataName(const Name& name) -{ - return std::make_tuple(name.getPrefix(-1 - m_syncOptions.prefix.size()), - name.get(-1).toNumber()); -} - -void -SyncModule::onMissingData(const std::vector& vectors) -{ - // an accumlator to collect all missing records along the way - for (auto& v : vectors) { - for (SeqNo curr = v.low; curr <= v.high; curr++) { - recursiveFetcher(m_svs->getDataName(v.nodeId, curr)); - } - } -} - -void -SyncModule::fetcher(const NodeID& nid, const SeqNo& s) -{ - auto searchStorage = [this] (const Name& n) { - try { - m_storageIntf.getter(n); - return true; - } - catch (std::exception& e) { - return false; - } - }; - - // check if exist in storage - if (searchStorage(m_svs->getDataName(nid, s))) { - NDN_LOG_TRACE("Already fetched " << m_svs->getDataName(nid, s)); - return; - } - - NDN_LOG_TRACE("Trying getting data " << m_svs->getDataName(nid, s)); - m_svs->fetchData(nid, s, [this, searchStorage] (const ndn::Data& data) { - NDN_LOG_DEBUG("Getting data " << data.getName()); - // check again if exist in storage - if (searchStorage(data.getName())) { - NDN_LOG_TRACE("Already fetched " << data.getName()); - return; - } - else { - m_storageIntf.adder(data.getName(), data.wireEncode()); - NDN_LOG_TRACE("Yielding " << data.getName()); - m_yieldCb(Record(data.getName(), data.getContent())); - } - }, - MAX_RETRIES); -} -void -SyncModule::recursiveFetcher(const Name& recordName) +Name +SyncModule::publishRecord(Record& record) { - auto searchStorage = [this] (const Name& n) { - try { - m_storageIntf.getter(n); - return true; - } - catch (std::exception& e) { - return false; - } - }; - - // check again if exist in acc or storage - if (searchStorage(recordName)) { - NDN_LOG_TRACE("Already fetched " << recordName); - return; - } - - NDN_LOG_TRACE("Trying getting data " << recordName); - auto tuple = parseDataName(recordName); - m_svs->fetchData(std::get<0>(tuple), std::get<1>(tuple), [this, searchStorage] (const ndn::Data& data) { - NDN_LOG_DEBUG("Getting data " << data.getName()); - Record record = Record(data.getName(), data.getContent()); - - if (!searchStorage(data.getName())) { - m_storageIntf.adder(data.getName(), data.wireEncode()); - NDN_LOG_TRACE("Yielding " << data.getName()); - m_yieldCb(record); - } - if (record.isGenesis()) { - NDN_LOG_INFO(data.getName() << " is an new genesis record"); - } - else { - // keep fetching - for (auto& pointer : record.getPointers()) { - recursiveFetcher(pointer); - } - } - }, - MAX_RETRIES); + auto name = getNextName(); + auto content = record.prepareContent(); + content->encode(); + NDN_LOG_INFO("Publishing " << name); + m_seqNo = m_ps->publish(name, make_span(reinterpret_cast(content->data()), content->size())); + return name; } Name -SyncModule::publishRecord(Record& record) -{ - SeqNo seq = m_svs->publishData(*record.prepareContent(), ndn::time::milliseconds(3000)); - auto puiblishedName = m_svs->getDataName(m_syncOptions.id, seq); - NDN_LOG_DEBUG("Published " << puiblishedName); - return puiblishedName; +SyncModule::getNextName() +{ + return Name(m_syncOptions.id).appendSequenceNumber(m_seqNo + 1); } } // namespace cledger::sync \ No newline at end of file diff --git a/src/sync/sync-module.hpp b/src/sync/sync-module.hpp index 72c386c..3486fe9 100644 --- a/src/sync/sync-module.hpp +++ b/src/sync/sync-module.hpp @@ -11,6 +11,7 @@ #include #include +#include namespace cledger::sync { @@ -21,9 +22,13 @@ using ndn::svs::SecurityOptions; using ndn::svs::UpdateCallback; using ndn::svs::MissingDataInfo; using ndn::svs::DataStore; +using ndn::svs::SVSPubSub; using YieldRecordCallback = std::function; + + + class SyncModule { public: @@ -36,32 +41,20 @@ class SyncModule Name publishRecord(Record& record); - std::shared_ptr - getSyncBase() - { - return m_svs; - } + Name + getNextName(); CLEDGER_PUBLIC_WITH_TESTS_ELSE_PRIVATE: - std::tuple - parseDataName(const Name& name); - - void - onMissingData(const std::vector& vectors); - - void - recursiveFetcher(const Name& recordName); - - void - fetcher(const NodeID& nid, const SeqNo& s); SyncOptions m_syncOptions; SecurityOptions m_secOptions; ndn::Face& m_face; - std::shared_ptr m_svs; + std::shared_ptr m_ps; storage::Interface m_storageIntf; YieldRecordCallback m_yieldCb; + + SeqNo m_seqNo = 0; }; } // namespace cledger::sync diff --git a/tools/ndncledger-ledger.cpp b/tools/ndncledger-ledger.cpp index 0533fac..0de6192 100644 --- a/tools/ndncledger-ledger.cpp +++ b/tools/ndncledger-ledger.cpp @@ -39,15 +39,6 @@ handleSignal(const boost::system::error_code& error, int signalNum) exit(1); } -static void -scheduleWrapper(time::seconds backoffPeriod, ndn::scheduler::EventCallback eventCb) -{ - auto randomized = std::experimental::randint(100, 110) / 100 * backoffPeriod; - scheduler.schedule(randomized, [backoffPeriod, eventCb] { - scheduleWrapper(backoffPeriod, eventCb); - eventCb(); - }); -} static int main(int argc, char* argv[]) { @@ -64,7 +55,7 @@ main(int argc, char* argv[]) optsDesc.add_options() ("help,h", "print this help message and exit") ("config-file,c", po::value(&configFilePath)->default_value(configFilePath), "path to configuration file") - ("backoff-period,b", po::value(&backoffPeriodStr)->default_value(backoffPeriodStr), "backoff period (in seconds) of generating reply record"); + ("backoff-period,b", po::value(&backoffPeriodStr)->default_value(backoffPeriodStr), "backoff period (in millseconds) of generating reply record"); po::variables_map vm; try { @@ -87,9 +78,7 @@ main(int argc, char* argv[]) return 0; } - LedgerModule ledger(face, keyChain, configFilePath); - auto backoffPeriod = time::seconds(std::stoul(backoffPeriodStr)); - scheduleWrapper(backoffPeriod, [&ledger] {ledger.publishReply();}); + LedgerModule ledger(face, keyChain, configFilePath, ndn::time::milliseconds(std::stoul(backoffPeriodStr))); face.processEvents(); return 0; } diff --git a/trust-schema.conf.sample b/trust-schema.conf.sample index 3a034fb..5afac6c 100644 --- a/trust-schema.conf.sample +++ b/trust-schema.conf.sample @@ -30,7 +30,7 @@ rule key-locator { type name - name /ndn/site1 + name /ndn relation is-prefix-of } } @@ -53,7 +53,7 @@ rule key-locator { type name - name /ndn/site1 + name /ndn relation is-prefix-of } } @@ -75,7 +75,7 @@ rule key-locator { type name - name /ndn/site1 + name /ndn relation is-prefix-of } } @@ -97,7 +97,7 @@ rule key-locator { type name - name /ndn/site1 + name /ndn relation is-prefix-of } } @@ -119,7 +119,7 @@ rule key-locator { type name - name /ndn/site1 + name /ndn relation is-prefix-of } } @@ -152,8 +152,14 @@ rule } checker { - type hierarchical + type customized sig-type ecdsa-sha256 + key-locator + { + type name + name /ndn + relation is-prefix-of + } } }