xukc commited on
Commit
99dc157
·
1 Parent(s): c8aad12

[fix]udp runtime error

Browse files
include/udp_client.h CHANGED
@@ -3,7 +3,6 @@
3
 
4
  #include "hv/hloop.h"
5
  #include "hv/UdpClient.h"
6
- #include "timer_thread.h"
7
  #include "bolt/crypt.h"
8
  #include "udp_client_map.h"
9
  #include <string>
@@ -26,23 +25,24 @@ class UdpClientProxy
26
  {
27
  public:
28
  UdpClientProxy(struct sockaddr_in t_addr, struct sockaddr_in u_addr, uint32_t session_id) : t_addr(t_addr), u_addr(u_addr), session_id(session_id) {}
29
- hv::UdpClient *getClient() { return &cli; }
30
- bool init();
31
  void close();
32
  bool isDns() {
33
  return ntohs(t_addr.sin_port) == 53;
34
  };
35
 
36
  private:
37
- void onRecv(const hv::SocketChannelPtr &channel, hv::Buffer *buf);
38
- void onWrited(const hv::SocketChannelPtr &channel, hv::Buffer *buf);
 
39
 
40
  public:
41
  struct sockaddr_in t_addr, u_addr;
42
 
43
  private:
44
- hv::UdpClient cli;
45
- TimerThread timer;
46
  uint32_t session_id;
47
  long long ts;
48
  };
 
3
 
4
  #include "hv/hloop.h"
5
  #include "hv/UdpClient.h"
 
6
  #include "bolt/crypt.h"
7
  #include "udp_client_map.h"
8
  #include <string>
 
25
  {
26
  public:
27
  UdpClientProxy(struct sockaddr_in t_addr, struct sockaddr_in u_addr, uint32_t session_id) : t_addr(t_addr), u_addr(u_addr), session_id(session_id) {}
28
+ hio_t *getClientIO() { return cli_io; }
29
+ bool init(hloop_t* loop);
30
  void close();
31
  bool isDns() {
32
  return ntohs(t_addr.sin_port) == 53;
33
  };
34
 
35
  private:
36
+ void onRecv(void* buf, int readbytes);
37
+ void onWrited(const void* buf, int writebytes);
38
+ void onClosed();
39
 
40
  public:
41
  struct sockaddr_in t_addr, u_addr;
42
 
43
  private:
44
+ hio_t *cli_io;
45
+ htimer_t* timer;
46
  uint32_t session_id;
47
  long long ts;
48
  };
main.cpp CHANGED
@@ -75,6 +75,7 @@ static HTHREAD_RETTYPE udpbind_thread(void *userdata)
75
  }
76
  int bindfd = hio_fd(bindio);
77
  spdlog::info("ProxyServer start: udp->{}", port);
 
78
  hio_setcb_read(bindio, udp_on_recvfrom);
79
  hio_setcb_write(bindio, udp_on_writed);
80
  hio_setcb_close(bindio, udp_on_close);
@@ -113,7 +114,6 @@ int main(int argc, char **argv)
113
  if (type == 1)
114
  {
115
  // Udp
116
- spdlog::info("ProxyServer start: udp->{}", port);
117
  udpbind_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
118
  udpbind_thread(udpbind_loop);
119
  }
 
75
  }
76
  int bindfd = hio_fd(bindio);
77
  spdlog::info("ProxyServer start: udp->{}", port);
78
+ hevent_set_userdata(bindio, loop); //必须
79
  hio_setcb_read(bindio, udp_on_recvfrom);
80
  hio_setcb_write(bindio, udp_on_writed);
81
  hio_setcb_close(bindio, udp_on_close);
 
114
  if (type == 1)
115
  {
116
  // Udp
 
117
  udpbind_loop = hloop_new(HLOOP_FLAG_AUTO_FREE);
118
  udpbind_thread(udpbind_loop);
119
  }
src/tcp_client.cpp CHANGED
@@ -92,7 +92,7 @@ bool TcpClientBolt::handShake(void *buf, int readbytes)
92
 
93
  if (!this->connect((struct sockaddr *)&t_addr))
