Skip to content

Commit

Permalink
ledger: new timer design
Browse files Browse the repository at this point in the history
  • Loading branch information
tianyuan129 committed Nov 2, 2022
1 parent aeaadbf commit d27f160
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 182 deletions.
18 changes: 16 additions & 2 deletions src/ledger-module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ NDN_LOG_INIT(cledger.benchmark.ledger);
NDN_LOG_INIT(cledger.ledger);
#endif

LedgerModule::LedgerModule(ndn::Face& face, ndn::KeyChain& keyChain, const std::string& configPath)
LedgerModule::LedgerModule(ndn::Face& face, ndn::KeyChain& keyChain, const std::string& configPath, time::milliseconds replyPeriod)
: m_face(face)
, m_keyChain(keyChain)
, m_replyPeriod(replyPeriod)
{
// load the config and create storage
m_config.load(configPath);
Expand Down Expand Up @@ -67,6 +68,9 @@ LedgerModule::LedgerModule(ndn::Face& face, ndn::KeyChain& keyChain, const std::
m_sync = std::make_unique<sync::SyncModule>(m_syncOps, m_secOps, m_face,
m_storage->getInterface(),
[this] (const Record& record) {
// refresh the timer anyway
refreshReplyTimer();

auto stateName = m_dag->add(record);
updateStatesTracker(stateName);
if (record.getType() != tlv::REPLY_RECORD) {
Expand Down Expand Up @@ -314,7 +318,7 @@ LedgerModule::afterValidation(const Data& data)
newRecord.setType(tlv::GENESIS_RECORD);
// a gensis record must be its first SVS publication,
// therefore we can guess the name
auto genesisName = m_sync->getSyncBase()->getMyDataName(1);
auto genesisName = m_sync->getNextName();
NDN_LOG_DEBUG("Referencing to [Genesis] " << genesisName);
newRecord.setPointers({genesisName});
}
Expand Down Expand Up @@ -447,4 +451,14 @@ LedgerModule::sendResponse(const Name& name, const Block& block, bool realtime)
});
}

void
LedgerModule::refreshReplyTimer()
{
if (m_replyEvent) {
m_replyEvent.cancel();
}
m_replyEvent = m_scheduler.schedule(m_replyPeriod, [this] {
publishReply();
});
}
} // namespace cledger::ledger
9 changes: 8 additions & 1 deletion src/ledger-module.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ using appendtlv::AppendStatus;
class LedgerModule : boost::noncopyable
{
public:
LedgerModule(ndn::Face& face, ndn::KeyChain& keyChain, const std::string& configPath);
LedgerModule(ndn::Face& face, ndn::KeyChain& keyChain, const std::string& configPath, time::milliseconds replyPeriod = 3600_s);

const std::unique_ptr<storage::LedgerStorage>&
getLedgerStorage()
Expand Down Expand Up @@ -75,6 +75,9 @@ class LedgerModule : boost::noncopyable
void
updateStatesTracker(const Name& stateName, bool interlocked = false);

void
refreshReplyTimer();

ndn::Face& m_face;
LedgerConfig m_config;
Scheduler m_scheduler{m_face.getIoService()};
Expand All @@ -99,6 +102,10 @@ class LedgerModule : boost::noncopyable
std::unique_ptr<dag::DagModule> m_dag;
// this shouldn't keep growing, so it's safe to put into the memory
std::set<Name> m_repliedRecords;

// reply management
time::milliseconds m_replyPeriod;
ndn::scheduler::EventId m_replyEvent;
};

} // namespace cledger::ledger
Expand Down
22 changes: 0 additions & 22 deletions src/sync/ledger-svs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,4 @@ LedgerSVSDataStore::insert(const Data& data)
m_storageIntf.adder(data.getName(), data.wireEncode());
}

LedgerSVSBase::LedgerSVSBase(const Name& syncPrefix,
const Name& nodePrefix,
ndn::Face& face,
const UpdateCallback& updateCallback,
const SecurityOptions& securityOptions,
std::shared_ptr<LedgerSVSDataStore> dataStore)
: SVSyncBase(syncPrefix, Name(nodePrefix).append(syncPrefix), nodePrefix,
face, updateCallback, securityOptions, std::move(dataStore))
{}

Name
LedgerSVSBase::getDataName(const NodeID& nid, const SeqNo& seqNo)
{
return Name(nid).append(m_syncPrefix).appendNumber(seqNo);
}

