27#include "log_enable.h"
33#include <condition_variable>
43struct SecureDhtConfig;
59 std::string proxy_server;
60 std::string push_node_id;
67 get(
id, bindGetCb(cb), donecb, f, w);
70 void get(InfoHash
id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
71 get(
id, bindGetCb(cb), donecb, f, w);
74 void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
76 void get(InfoHash
id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
77 get(
id, cb, bindDoneCb(donecb), f, w);
79 void get(
const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = Value::AllFilter(), Where w = {});
82 void get(InfoHash hash, std::function<
bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
84 get(hash, [=](
const std::vector<std::shared_ptr<Value>>& vals) {
85 return cb(unpackVector<T>(vals));
91 void get(InfoHash hash, std::function<
bool(T&&)> cb, DoneCallbackSimple dcb={})
93 get(hash, [=](
const std::vector<std::shared_ptr<Value>>& vals) {
94 for (
const auto& v : vals) {
96 if (not cb(Value::unpack<T>(*v)))
98 }
catch (
const std::exception&) {
108 std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = Value::AllFilter(), Where w = {}) {
109 auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
110 auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
111 get(key, [=](
const std::vector<std::shared_ptr<dht::Value>>& vlist) {
112 values->insert(values->end(), vlist.begin(), vlist.end());
115 p->set_value(std::move(*values));
118 return p->get_future();
122 std::future<std::vector<T>> get(InfoHash key) {
123 auto p = std::make_shared<std::promise<std::vector<T>>>();
124 auto values = std::make_shared<std::vector<T>>();
125 get<T>(key, [=](T&& v) {
126 values->emplace_back(std::move(v));
129 p->set_value(std::move(*values));
131 return p->get_future();
134 void query(
const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
135 void query(
const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
136 query(hash, cb, bindDoneCb(done_cb), q);
139 std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {});
141 std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
142 return listen(key, [cb](
const std::vector<Sp<Value>>& vals,
bool expired){
146 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
148 std::future<size_t> listen(
const std::string& key, GetCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {});
149 std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = Value::AllFilter(), Where w = {}) {
150 return listen(key, bindGetCb(cb), f, w);
154 std::future<size_t> listen(InfoHash hash, std::function<
bool(std::vector<T>&&)> cb)
156 return listen(hash, [=](
const std::vector<std::shared_ptr<Value>>& vals) {
157 return cb(unpackVector<T>(vals));
161 template <
typename T>
162 std::future<size_t> listen(InfoHash hash, std::function<
bool(T&&)> cb, Value::Filter f = Value::AllFilter(), Where w = {})
164 return listen(hash, [=](
const std::vector<std::shared_ptr<Value>>& vals) {
165 for (
const auto& v : vals) {
167 if (not cb(Value::unpack<T>(*v)))
169 }
catch (
const std::exception&) {
175 getFilterSet<T>(f), w);
178 void cancelListen(InfoHash h,
size_t token);
179 void cancelListen(InfoHash h, std::shared_future<size_t> token);
181 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(),
bool permanent =
false);
182 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(),
bool permanent =
false) {
183 put(hash, value, bindDoneCb(cb), created, permanent);
186 void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(),
bool permanent =
false);
187 void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(),
bool permanent =
false) {
188 put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
190 void put(
const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(),
bool permanent =
false);
192 void cancelPut(
const InfoHash& h,
const Value::Id&
id);
194 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={});
195 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb) {
196 putSigned(hash, value, bindDoneCb(cb));
199 void putSigned(InfoHash hash, Value&& value, DoneCallback cb={});
200 void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb) {
201 putSigned(hash, std::forward<Value>(value), bindDoneCb(cb));
203 void putSigned(
const std::string& key, Value&& value, DoneCallbackSimple cb={});
205 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={});
206 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb) {
207 putEncrypted(hash, to, value, bindDoneCb(cb));
210 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={});
211 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb) {
212 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb));
214 void putEncrypted(
const std::string& key, InfoHash to, Value&& value, DoneCallback cb={});
220 void bootstrap(
const std::vector<SockAddr>& nodes, DoneCallbackSimple&& cb={});
221 void bootstrap(
const SockAddr& addr, DoneCallbackSimple&& cb={});
235 void bootstrap(
const std::string& host,
const std::string& service);
249 void dumpTables()
const;
260 return (f == AF_INET) ? bound4 : bound6;
268 return getBound(f).getPort();
271 std::pair<size_t, size_t> getStoreSize()
const;
273 void setStorageLimit(
size_t limit = DEFAULT_STORAGE_LIMIT);
275 std::vector<NodeExport> exportNodes()
const;
277 std::vector<ValuesExport> exportValues()
const;
286 void registerType(
const ValueType& type);
288 void importValues(
const std::vector<ValuesExport>& values);
290 bool isRunning()
const {
294 NodeStats getNodesStats(sa_family_t af)
const;
295 unsigned getNodesStats(sa_family_t af,
unsigned *good_return,
unsigned *dubious_return,
unsigned *cached_return,
unsigned *incoming_return)
const;
296 NodeInfo getNodeInfo()
const;
298 std::vector<unsigned> getNodeMessageStats(
bool in =
false)
const;
299 std::string getStorageLog()
const;
300 std::string getStorageLog(
const InfoHash&)
const;
301 std::string getRoutingTablesLog(sa_family_t af)
const;
302 std::string getSearchesLog(sa_family_t af = AF_UNSPEC)
const;
303 std::string getSearchLog(
const InfoHash&, sa_family_t af = AF_UNSPEC)
const;
304 std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC);
305 std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC);
309 void findCertificate(InfoHash hash, std::function<
void(
const std::shared_ptr<crypto::Certificate>)>);
310 void registerCertificate(std::shared_ptr<crypto::Certificate> cert);
311 void setLocalCertificateStore(CertificateStoreQuery&& query_method);
319 void run(in_port_t port = 4222,
const crypto::Identity identity = {},
bool threaded =
false, NetId network = 0) {
335 void run(in_port_t port, Config config);
350 void run(
const char* ip4,
const char* ip6,
const char* service,
Config config);
352 void setOnStatusChanged(StatusCallback&& cb) {
353 statusCb = std::move(cb);
362 std::lock_guard<std::mutex> lck(dht_mtx);
363 time_point wakeup = time_point::min();
367 startNetwork(bound4, bound6);
384 void setProxyServer(
const std::string& proxy,
const std::string& pushNodeId =
"");
406 void forwardAllMessages(
bool forward);
409 static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
417 void tryBootstrapContinuously();
423 return std::max(status4, status6);
427 std::unique_ptr<SecureDht> dht_;
430 std::unique_ptr<SecureDht> dht_via_proxy_;
433 std::atomic_bool use_proxy {
false};
445 SecureDht* activeDht()
const;
451 std::map<size_t, Listener> listeners_;
452 size_t listener_token_ {1};
454 mutable std::mutex dht_mtx {};
455 std::thread dht_thread {};
456 std::condition_variable cv {};
458 std::thread rcv_thread {};
459 std::mutex sock_mtx {};
461 struct ReceivedPacket {
466 std::queue<ReceivedPacket> rcv {};
469 std::atomic_bool bootstraping {
false};
471 std::vector<std::pair<std::string,std::string>> bootstrap_nodes_all {};
472 std::vector<std::pair<std::string,std::string>> bootstrap_nodes {};
473 std::thread bootstrap_thread {};
475 std::mutex bootstrap_mtx {};
476 std::condition_variable bootstrap_cv {};
478 std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
479 std::queue<std::function<void(SecureDht&)>> pending_ops {};
480 std::mutex storage_mtx {};
482 std::atomic_bool running {
false};
483 std::atomic_bool running_network {
false};
485 NodeStatus status4 {NodeStatus::Disconnected},
486 status6 {NodeStatus::Disconnected};
487 StatusCallback statusCb {
nullptr};
489 int s4 {-1}, s6 {-1};
494 std::string pushToken_;
in_port_t getBoundPort(sa_family_t f=AF_INET) const
void bootstrap(const std::string &host, const std::string &service)
void setPushNotificationToken(const std::string &token)
void connectivityChanged()
void run(const SockAddr &local4, const SockAddr &local6, Config config)
void pushNotificationReceived(const std::map< std::string, std::string > &data)
void run(const char *ip4, const char *ip6, const char *service, Config config)
void bootstrap(const std::vector< NodeExport > &nodes)
const SockAddr & getBound(sa_family_t f=AF_INET) const
void setLogFilter(const InfoHash &f={})
void enableProxy(bool proxify)
void shutdown(ShutdownCallback cb)
void bootstrap(const std::vector< SockAddr > &nodes, DoneCallbackSimple &&cb={})
void run(in_port_t port=4222, const crypto::Identity identity={}, bool threaded=false, NetId network=0)
void NOLOG(char const *, va_list)
std::vector< uint8_t > Blob
Serializable dht::Value filter.