My Project 1.7.4
C++ Distributed Hash Table
network_engine.h
1/*
2 * Copyright (C) 2014-2018 Savoir-faire Linux Inc.
3 * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20#pragma once
21
22#include "node_cache.h"
23#include "value.h"
24#include "infohash.h"
25#include "node.h"
26#include "scheduler.h"
27#include "utils.h"
28#include "rng.h"
29#include "rate_limiter.h"
30
31#include <vector>
32#include <string>
33#include <functional>
34#include <algorithm>
35#include <memory>
36#include <queue>
37
38namespace dht {
39namespace net {
40
41struct Request;
42struct Socket;
43struct TransId;
44
45#ifndef MSG_CONFIRM
46#define MSG_CONFIRM 0
47#endif
48
50public:
51 // sent to another peer (http-like).
52 static const constexpr uint16_t NON_AUTHORITATIVE_INFORMATION {203}; /* incomplete request packet. */
53 static const constexpr uint16_t UNAUTHORIZED {401}; /* wrong tokens. */
54 static const constexpr uint16_t NOT_FOUND {404}; /* storage not found */
55 // for internal use (custom).
56 static const constexpr uint16_t INVALID_TID_SIZE {421}; /* id was truncated. */
57 static const constexpr uint16_t UNKNOWN_TID {422}; /* unknown tid */
58 static const constexpr uint16_t WRONG_NODE_INFO_BUF_LEN {423}; /* node info length is wrong */
59
60 static const std::string GET_NO_INFOHASH; /* received "get" request with no infohash */
61 static const std::string LISTEN_NO_INFOHASH; /* got "listen" request without infohash */
62 static const std::string LISTEN_WRONG_TOKEN; /* wrong token in "listen" request */
63 static const std::string PUT_NO_INFOHASH; /* no infohash in "put" request */
64 static const std::string PUT_WRONG_TOKEN; /* got "put" request with wrong token */
65 static const std::string STORAGE_NOT_FOUND; /* got access request for an unknown storage */
66 static const std::string PUT_INVALID_ID; /* invalid id in "put" request */
67
68 DhtProtocolException(uint16_t code, const std::string& msg="", InfoHash failing_node_id={})
69 : DhtException(msg), msg(msg), code(code), failing_node_id(failing_node_id) {}
70
71 std::string getMsg() const { return msg; }
72 uint16_t getCode() const { return code; }
73 const InfoHash getNodeId() const { return failing_node_id; }
74
75private:
76 std::string msg;
77 uint16_t code;
78 const InfoHash failing_node_id;
79};
80
81struct ParsedMessage;
82
87 Blob ntoken {};
88 Value::Id vid {};
89 std::vector<Sp<Value>> values {};
90 std::vector<Value::Id> refreshed_values {};
91 std::vector<Value::Id> expired_values {};
92 std::vector<Sp<FieldValueIndex>> fields {};
93 std::vector<Sp<Node>> nodes4 {};
94 std::vector<Sp<Node>> nodes6 {};
95 RequestAnswer() {}
96 RequestAnswer(ParsedMessage&& msg);
97};
98
117class NetworkEngine final
118{
119private:
123 std::function<void(Sp<Request>, DhtProtocolException)> onError;
124
131 std::function<void(const Sp<Node>&, int)> onNewNode;
138 std::function<void(const InfoHash&, const SockAddr&)> onReportedAddr;
144 std::function<RequestAnswer(Sp<Node>)> onPing {};
153 std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t)> onFindNode {};
162 std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t, const Query&)> onGetValues {};
171 std::function<RequestAnswer(Sp<Node>,
172 const InfoHash&,
173 const Blob&,
174 Tid,
175 const Query&)> onListen {};
185 std::function<RequestAnswer(Sp<Node>,
186 const InfoHash&,
187 const Blob&,
188 const std::vector<Sp<Value>>&,
189 const time_point&)> onAnnounce {};
198 std::function<RequestAnswer(Sp<Node>,
199 const InfoHash&,
200 const Blob&,
201 const Value::Id&)> onRefresh {};
202
203public:
204 using RequestCb = std::function<void(const Request&, RequestAnswer&&)>;
205 using RequestExpiredCb = std::function<void(const Request&, bool)>;
206
207 NetworkEngine(Logger& log, Scheduler& scheduler, const int& s = -1, const int& s6 = -1);
208 NetworkEngine(InfoHash& myid, NetId net, const int& s, const int& s6, Logger& log, Scheduler& scheduler,
209 decltype(NetworkEngine::onError) onError,
210 decltype(NetworkEngine::onNewNode) onNewNode,
211 decltype(NetworkEngine::onReportedAddr) onReportedAddr,
212 decltype(NetworkEngine::onPing) onPing,
213 decltype(NetworkEngine::onFindNode) onFindNode,
214 decltype(NetworkEngine::onGetValues) onGetValues,
215 decltype(NetworkEngine::onListen) onListen,
216 decltype(NetworkEngine::onAnnounce) onAnnounce,
217 decltype(NetworkEngine::onRefresh) onRefresh);
218
219 virtual ~NetworkEngine();
220
221 void clear();
222
236 void tellListener(Sp<Node> n, Tid socket_id, const InfoHash& hash, want_t want, const Blob& ntoken,
237 std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6,
238 std::vector<Sp<Value>>&& values, const Query& q);
239
240 void tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values);
241 void tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values);
242
243 bool isRunning(sa_family_t af) const;
244 inline want_t want () const { return dht_socket >= 0 && dht_socket6 >= 0 ? (WANT4 | WANT6) : -1; }
245
246 void connectivityChanged(sa_family_t);
247
248 /**************
249 * Requests *
250 **************/
251
261 Sp<Request>
262 sendPing(Sp<Node> n, RequestCb&& on_done, RequestExpiredCb&& on_expired);
273 Sp<Request>
274 sendPing(const sockaddr* sa, socklen_t salen, RequestCb&& on_done, RequestExpiredCb&& on_expired) {
275 return sendPing(std::make_shared<Node>(zeroes, sa, salen),
276 std::forward<RequestCb>(on_done),
277 std::forward<RequestExpiredCb>(on_expired));
278 }
291 Sp<Request> sendFindNode(Sp<Node> n,
292 const InfoHash& hash,
293 want_t want,
294 RequestCb&& on_done,
295 RequestExpiredCb&& on_expired);
310 Sp<Request> sendGetValues(Sp<Node> n,
311 const InfoHash& hash,
312 const Query& query,
313 want_t want,
314 RequestCb&& on_done,
315 RequestExpiredCb&& on_expired);
339 Sp<Request> sendListen(Sp<Node> n,
340 const InfoHash& hash,
341 const Query& query,
342 const Blob& token,
343 Sp<Request> previous,
344 RequestCb&& on_done,
345 RequestExpiredCb&& on_expired,
346 SocketCb&& socket_cb);
360 Sp<Request> sendAnnounceValue(Sp<Node> n,
361 const InfoHash& hash,
362 const Sp<Value>& v,
363 time_point created,
364 const Blob& token,
365 RequestCb&& on_done,
366 RequestExpiredCb&& on_expired);
380 Sp<Request> sendRefreshValue(Sp<Node> n,
381 const InfoHash& hash,
382 const Value::Id& vid,
383 const Blob& token,
384 RequestCb&& on_done,
385 RequestExpiredCb&& on_expired);
386
396 void processMessage(const uint8_t *buf, size_t buflen, const SockAddr& addr);
397
398 Sp<Node> insertNode(const InfoHash& myid, const SockAddr& addr) {
399 auto n = cache.getNode(myid, addr, scheduler.time(), 0);
400 onNewNode(n, 0);
401 return n;
402 }
403
404 std::vector<unsigned> getNodeMessageStats(bool in) {
405 auto& st = in ? in_stats : out_stats;
406 std::vector<unsigned> stats {st.ping, st.find, st.get, st.listen, st.put};
407 st = {};
408 return stats;
409 }
410
411 void blacklistNode(const Sp<Node>& n);
412
413 std::vector<Sp<Node>> getCachedNodes(const InfoHash& id, sa_family_t sa_f, size_t count) {
414 return cache.getCachedNodes(id, sa_f, count);
415 }
416
417private:
418
419 struct PartialMessage;
420
421 /***************
422 * Constants *
423 ***************/
424 static constexpr size_t MAX_REQUESTS_PER_SEC {1600};
425 /* the length of a node info buffer in ipv4 format */
426 static const constexpr size_t NODE4_INFO_BUF_LEN {HASH_LEN + sizeof(in_addr) + sizeof(in_port_t)};
427 /* the length of a node info buffer in ipv6 format */
428 static const constexpr size_t NODE6_INFO_BUF_LEN {HASH_LEN + sizeof(in6_addr) + sizeof(in_port_t)};
429 /* after a UDP reply, the period during which we tell the link layer about it */
430 static constexpr std::chrono::seconds UDP_REPLY_TIME {15};
431
432 /* Max. time to receive a full fragmented packet */
433 static constexpr std::chrono::seconds RX_MAX_PACKET_TIME {10};
434 /* Max. time between packet fragments */
435 static constexpr std::chrono::seconds RX_TIMEOUT {3};
436 /* The maximum number of nodes that we snub. There is probably little
437 reason to increase this value. */
438 static constexpr unsigned BLACKLISTED_MAX {10};
439
440 static constexpr size_t MTU {1280};
441 static constexpr size_t MAX_PACKET_VALUE_SIZE {600};
442
443 static const std::string my_v;
444
445 void process(std::unique_ptr<ParsedMessage>&&, const SockAddr& from);
446
447 bool rateLimit(const SockAddr& addr);
448
449 static bool isMartian(const SockAddr& addr);
450 bool isNodeBlacklisted(const SockAddr& addr) const;
451
452 void requestStep(Sp<Request> req);
453
458 void sendRequest(const Sp<Request>& request);
459
460 struct MessageStats {
461 unsigned ping {0};
462 unsigned find {0};
463 unsigned get {0};
464 unsigned put {0};
465 unsigned listen {0};
466 unsigned refresh {0};
467 };
468
469
470 // basic wrapper for socket sendto function
471 int send(const char *buf, size_t len, int flags, const SockAddr& addr);
472
473 void sendValueParts(const TransId& tid, const std::vector<Blob>& svals, const SockAddr& addr);
474 std::vector<Blob> packValueHeader(msgpack::sbuffer&, const std::vector<Sp<Value>>&);
475 void maintainRxBuffer(Tid tid);
476
477 /*************
478 * Answers *
479 *************/
480 /* answer to a ping request */
481 void sendPong(const SockAddr& addr, Tid tid);
482 /* answer to findnodes/getvalues request */
483 void sendNodesValues(const SockAddr& addr,
484 Tid tid,
485 const Blob& nodes,
486 const Blob& nodes6,
487 const std::vector<Sp<Value>>& st,
488 const Query& query,
489 const Blob& token);
490 Blob bufferNodes(sa_family_t af, const InfoHash& id, std::vector<Sp<Node>>& nodes);
491
492 std::pair<Blob, Blob> bufferNodes(sa_family_t af,
493 const InfoHash& id,
494 want_t want,
495 std::vector<Sp<Node>>& nodes,
496 std::vector<Sp<Node>>& nodes6);
497 /* answer to a listen request */
498 void sendListenConfirmation(const SockAddr& addr, Tid tid);
499 /* answer to put request */
500 void sendValueAnnounced(const SockAddr& addr, Tid, Value::Id);
501 /* answer in case of error */
502 void sendError(const SockAddr& addr,
503 Tid tid,
504 uint16_t code,
505 const std::string& message,
506 bool include_id=false);
507
508 void deserializeNodes(ParsedMessage& msg, const SockAddr& from);
509
510 /* DHT info */
511 const InfoHash& myid;
512 const NetId network {0};
513 const int& dht_socket;
514 const int& dht_socket6;
515 const Logger& DHT_LOG;
516
517 NodeCache cache {};
518
519 // global limiting should be triggered by at least 8 different IPs
520 using IpLimiter = RateLimiter<MAX_REQUESTS_PER_SEC/8>;
521 using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>;
522 IpLimiterMap address_rate_limiter {};
523 RateLimiter<MAX_REQUESTS_PER_SEC> rate_limiter {};
524 size_t limiter_maintenance {0};
525
526 // requests handling
527 std::map<Tid, Sp<Request>> requests {};
528 std::map<Tid, PartialMessage> partial_messages;
529
530 MessageStats in_stats {}, out_stats {};
531 std::set<SockAddr> blacklist {};
532
533 Scheduler& scheduler;
534};
535
536} /* namespace net */
537} /* namespace dht */
Job scheduler.
Definition: scheduler.h:37
const time_point & time() const
Definition: scheduler.h:116
An abstraction of communication protocol on the network.
Sp< Request > sendListen(Sp< Node > n, const InfoHash &hash, const Query &query, const Blob &token, Sp< Request > previous, RequestCb &&on_done, RequestExpiredCb &&on_expired, SocketCb &&socket_cb)
Sp< Request > sendPing(const sockaddr *sa, socklen_t salen, RequestCb &&on_done, RequestExpiredCb &&on_expired)
void tellListener(Sp< Node > n, Tid socket_id, const InfoHash &hash, want_t want, const Blob &ntoken, std::vector< Sp< Node > > &&nodes, std::vector< Sp< Node > > &&nodes6, std::vector< Sp< Value > > &&values, const Query &q)
void processMessage(const uint8_t *buf, size_t buflen, const SockAddr &addr)
Sp< Request > sendAnnounceValue(Sp< Node > n, const InfoHash &hash, const Sp< Value > &v, time_point created, const Blob &token, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendFindNode(Sp< Node > n, const InfoHash &hash, want_t want, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendGetValues(Sp< Node > n, const InfoHash &hash, const Query &query, want_t want, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendRefreshValue(Sp< Node > n, const InfoHash &hash, const Value::Id &vid, const Blob &token, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendPing(Sp< Node > n, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Definition: callbacks.h:34
std::vector< uint8_t > Blob
Definition: utils.h:114
Describes a query destined to another peer.
Definition: value.h:864