94
  {
95
- spdlog::info("BOLT_CHANNEL_CMD_TCP_HANDSHAKE_REQUEST: t_addr={}:{}, u_addr={}:{} ==> connect fail", inet_ntoa(t_addr.sin_addr), htons(t_addr.sin_port), inet_ntoa(u_addr.sin_addr), htons(u_addr.sin_port));
96
  char buff[2];
97
  buff[0] = BOLT_CHANNEL_CMD_TCP_HANDSHAKE_RESPONSE;
98
  buff[1] = BOLT_TCP_HANDSHAKE_FAIL_TIMEOUT;
@@ -124,7 +124,7 @@ bool TcpClientBolt::handShake(void *buf, int readbytes)
124
  config.ept_type = config.encrypt ? CRYPT_TYPE::XOR : CRYPT_TYPE::NONE;
125
  config.ept_key = config.encrypt ? generateRandomKey() : 0;
126
 
127
- spdlog::info("BOLT_CHANNEL_CMD_BIND_REQUEST: requestId:{}, session_id:{}, encrypt:{}", request_id, session_id, config.encrypt);
128
 
129
  GENERATE_DECRYPT_KEY(extend_response, extend_response_len, config.ept_type, config.ept_key)
130
 
@@ -145,7 +145,7 @@ bool TcpClientBolt::handShake(void *buf, int readbytes)
145
  TcpClientMap<uint32_t, tcpBoltConfig>::getInstance().remove(session_id);
146
  this->close();
147
  hio_close_async(io);
148
- spdlog::info("BOLT_CHANNEL_CMD_UNBIND_REQUEST: t_addr={}:{}, u_addr={}:{}", inet_ntoa(t_addr.sin_addr), htons(t_addr.sin_port), inet_ntoa(u_addr.sin_addr), htons(u_addr.sin_port));
149
  }
150
  }
151
  }
@@ -179,14 +179,14 @@ bool TcpClientBolt::connect(struct sockaddr *addr)
179
 
180
  cli.onMessage = [this](const hv::SocketChannelPtr &channel, hv::Buffer *buf)
181
  {
182
- spdlog::info("<<< len:{} : {}}\n", (int)buf->size(), (char *)buf->data());
183
 
184
  onRecv(channel, buf);
185
  };
186
 
187
  cli.onWriteComplete = [this](const hv::SocketChannelPtr &channel, hv::Buffer *buf)
188
  {
189
- spdlog::info(">>> len:{} : {}}\n", (int)buf->size(), (char *)buf->data());
190
 
191
  onWrited(channel, buf);
192
  };
@@ -225,5 +225,6 @@ int TcpClientBolt::send(char *data, int size)
225
 
226
  void TcpClientBolt::close()
227
  {
228
- cli.closesocket();
 
229
  }
 
92
 
93
  if (!this->connect((struct sockaddr *)&t_addr))
94
  {
95
+ spdlog::info("[TCP]BOLT_CHANNEL_CMD_TCP_HANDSHAKE_REQUEST: t_addr={}:{}, u_addr={}:{} ==> connect fail", inet_ntoa(t_addr.sin_addr), htons(t_addr.sin_port), inet_ntoa(u_addr.sin_addr), htons(u_addr.sin_port));
96
  char buff[2];
97
  buff[0] = BOLT_CHANNEL_CMD_TCP_HANDSHAKE_RESPONSE;
98
  buff[1] = BOLT_TCP_HANDSHAKE_FAIL_TIMEOUT;
 
124
  config.ept_type = config.encrypt ? CRYPT_TYPE::XOR : CRYPT_TYPE::NONE;
125
  config.ept_key = config.encrypt ? generateRandomKey() : 0;
126
 
127
+ spdlog::info("[TCP]BOLT_CHANNEL_CMD_BIND_REQUEST: requestId:{}, session_id:{}, encrypt:{}", request_id, session_id, config.encrypt);
128
 
129
  GENERATE_DECRYPT_KEY(extend_response, extend_response_len, config.ept_type, config.ept_key)
130
 
 
145
  TcpClientMap<uint32_t, tcpBoltConfig>::getInstance().remove(session_id);
146
  this->close();
147
  hio_close_async(io);
148
+ spdlog::info("[TCP]BOLT_CHANNEL_CMD_UNBIND_REQUEST: requestId:{}, session_id:{} t_addr={}:{}, u_addr={}:{}", config->request_id, config->session_id, inet_ntoa(t_addr.sin_addr), htons(t_addr.sin_port), inet_ntoa(u_addr.sin_addr), htons(u_addr.sin_port));
149
  }