Name
LedgerSVSBase::getMyDataName(const SeqNo& seqNo)
{
return Name(m_dataPrefix).appendNumber(seqNo);
}

} // namespace cledger::sync
18 changes: 0 additions & 18 deletions src/sync/ledger-svs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,6 @@ class LedgerSVSDataStore : public DataStore
storage::Interface m_storageIntf;
};

class LedgerSVSBase : public SVSyncBase
{
public:
LedgerSVSBase(const Name& syncPrefix,
const Name& nodePrefix,
ndn::Face& face,
const UpdateCallback& updateCallback,
const SecurityOptions& securityOptions,
std::shared_ptr<LedgerSVSDataStore> dataStore);

Name
getDataName(const NodeID& nid, const SeqNo& seqNo) override;

Name
getMyDataName(const SeqNo& seqNo);
};


} // namespace cledger::sync

#endif // CLEDGER_SYNC_LEDGER_SVS_HPP
125 changes: 22 additions & 103 deletions src/sync/sync-module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,115 +11,34 @@ SyncModule::SyncModule(const SyncOptions &options, const SecurityOptions& secOps
, m_storageIntf(storageIntf)
, m_yieldCb(yield)
{
m_svs = std::make_shared<LedgerSVSBase>(m_syncOptions.prefix,
m_syncOptions.id,
m_face, std::bind(&SyncModule::onMissingData, this, _1), m_secOptions,
std::make_shared<LedgerSVSDataStore>(m_storageIntf));
m_ps = std::make_shared<SVSPubSub>(m_syncOptions.prefix,
m_syncOptions.id,
m_face, [] (auto&&) {}, m_secOptions,
std::make_shared<LedgerSVSDataStore>(m_storageIntf));
// Subscribe to all data packets with prefix /chat (the "topic")
m_ps->subscribe(Name("/"), [this] (const auto& subData) {
// assume no segmentation
auto name = subData.name;
auto contentBlock = subData.data;
m_yieldCb(Record(name, Block(contentBlock)));
});
}

std::tuple<NodeID, SeqNo>
SyncModule::parseDataName(const Name& name)
{
return std::make_tuple<NodeID, SeqNo>(name.getPrefix(-1 - m_syncOptions.prefix.size()),
name.get(-1).toNumber());
}

void
SyncModule::onMissingData(const std::vector<MissingDataInfo>& vectors)
{
// an accumlator to collect all missing records along the way
for (auto& v : vectors) {
for (SeqNo curr = v.low; curr <= v.high; curr++) {
recursiveFetcher(m_svs->getDataName(v.nodeId, curr));
}
}
}

void
SyncModule::fetcher(const NodeID& nid, const SeqNo& s)
{
auto searchStorage = [this] (const Name& n) {
try {
m_storageIntf.getter(n);
return true;
}
catch (std::exception& e) {
return false;
}
};

// check if exist in storage
if (searchStorage(m_svs->getDataName(nid, s))) {
NDN_LOG_TRACE("Already fetched " << m_svs->getDataName(nid, s));
return;
}

NDN_LOG_TRACE("Trying getting data " << m_svs->getDataName(nid, s));
m_svs->fetchData(nid, s, [this, searchStorage] (const ndn::Data& data) {
NDN_LOG_DEBUG("Getting data " << data.getName());
// check again if exist in storage
if (searchStorage(data.getName())) {
NDN_LOG_TRACE("Already fetched " << data.getName());
return;
}
else {
m_storageIntf.adder(data.getName(), data.wireEncode());
NDN_LOG_TRACE("Yielding " << data.getName());
m_yieldCb(Record(data.getName(), data.getContent()));
}
},
MAX_RETRIES);
}
void
SyncModule::recursiveFetcher(const Name& recordName)
Name
SyncModule::publishRecord(Record& record)
{
auto searchStorage = [this] (const Name& n) {
try {
m_storageIntf.getter(n);
return true;
}
catch (std::exception& e) {
return false;
}
};

// check again if exist in acc or storage
if (searchStorage(recordName)) {
NDN_LOG_TRACE("Already fetched " << recordName);
return;
}

NDN_LOG_TRACE("Trying getting data " << recordName);
auto tuple = parseDataName(recordName);
m_svs->fetchData(std::get<0>(tuple), std::get<1>(tuple), [this, searchStorage] (const ndn::Data& data) {
NDN_LOG_DEBUG("Getting data " << data.getName());
Record record = Record(data.getName(), data.getContent());

if (!searchStorage(data.getName())) {
m_storageIntf.adder(data.getName(), data.wireEncode());
NDN_LOG_TRACE("Yielding " << data.getName());
m_yieldCb(record);
}
if (record.isGenesis()) {
NDN_LOG_INFO(data.getName() << " is an new genesis record");
}
else {
// keep fetching
for (auto& pointer : record.getPointers()) {
recursiveFetcher(pointer);
}
}
},
MAX_RETRIES);
auto name = getNextName();
auto content = record.prepareContent();
content->encode();
NDN_LOG_INFO("Publishing " << name);
m_seqNo = m_ps->publish(name, make_span(reinterpret_cast<const uint8_t*>(content->data()), content->size()));
return name;
}

Name
SyncModule::publishRecord(Record& record)
{
SeqNo seq = m_svs->publishData(*record.prepareContent(), ndn::time::milliseconds(3000));
auto puiblishedName = m_svs->getDataName(m_syncOptions.id, seq);
NDN_LOG_DEBUG("Published " << puiblishedName);
return puiblishedName;
SyncModule::getNextName()
{
return Name(m_syncOptions.id).appendSequenceNumber(m_seqNo + 1);
}

} // namespace cledger::sync
27 changes: 10 additions & 17 deletions src/sync/sync-module.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <vector>

#include <ndn-svs/svsync-base.hpp>
#include <ndn-svs/svspubsub.hpp>

namespace cledger::sync {

Expand All @@ -21,9 +22,13 @@ using ndn::svs::SecurityOptions;
using ndn::svs::UpdateCallback;
using ndn::svs::MissingDataInfo;
using ndn::svs::DataStore;
using ndn::svs::SVSPubSub;

using YieldRecordCallback = std::function<void(const Record&)>;




class SyncModule
{
public:
Expand All @@ -36,32 +41,20 @@ class SyncModule
Name
publishRecord(Record& record);

std::shared_ptr<LedgerSVSBase>
getSyncBase()
{
return m_svs;
}
Name
getNextName();

CLEDGER_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
std::tuple<NodeID, SeqNo>
parseDataName(const Name& name);

void
onMissingData(const std::vector<MissingDataInfo>& vectors);

void
recursiveFetcher(const Name& recordName);

void
fetcher(const NodeID& nid, const SeqNo& s);

SyncOptions m_syncOptions;
SecurityOptions m_secOptions;
ndn::Face& m_face;

std::shared_ptr<LedgerSVSBase> m_svs;
std::shared_ptr<SVSPubSub> m_ps;
storage::Interface m_storageIntf;
YieldRecordCallback m_yieldCb;

SeqNo m_seqNo = 0;
};

} // namespace cledger::sync
Expand Down
15 changes: 2 additions & 13 deletions tools/ndncledger-ledger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ handleSignal(const boost::system::error_code& error, int signalNum)
exit(1);
}

