Only in s2s/: .deps Only in s2s/: Makefile diff -ru /usr/local/src/jabberd-2.0s6/s2s/in.c s2s/in.c --- /usr/local/src/jabberd-2.0s6/s2s/in.c Fri Nov 26 16:36:53 2004 +++ s2s/in.c Tue Dec 14 09:47:02 2004 @@ -65,6 +65,7 @@ s2s_t s2s = (s2s_t) arg; struct sockaddr_storage sa; int namelen = sizeof(sa), port, nbytes; + char ipport[INET6_ADDRSTRLEN + 17]; switch(a) { case action_READ: @@ -90,9 +91,16 @@ jqueue_push(in->s2s->dead, (void *) in->s, 0); - xhash_zap(in->s2s->in, in->key); + /* remove from open streams hash if online, or open connections if not */ + if (in->online) + xhash_zap(in->s2s->in, in->key); + else { + snprintf(ipport, INET6_ADDRSTRLEN + 16, "%s/%d", in->ip, in->port); + xhash_zap(in->s2s->in_accept, ipport); + } xhash_free(in->states); + xhash_free(in->states_time); xhash_free(in->routes); if(in->key != NULL) free(in->key); @@ -120,6 +128,7 @@ in->port = port; in->states = xhash_new(101); + in->states_time = xhash_new(101); in->fd = fd; @@ -128,6 +137,10 @@ in->s = sx_new(s2s->sx_env, in->fd, _in_sx_callback, (void *) in); mio_app(m, in->fd, in_mio_callback, (void *) in); + /* add to incoming connections hash */ + snprintf(ipport, INET6_ADDRSTRLEN + 16, "%s/%d", in->ip, in->port); + xhash_put(s2s->in_accept, pstrdup(xhash_pool(s2s->in_accept),ipport), (void *) in); + #ifdef HAVE_SSL sx_server_init(in->s, S2S_DB_HEADER | ((s2s->local_pemfile != NULL) ? SX_SSL_STARTTLS_OFFER : 0) ); #else @@ -145,6 +158,7 @@ int len; sx_error_t *sxe; nad_t nad; + char ipport[INET6_ADDRSTRLEN + 17]; switch(e) { case event_WANT_READ: @@ -220,17 +234,25 @@ /* first time, bring them online */ if ((!in->online)||(strcmp(in->key,s->id)!=0)) { - log_debug(ZONE, "incoming conn from %s port %d is online (id %s)", in->ip, in->port, s->id); + log_write(in->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] incoming stream online (id %s)", in->fd, in->ip, in->port, s->id); in->online = 1; /* record the id */ - if (in->key != NULL) + if (in->key != NULL) { log_debug(ZONE,"adding new SSL stream id %s for stream id %s", s->id, in->key); + + /* remove the initial (non-SSL) stream id from the in connections hash */ + xhash_zap(in->s2s->in, in->key); + } + in->key = strdup(s->id); - /* track it */ + /* track it - add to open streams hash and remove from new connections hash */ xhash_put(in->s2s->in, in->key, (void *) in); + + snprintf(ipport, INET6_ADDRSTRLEN + 16, "%s/%d", in->ip, in->port); + xhash_zap(in->s2s->in_accept, ipport); } break; @@ -238,6 +260,9 @@ case event_PACKET: nad = (nad_t) data; + /* update last packet timestamp */ + in->last_packet = time(NULL); + /* dialback packets */ if(NAD_NURI_L(nad, NAD_ENS(nad, 0)) == strlen(uri_DIALBACK) && strncmp(uri_DIALBACK, NAD_NURI(nad, NAD_ENS(nad, 0)), strlen(uri_DIALBACK)) == 0) { /* only result and verify mean anything */ @@ -305,6 +330,7 @@ char *rkey; nad_t verify; pkt_t pkt; + time_t now; attr = nad_find_attr(nad, 0, -1, "from", NULL); if(attr < 0 || (from = jid_new(in->s2s->pc, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) { @@ -323,7 +349,7 @@ rkey = s2s_route_key(NULL, to->domain, from->domain); - log_debug(ZONE, "auth request from id %s for route %s", in->key, rkey); + log_write(in->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] received dialback auth request for route '%s'", in->fd, in->ip, in->port, rkey); /* get current state */ if((conn_state_t) xhash_get(in->states, rkey) == conn_VALID) { @@ -347,6 +373,13 @@ /* not valid, so we need to verify */ + /* set the route status to INPROGRESS and set timestamp */ + xhash_put(in->states, pstrdup(xhash_pool(in->states), rkey), (void *) conn_INPROGRESS); + + /* record the time that we set conn_INPROGRESS state */ + now = time(NULL); + xhash_put(in->states_time, pstrdup(xhash_pool(in->states_time), rkey), (void *) now); + /* need the key */ if(NAD_CDATA_L(nad, 0) <= 0) { log_debug(ZONE, "no dialback key given with db result packet"); @@ -442,6 +475,8 @@ type = "invalid"; } + log_write(in->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] checking dialback verification from %s: sending %s", in->fd, in->ip, in->port, from->domain, type); + log_debug(ZONE, "letting them know"); /* now munge the packet and send it back to them */ @@ -469,14 +504,14 @@ attr = nad_find_attr(nad, 0, -1, "from", NULL); if(attr < 0 || (from = jid_new(in->s2s->pc, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) { - log_debug(ZONE, "missing or invalid from on db verify packet"); + log_debug(ZONE, "missing or invalid from on incoming packet"); nad_free(nad); return; } attr = nad_find_attr(nad, 0, -1, "to", NULL); if(attr < 0 || (to = jid_new(in->s2s->pc, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) { - log_debug(ZONE, "missing or invalid to on db verify packet"); + log_debug(ZONE, "missing or invalid to on incoming packet"); jid_free(from); nad_free(nad); return; @@ -486,9 +521,9 @@ log_debug(ZONE, "received packet from %s for %s", in->key, rkey); - /* if its invalid, drop it */ - if((conn_state_t) xhash_get(in->states, rkey) == conn_INVALID) { - log_debug(ZONE, "route not valid, dropping packet"); + /* drop packets received on routes not valid on that connection as per XMPP 8.3.10 */ + if((conn_state_t) xhash_get(in->states, rkey) != conn_VALID) { + log_write(in->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] dropping packet on unvalidated route: '%s'", in->fd, in->ip, in->port, rkey); free(rkey); nad_free(nad); jid_free(from); diff -ru /usr/local/src/jabberd-2.0s6/s2s/main.c s2s/main.c --- /usr/local/src/jabberd-2.0s6/s2s/main.c Tue Dec 7 16:16:40 2004 +++ s2s/main.c Tue Dec 14 09:49:17 2004 @@ -138,10 +138,11 @@ if (s2s->local_pemfile != NULL) log_debug(ZONE,"loaded local pemfile for peer s2s connections"); - s2s->check_interval = j_atoi(config_get_one(s2s->config, "check.interval", 0), 0); - s2s->check_queue = j_atoi(config_get_one(s2s->config, "check.queue", 0), 0); - s2s->check_invalid = j_atoi(config_get_one(s2s->config, "check.invalid", 0), 0); + s2s->check_interval = j_atoi(config_get_one(s2s->config, "check.interval", 0), 60); + s2s->check_queue = j_atoi(config_get_one(s2s->config, "check.queue", 0), 60); s2s->check_keepalive = j_atoi(config_get_one(s2s->config, "check.keepalive", 0), 0); + s2s->check_idle = j_atoi(config_get_one(s2s->config, "check.idle", 0), 86400); + } static int _s2s_router_connect(s2s_t s2s) { @@ -161,10 +162,44 @@ return 0; } +int _s2s_check_conn_routes(s2s_t s2s, conn_t conn, const char *direction) +{ + char *rkey; + conn_state_t state; + time_t now, dialback_time; + + if(xhash_iter_first(conn->states)) + do { + xhash_iter_get(conn->states, (const char **) &rkey, (void *) &state); + if (state == conn_INPROGRESS) { + dialback_time = (time_t) xhash_get(conn->states_time, rkey); + + if(now > dialback_time + s2s->check_queue) { + log_write(s2s->log, LOG_NOTICE, "dialback for %s route %s timed out", direction, rkey); + + xhash_zap(conn->states, rkey); + xhash_zap(conn->states_time, rkey); + + /* stream error */ + sx_error(conn->s, stream_err_CONNECTION_TIMEOUT, "dialback timed out"); + + /* close connection as per XMPP/RFC3920 */ + sx_close(conn->s); + + /* indicate that we closed the connection */ + return 0; + } + } + } while(xhash_iter_next(conn->states)); + + /* all ok */ + return 1; +} + static void _s2s_time_checks(s2s_t s2s) { conn_t conn; time_t now; - char *domain, ipport[INET6_ADDRSTRLEN + 17], *key; + char *domain, ipport[INET6_ADDRSTRLEN + 17], *key, *rkey; jqueue_t q; dnscache_t dns; pkt_t pkt; @@ -188,18 +223,10 @@ if(dns->pending) { log_debug(ZONE, "dns lookup pending for %s", domain); if(now > dns->init_time + s2s->check_queue) { - log_debug(ZONE, "dns lookup expired for %s, bouncing packets in queue", domain); + log_write(s2s->log, LOG_NOTICE, "dns lookup for %s timed out", domain); + /* bounce queue */ - while((pkt = jqueue_pull(q)) != NULL) { - if(pkt->nad->ecur > 1 && NAD_NURI_L(pkt->nad, NAD_ENS(pkt->nad, 1)) == strlen(uri_CLIENT) && strncmp(NAD_NURI(pkt->nad, NAD_ENS(pkt->nad, 1)), uri_CLIENT, strlen(uri_CLIENT)) == 0) - sx_nad_write(s2s->router, stanza_tofrom(stanza_tofrom(stanza_error(pkt->nad, 1, stanza_err_REMOTE_SERVER_TIMEOUT), 1), 0)); - else - nad_free(pkt->nad); - - jid_free(pkt->to); - jid_free(pkt->from); - free(pkt); - } + out_bounce_queue(s2s, domain, stanza_err_REMOTE_SERVER_NOT_FOUND); /* expire pending dns entry */ xhash_zap(s2s->dnscache, dns->name); @@ -219,57 +246,63 @@ log_debug(ZONE, "no pending connection for %s, bouncing queue", domain); /* bounce queue */ - while((pkt = jqueue_pull(q)) != NULL) { - if(pkt->nad->ecur > 1 && NAD_NURI_L(pkt->nad, NAD_ENS(pkt->nad, 1)) == strlen(uri_CLIENT) && strncmp(NAD_NURI(pkt->nad, NAD_ENS(pkt->nad, 1)), uri_CLIENT, strlen(uri_CLIENT)) == 0) - sx_nad_write(s2s->router, stanza_tofrom(stanza_tofrom(stanza_error(pkt->nad, 1, stanza_err_REMOTE_SERVER_TIMEOUT), 1), 0)); - else - nad_free(pkt->nad); - - jid_free(pkt->to); - jid_free(pkt->from); - free(pkt); - } + out_bounce_queue(s2s, domain, stanza_err_REMOTE_SERVER_TIMEOUT); continue; } /* connect timeout check */ if(!conn->online && now > conn->init_time + s2s->check_queue) { - log_debug(ZONE, "connection to %s is not online yet, bouncing queue", domain); - /* !!! kill conn if necessary */ + log_write(s2s->log, LOG_NOTICE, "connection to %s timed out", domain); + /* bounce queue */ - while((pkt = jqueue_pull(q)) != NULL) { - if(pkt->nad->ecur > 1 && NAD_NURI_L(pkt->nad, NAD_ENS(pkt->nad, 1)) == strlen(uri_CLIENT) && strncmp(NAD_NURI(pkt->nad, NAD_ENS(pkt->nad, 1)), uri_CLIENT, strlen(uri_CLIENT)) == 0) - sx_nad_write(s2s->router, stanza_tofrom(stanza_tofrom(stanza_error(pkt->nad, 1, stanza_err_REMOTE_SERVER_TIMEOUT), 1), 0)); - else - nad_free(pkt->nad); - - jid_free(pkt->to); - jid_free(pkt->from); - free(pkt); - } + out_bounce_queue(s2s, domain, stanza_err_REMOTE_SERVER_TIMEOUT); + + /* close connection as per XMPP/RFC3920 */ + sx_close(conn->s); + } } while(xhash_iter_next(s2s->outq)); } - /* invalid expiry */ - if(s2s->check_invalid > 0 && now > s2s->last_invalid_check + s2s->check_invalid) { + /* expiry of connected routes in conn_INPROGRESS state */ + if(s2s->check_queue > 0) { + + /* outgoing connections */ if(xhash_iter_first(s2s->out)) do { xhash_iter_get(s2s->out, (const char **) &key, (void *) &conn); - log_debug(ZONE, "checking connection state for %s", key); - if(xhash_iter_first(conn->states)) - do { - xhash_iter_get(conn->states, NULL, (void *) &state); - - /* drop invalid */ - if(state == conn_INVALID) { - xhash_zap(conn->states, key); - log_debug(ZONE, "dropping invalid connection for conn key %s", key); - } - - } while(xhash_iter_next(conn->states)); + log_debug(ZONE, "checking dialback state for outgoing conn %s", key); + _s2s_check_conn_routes(s2s, conn, "outgoing"); } while(xhash_iter_next(s2s->out)); + + /* incoming open streams */ + if(xhash_iter_first(s2s->in)) + do { + xhash_iter_get(s2s->in, (const char **) &key, (void *) &conn); + + log_debug(ZONE, "checking dialback state for incoming conn %s", key); + if (_s2s_check_conn_routes(s2s, conn, "incoming")) + /* if the connection is still valid, check that dialbacks have been initiated */ + if(!xhash_count(conn->states) && now > conn->init_time + s2s->check_queue) { + log_write(s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] no dialback started", conn->fd, conn->ip, conn->port); + sx_error(conn->s, stream_err_CONNECTION_TIMEOUT, "no dialback initiated"); + sx_close(conn->s); + } + } while(xhash_iter_next(s2s->in)); + + /* incoming open connections (not yet streams) */ + if(xhash_iter_first(s2s->in_accept)) + do { + xhash_iter_get(s2s->in_accept, (const char **) &key, (void *) &conn); + + log_debug(ZONE, "checking stream connection state for incoming conn %i", conn->fd); + if(!conn->online && now > conn->init_time + s2s->check_queue) { + log_write(s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] stream initiation timed out", conn->fd, conn->ip, conn->port); + sx_close(conn->s); + } + } while(xhash_iter_next(s2s->in_accept)); + } /* keepalives */ @@ -285,6 +318,35 @@ mio_write(s2s->mio, conn->fd); } } while(xhash_iter_next(s2s->out)); + + /* idle timeouts - disconnect connections through which no packets have been sent for seconds */ + if(s2s->check_idle > 0) { + + /* outgoing connections */ + if(xhash_iter_first(s2s->out)) + do { + xhash_iter_get(s2s->out, (const char **) &key, (void *) &conn); + log_debug(ZONE, "checking idle state for %s", key); + if (conn->last_packet > 0 && now > conn->last_packet + s2s->check_idle && conn->s->state >= state_STREAM) { + log_write(s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] idle timeout", conn->fd, conn->ip, conn->port); + sx_close(conn->s); + } + } while(xhash_iter_next(s2s->out)); + + /* incoming connections */ + if(xhash_iter_first(s2s->in)) + do { + xhash_iter_get(s2s->in, (const char **) &key, (void *) &conn); + log_debug(ZONE, "checking idle state for %s", key); + if (conn->last_packet > 0 && now > conn->last_packet + s2s->check_idle && conn->s->state >= state_STREAM) { + log_write(s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] idle timeout", conn->fd, conn->ip, conn->port); + sx_close(conn->s); + } + } while(xhash_iter_next(s2s->in)); + + } + + return; } int main(int argc, char **argv) { @@ -384,6 +446,7 @@ s2s->outq = xhash_new(401); s2s->out = xhash_new(401); s2s->in = xhash_new(401); + s2s->in_accept = xhash_new(401); s2s->dnscache = xhash_new(401); s2s->pc = prep_cache_new(); @@ -491,6 +554,7 @@ xhash_free(s2s->outq); xhash_free(s2s->out); xhash_free(s2s->in); + xhash_free(s2s->in_accept); xhash_free(s2s->dnscache); prep_cache_free(s2s->pc); diff -ru /usr/local/src/jabberd-2.0s6/s2s/out.c s2s/out.c --- /usr/local/src/jabberd-2.0s6/s2s/out.c Sat Nov 13 18:48:19 2004 +++ s2s/out.c Tue Dec 14 09:52:47 2004 @@ -113,6 +113,9 @@ char *c, *dbkey; nad_t nad; int ns; + time_t now; + + now = time(NULL); c = strchr(rkey, '/'); *c = '\0'; @@ -134,6 +137,7 @@ *c = '/'; log_debug(ZONE, "sending auth request for %s (key %s)", rkey, dbkey); + log_write(out->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] sending dialback auth request for route '%s'", out->fd, out->ip, out->port, rkey); /* off it goes */ sx_nad_write(out->s, nad); @@ -142,6 +146,9 @@ /* we're in progress now */ xhash_put(out->states, pstrdup(xhash_pool(out->states), rkey), (void *) conn_INPROGRESS); + + /* record the time that we set conn_INPROGRESS state */ + xhash_put(out->states_time, pstrdup(xhash_pool(out->states_time), rkey), (void *) now); } /** send a packet out */ @@ -225,9 +232,11 @@ /* new route key */ rkey = s2s_route_key(NULL, pkt->from->domain, pkt->to->domain); - if(out == NULL) { + /* if no connection, or connection not yet established, queue the packet */ + if((out == NULL) || (out->fd < 0)) _out_packet_queue(s2s, pkt); + if(out == NULL) { /* no conn, create one */ out = (conn_t) malloc(sizeof(struct conn_st)); memset(out, 0, sizeof(struct conn_st)); @@ -240,6 +249,7 @@ out->port = pkt->port; out->states = xhash_new(101); + out->states_time = xhash_new(101); out->routes = xhash_new(101); @@ -249,29 +259,34 @@ xhash_put(out->routes, pstrdup(xhash_pool(out->routes), rkey), (void *) 1); - log_debug(ZONE, "initiating connection to %s", ipport); - + out->fd = -1; + } + + if (out->fd < 0) { /* connect */ + log_debug(ZONE, "initiating connection to %s", ipport); + out->fd = mio_connect(s2s->mio, pkt->port, pkt->ip, _out_mio_callback, (void *) out); if (out->fd < 0) { log_write(out->s2s->log, LOG_NOTICE, "mio_connect error connecting to %s : %s (%d)", ipport, strerror(errno), errno); } else { log_write(out->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] outgoing connection", out->fd, out->ip, out->port); - } - out->s = sx_new(s2s->sx_env, out->fd, _out_sx_callback, (void *) out); + out->s = sx_new(s2s->sx_env, out->fd, _out_sx_callback, (void *) out); #ifdef HAVE_SSL - /* Send a stream version of 1.0 if we can do STARTTLS */ - if(out->s2s->sx_ssl != NULL && out->s2s->local_pemfile != NULL) { - sx_client_init(out->s, S2S_DB_HEADER, "jabber:server", pkt->to->domain, NULL, "1.0"); - } else { - sx_client_init(out->s, S2S_DB_HEADER, "jabber:server", NULL, NULL, NULL); - } + /* Send a stream version of 1.0 if we can do STARTTLS */ + if(out->s2s->sx_ssl != NULL && out->s2s->local_pemfile != NULL) { + sx_client_init(out->s, S2S_DB_HEADER, "jabber:server", pkt->to->domain, NULL, "1.0"); + } else { + sx_client_init(out->s, S2S_DB_HEADER, "jabber:server", NULL, NULL, NULL); + } #else - sx_client_init(out->s, S2S_DB_HEADER, "jabber:server", NULL, NULL, NULL); + sx_client_init(out->s, S2S_DB_HEADER, "jabber:server", NULL, NULL, NULL); #endif + } + free(rkey); return; @@ -303,27 +318,8 @@ else sx_nad_write_elem(out->s, pkt->nad, 1); - jid_free(pkt->from); - jid_free(pkt->to); - free(pkt); - - free(rkey); - - return; - } - - /* invalid conns */ - if(state == conn_INVALID) { - log_debug(ZONE, "route %s is invalid, bouncing packet", rkey); - - /* error and bounce */ - stanza_error(pkt->nad, 1, stanza_err_SERVICE_UNAVAILABLE); - stanza_tofrom(pkt->nad, 1); - - stanza_tofrom(pkt->nad, 0); - nad_set_attr(pkt->nad, 0, -1, "from", s2s->id, 0); - - sx_nad_write(s2s->router, pkt->nad); + /* update timestamp */ + out->last_packet = time(NULL); jid_free(pkt->from); jid_free(pkt->to); @@ -343,7 +339,7 @@ return; } - /* go */ + /* this is a new route - send dialback auth request to piggyback on the existing connection */ _out_dialback(out, rkey); free(rkey); @@ -368,20 +364,9 @@ xhash_zap(s2s->dnscache, name->domain); /* bounce queue */ - q = xhash_get(s2s->outq, name->domain); - while((pkt = jqueue_pull(q)) != NULL) { - if(pkt->nad->ecur > 1 && NAD_NURI_L(pkt->nad, NAD_ENS(pkt->nad, 1)) == strlen(uri_CLIENT) && strncmp(NAD_NURI(pkt->nad, NAD_ENS(pkt->nad, 1)), uri_CLIENT, strlen(uri_CLIENT)) == 0) - sx_nad_write(s2s->router, stanza_tofrom(stanza_tofrom(stanza_error(pkt->nad, 1, stanza_err_REMOTE_SERVER_TIMEOUT), 1), 0)); - else - nad_free(pkt->nad); - - jid_free(pkt->to); - jid_free(pkt->from); - free(pkt); - } + out_bounce_queue(s2s, name->domain, stanza_err_REMOTE_SERVER_NOT_FOUND); jid_free(name); - nad_free(nad); return; @@ -466,14 +451,16 @@ case action_WRITE: log_debug(ZONE, "write action on fd %d", fd); + /* update activity timestamp */ + out->last_activity = time(NULL); + return sx_can_write(out->s); case action_CLOSE: log_debug(ZONE, "close action on fd %d", fd); - /* !!! logging */ - - /* !!! bounce queues */ + /* bounce queues */ + out_bounce_conn_queues(out, stanza_err_SERVICE_UNAVAILABLE); jqueue_push(out->s2s->dead, (void *) out->s, 0); @@ -485,6 +472,7 @@ xhash_zap(out->s2s->out, ipport); xhash_free(out->states); + xhash_free(out->states_time); xhash_free(out->routes); @@ -601,7 +589,7 @@ /* if no stream version from either side, kick off dialback for each route, */ /* otherwise wait for stream features */ - if ((out->s->res_version==NULL) || (out->s2s->sx_ssl == NULL)) { + if ((out->s->res_version==NULL) || (out->s2s->sx_ssl == NULL) || (out->s2s->local_pemfile == NULL)) { log_debug(ZONE, "no stream version, sending dialbacks for %s immediately", out->key); out->online = 1; send_dialbacks(out); @@ -714,7 +702,7 @@ /* key is valid */ if(nad_find_attr(nad, 0, -1, "type", "valid") >= 0) { - log_write(out->s2s->log, LOG_NOTICE, "outgoing route '%s' is now valid; destination=%s, port %d%s", rkey, out->ip, out->port, out->s->ssf ? ", SSL negotiated" : ""); + log_write(out->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] outgoing route '%s' is now valid%s", out->fd, out->ip, out->port, rkey, out->s->ssf ? ", SSL negotiated" : ""); xhash_put(out->states, pstrdup(xhash_pool(out->states), rkey), (void *) conn_VALID); /* !!! small leak here */ @@ -756,34 +744,14 @@ /* invalid */ log_write(out->s2s->log, LOG_NOTICE, "outgoing route '%s' is now invalid; destination=%s, port %d", rkey, out->ip, out->port); - xhash_put(out->states, pstrdup(xhash_pool(out->states), rkey), (void *) conn_INVALID); /* !!! small leak here */ - - log_debug(ZONE, "%s invalid, flushing queue", rkey); - - /* to domain */ - c = strchr(rkey, '/'); - c++; - - /* flush the queue */ - q = (jqueue_t) xhash_get(out->s2s->outq, c); - npkt = jqueue_size(q); - - if(q == NULL || npkt == 0) { - /* weird */ - log_debug(ZONE, "nonexistent or empty queue for domain, we're done"); - free(rkey); - jid_free(from); - jid_free(to); - nad_free(nad); - return; - } + /* close connection */ + log_write(out->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] closing connection", out->fd, out->ip, out->port); - log_debug(ZONE, "flushing %d packets to out_packet", npkt); + /* report stream error */ + sx_error(out->s, stream_err_INVALID_ID, "dialback negotiation failed"); - for(i = 0; i < npkt; i++) { - pkt = jqueue_pull(q); - out_packet(out->s2s, pkt); - } + /* close the stream */ + sx_close(out->s); free(rkey); @@ -799,6 +767,7 @@ jid_t from, to; conn_t in; char *rkey, *type; + int valid; attr = nad_find_attr(nad, 0, -1, "from", NULL); if(attr < 0 || (from = jid_new(out->s2s->pc, NAD_AVAL(nad, attr), NAD_AVAL_L(nad, attr))) == NULL) { @@ -838,13 +807,12 @@ attr = nad_find_attr(nad, 0, -1, "type", "valid"); if(attr >= 0) { - xhash_put(in->states, pstrdup(xhash_pool(out->states), rkey), (void *) conn_VALID); - log_write(in->s2s->log, LOG_NOTICE, "incoming route '%s' is now valid; source=%s, port %d%s", rkey, in->ip, in->port, in->s->ssf ? ", SSL negotiated" : ""); - type = "valid"; + xhash_put(in->states, pstrdup(xhash_pool(in->states), rkey), (void *) conn_VALID); + log_write(in->s2s->log, LOG_NOTICE, "[%d] [%s, port=%d] incoming route '%s' is now valid%s", in->fd, in->ip, in->port, rkey, in->s->ssf ? ", SSL negotiated" : ""); + valid = 1; } else { - xhash_put(in->states, pstrdup(xhash_pool(out->states), rkey), (void *) conn_INVALID); log_write(in->s2s->log, LOG_NOTICE, "incoming route '%s' is now invalid; source=%s, port %d", rkey, in->ip, in->port); - type = "invalid"; + valid = 0; } free(rkey); @@ -858,11 +826,62 @@ nad_append_elem(nad, ns, "result", 0); nad_append_attr(nad, -1, "to", from->domain); nad_append_attr(nad, -1, "from", to->domain); - nad_append_attr(nad, -1, "type", type); + nad_append_attr(nad, -1, "type", valid ? "valid" : "invalid"); /* off it goes */ sx_nad_write(in->s, nad); + /* if invalid, close the stream */ + if (!valid) { + /* generate stream error */ + sx_error(in->s, stream_err_INVALID_ID, "dialback negotiation failed"); + + /* close the incoming stream */ + sx_close(in->s); + } + jid_free(from); jid_free(to); +} + +/* bounce all packets in the queue for domain */ +int out_bounce_queue(s2s_t s2s, const char *domain, int err) +{ + jqueue_t q; + pkt_t pkt; + int pktcount = 0; + + q = xhash_get(s2s->outq, domain); + while((pkt = jqueue_pull(q)) != NULL) { + if(pkt->nad->ecur > 1 && NAD_NURI_L(pkt->nad, NAD_ENS(pkt->nad, 1)) == strlen(uri_CLIENT) && strncmp(NAD_NURI(pkt->nad, NAD_ENS(pkt->nad, 1)), uri_CLIENT, strlen(uri_CLIENT)) == 0) { + sx_nad_write(s2s->router, stanza_tofrom(stanza_tofrom(stanza_error(pkt->nad, 1, err), 1), 0)); + pktcount++; + } + else + nad_free(pkt->nad); + + jid_free(pkt->to); + jid_free(pkt->from); + free(pkt); + } + + return pktcount; +} + +int out_bounce_conn_queues(conn_t out, int err) +{ + char *c; + char *rkey; + + /* bounce queues for all domains handled by this connection - iterate through routes */ + if (xhash_iter_first(out->routes)) { + do { + xhash_iter_get(out->routes, (const char **) &rkey, NULL); + c = strchr(rkey, '/'); + c++; + out_bounce_queue(out->s2s, c, err); + } while(xhash_iter_next(out->routes)); + } + + return; } diff -ru /usr/local/src/jabberd-2.0s6/s2s/s2s.h s2s/s2s.h --- /usr/local/src/jabberd-2.0s6/s2s/s2s.h Tue Dec 7 18:38:05 2004 +++ s2s/s2s.h Tue Dec 14 09:23:22 2004 @@ -103,6 +103,7 @@ int check_queue; int check_invalid; int check_keepalive; + int check_idle; time_t last_queue_check; time_t last_invalid_check; @@ -130,6 +131,9 @@ /** incoming conns (key is stream id) */ xht in; + /** incoming conns prior to stream initiation (key is ip/port) */ + xht in_accept; + /** dns resolution cache */ xht dnscache; }; @@ -167,6 +171,9 @@ /** states of outgoing dialbacks (key is local/remote) */ xht states; + /** time of the last state change (key is local/remote) */ + xht states_time; + /** routes that this conn handles (key is local/remote) */ xht routes; @@ -175,6 +182,7 @@ int online; time_t last_activity; + time_t last_packet; }; /** one item in the dns resolution cache */