150
  }
151
  }
 
179
 
180
  cli.onMessage = [this](const hv::SocketChannelPtr &channel, hv::Buffer *buf)
181
  {
182
+ spdlog::info("<<< len:{} : {}", (int)buf->size(), (char *)buf->data());
183
 
184
  onRecv(channel, buf);
185
  };
186
 
187
  cli.onWriteComplete = [this](const hv::SocketChannelPtr &channel, hv::Buffer *buf)
188
  {
189
+ spdlog::info(">>> len:{} : {}", (int)buf->size(), (char *)buf->data());
190
 
191
  onWrited(channel, buf);
192
  };
 
225
 
226
  void TcpClientBolt::close()
227
  {
228
+ spdlog::info("TcpClientBolt::close()\n");
229
+ cli.stop();
230
  }
src/tcp_inbound.cpp CHANGED
@@ -11,8 +11,8 @@ static void tcp_on_close(hio_t* io) {
11
 
12
  auto cli = TcpConnMap<hio_t*, TcpClientBolt>::getInstance().get(io);
13
  if(cli) {
14
- TcpConnMap<hio_t*, TcpClientBolt>::getInstance().remove(io);
15
  cli->close();
 
16
  }
17
  }
18
 
 
11
 
12
  auto cli = TcpConnMap<hio_t*, TcpClientBolt>::getInstance().get(io);
13
  if(cli) {
 
14
  cli->close();
15
+ TcpConnMap<hio_t*, TcpClientBolt>::getInstance().remove(io);
16
  }
17
  }
18
 