static void
scheduleWrapper(time::seconds backoffPeriod, ndn::scheduler::EventCallback eventCb)
{
auto randomized = std::experimental::randint(100, 110) / 100 * backoffPeriod;
scheduler.schedule(randomized, [backoffPeriod, eventCb] {
scheduleWrapper(backoffPeriod, eventCb);
eventCb();
});
}
static int
main(int argc, char* argv[])
{
Expand All @@ -64,7 +55,7 @@ main(int argc, char* argv[])
optsDesc.add_options()
("help,h", "print this help message and exit")
("config-file,c", po::value<std::string>(&configFilePath)->default_value(configFilePath), "path to configuration file")
("backoff-period,b", po::value<std::string>(&backoffPeriodStr)->default_value(backoffPeriodStr), "backoff period (in seconds) of generating reply record");
("backoff-period,b", po::value<std::string>(&backoffPeriodStr)->default_value(backoffPeriodStr), "backoff period (in millseconds) of generating reply record");

po::variables_map vm;
try {
Expand All @@ -87,9 +78,7 @@ main(int argc, char* argv[])
return 0;
}

LedgerModule ledger(face, keyChain, configFilePath);
auto backoffPeriod = time::seconds(std::stoul(backoffPeriodStr));
scheduleWrapper(backoffPeriod, [&ledger] {ledger.publishReply();});
LedgerModule ledger(face, keyChain, configFilePath, ndn::time::milliseconds(std::stoul(backoffPeriodStr)));
face.processEvents();
return 0;
}
Expand Down
Loading

0 comments on commit d27f160

Please sign in to comment.