My Project 1.7.4
C++ Distributed Hash Table
dht_proxy_client.h
1/*
2 * Copyright (C) 2016-2018 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20#if OPENDHT_PROXY_CLIENT
21
22#pragma once
23
24#include <functional>
25#include <thread>
26#include <mutex>
27
28#include "callbacks.h"
29#include "def.h"
30#include "dht_interface.h"
31#include "scheduler.h"
32#include "proxy.h"
33
34namespace restbed {
35 class Request;
36}
37
38namespace Json {
39 class Value;
40}
41
42namespace dht {
43
44class SearchCache;
45
46class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
47public:
48
49 DhtProxyClient();
50
51 explicit DhtProxyClient(std::function<void()> loopSignal, const std::string& serverHost, const std::string& pushClientId = "");
52
53 virtual void setPushNotificationToken(const std::string& token) {
54#if OPENDHT_PUSH_NOTIFICATIONS
55 deviceKey_ = token;
56#endif
57 }
58
59 virtual ~DhtProxyClient();
60
64 inline const InfoHash& getNodeId() const { return myid; }
65
69 NodeStatus getStatus(sa_family_t af) const;
70 NodeStatus getStatus() const {
71 return std::max(getStatus(AF_INET), getStatus(AF_INET6));
72 }
73
77 void shutdown(ShutdownCallback cb);
78
85 bool isRunning(sa_family_t af = 0) const;
86
97 virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {});
98 virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) {
99 get(key, cb, bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
100 }
101 virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) {
102 get(key, bindGetCb(cb), donecb, std::forward<Value::Filter>(f), std::forward<Where>(w));
103 }
104 virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) {
105 get(key, bindGetCb(cb), bindDoneCb(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
106 }
107
115 void put(const InfoHash& key,
116 Sp<Value>,
117 DoneCallback cb=nullptr,
118 time_point created=time_point::max(),
119 bool permanent = false);
120 void put(const InfoHash& key,
121 const Sp<Value>& v,
122 DoneCallbackSimple cb,
123 time_point created=time_point::max(),
124 bool permanent = false)
125 {
126 put(key, v, bindDoneCb(cb), created, permanent);
127 }
128
129 void put(const InfoHash& key,
130 Value&& v,
131 DoneCallback cb=nullptr,
132 time_point created=time_point::max(),
133 bool permanent = false)
134 {
135 put(key, std::make_shared<Value>(std::move(v)), cb, created, permanent);
136 }
137 void put(const InfoHash& key,
138 Value&& v,
139 DoneCallbackSimple cb,
140 time_point created=time_point::max(),
141 bool permanent = false)
142 {
143 put(key, std::forward<Value>(v), bindDoneCb(cb), created, permanent);
144 }
145
150 NodeStats getNodesStats(sa_family_t af) const;
151
156 std::vector<SockAddr> getPublicAddress(sa_family_t family = 0);
157
165 virtual size_t listen(const InfoHash&, ValueCallback, Value::Filter={}, Where={});
166
167 virtual size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f={}, Where w={}) {
168 return listen(key, [cb](const std::vector<Sp<Value>>& vals, bool expired){
169 if (not expired)
170 return cb(vals);
171 return true;
172 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
173 }
174 virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={}) {
175 return listen(key, bindGetCb(cb), std::forward<Value::Filter>(f), std::forward<Where>(w));
176 }
177 virtual bool cancelListen(const InfoHash& key, size_t token);
178
183 void pushNotificationReceived(const std::map<std::string, std::string>& notification);
184
185 time_point periodic(const uint8_t*, size_t, const SockAddr&);
186 time_point periodic(const uint8_t *buf, size_t buflen, const sockaddr* from, socklen_t fromlen) {
187 return periodic(buf, buflen, SockAddr(from, fromlen));
188 }
189
190
201 virtual void query(const InfoHash& /*key*/, QueryCallback /*cb*/, DoneCallback /*done_cb*/ = {}, Query&& /*q*/ = {}) { }
202 virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) {
203 query(key, cb, bindDoneCb(done_cb), std::forward<Query>(q));
204 }
205
209 std::vector<Sp<Value>> getPut(const InfoHash&);
210
214 Sp<Value> getPut(const InfoHash&, const Value::Id&);
215
220 bool cancelPut(const InfoHash&, const Value::Id&);
221
222 void pingNode(const sockaddr*, socklen_t, DoneCallbackSimple&& /*cb*/={}) { }
223
224 virtual void registerType(const ValueType& type) {
225 types.registerType(type);
226 }
227 const ValueType& getType(ValueType::Id type_id) const {
228 return types.getType(type_id);
229 }
230
231 std::vector<Sp<Value>> getLocal(const InfoHash& k, Value::Filter filter) const;
232 Sp<Value> getLocalById(const InfoHash& k, Value::Id id) const;
233
238 void insertNode(const InfoHash&, const SockAddr&) { }
239 void insertNode(const InfoHash&, const sockaddr*, socklen_t) { }
240 void insertNode(const NodeExport&) { }
241 std::pair<size_t, size_t> getStoreSize() const { return {}; }
242 std::vector<NodeExport> exportNodes() { return {}; }
243 std::vector<ValuesExport> exportValues() const { return {}; }
244 void importValues(const std::vector<ValuesExport>&) {}
245 std::string getStorageLog() const { return {}; }
246 std::string getStorageLog(const InfoHash&) const { return {}; }
247 std::string getRoutingTablesLog(sa_family_t) const { return {}; }
248 std::string getSearchesLog(sa_family_t) const { return {}; }
249 std::string getSearchLog(const InfoHash&, sa_family_t) const { return {}; }
250 void dumpTables() const {}
251 std::vector<unsigned> getNodeMessageStats(bool) { return {}; }
252 void setStorageLimit(size_t) {}
253 void connectivityChanged(sa_family_t) {
254 restartListeners();
255 }
256 void connectivityChanged() {
257 getProxyInfos();
258 restartListeners();
259 loopSignal_();
260 }
261
262private:
266 void startProxy();
267
272 struct InfoState;
273 void getProxyInfos();
274 void onProxyInfos(const Json::Value& val, sa_family_t family);
275 SockAddr parsePublicAddress(const Json::Value& val);
276
277 void opFailed();
278
279 size_t doListen(const InfoHash& key, ValueCallback, Value::Filter);
280 bool doCancelListen(const InfoHash& key, size_t token);
281
282 struct ListenState;
283 void sendListen(const std::shared_ptr<restbed::Request>& request, const ValueCallback&, const Value::Filter& filter, const Sp<ListenState>& state);
284 void sendSubscribe(const std::shared_ptr<restbed::Request>& request, const Sp<proxy::ListenToken>&, const Sp<ListenState>& state);
285
286 void doPut(const InfoHash&, Sp<Value>, DoneCallback, time_point created, bool permanent);
287
291 void getConnectivityStatus();
295 void cancelAllListeners();
299 void cancelAllOperations();
300
301 std::string serverHost_;
302 std::string pushClientId_;
303
304 mutable std::mutex lockCurrentProxyInfos_;
305 NodeStatus statusIpv4_ {NodeStatus::Disconnected};
306 NodeStatus statusIpv6_ {NodeStatus::Disconnected};
307 NodeStats stats4_ {};
308 NodeStats stats6_ {};
309 SockAddr publicAddressV4_;
310 SockAddr publicAddressV6_;
311
312 InfoHash myid {};
313
314 // registred types
315 TypeStore types;
316
320 struct Listener;
321 struct ProxySearch;
322
323 size_t listenerToken_ {0};
324 std::map<InfoHash, ProxySearch> searches_;
325 mutable std::mutex searchLock_;
326
330 struct Operation
331 {
332 std::shared_ptr<restbed::Request> req;
333 std::thread thread;
334 std::shared_ptr<std::atomic_bool> finished;
335 };
336 std::vector<Operation> operations_;
337 std::mutex lockOperations_;
341 std::vector<std::function<void()>> callbacks_;
342 std::mutex lockCallbacks;
343
344 Sp<InfoState> infoState_;
345 std::thread statusThread_;
346 mutable std::mutex statusLock_;
347
348 Scheduler scheduler;
352 void confirmProxy();
353 Sp<Scheduler::Job> nextProxyConfirmation {};
357 void restartListeners();
358
363 void resubscribe(const InfoHash& key, Listener& listener);
364
369 std::string deviceKey_ {};
370
371 const std::function<void()> loopSignal_;
372
373#if OPENDHT_PUSH_NOTIFICATIONS
374 void fillBodyToGetToken(std::shared_ptr<restbed::Request> request, unsigned token = 0);
375 void getPushRequest(Json::Value&) const;
376#endif // OPENDHT_PUSH_NOTIFICATIONS
377
378 bool isDestroying_ {false};
379};
380
381}
382
383#endif // OPENDHT_PROXY_CLIENT
Definition: callbacks.h:34
NodeStatus
Definition: callbacks.h:41