src/udp_client.cpp CHANGED
@@ -21,13 +21,14 @@ bool UdpServerBoltProxy::analyzeData(struct sockaddr_in t_addr, struct sockaddr_
21
  auto client = _map.get(key);
22
  if (!client)
23
  {
 
24
  std::unique_ptr<UdpClientProxy> new_client = std::make_unique<UdpClientProxy>(t_addr, u_addr, session_id);
25
- if (!new_client->init())
26
  {
27
  return false;
28
  }
29
- _map.add(key, new_client);
30
  client = new_client.get();
 
31
  }
32
 
33
  // if (getConfig().ept_type == CRYPT_TYPE::XOR)
@@ -35,7 +36,7 @@ bool UdpServerBoltProxy::analyzeData(struct sockaddr_in t_addr, struct sockaddr_
35
  // xor_::crypt((char *)dest, dest_len, getConfig().ept_key);
36
  // }
37
 
38
- client->getClient()->sendto(dest, dest_len);
39
 
40
  return true;
41
  }
@@ -55,13 +56,7 @@ int UdpServerBoltProxy::sendData(struct sockaddr_in t_addr, struct sockaddr_in u
55
  void UdpServerBoltProxy::closeClient(struct sockaddr_in &t_addr, struct sockaddr_in &u_addr)
56
  {
57
  auto key = get_key(t_addr, u_addr);
58
- auto client = _map.get(key);
59
- if (client)
60
- {
61
- client->close();
62
- _map.remove(key);
63
- }
64
-
65
  }
66
 
67
  void UdpServerBoltProxy::recycle()
@@ -73,70 +68,106 @@ void UdpServerBoltProxy::recycle()
73
  _map.clear();
74
  }
75
 
76
- bool UdpClientProxy::init()
77
  {
78
 
79
- int sockfd = cli.createsocket(t_addr.sin_port, inet_ntoa(t_addr.sin_addr));
80
- if (sockfd < 0)
 
81
  {
82
  return false;
83
  }
84
- cli.onWriteComplete = [this](const hv::SocketChannelPtr &channel, hv::Buffer *buf)
85
- {
86
- onWrited(channel, buf);
87
- };
88
- cli.onMessage = [this](const hv::SocketChannelPtr &channel, hv::Buffer *buf)
89
- {
90
- onRecv(channel, buf);
91
- };
92
- hio_setcb_close(cli.channel.get()->io(), [](hio_t *io) {
93
-
94
  });
95
- cli.start();
96
-
97
- timer.setInterval(TIMEOUT_TIMEUNIT, [this](hv::TimerID timerID) {
98
- long timeout = isDns() ? DNS_TIMEOUT : TIMEOUT_TIMEUNIT;
99
- long time = currentTimeMillis() - ts;
100
- if (time > 0 && time < timeout)
 
 
 
 
101
  {
102
- long remaining = (timeout-time);
103
- if(remaining < TIMEOUT_TIMEUNIT) {
104
- } else {
 
105
 
106
- }
107
- } else {
108
- auto serverProxy = UdpConnMap<uint32_t, UdpServerBoltProxy>::getInstance().get(session_id);
109
- if (!serverProxy)
 
 
 
110
  {
111
- close();
112
- return;
 
 
 
 
 
 
 
113
  }
114
- serverProxy->closeClient(t_addr, u_addr);
115
- }
116
-
117
- });
 
 
 
 
118
  return true;
119
  }
120
 
121
  void UdpClientProxy::close()
122
  {
123
- cli.closesocket();
124
- timer.stop();
 
 
 
 
 
 
 
125
  }
126
 
127
- void UdpClientProxy::onRecv(const hv::SocketChannelPtr &channel, hv::Buffer *buf)
128
  {
129
- spdlog::info("<<< size:{} : data: {}", (int)buf->size(), (char *)buf->data());
130
  auto serverProxy = UdpConnMap<uint32_t, UdpServerBoltProxy>::getInstance().get(session_id);
131
  if (!serverProxy)
132
  {
133
  return;
134
  }
135
  ts = currentTimeMillis();
136
- serverProxy->sendData(t_addr, u_addr, buf->data(), buf->size());
137
  }
138
 
139
- void UdpClientProxy::onWrited(const hv::SocketChannelPtr &channel, hv::Buffer *buf)
140
  {
141
- spdlog::info(">>> size:{} : data: {}", (int)buf->size(), (char *)buf->data());
 
 
 
 
 
 
 
 
 
 
 
 
142
  }
 
21
  auto client = _map.get(key);
22
  if (!client)
23
  {
24
+ hloop_t *loop = (hloop_t *)hevent_userdata(io);
25
  std::unique_ptr<UdpClientProxy> new_client = std::make_unique<UdpClientProxy>(t_addr, u_addr, session_id);
26
+ if (!new_client->init(loop))
27
  {
28
  return false;
29
  }
 
30
  client = new_client.get();
31
+ _map.add(key, new_client);
32
  }
33
 
34
  // if (getConfig().ept_type == CRYPT_TYPE::XOR)
 
36
  // xor_::crypt((char *)dest, dest_len, getConfig().ept_key);
37
  // }
38
 
39
+ hio_write(client->getClientIO(), dest, dest_len);
40
 
41
  return true;
42
  }
 
56
  void UdpServerBoltProxy::closeClient(struct sockaddr_in &t_addr, struct sockaddr_in &u_addr)
57
  {
58
  auto key = get_key(t_addr, u_addr);
59
+ _map.remove(key);
 
 
 
 
 
 
60
  }
61
 
62
  void UdpServerBoltProxy::recycle()
 
68
  _map.clear();
69
  }
70
 
