#include "LibLsp/JsonRpc/MessageIssue.h" #include "LibLsp/JsonRpc/WebSocketServer.h" #include #include #include #include "LibLsp/JsonRpc/stream.h" #include #include #include namespace beast = boost::beast; // from namespace http = beast::http; // from namespace websocket = beast::websocket; // from namespace net = boost::asio; // from using tcp = boost::asio::ip::tcp; // from namespace lsp { // Echoes back all received WebSocket messages class server_session : public std::enable_shared_from_this { websocket::stream ws_; beast::flat_buffer buffer_; std::string user_agent_; public: std::shared_ptr proxy_; // Take ownership of the socket explicit server_session(tcp::socket&& socket, std::string const& user_agent) : ws_(std::move(socket)), user_agent_(user_agent) { proxy_ = std::make_shared(ws_); } // Get on the correct executor void run() { // We need to be executing within a strand to perform async operations // on the I/O objects in this server_session. Although not strictly necessary // for single-threaded contexts, this example code is written to be // thread-safe by default. net::dispatch(ws_.get_executor(), beast::bind_front_handler(&server_session::on_run, shared_from_this())); } // Start the asynchronous operation void on_run() { // Set suggested timeout settings for the websocket ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server)); // Set a decorator to change the Server of the handshake ws_.set_option(websocket::stream_base::decorator([=](websocket::response_type& res) { res.set(http::field::server, user_agent_.c_str()); })); // Accept the websocket handshake ws_.async_accept(beast::bind_front_handler(&server_session::on_accept, shared_from_this())); } void on_accept(beast::error_code ec) { if (ec) { return; } // Read a message // Read a message into our buffer ws_.async_read(buffer_, beast::bind_front_handler(&server_session::on_read, shared_from_this())); } void on_read(beast::error_code ec, std::size_t bytes_transferred) { if (!ec) { char* data = reinterpret_cast(buffer_.data().data()); std::vector elements(data, data + bytes_transferred); buffer_.clear(); proxy_->on_request.EnqueueAll(std::move(elements), false); // Read a message into our buffer ws_.async_read(buffer_, beast::bind_front_handler(&server_session::on_read, shared_from_this())); return; } if (ec) { proxy_->error_message = ec.message(); } } void close() { if (ws_.is_open()) { boost::system::error_code ec; ws_.close(websocket::close_code::normal, ec); } } }; //------------------------------------------------------------------------------ struct WebSocketServer::Data { Data(std::string const& user_agent, lsp::Log& log) : acceptor_(io_context_), user_agent_(user_agent), _log(log) { } ~Data() { } /// The io_context used to perform asynchronous operations. boost::asio::io_context io_context_; std::shared_ptr> work; /// Acceptor used to listen for incoming connections. boost::asio::ip::tcp::acceptor acceptor_; std::shared_ptr _server_session; std::string user_agent_; lsp::Log& _log; }; websocket_stream_wrapper::websocket_stream_wrapper(boost::beast::websocket::stream& _w) : ws_(_w), request_waiter(new MultiQueueWaiter()), on_request(request_waiter) { } bool websocket_stream_wrapper::fail() { return bad(); } bool websocket_stream_wrapper::eof() { return bad(); } bool websocket_stream_wrapper::good() { return !bad(); } websocket_stream_wrapper& websocket_stream_wrapper::read(char* str, std::streamsize count) { auto some = on_request.TryDequeueSome(static_cast(count)); memcpy(str, some.data(), some.size()); for (std::streamsize i = some.size(); i < count; ++i) { str[i] = static_cast(get()); } return *this; } int websocket_stream_wrapper::get() { return on_request.Dequeue(); } bool websocket_stream_wrapper::bad() { return !ws_.next_layer().socket().is_open(); } websocket_stream_wrapper& websocket_stream_wrapper::write(std::string const& c) { ws_.write(boost::asio::buffer(std::string(c))); return *this; } websocket_stream_wrapper& websocket_stream_wrapper::write(std::streamsize _s) { std::ostringstream temp; temp << _s; ws_.write(boost::asio::buffer(temp.str())); return *this; } websocket_stream_wrapper& websocket_stream_wrapper::flush() { return *this; } void websocket_stream_wrapper::clear() { } std::string websocket_stream_wrapper::what() { if (!error_message.empty()) { return error_message; } if (!ws_.next_layer().socket().is_open()) { return "Socket is not open."; } return {}; } WebSocketServer::~WebSocketServer() { delete d_ptr; } WebSocketServer::WebSocketServer( std::string const& user_agent, std::string const& address, std::string const& port, std::shared_ptr json_handler, std::shared_ptr localEndPoint, lsp::Log& log, uint32_t _max_workers ) : point(json_handler, localEndPoint, log, lsp::JSONStreamStyle::Standard, static_cast(_max_workers)), d_ptr(new Data(user_agent, log)) { d_ptr->work = std::make_shared>(d_ptr->io_context_.get_executor()); // Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR). boost::asio::ip::tcp::resolver resolver(d_ptr->io_context_); boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(address, port).begin(); d_ptr->acceptor_.open(endpoint.protocol()); d_ptr->acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); try { d_ptr->acceptor_.bind(endpoint); } catch (boost::system::system_error& e) { std::string temp = "Socket Server blid faild."; d_ptr->_log.log(lsp::Log::Level::INFO, temp + e.what()); return; } d_ptr->acceptor_.listen(); do_accept(); std::string desc = "Socket WebSocketServer " + address + " " + port + " start."; d_ptr->_log.log(lsp::Log::Level::INFO, desc); } void WebSocketServer::run() { // The io_context::run() call will block until all asynchronous operations // have finished. While the WebSocketServer is running, there is always at least one // asynchronous operation outstanding: the asynchronous accept call waiting // for new incoming connections. d_ptr->io_context_.run(); } void WebSocketServer::stop() { try { if (d_ptr->work) { d_ptr->work.reset(); } do_stop(); } catch (...) { } } void WebSocketServer::do_accept() { d_ptr->acceptor_.async_accept( [this](boost::system::error_code ec, boost::asio::ip::tcp::socket socket) { // Check whether the WebSocketServer was stopped by a signal before this // completion handler had a chance to run. if (!d_ptr->acceptor_.is_open()) { return; } if (!ec) { if (d_ptr->_server_session) { try { d_ptr->_server_session->close(); point.stop(); } catch (...) { } } d_ptr->_server_session = std::make_shared(std::move(socket), d_ptr->user_agent_); d_ptr->_server_session->run(); point.startProcessingMessages(d_ptr->_server_session->proxy_, d_ptr->_server_session->proxy_); do_accept(); } } ); } void WebSocketServer::do_stop() { d_ptr->acceptor_.close(); point.stop(); } } // namespace lsp