My Project 1.7.4
C++ Distributed Hash Table
dht_proxy_server.h
1/*
2 * Copyright (C) 2017-2018 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20#if OPENDHT_PROXY_SERVER
21
22#pragma once
23
24#include "callbacks.h"
25#include "def.h"
26#include "infohash.h"
27#include "proxy.h"
28#include "scheduler.h"
29#include "sockaddr.h"
30#include "value.h"
31
32#include <thread>
33#include <memory>
34#include <mutex>
35#include <restbed>
36
37namespace Json {
38 class Value;
39}
40
41namespace dht {
42
43class DhtRunner;
44
48class OPENDHT_PUBLIC DhtProxyServer
49{
50public:
59 DhtProxyServer(std::shared_ptr<DhtRunner> dht, in_port_t port = 8000, const std::string& pushServer = "");
60 virtual ~DhtProxyServer();
61
62 DhtProxyServer(const DhtProxyServer& other) = delete;
63 DhtProxyServer(DhtProxyServer&& other) = delete;
64 DhtProxyServer& operator=(const DhtProxyServer& other) = delete;
65 DhtProxyServer& operator=(DhtProxyServer&& other) = delete;
66
67 struct ServerStats {
69 size_t listenCount;
71 size_t putCount;
73 size_t pushListenersCount;
75 double requestRate;
76
77 std::string toString() const {
78 std::ostringstream ss;
79 ss << "Listens: " << listenCount << " Puts: " << putCount << " PushListeners: " << pushListenersCount << std::endl;
80 ss << "Requests: " << requestRate << " per second." << std::endl;
81 return ss.str();
82 }
83 };
84 ServerStats getStats() const;
85
86 std::shared_ptr<DhtRunner> getNode() const { return dht_; }
87
91 void stop();
92
93private:
101 void getNodeInfo(const std::shared_ptr<restbed::Session>& session) const;
102
113 void get(const std::shared_ptr<restbed::Session>& session) const;
114
125 void listen(const std::shared_ptr<restbed::Session>& session);
126
136 void put(const std::shared_ptr<restbed::Session>& session);
137
138 void cancelPut(const InfoHash& key, Value::Id vid);
139
140#if OPENDHT_PROXY_SERVER_IDENTITY
150 void putSigned(const std::shared_ptr<restbed::Session>& session) const;
151
161 void putEncrypted(const std::shared_ptr<restbed::Session>& session) const;
162#endif // OPENDHT_PROXY_SERVER_IDENTITY
163
174 void getFiltered(const std::shared_ptr<restbed::Session>& session) const;
175
183 void handleOptionsMethod(const std::shared_ptr<restbed::Session>& session) const;
184
189 void removeClosedListeners(bool testSession = true);
190
191#if OPENDHT_PUSH_NOTIFICATIONS
201 void subscribe(const std::shared_ptr<restbed::Session>& session);
209 void unsubscribe(const std::shared_ptr<restbed::Session>& session);
215 void sendPushNotification(const std::string& key, const Json::Value& json, bool isAndroid) const;
216
223 void cancelPushListen(const std::string& pushToken, const InfoHash& key, const std::string& clientId);
224
225
226#endif //OPENDHT_PUSH_NOTIFICATIONS
227
228 using clock = std::chrono::steady_clock;
229 using time_point = clock::time_point;
230
231 std::thread server_thread {};
232 std::unique_ptr<restbed::Service> service_;
233 std::shared_ptr<DhtRunner> dht_;
234
235 std::mutex schedulerLock_;
236 std::condition_variable schedulerCv_;
237 Scheduler scheduler_;
238 std::thread schedulerThread_;
239
240 Sp<Scheduler::Job> printStatsJob_;
241 mutable std::mutex statsMutex_;
242 mutable NodeInfo nodeInfo_ {};
243
244 // Handle client quit for listen.
245 // NOTE: can be simplified when we will supports restbed 5.0
246 std::thread listenThread_;
247 struct SessionToHashToken {
248 std::shared_ptr<restbed::Session> session;
249 InfoHash hash;
250 std::future<size_t> token;
251 };
252 std::vector<SessionToHashToken> currentListeners_;
253 std::mutex lockListener_;
254 std::atomic_bool stopListeners {false};
255
256 struct PermanentPut;
257 struct SearchPuts;
258 std::map<InfoHash, SearchPuts> puts_;
259
260 mutable std::atomic<size_t> requestNum_ {0};
261 mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
262
263 const std::string pushServer_;
264
265#if OPENDHT_PUSH_NOTIFICATIONS
266 struct Listener;
267 struct PushListener;
268 std::mutex lockPushListeners_;
269 std::map<std::string, PushListener> pushListeners_;
270 proxy::ListenToken tokenPushNotif_ {0};
271#endif //OPENDHT_PUSH_NOTIFICATIONS
272};
273
274}
275
276#endif //OPENDHT_PROXY_SERVER
OPENDHT_PUBLIC Blob hash(const Blob &data, size_t hash_length=512/8)
Definition: callbacks.h:34