71
+ bool UdpClientProxy::init(hloop_t *loop)
72
  {
73
 
74
+ spdlog::info("UdpClientProxy::init {}:{}<==>{}:{}", inet_ntoa(u_addr.sin_addr), ntohs(u_addr.sin_port), inet_ntoa(t_addr.sin_addr), ntohs(t_addr.sin_port));
75
+ cli_io = hloop_create_udp_client(loop, inet_ntoa(t_addr.sin_addr), ntohs(t_addr.sin_port));
76
+ if (cli_io == nullptr)
77
  {
78
  return false;
79
  }
80
+ hevent_set_userdata(cli_io, this);
81
+ hio_setcb_read(cli_io, [](hio_t *io, void *buf, int readbytes) {
82
+ UdpClientProxy *cli = (UdpClientProxy *)hevent_userdata(io);
83
+ if (cli)
84
+ {
85
+ cli->onRecv(buf, readbytes);
86
+ }
 
 
 
87
  });
88
+ hio_setcb_write(cli_io, [](hio_t *io, const void *buf, int writebytes) {
89
+ UdpClientProxy* cli = (UdpClientProxy*)hevent_userdata(io);
90
+ if (cli)
91
+ {
92
+ cli->onWrited(buf, writebytes);
93
+ }
94
+ });
95
+ hio_setcb_close(cli_io, [](hio_t *io) {
96
+ UdpClientProxy* cli = (UdpClientProxy*)hevent_userdata(io);
97
+ if (cli)
98
  {
99
+ cli->onClosed();
100
+ }
101
+ });
102
+ hio_read(cli_io);
103
 
104
+ timer = htimer_add(loop, [](htimer_t *timer) {
105
+ UdpClientProxy* cli = (UdpClientProxy*)hevent_userdata(timer);
106
+ if (cli)
107
+ {
108
+ long timeout = cli->isDns() ? DNS_TIMEOUT : TIMEOUT_TIMEUNIT;
109
+ long time = currentTimeMillis() - cli->ts;
110
+ if (time > 0 && time < timeout)
111
  {
112
+ long remaining = (timeout - time);
113
+ if (remaining < TIMEOUT_TIMEUNIT)
114
+ {
115
+ htimer_reset(timer, remaining);
116
+ }
117
+ else
118
+ {
119
+ htimer_reset(timer, TIMEOUT_TIMEUNIT);
120
+ }
121
  }
122
+ else
123
+ {
124
+ cli->close();
125
+ }
126
+ }
127
+ }, TIMEOUT_TIMEUNIT);
128
+ hevent_set_userdata(timer, this);
129
+
130
  return true;
131
  }
132
 
133
  void UdpClientProxy::close()
134
  {
135
+ if (timer)
136
+ {
137
+ htimer_del(timer);
138
+ timer = nullptr;
139
+ }
140
+ if (cli_io) {
141
+ hio_close(cli_io);
142
+ cli_io = nullptr;
143
+ }
144
  }
145
 
146
+ void UdpClientProxy::onRecv(void *buf, int readbytes)
147
  {
148
+ spdlog::info("UdpClientProxy::onRecv {}:{}<==>{}:{} <<< size:{} : data: {}", inet_ntoa(u_addr.sin_addr), ntohs(u_addr.sin_port), inet_ntoa(t_addr.sin_addr), ntohs(t_addr.sin_port), readbytes, std::string((char *)buf, readbytes).c_str());
149
  auto serverProxy = UdpConnMap<uint32_t, UdpServerBoltProxy>::getInstance().get(session_id);
150
  if (!serverProxy)
151
  {
152
  return;
153
  }
154
  ts = currentTimeMillis();
155
+ serverProxy->sendData(t_addr, u_addr, buf, readbytes);
156
  }
157
 
158
+ void UdpClientProxy::onWrited(const void *buf, int writebytes)
159
  {
160
+ spdlog::info("UdpClientProxy::onWrited {}:{}<==>{}:{} >>> size:{} : data: {}", inet_ntoa(u_addr.sin_addr), ntohs(u_addr.sin_port), inet_ntoa(t_addr.sin_addr), ntohs(t_addr.sin_port), writebytes, std::string((char *)buf, writebytes).c_str());
161
+ }
162
+
163
+ void UdpClientProxy::onClosed()
164
+ {
165
+ spdlog::info("UdpClientProxy::onClosed {}:{}<==>{}:{}", inet_ntoa(u_addr.sin_addr), ntohs(u_addr.sin_port), inet_ntoa(t_addr.sin_addr), ntohs(t_addr.sin_port));
166
+ auto key = get_key(t_addr, u_addr);
167
+ auto serverProxy = UdpConnMap<uint32_t, UdpServerBoltProxy>::getInstance().get(session_id);
168
+ if (!serverProxy)
169
+ {
170
+ return;
171
+ }
172
+ serverProxy->closeClient(t_addr, u_addr);
173
  }
