My Project 1.7.4
C++ Distributed Hash Table
dhtrunner.h
1/*
2 * Copyright (C) 2014-2017 Savoir-faire Linux Inc.
3 * Authors: Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5 * Sébastien Blin <sebastien.blin@savoirfairelinux.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include "infohash.h"
24#include "value.h"
25#include "callbacks.h"
26#include "sockaddr.h"
27#include "log_enable.h"
28#include "def.h"
29
30#include <thread>
31#include <mutex>
32#include <atomic>
33#include <condition_variable>
34#include <future>
35#include <exception>
36#include <queue>
37#include <chrono>
38
39namespace dht {
40
41struct Node;
42class SecureDht;
43struct SecureDhtConfig;
44
51class OPENDHT_PUBLIC DhtRunner {
52
53public:
54 typedef std::function<void(NodeStatus, NodeStatus)> StatusCallback;
55
56 struct Config {
57 SecureDhtConfig dht_config;
58 bool threaded;
59 std::string proxy_server;
60 std::string push_node_id;
61 };
62
63 DhtRunner();
64 virtual ~DhtRunner();
65
66 void get(InfoHash id, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
67 get(id, bindGetCb(cb), donecb, f, w);
68 }
69
70 void get(InfoHash id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
71 get(id, bindGetCb(cb), donecb, f, w);
72 }
73
74 void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
75
76 void get(InfoHash id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = Value::AllFilter(), Where w = {}) {
77 get(id, cb, bindDoneCb(donecb), f, w);
78 }
79 void get(const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = Value::AllFilter(), Where w = {});
80
81 template <class T>
82 void get(InfoHash hash, std::function<bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
83 {
84 get(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
85 return cb(unpackVector<T>(vals));
86 },
87 dcb,
88 getFilterSet<T>());
89 }
90 template <class T>
91 void get(InfoHash hash, std::function<bool(T&&)> cb, DoneCallbackSimple dcb={})
92 {
93 get(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
94 for (const auto& v : vals) {
95 try {
96 if (not cb(Value::unpack<T>(*v)))
97 return false;
98 } catch (const std::exception&) {
99 continue;
100 }
101 }
102 return true;
103 },
104 dcb,
105 getFilterSet<T>());
106 }
107
108 std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = Value::AllFilter(), Where w = {}) {
109 auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
110 auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
111 get(key, [=](const std::vector<std::shared_ptr<dht::Value>>& vlist) {
112 values->insert(values->end(), vlist.begin(), vlist.end());
113 return true;
114 }, [=](bool) {
115 p->set_value(std::move(*values));
116 },
117 f, w);
118 return p->get_future();
119 }
120
121 template <class T>
122 std::future<std::vector<T>> get(InfoHash key) {
123 auto p = std::make_shared<std::promise<std::vector<T>>>();
124 auto values = std::make_shared<std::vector<T>>();
125 get<T>(key, [=](T&& v) {
126 values->emplace_back(std::move(v));
127 return true;
128 }, [=](bool) {
129 p->set_value(std::move(*values));
130 });
131 return p->get_future();
132 }
133
134 void query(const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
135 void query(const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
136 query(hash, cb, bindDoneCb(done_cb), q);
137 }
138
139 std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {});
140
141 std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
142 return listen(key, [cb](const std::vector<Sp<Value>>& vals, bool expired){
143 if (not expired)
144 return cb(vals);
145 return true;
146 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
147 }
148 std::future<size_t> listen(const std::string& key, GetCallback vcb, Value::Filter f = Value::AllFilter(), Where w = {});
149 std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = Value::AllFilter(), Where w = {}) {
150 return listen(key, bindGetCb(cb), f, w);
151 }
152
153 template <class T>
154 std::future<size_t> listen(InfoHash hash, std::function<bool(std::vector<T>&&)> cb)
155 {
156 return listen(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
157 return cb(unpackVector<T>(vals));
158 },
159 getFilterSet<T>());
160 }
161 template <typename T>
162 std::future<size_t> listen(InfoHash hash, std::function<bool(T&&)> cb, Value::Filter f = Value::AllFilter(), Where w = {})
163 {
164 return listen(hash, [=](const std::vector<std::shared_ptr<Value>>& vals) {
165 for (const auto& v : vals) {
166 try {
167 if (not cb(Value::unpack<T>(*v)))
168 return false;
169 } catch (const std::exception&) {
170 continue;
171 }
172 }
173 return true;
174 },
175 getFilterSet<T>(f), w);
176 }
177
178 void cancelListen(InfoHash h, size_t token);
179 void cancelListen(InfoHash h, std::shared_future<size_t> token);
180
181 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
182 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
183 put(hash, value, bindDoneCb(cb), created, permanent);
184 }
185
186 void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(), bool permanent = false);
187 void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(), bool permanent = false) {
188 put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
189 }
190 void put(const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(), bool permanent = false);
191
192 void cancelPut(const InfoHash& h, const Value::Id& id);
193
194 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={});
195 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb) {
196 putSigned(hash, value, bindDoneCb(cb));
197 }
198
199 void putSigned(InfoHash hash, Value&& value, DoneCallback cb={});
200 void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb) {
201 putSigned(hash, std::forward<Value>(value), bindDoneCb(cb));
202 }
203 void putSigned(const std::string& key, Value&& value, DoneCallbackSimple cb={});
204
205 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={});
206 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb) {
207 putEncrypted(hash, to, value, bindDoneCb(cb));
208 }
209
210 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={});
211 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb) {
212 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb));
213 }
214 void putEncrypted(const std::string& key, InfoHash to, Value&& value, DoneCallback cb={});
215
220 void bootstrap(const std::vector<SockAddr>& nodes, DoneCallbackSimple&& cb={});
221 void bootstrap(const SockAddr& addr, DoneCallbackSimple&& cb={});
222
227 void bootstrap(const std::vector<NodeExport>& nodes);
228
235 void bootstrap(const std::string& host, const std::string& service);
236
241
248
249 void dumpTables() const;
250
251 InfoHash getId() const;
252
253 InfoHash getNodeId() const;
254
259 const SockAddr& getBound(sa_family_t f = AF_INET) const {
260 return (f == AF_INET) ? bound4 : bound6;
261 }
262
267 in_port_t getBoundPort(sa_family_t f = AF_INET) const {
268 return getBound(f).getPort();
269 }
270
271 std::pair<size_t, size_t> getStoreSize() const;
272
273 void setStorageLimit(size_t limit = DEFAULT_STORAGE_LIMIT);
274
275 std::vector<NodeExport> exportNodes() const;
276
277 std::vector<ValuesExport> exportValues() const;
278
279 void setLoggers(LogMethod err = NOLOG, LogMethod warn = NOLOG, LogMethod debug = NOLOG);
280
284 void setLogFilter(const InfoHash& f = {});
285
286 void registerType(const ValueType& type);
287
288 void importValues(const std::vector<ValuesExport>& values);
289
290 bool isRunning() const {
291 return running;
292 }
293
294 NodeStats getNodesStats(sa_family_t af) const;
295 unsigned getNodesStats(sa_family_t af, unsigned *good_return, unsigned *dubious_return, unsigned *cached_return, unsigned *incoming_return) const;
296 NodeInfo getNodeInfo() const;
297
298 std::vector<unsigned> getNodeMessageStats(bool in = false) const;
299 std::string getStorageLog() const;
300 std::string getStorageLog(const InfoHash&) const;
301 std::string getRoutingTablesLog(sa_family_t af) const;
302 std::string getSearchesLog(sa_family_t af = AF_UNSPEC) const;
303 std::string getSearchLog(const InfoHash&, sa_family_t af = AF_UNSPEC) const;
304 std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC);
305 std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC);
306
307 // securedht methods
308
309 void findCertificate(InfoHash hash, std::function<void(const std::shared_ptr<crypto::Certificate>)>);
310 void registerCertificate(std::shared_ptr<crypto::Certificate> cert);
311 void setLocalCertificateStore(CertificateStoreQuery&& query_method);
312
319 void run(in_port_t port = 4222, const crypto::Identity identity = {}, bool threaded = false, NetId network = 0) {
320 run(port, {
321 /*.dht_config = */{
322 /*.node_config = */{
323 /*.node_id = */{},
324 /*.network = */network,
325 /*.is_bootstrap = */false,
326 /*.maintain_storage*/false
327 },
328 /*.id = */identity
329 },
330 /*.threaded = */threaded,
331 /*.proxy_server = */"",
332 /*.push_node_id = */""
333 });
334 }
335 void run(in_port_t port, Config config);
336
345 void run(const SockAddr& local4, const SockAddr& local6, Config config);
346
350 void run(const char* ip4, const char* ip6, const char* service, Config config);
351
352 void setOnStatusChanged(StatusCallback&& cb) {
353 statusCb = std::move(cb);
354 }
355
361 time_point loop() {
362 std::lock_guard<std::mutex> lck(dht_mtx);
363 time_point wakeup = time_point::min();
364 try {
365 wakeup = loop_();
366 } catch (const dht::SocketException& e) {
367 startNetwork(bound4, bound6);
368 }
369 return wakeup;
370 }
371
375 void shutdown(ShutdownCallback cb);
376
382 void join();
383
384 void setProxyServer(const std::string& proxy, const std::string& pushNodeId = "");
385
391 void enableProxy(bool proxify);
392
393 /* Push notification methods */
394
398 void setPushNotificationToken(const std::string& token);
399
403 void pushNotificationReceived(const std::map<std::string, std::string>& data);
404
405 /* Proxy server mothods */
406 void forwardAllMessages(bool forward);
407
408private:
409 static constexpr std::chrono::seconds BOOTSTRAP_PERIOD {10};
410
417 void tryBootstrapContinuously();
418
419 void startNetwork(const SockAddr sin4, const SockAddr sin6);
420 time_point loop_();
421
422 NodeStatus getStatus() const {
423 return std::max(status4, status6);
424 }
425
427 std::unique_ptr<SecureDht> dht_;
428
430 std::unique_ptr<SecureDht> dht_via_proxy_;
431
433 std::atomic_bool use_proxy {false};
434
436 Config config_;
437
441 void resetDht();
445 SecureDht* activeDht() const;
446
450 struct Listener;
451 std::map<size_t, Listener> listeners_;
452 size_t listener_token_ {1};
453
454 mutable std::mutex dht_mtx {};
455 std::thread dht_thread {};
456 std::condition_variable cv {};
457
458 std::thread rcv_thread {};
459 std::mutex sock_mtx {};
460
461 struct ReceivedPacket {
462 Blob data;
463 SockAddr from;
464 time_point received;
465 };
466 std::queue<ReceivedPacket> rcv {};
467
469 std::atomic_bool bootstraping {false};
470 /* bootstrap nodes given as (host, service) pairs */
471 std::vector<std::pair<std::string,std::string>> bootstrap_nodes_all {};
472 std::vector<std::pair<std::string,std::string>> bootstrap_nodes {};
473 std::thread bootstrap_thread {};
475 std::mutex bootstrap_mtx {};
476 std::condition_variable bootstrap_cv {};
477
478 std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
479 std::queue<std::function<void(SecureDht&)>> pending_ops {};
480 std::mutex storage_mtx {};
481
482 std::atomic_bool running {false};
483 std::atomic_bool running_network {false};
484
485 NodeStatus status4 {NodeStatus::Disconnected},
486 status6 {NodeStatus::Disconnected};
487 StatusCallback statusCb {nullptr};
488
489 int s4 {-1}, s6 {-1};
490 SockAddr bound4 {};
491 SockAddr bound6 {};
492
494 std::string pushToken_;
495};
496
497}
in_port_t getBoundPort(sa_family_t f=AF_INET) const
Definition: dhtrunner.h:267
void clearBootstrap()
void bootstrap(const std::string &host, const std::string &service)
void setPushNotificationToken(const std::string &token)
void connectivityChanged()
time_point loop()
Definition: dhtrunner.h:361
void run(const SockAddr &local4, const SockAddr &local6, Config config)
void pushNotificationReceived(const std::map< std::string, std::string > &data)
void run(const char *ip4, const char *ip6, const char *service, Config config)
void bootstrap(const std::vector< NodeExport > &nodes)
const SockAddr & getBound(sa_family_t f=AF_INET) const
Definition: dhtrunner.h:259
void setLogFilter(const InfoHash &f={})
void enableProxy(bool proxify)
void shutdown(ShutdownCallback cb)
void bootstrap(const std::vector< SockAddr > &nodes, DoneCallbackSimple &&cb={})
void run(in_port_t port=4222, const crypto::Identity identity={}, bool threaded=false, NetId network=0)
Definition: dhtrunner.h:319
Definition: callbacks.h:34
void NOLOG(char const *, va_list)
Definition: log_enable.h:38
std::vector< uint8_t > Blob
Definition: utils.h:114
NodeStatus
Definition: callbacks.h:41
Serializable dht::Value filter.
Definition: value.h:740