1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
| class udp_relay_server
{
struct relay_policy
{
udp::endpoint from_, to_;
DWORD delay_;
};
struct relay_message
{
boost::shared_ptr<std::string> message_;
udp::endpoint destination_;
DWORD expire_clock_;
};
enum
{
MIN_SLEEP_DELAY = 10
};
public:
udp_relay_server(boost::asio::io_service& io_service, short port)
: socket_(io_service, udp::endpoint(udp::v4(), port))
, timer_(io_service)
{
do_receive();
do_timer();
}
udp_relay_server(boost::asio::io_service& io_service, const udp::endpoint & endpoint_ )
: socket_(io_service, endpoint_)
, timer_(io_service)
{
do_receive();
do_timer();
}
void add_policy( const udp::endpoint & from_, const udp::endpoint & to_, DWORD delay = 0 )
{
relay_policy p;
p.from_ = from_;
p.to_ = to_;
p.delay_ = delay;
policies.push_back(p);
}
void add_policy( string from_addr, short from_port, string to_addr, short to_port, DWORD delay = 0 )
{
udp::endpoint from_( boost::asio::ip::address::from_string(from_addr), from_port );
udp::endpoint to_( boost::asio::ip::address::from_string(to_addr), to_port );
add_policy( from_, to_, delay );
}
void clear_policy() { policies.clear(); }
private:
void do_timer()
{
timer_.expires_from_now(boost::posix_time::milliseconds(MIN_SLEEP_DELAY));
timer_.async_wait(boost::bind(&udp_relay_server::handle_timer,this,boost::asio::placeholders::error));
}
void handle_timer( const boost::system::error_code & ec )
{
if ( !queue_.empty() )
{
DWORD cur_clock = GetTickCount();
std::list< relay_message >::iterator itr = queue_.begin();
while ( itr != queue_.end() )
{
relay_message & m = *itr;
if ( cur_clock > m.expire_clock_ )
{
socket_.async_send_to(boost::asio::buffer(*m.message_), m.destination_,
boost::bind(&udp_relay_server::handle_send, this, m.message_, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) );
itr = queue_.erase(itr++);
}
else
{
itr++;
}
}
}
do_timer();
}
void do_receive()
{
socket_.async_receive_from(
boost::asio::buffer(recv_buffer_), remote_endpoint_,
boost::bind(&udp_relay_server::handle_receive, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_receive(const boost::system::error_code& error, std::size_t bytes_transferred)
{
if (!error)
{
bool bRelayed = false;
for(size_t i = 0 ; i < policies.size() ; i ++ )
{
relay_policy & p = policies[i];
if ( p.from_ == remote_endpoint_ )
{
udp::endpoint relay_to(p.to_);
boost::shared_ptr<std::string> message(new string(recv_buffer_.data(),bytes_transferred));
if ( p.delay_ > 0 )
{
relay_message m;
m.message_ = message;
m.destination_ = relay_to;
m.expire_clock_ = GetTickCount() + p.delay_;
queue_.push_back( m );
}
else
{
do_send( message, relay_to );
}
bRelayed = true;
break;
}
}
do_receive();
}
}
void do_send( boost::shared_ptr<std::string> message, const udp::endpoint & destination )
{
socket_.async_send_to(boost::asio::buffer(*message), destination,
boost::bind(&udp_relay_server::handle_send, this,
message, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred) );
}
void handle_send(boost::shared_ptr<std::string> message, const boost::system::error_code& error, std::size_t bytes_transferred )
{
}
udp::socket socket_;
udp::endpoint remote_endpoint_;
boost::array<char, 128> recv_buffer_;
vector<relay_policy> policies;
boost::asio::deadline_timer timer_;
std::list< relay_message > queue_;
};
|