src/udp_inbound.cpp CHANGED
@@ -4,6 +4,7 @@
4
  #include "utils.h"
5
  #include "bolt/datagram.h"
6
  #include "bolt/crypt.h"
 
7
 
8
  void udp_on_recvfrom(hio_t *io, void *buf, int readbytes)
9
  {
@@ -35,7 +36,7 @@ void udp_on_recvfrom(hio_t *io, void *buf, int readbytes)
35
 
36
  GENERATE_DECRYPT_KEY(extend_response, extend_response_len, serverProxy->getConfig().ept_type, serverProxy->getConfig().ept_key)
37
 
38
- char result[1] = {BOLT_BIND_RESPONSE_CODE_SUCESS};
39
 
40
  PACK_BIND_RESPONSE_DATA(bind_response, bind_response_len, BOLT_CHANNEL_CMD_BIND_RESPONSE, request_id, session_id, result)
41
 
@@ -44,6 +45,7 @@ void udp_on_recvfrom(hio_t *io, void *buf, int readbytes)
44
 
45
  auto proxy = std::unique_ptr<UdpServerBoltProxy>(serverProxy);
46
  UdpConnMap<uint32_t, UdpServerBoltProxy>::getInstance().add(session_id, proxy);
 
47
  }
48
  else if (dest_len == 9 && dest[0] == BOLT_CHANNEL_CMD_UNBIND_REQUEST)
49
  {
@@ -51,6 +53,7 @@ void udp_on_recvfrom(hio_t *io, void *buf, int readbytes)
51
 
52
  auto serverProxy = UdpConnMap<uint32_t, UdpServerBoltProxy>::getInstance().get(session_id);
53
  serverProxy->recycle();
 
54
  UdpConnMap<uint32_t, UdpServerBoltProxy>::getInstance().remove(session_id);
55
  }
56
  else
 
4
  #include "utils.h"
5
  #include "bolt/datagram.h"
6
  #include "bolt/crypt.h"
7
+ #include "spdlog/spdlog.h"
8
 
9
  void udp_on_recvfrom(hio_t *io, void *buf, int readbytes)
10
  {
 
36
 
37
  GENERATE_DECRYPT_KEY(extend_response, extend_response_len, serverProxy->getConfig().ept_type, serverProxy->getConfig().ept_key)
38
 
39
+ uint32_t result = BOLT_BIND_RESPONSE_CODE_SUCESS;
40
 
41
  PACK_BIND_RESPONSE_DATA(bind_response, bind_response_len, BOLT_CHANNEL_CMD_BIND_RESPONSE, request_id, session_id, result)
42
 
 
45
 
46
  auto proxy = std::unique_ptr<UdpServerBoltProxy>(serverProxy);
47
  UdpConnMap<uint32_t, UdpServerBoltProxy>::getInstance().add(session_id, proxy);
48
+ spdlog::info("[UDP]BOLT_CHANNEL_CMD_BIND_REQUEST: requestId:{}, session_id:{}, encrypt:{}", request_id, session_id, serverProxy->getConfig().encrypt);
49
  }
50
  else if (dest_len == 9 && dest[0] == BOLT_CHANNEL_CMD_UNBIND_REQUEST)
51
  {
 
53
 
54
  auto serverProxy = UdpConnMap<uint32_t, UdpServerBoltProxy>::getInstance().get(session_id);
55
  serverProxy->recycle();
56
+ spdlog::info("[UDP]BOLT_CHANNEL_CMD_UNBIND_REQUEST: requestId:{}, session_id:{}", serverProxy->getConfig().request_id, serverProxy->getConfig().session_id);
57
  UdpConnMap<uint32_t, UdpServerBoltProxy>::getInstance().remove(session_id);
58
  }
59
  else