summaryrefslogtreecommitdiff
path: root/src/network/core/tcp_connect.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/network/core/tcp_connect.cpp')
-rw-r--r--src/network/core/tcp_connect.cpp301
1 files changed, 263 insertions, 38 deletions
diff --git a/src/network/core/tcp_connect.cpp b/src/network/core/tcp_connect.cpp
index 81c4d8c26..cca9f09b7 100644
--- a/src/network/core/tcp_connect.cpp
+++ b/src/network/core/tcp_connect.cpp
@@ -15,6 +15,8 @@
#include "tcp.h"
#include "../network_internal.h"
+#include <deque>
+
#include "../../safeguards.h"
/** List of connections that are currently being created */
@@ -24,38 +26,271 @@ static std::vector<TCPConnecter *> _tcp_connecters;
* Create a new connecter for the given address
* @param connection_string the address to connect to
*/
-TCPConnecter::TCPConnecter(const std::string &connection_string, uint16 default_port) :
- connected(false),
- aborted(false),
- killed(false),
- sock(INVALID_SOCKET)
+TCPConnecter::TCPConnecter(const std::string &connection_string, uint16 default_port)
{
- this->address = ParseConnectionString(connection_string, default_port);
+ this->connection_string = NormalizeConnectionString(connection_string, default_port);
_tcp_connecters.push_back(this);
- if (!StartNewThread(nullptr, "ottd:tcp", &TCPConnecter::ThreadEntry, this)) {
- this->Connect();
+
+ if (!StartNewThread(nullptr, "ottd:resolve", &TCPConnecter::ResolveThunk, this)) {
+ this->Resolve();
+ }
+}
+
+TCPConnecter::~TCPConnecter()
+{
+ for (const auto &socket : this->sockets) {
+ close(socket);
}
+
+ freeaddrinfo(this->ai);
}
-/** The actual connection function */
-void TCPConnecter::Connect()
+/**
+ * Start a connection to the indicated address.
+ * @param address The address to connection to.
+ */
+void TCPConnecter::Connect(addrinfo *address)
{
- this->sock = this->address.Connect();
- if (this->sock == INVALID_SOCKET) {
- this->aborted = true;
- } else {
- this->connected = true;
+ SOCKET sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
+ if (sock == INVALID_SOCKET) {
+ DEBUG(net, 0, "Could not create %s %s socket: %s", NetworkAddress::SocketTypeAsString(address->ai_socktype), NetworkAddress::AddressFamilyAsString(address->ai_family), NetworkError::GetLast().AsString());
+ return;
+ }
+
+ if (!SetNoDelay(sock)) DEBUG(net, 1, "Setting TCP_NODELAY failed");
+ if (!SetNonBlocking(sock)) DEBUG(net, 0, "Setting non-blocking mode failed");
+
+ NetworkAddress network_address = NetworkAddress(address->ai_addr, (int)address->ai_addrlen);
+ DEBUG(net, 4, "Attempting to connect to %s", network_address.GetAddressAsString().c_str());
+
+ int err = connect(sock, address->ai_addr, (int)address->ai_addrlen);
+ if (err != 0 && !NetworkError::GetLast().IsConnectInProgress()) {
+ closesocket(sock);
+
+ DEBUG(net, 1, "Could not connect to %s: %s", network_address.GetAddressAsString().c_str(), NetworkError::GetLast().AsString());
+ return;
+ }
+
+ this->sockets.push_back(sock);
+}
+
+/**
+ * Start the connect() for the next address in the list.
+ * @return True iff a new connect() is attempted.
+ */
+bool TCPConnecter::TryNextAddress()
+{
+ if (this->current_address >= this->addresses.size()) return false;
+
+ this->last_attempt = std::chrono::steady_clock::now();
+ this->Connect(this->addresses[this->current_address++]);
+
+ return true;
+}
+
+void TCPConnecter::OnResolved(addrinfo *ai)
+{
+ std::deque<addrinfo *> addresses_ipv4, addresses_ipv6;
+
+ /* Apply "Happy Eyeballs" if it is likely IPv6 is functional. */
+
+ /* Detect if IPv6 is likely to succeed or not. */
+ bool seen_ipv6 = false;
+ bool resort = true;
+ for (addrinfo *runp = ai; runp != nullptr; runp = runp->ai_next) {
+ if (runp->ai_family == AF_INET6) {
+ seen_ipv6 = true;
+ } else if (!seen_ipv6) {
+ /* We see an IPv4 before an IPv6; this most likely means there is
+ * no IPv6 available on the system, so keep the order of this
+ * list. */
+ resort = false;
+ break;
+ }
+ }
+
+ /* Convert the addrinfo into NetworkAddresses. */
+ for (addrinfo *runp = ai; runp != nullptr; runp = runp->ai_next) {
+ if (resort) {
+ if (runp->ai_family == AF_INET6) {
+ addresses_ipv6.emplace_back(runp);
+ } else {
+ addresses_ipv4.emplace_back(runp);
+ }
+ } else {
+ this->addresses.emplace_back(runp);
+ }
+ }
+
+ /* If we want to resort, make the list like IPv6 / IPv4 / IPv6 / IPv4 / ..
+ * for how ever many (round-robin) DNS entries we have. */
+ if (resort) {
+ while (!addresses_ipv4.empty() || !addresses_ipv6.empty()) {
+ if (!addresses_ipv6.empty()) {
+ this->addresses.push_back(addresses_ipv6.front());
+ addresses_ipv6.pop_front();
+ }
+ if (!addresses_ipv4.empty()) {
+ this->addresses.push_back(addresses_ipv4.front());
+ addresses_ipv4.pop_front();
+ }
+ }
+ }
+
+ if (_debug_net_level >= 5) {
+ DEBUG(net, 5, "%s resolved in:", this->connection_string.c_str());
+ for (const auto &address : this->addresses) {
+ DEBUG(net, 5, "- %s", NetworkAddress(address->ai_addr, (int)address->ai_addrlen).GetAddressAsString().c_str());
+ }
+ }
+
+ this->current_address = 0;
+}
+
+void TCPConnecter::Resolve()
+{
+ /* Port is already guaranteed part of the connection_string. */
+ NetworkAddress address = ParseConnectionString(this->connection_string, 0);
+
+ addrinfo hints;
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_flags = AI_ADDRCONFIG;
+ hints.ai_socktype = SOCK_STREAM;
+
+ char port_name[6];
+ seprintf(port_name, lastof(port_name), "%u", address.GetPort());
+
+ static bool getaddrinfo_timeout_error_shown = false;
+ auto start = std::chrono::steady_clock::now();
+
+ addrinfo *ai;
+ int e = getaddrinfo(address.GetHostname(), port_name, &hints, &ai);
+
+ auto end = std::chrono::steady_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::seconds>(end - start);
+ if (!getaddrinfo_timeout_error_shown && duration >= std::chrono::seconds(5)) {
+ DEBUG(net, 0, "getaddrinfo() for address \"%s\" took %i seconds", this->connection_string.c_str(), (int)duration.count());
+ DEBUG(net, 0, " This is likely an issue in the DNS name resolver's configuration causing it to time out");
+ getaddrinfo_timeout_error_shown = true;
}
+
+ if (e != 0) {
+ DEBUG(misc, 0, "Failed to resolve DNS for %s", this->connection_string.c_str());
+ this->OnFailure();
+ return;
+ }
+
+ this->ai = ai;
+ this->OnResolved(ai);
+ this->is_resolved = true;
+}
+
+/* static */ void TCPConnecter::ResolveThunk(TCPConnecter *connecter)
+{
+ connecter->Resolve();
}
/**
- * Entry point for the new threads.
- * @param param the TCPConnecter instance to call Connect on.
+ * Check if there was activity for this connecter.
+ * @return True iff the TCPConnecter is done and can be cleaned up.
*/
-/* static */ void TCPConnecter::ThreadEntry(TCPConnecter *param)
+bool TCPConnecter::CheckActivity()
{
- param->Connect();
+ if (!this->is_resolved.load()) return false;
+
+ /* If there are no attempts pending, connect to the next. */
+ if (this->sockets.empty()) {
+ if (!this->TryNextAddress()) {
+ /* There were no more addresses to try, so we failed. */
+ this->OnFailure();
+ return true;
+ }
+ return false;
+ }
+
+ fd_set write_fd;
+ FD_ZERO(&write_fd);
+ for (const auto &socket : this->sockets) {
+ FD_SET(socket, &write_fd);
+ }
+
+ timeval tv;
+ tv.tv_usec = 0;
+ tv.tv_sec = 0;
+ int n = select(FD_SETSIZE, NULL, &write_fd, NULL, &tv);
+ /* select() failed; hopefully next try it doesn't. */
+ if (n < 0) {
+ /* select() normally never fails; so hopefully it works next try! */
+ DEBUG(net, 1, "select() failed with %s", NetworkError::GetLast().AsString());
+ return false;
+ }
+
+ /* No socket updates. */
+ if (n == 0) {
+ /* Wait 250ms between attempting another address. */
+ if (std::chrono::steady_clock::now() < this->last_attempt + std::chrono::milliseconds(250)) return false;
+
+ /* Try the next address in the list. */
+ if (this->TryNextAddress()) return false;
+
+ /* Wait up to 3 seconds since the last connection we started. */
+ if (std::chrono::steady_clock::now() < this->last_attempt + std::chrono::milliseconds(3000)) return false;
+
+ /* More than 3 seconds no socket reported activity, and there are no
+ * more address to try. Timeout the attempt. */
+ DEBUG(net, 0, "Timeout while connecting to %s", this->connection_string.c_str());
+
+ for (const auto &socket : this->sockets) {
+ closesocket(socket);
+ }
+ this->OnFailure();
+ return true;
+ }
+
+ /* Check for errors on any of the sockets. */
+ for (auto it = this->sockets.begin(); it != this->sockets.end(); /* nothing */) {
+ NetworkError socket_error = GetSocketError(*it);
+ if (socket_error.HasError()) {
+ DEBUG(net, 1, "Could not connect to %s: %s", NetworkAddress::GetPeerName(*it).c_str(), socket_error.AsString());
+ closesocket(*it);
+ it = this->sockets.erase(it);
+ } else {
+ it++;
+ }
+ }
+
+ /* In case all sockets had an error, queue a new one. */
+ if (this->sockets.empty()) {
+ if (!this->TryNextAddress()) {
+ /* There were no more addresses to try, so we failed. */
+ this->OnFailure();
+ return true;
+ }
+ return false;
+ }
+
+ /* At least one socket is connected. The first one that does is the one
+ * we will be using, and we close all other sockets. */
+ SOCKET connected_socket = INVALID_SOCKET;
+ for (auto it = this->sockets.begin(); it != this->sockets.end(); /* nothing */) {
+ if (connected_socket == INVALID_SOCKET && FD_ISSET(*it, &write_fd)) {
+ connected_socket = *it;
+ } else {
+ closesocket(*it);
+ }
+ it = this->sockets.erase(it);
+ }
+ assert(connected_socket != INVALID_SOCKET);
+
+ DEBUG(net, 1, "Connected to %s", this->connection_string.c_str());
+ if (_debug_net_level >= 5) {
+ DEBUG(net, 5, "- using %s", NetworkAddress::GetPeerName(connected_socket).c_str());
+ }
+
+ this->OnConnect(connected_socket);
+ return true;
}
/**
@@ -68,32 +303,22 @@ void TCPConnecter::Connect()
{
for (auto iter = _tcp_connecters.begin(); iter < _tcp_connecters.end(); /* nothing */) {
TCPConnecter *cur = *iter;
- const bool connected = cur->connected.load();
- const bool aborted = cur->aborted.load();
- if ((connected || aborted) && cur->killed) {
- iter = _tcp_connecters.erase(iter);
- if (cur->sock != INVALID_SOCKET) closesocket(cur->sock);
- delete cur;
- continue;
- }
- if (connected) {
- iter = _tcp_connecters.erase(iter);
- cur->OnConnect(cur->sock);
- delete cur;
- continue;
- }
- if (aborted) {
+
+ if (cur->CheckActivity()) {
iter = _tcp_connecters.erase(iter);
- cur->OnFailure();
delete cur;
- continue;
+ } else {
+ iter++;
}
- iter++;
}
}
/** Kill all connection attempts. */
/* static */ void TCPConnecter::KillAll()
{
- for (TCPConnecter *conn : _tcp_connecters) conn->killed = true;
+ for (auto iter = _tcp_connecters.begin(); iter < _tcp_connecters.end(); /* nothing */) {
+ TCPConnecter *cur = *iter;
+ iter = _tcp_connecters.erase(iter);
+ delete cur;
+ }
}