diff options
Diffstat (limited to 'src/network/core/tcp_connect.cpp')
-rw-r--r-- | src/network/core/tcp_connect.cpp | 301 |
1 files changed, 263 insertions, 38 deletions
diff --git a/src/network/core/tcp_connect.cpp b/src/network/core/tcp_connect.cpp index 81c4d8c26..cca9f09b7 100644 --- a/src/network/core/tcp_connect.cpp +++ b/src/network/core/tcp_connect.cpp @@ -15,6 +15,8 @@ #include "tcp.h" #include "../network_internal.h" +#include <deque> + #include "../../safeguards.h" /** List of connections that are currently being created */ @@ -24,38 +26,271 @@ static std::vector<TCPConnecter *> _tcp_connecters; * Create a new connecter for the given address * @param connection_string the address to connect to */ -TCPConnecter::TCPConnecter(const std::string &connection_string, uint16 default_port) : - connected(false), - aborted(false), - killed(false), - sock(INVALID_SOCKET) +TCPConnecter::TCPConnecter(const std::string &connection_string, uint16 default_port) { - this->address = ParseConnectionString(connection_string, default_port); + this->connection_string = NormalizeConnectionString(connection_string, default_port); _tcp_connecters.push_back(this); - if (!StartNewThread(nullptr, "ottd:tcp", &TCPConnecter::ThreadEntry, this)) { - this->Connect(); + + if (!StartNewThread(nullptr, "ottd:resolve", &TCPConnecter::ResolveThunk, this)) { + this->Resolve(); + } +} + +TCPConnecter::~TCPConnecter() +{ + for (const auto &socket : this->sockets) { + close(socket); } + + freeaddrinfo(this->ai); } -/** The actual connection function */ -void TCPConnecter::Connect() +/** + * Start a connection to the indicated address. + * @param address The address to connection to. + */ +void TCPConnecter::Connect(addrinfo *address) { - this->sock = this->address.Connect(); - if (this->sock == INVALID_SOCKET) { - this->aborted = true; - } else { - this->connected = true; + SOCKET sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol); + if (sock == INVALID_SOCKET) { + DEBUG(net, 0, "Could not create %s %s socket: %s", NetworkAddress::SocketTypeAsString(address->ai_socktype), NetworkAddress::AddressFamilyAsString(address->ai_family), NetworkError::GetLast().AsString()); + return; + } + + if (!SetNoDelay(sock)) DEBUG(net, 1, "Setting TCP_NODELAY failed"); + if (!SetNonBlocking(sock)) DEBUG(net, 0, "Setting non-blocking mode failed"); + + NetworkAddress network_address = NetworkAddress(address->ai_addr, (int)address->ai_addrlen); + DEBUG(net, 4, "Attempting to connect to %s", network_address.GetAddressAsString().c_str()); + + int err = connect(sock, address->ai_addr, (int)address->ai_addrlen); + if (err != 0 && !NetworkError::GetLast().IsConnectInProgress()) { + closesocket(sock); + + DEBUG(net, 1, "Could not connect to %s: %s", network_address.GetAddressAsString().c_str(), NetworkError::GetLast().AsString()); + return; + } + + this->sockets.push_back(sock); +} + +/** + * Start the connect() for the next address in the list. + * @return True iff a new connect() is attempted. + */ +bool TCPConnecter::TryNextAddress() +{ + if (this->current_address >= this->addresses.size()) return false; + + this->last_attempt = std::chrono::steady_clock::now(); + this->Connect(this->addresses[this->current_address++]); + + return true; +} + +void TCPConnecter::OnResolved(addrinfo *ai) +{ + std::deque<addrinfo *> addresses_ipv4, addresses_ipv6; + + /* Apply "Happy Eyeballs" if it is likely IPv6 is functional. */ + + /* Detect if IPv6 is likely to succeed or not. */ + bool seen_ipv6 = false; + bool resort = true; + for (addrinfo *runp = ai; runp != nullptr; runp = runp->ai_next) { + if (runp->ai_family == AF_INET6) { + seen_ipv6 = true; + } else if (!seen_ipv6) { + /* We see an IPv4 before an IPv6; this most likely means there is + * no IPv6 available on the system, so keep the order of this + * list. */ + resort = false; + break; + } + } + + /* Convert the addrinfo into NetworkAddresses. */ + for (addrinfo *runp = ai; runp != nullptr; runp = runp->ai_next) { + if (resort) { + if (runp->ai_family == AF_INET6) { + addresses_ipv6.emplace_back(runp); + } else { + addresses_ipv4.emplace_back(runp); + } + } else { + this->addresses.emplace_back(runp); + } + } + + /* If we want to resort, make the list like IPv6 / IPv4 / IPv6 / IPv4 / .. + * for how ever many (round-robin) DNS entries we have. */ + if (resort) { + while (!addresses_ipv4.empty() || !addresses_ipv6.empty()) { + if (!addresses_ipv6.empty()) { + this->addresses.push_back(addresses_ipv6.front()); + addresses_ipv6.pop_front(); + } + if (!addresses_ipv4.empty()) { + this->addresses.push_back(addresses_ipv4.front()); + addresses_ipv4.pop_front(); + } + } + } + + if (_debug_net_level >= 5) { + DEBUG(net, 5, "%s resolved in:", this->connection_string.c_str()); + for (const auto &address : this->addresses) { + DEBUG(net, 5, "- %s", NetworkAddress(address->ai_addr, (int)address->ai_addrlen).GetAddressAsString().c_str()); + } + } + + this->current_address = 0; +} + +void TCPConnecter::Resolve() +{ + /* Port is already guaranteed part of the connection_string. */ + NetworkAddress address = ParseConnectionString(this->connection_string, 0); + + addrinfo hints; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_flags = AI_ADDRCONFIG; + hints.ai_socktype = SOCK_STREAM; + + char port_name[6]; + seprintf(port_name, lastof(port_name), "%u", address.GetPort()); + + static bool getaddrinfo_timeout_error_shown = false; + auto start = std::chrono::steady_clock::now(); + + addrinfo *ai; + int e = getaddrinfo(address.GetHostname(), port_name, &hints, &ai); + + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast<std::chrono::seconds>(end - start); + if (!getaddrinfo_timeout_error_shown && duration >= std::chrono::seconds(5)) { + DEBUG(net, 0, "getaddrinfo() for address \"%s\" took %i seconds", this->connection_string.c_str(), (int)duration.count()); + DEBUG(net, 0, " This is likely an issue in the DNS name resolver's configuration causing it to time out"); + getaddrinfo_timeout_error_shown = true; } + + if (e != 0) { + DEBUG(misc, 0, "Failed to resolve DNS for %s", this->connection_string.c_str()); + this->OnFailure(); + return; + } + + this->ai = ai; + this->OnResolved(ai); + this->is_resolved = true; +} + +/* static */ void TCPConnecter::ResolveThunk(TCPConnecter *connecter) +{ + connecter->Resolve(); } /** - * Entry point for the new threads. - * @param param the TCPConnecter instance to call Connect on. + * Check if there was activity for this connecter. + * @return True iff the TCPConnecter is done and can be cleaned up. */ -/* static */ void TCPConnecter::ThreadEntry(TCPConnecter *param) +bool TCPConnecter::CheckActivity() { - param->Connect(); + if (!this->is_resolved.load()) return false; + + /* If there are no attempts pending, connect to the next. */ + if (this->sockets.empty()) { + if (!this->TryNextAddress()) { + /* There were no more addresses to try, so we failed. */ + this->OnFailure(); + return true; + } + return false; + } + + fd_set write_fd; + FD_ZERO(&write_fd); + for (const auto &socket : this->sockets) { + FD_SET(socket, &write_fd); + } + + timeval tv; + tv.tv_usec = 0; + tv.tv_sec = 0; + int n = select(FD_SETSIZE, NULL, &write_fd, NULL, &tv); + /* select() failed; hopefully next try it doesn't. */ + if (n < 0) { + /* select() normally never fails; so hopefully it works next try! */ + DEBUG(net, 1, "select() failed with %s", NetworkError::GetLast().AsString()); + return false; + } + + /* No socket updates. */ + if (n == 0) { + /* Wait 250ms between attempting another address. */ + if (std::chrono::steady_clock::now() < this->last_attempt + std::chrono::milliseconds(250)) return false; + + /* Try the next address in the list. */ + if (this->TryNextAddress()) return false; + + /* Wait up to 3 seconds since the last connection we started. */ + if (std::chrono::steady_clock::now() < this->last_attempt + std::chrono::milliseconds(3000)) return false; + + /* More than 3 seconds no socket reported activity, and there are no + * more address to try. Timeout the attempt. */ + DEBUG(net, 0, "Timeout while connecting to %s", this->connection_string.c_str()); + + for (const auto &socket : this->sockets) { + closesocket(socket); + } + this->OnFailure(); + return true; + } + + /* Check for errors on any of the sockets. */ + for (auto it = this->sockets.begin(); it != this->sockets.end(); /* nothing */) { + NetworkError socket_error = GetSocketError(*it); + if (socket_error.HasError()) { + DEBUG(net, 1, "Could not connect to %s: %s", NetworkAddress::GetPeerName(*it).c_str(), socket_error.AsString()); + closesocket(*it); + it = this->sockets.erase(it); + } else { + it++; + } + } + + /* In case all sockets had an error, queue a new one. */ + if (this->sockets.empty()) { + if (!this->TryNextAddress()) { + /* There were no more addresses to try, so we failed. */ + this->OnFailure(); + return true; + } + return false; + } + + /* At least one socket is connected. The first one that does is the one + * we will be using, and we close all other sockets. */ + SOCKET connected_socket = INVALID_SOCKET; + for (auto it = this->sockets.begin(); it != this->sockets.end(); /* nothing */) { + if (connected_socket == INVALID_SOCKET && FD_ISSET(*it, &write_fd)) { + connected_socket = *it; + } else { + closesocket(*it); + } + it = this->sockets.erase(it); + } + assert(connected_socket != INVALID_SOCKET); + + DEBUG(net, 1, "Connected to %s", this->connection_string.c_str()); + if (_debug_net_level >= 5) { + DEBUG(net, 5, "- using %s", NetworkAddress::GetPeerName(connected_socket).c_str()); + } + + this->OnConnect(connected_socket); + return true; } /** @@ -68,32 +303,22 @@ void TCPConnecter::Connect() { for (auto iter = _tcp_connecters.begin(); iter < _tcp_connecters.end(); /* nothing */) { TCPConnecter *cur = *iter; - const bool connected = cur->connected.load(); - const bool aborted = cur->aborted.load(); - if ((connected || aborted) && cur->killed) { - iter = _tcp_connecters.erase(iter); - if (cur->sock != INVALID_SOCKET) closesocket(cur->sock); - delete cur; - continue; - } - if (connected) { - iter = _tcp_connecters.erase(iter); - cur->OnConnect(cur->sock); - delete cur; - continue; - } - if (aborted) { + + if (cur->CheckActivity()) { iter = _tcp_connecters.erase(iter); - cur->OnFailure(); delete cur; - continue; + } else { + iter++; } - iter++; } } /** Kill all connection attempts. */ /* static */ void TCPConnecter::KillAll() { - for (TCPConnecter *conn : _tcp_connecters) conn->killed = true; + for (auto iter = _tcp_connecters.begin(); iter < _tcp_connecters.end(); /* nothing */) { + TCPConnecter *cur = *iter; + iter = _tcp_connecters.erase(iter); + delete cur; + } } |