/* * The DotSlash daemon (dotsd), called by mod_dots.c * * (c) Columbia University, 2004-2006, All Rights Reserved. * Author: Weibin Zhao */ #include "mod_dots.h" /*--------------------------- Candidate ---------------------------------*/ static dots_cnode_t *alloc_candidate() { int i; dots_cnode_t *p = NULL; pthread_mutex_lock(&ctable_mutex); for (i=0; icount; i++) { if (ctable->entry[i].status == CS_Ready) { ctable->entry[i].status = CS_Used; p = &ctable->entry[i]; ctable->nfree--; break; } } pthread_mutex_unlock(&ctable_mutex); return p; } static void release_candidate(char *name) { int i; pthread_mutex_lock(&ctable_mutex); for (i=0; icount; i++) { if (strcmp(ctable->entry[i].name, name) == 0 && ctable->entry[i].status == CS_Used) { ctable->entry[i].status = CS_Ready; ctable->nfree++; break; } } pthread_mutex_unlock(&ctable_mutex); } static void remove_candidate(char *name) { int i; pthread_mutex_lock(&ctable_mutex); for (i=0; icount; i++) { if (strcmp(ctable->entry[i].name, name) == 0) { ctable->entry[i].status = CS_Busy; /* bad candidate */ break; } } pthread_mutex_unlock(&ctable_mutex); } void reset_ctable() { ctable->count = 0; ctable->nfree = 0; } void append_ctable(char *url, char *attr) { char *p, *q; char name[MAXNAME], ip[MAXADDR]; int i, count, port, link_rate, is_new_entry = 1; double cpu_rate; p = strstr(url, "://"); /* has scheme? */ if (p == NULL) { p = url; /* no scheme */ } else { p += 3; /* skip the scheme and :// */ } q = strchr(p, ':'); /* has port? */ if (q == NULL) { strcpy(name, p); /* no port */ port = Default_DotsdPort; /* use default port */ } else { memcpy(name, p, q-p); /* get name without port */ name[q-p] = '\0'; port = atoi(q+1); /* get port */ } if (strcmp(my_fullname, name) == 0) return; /* skip my own record */ p = strstr(attr, "LinkRate="); if (p == NULL) { link_rate = 500; /* use some default rescue rate */ } else { link_rate = atoi(p+9); } p = strstr(attr, "CpuRate="); if (p == NULL) { cpu_rate = 0.1; /* use some default rescue rate */ } else { cpu_rate = atof(p+8); } p = strstr(attr, "IP="); /* get static IP address */ if (p == NULL) { strcpy(ip, "NULL"); } else { p += 3; /* skip "IP=" to find the beginning of IP */ q = strchr(p, ')'); /* find the end of IP */ memcpy(ip, p, q-p); ip[q-p] = '\0'; } pthread_mutex_lock(&ctable_mutex); for (i=0; icount; i++) { if (strcmp(ctable->entry[i].name, name) == 0) { ctable->entry[i].link_rate = link_rate; /* update rate */ ctable->entry[i].cpu_rate = cpu_rate; /* update rate */ if (ctable->entry[i].status == CS_Busy) { ctable->entry[i].status = CS_Ready; /* update status */ ctable->nfree++; } is_new_entry = 0; break; } } if (is_new_entry == 1) { count = ctable->count; strcpy(ctable->entry[count].name, name); strcpy(ctable->entry[count].ip, ip); ctable->entry[count].port = port; ctable->entry[count].link_rate = link_rate; ctable->entry[count].cpu_rate = cpu_rate; ctable->entry[count].status = CS_Ready; ctable->count++; ctable->nfree++; } pthread_mutex_unlock(&ctable_mutex); } /*---------------------------- Ptable ------------------------------*/ void init_ptable() { int i; ptable->status = S_Normal; ptable->qcache_on = 0; ptable->alloc_ttl = 0; ptable->prob_update_ttl = 0; ptable->resc_idle_ttl = 0; ptable->need_resc_cap = 0; ptable->no_resc_cap = 0; ptable->redi_prob = 0; ptable->pused = NULL; ptable->pfree = (dots_pnode_t *) &ptable->heap[0]; for (i=0; iheap[i].next = (dots_pnode_t *) &(ptable->heap[i+1]); } else { ptable->heap[i].next = NULL; } } } void reset_ptable() { dots_pnode_t *p; p = ptable->pused; while (p != NULL) { if (p->sockfd != -1) close(p->sockfd); p = p->next; } init_ptable(); } static void set_status(int status) { apr_proc_mutex_lock(ptable_mutex); ptable->status = status; apr_proc_mutex_unlock(ptable_mutex); } static void update_status() { dots_pnode_t *p; apr_proc_mutex_lock(ptable_mutex); p = ptable->pused; while (p != NULL) { if (p->type == C_Rescue || p->type == C_Origin) break; p = p->next; } if (p == NULL) ptable->status = S_Normal; // if no C_Rescue/C_Origin apr_proc_mutex_unlock(ptable_mutex); } static void set_rate(dots_pnode_t *p, int redi_rate, int redi_data_rate) { p->redi_rate = redi_rate; if (redi_rate != 0) { p->cpu_lw = 1.0/(redi_rate * ControlInterval); } else { p->cpu_lw = 1.0; ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "redi_rate is zero"); } p->cpu_lf = 0; p->num_resc = 0; p->redi_data_rate = redi_data_rate; if (redi_data_rate != 0) { p->link_lw = 1.0/(redi_data_rate * ControlInterval * 1000); } else { p->link_lw = 1.0; ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "redi_data_rate is zero"); } p->link_lf = 0; p->resc_bytes = 0; } static void set_pnode(dots_pnode_t *p, char *orig_serv, char *orig_ip, int orig_port, char *resc_serv, char *resc_ip, int resc_port, int redi_rate, int redi_data_rate, int keep_alive, int qcache_ttl) { apr_proc_mutex_lock(ptable_mutex); if (p->type == C_Origin) { strcpy(p->orig_serv, orig_serv); strcpy(p->orig_ip, orig_ip); p->orig_port = orig_port; } strcpy(p->resc_serv, resc_serv); strcpy(p->resc_ip, resc_ip); p->resc_port = resc_port; set_rate(p, redi_rate, redi_data_rate); p->rate_update_ttl = 0; p->orig_idle_ttl = 0; p->pastorig_ttl = 0; if (p->type == C_Rescue) { p->ka_timeout = keep_alive + 20; p->recv_ka_ttl = p->ka_timeout; } else if (p->type == C_Origin) { p->ka_interval = keep_alive; p->send_ka_ttl = keep_alive; } p->qcache_ttl = qcache_ttl; p->set_done = 1; apr_proc_mutex_unlock(ptable_mutex); } static void release_pnode(dots_pnode_t *p) { dots_pnode_t *q; if (p == NULL) return; /* nothing to release */ if (dns_rr_set && p->dns_rr_added) { // remove rescue entry for DNS-RR dns_update(DNS_DEL, my_fullname, p->resc_ip); p->dns_rr_added = 0; } apr_proc_mutex_lock(ptable_mutex); if (p == ptable->pused) { ptable->pused = p->next; } else { q = ptable->pused; while (q != NULL && q->next != p) q = q->next; if (q != NULL) q->next = p->next; } p->next = ptable->pfree; ptable->pfree = p; apr_proc_mutex_unlock(ptable_mutex); } static dots_pnode_t *alloc_pnode(int sockfd, int type, char *candidate) { dots_pnode_t *p = NULL; apr_proc_mutex_lock(ptable_mutex); if (ptable->pfree != NULL) { p = ptable->pfree; ptable->pfree = ptable->pfree->next; p->next = ptable->pused; ptable->pused = p; p->type = type; p->sockfd = sockfd; p->candidate = candidate; // for release purpose p->msg_seqno = 1; p->req_seqno = -1; // no used yet p->set_done = 0; } apr_proc_mutex_unlock(ptable_mutex); return p; } static void remove_conn(dots_pnode_t *p) { char cmd[MAXLINE]; if (p->type == C_Rescue) { release_candidate(p->candidate); release_pnode(p); } else if (p->type == C_Origin) { if (p->set_done == 0) { release_pnode(p); } else { apr_proc_mutex_lock(ptable_mutex); p->sockfd = -1; p->type = C_PastOrig; p->pastorig_ttl = 0; apr_proc_mutex_unlock(ptable_mutex); sprintf(cmd, "rm -rf %s/%s", ScriptRoot, p->orig_serv); system(cmd); } } update_status(); } /*--------------- Create a DNS alias for this machine ---------------*/ static char *new_alias() { static char alias[MAXNAME]; shmptr->alias_alloc_num++; sprintf(alias, "vh%d.%s", shmptr->alias_alloc_num, my_aliasbase); if (shmptr->alias_alloc_num > shmptr->alias_reg_num) { //need to update DNS dns_update(DNS_ADD, alias, my_ip); shmptr->alias_reg_num = shmptr->alias_alloc_num; } return alias; } /*--------- Process requests from peer Dotsds over the Internet ------*/ static void *tcp_handler(void *arg) { dots_pnode_t *p = arg; int sockfd = p->sockfd; int i, n, len, msg_seqno, rep_code; char mesg[MAXLINE]; char *opcmd, *s; char *orig_serv, *orig_ip, *resc_serv, *resc_ip; int orig_port, resc_port, redi_rate, redi_data_rate, keepalive; int qcache_ttl; char peername[MAXNAME]; struct sockaddr_in peeraddr; struct hostent *hp; pthread_detach(pthread_self()); len = sizeof(peeraddr); getpeername(sockfd, (struct sockaddr *) &peeraddr, &len); hp = gethostbyaddr((char *) &peeraddr.sin_addr, sizeof(peeraddr.sin_addr), AF_INET); strcpy(peername, hp->h_name); ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "[start] peering with %s", peername); for (;;) { if ((n=readline(sockfd, mesg, MAXLINE)) == 0) break; ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "[From %s] %s", peername, mesg); /* get msg_seqno, cmd_str/reply_code */ i = 0; s = get_token(mesg, &i); if (s == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "No message sequence number"); continue; } msg_seqno = atoi(s); opcmd = get_token(mesg, &i); if (opcmd == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "No command string or response code"); continue; } /* process the request or response */ if (strcasecmp(opcmd, "SHUTDOWN") == 0) { s = get_token(mesg, &i); if (s == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing shutdown reason"); } else { if (strncasecmp(s, "overload", 8) == 0) { remove_candidate(p->candidate); ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "remove_candidate: %s", p->candidate); } } break; } else if (strcasecmp(opcmd, "KEEPALIVE") == 0) { apr_proc_mutex_lock(ptable_mutex); p->recv_ka_ttl = p->ka_timeout; apr_proc_mutex_unlock(ptable_mutex); } else if (strcasecmp(opcmd, "RATE") == 0) { s = get_token(mesg, &i); if (s == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing RedirectRate"); continue; } redi_rate = atoi(s); s = get_token(mesg, &i); if (s == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing RedirectDataRate"); continue; } redi_data_rate = atoi(s); apr_proc_mutex_lock(ptable_mutex); set_rate(p, redi_rate, redi_data_rate); apr_proc_mutex_unlock(ptable_mutex); } else if (strcasecmp(opcmd, "SOS") == 0) { if (ptable->status != S_Normal || ometer->cpu_load > cpu_optload || ometer->link_load > link_optload) { /* * can be a rescue server for ONLY one origin server for now */ sprintf(mesg, "%d %d Reject\n", msg_seqno, A_Reject); write(sockfd, mesg, strlen(mesg)); ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "[To %s] %s", peername, mesg); break; } else { orig_serv = get_token(mesg, &i); if (orig_serv == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing server name for SOS"); continue; } orig_ip = get_token(mesg, &i); if (orig_ip == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing server IP for SOS"); continue; } s = get_token(mesg, &i); if (s == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing server port for SOS"); continue; } orig_port = atoi(s); s = get_token(mesg, &i); if (s == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing KeepAliveTTL"); } keepalive = atoi(s); s = get_token(mesg, &i); if (s == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing qcache ttl for SOS"); continue; } qcache_ttl = atoi(s); resc_serv = new_alias(); redi_rate = (int) ((cpu_upper_thd - ometer->cpu_load) * MaxDreqRate); redi_data_rate = (int) ((link_upper_thd - ometer->link_load) * MaxDataRate); set_pnode(p, orig_serv, orig_ip, orig_port, resc_serv, my_ip, ApachePort, redi_rate, redi_data_rate, keepalive, qcache_ttl); set_status(S_Rescue); sprintf(mesg, "%d %d OK %s %s %d %d %d\n", msg_seqno, A_OK, resc_serv, my_ip, ApachePort, redi_rate, redi_data_rate); write(sockfd, mesg, strlen(mesg)); ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "[To %s] %s", peername, mesg); } } else if (isdigit(opcmd[0])) { /* a response */ rep_code = atoi(opcmd); if (p->req_seqno==msg_seqno && strcasecmp(p->req_cmd, "SOS")==0) { s = get_token(mesg, &i); /* response string */ if (s == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing response string"); } if (rep_code == A_OK) { /* OK */ resc_serv = get_token(mesg, &i); if (resc_serv == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing rescue server alias"); } resc_ip = get_token(mesg, &i); if (resc_ip == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing rescue server IP"); } s = get_token(mesg, &i); if (s == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing rescue server port"); } resc_port = atoi(s); s = get_token(mesg, &i); if (s == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing RedirectRate"); } redi_rate = atoi(s); s = get_token(mesg, &i); if (s == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "Missing RedirectDataRate"); } redi_data_rate = atoi(s); set_pnode(p, NULL, NULL, 0, resc_serv, resc_ip, resc_port, redi_rate, redi_data_rate, KeepAliveTTL, QCacheTTL); if (dns_rr_set) { // add rescue entry for DNS-RR dns_update(DNS_ADD, my_fullname, resc_ip); p->dns_rr_added = 1; } if (ptable->status == S_Normal) set_status(S_SOS); } else { remove_candidate(p->candidate); ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "Rescue request is rejected, remove_candidate: %s", p->candidate); break; } } } } close(sockfd); remove_conn(p); ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "[end] peering with %s", peername); return (NULL); } /*------- UDP Server: Process (reset) requests from local Apache --------*/ static void *udp_server(void *dummy) { int udpfd, i, n, clilen; char mesg[MAXLINE]; struct sockaddr_in cliaddr, servaddr; char *opcmd, *opdata, *answer; udpfd = socket(AF_INET, SOCK_DGRAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(DotsdPort); bind(udpfd, (SA *)&servaddr, sizeof(servaddr)); for (;;) { clilen = sizeof(cliaddr); n = recvfrom(udpfd, mesg, MAXLINE, 0, (SA *)&cliaddr, &clilen); mesg[n] = 0; i = 0; opcmd = get_token(mesg, &i); if (opcmd == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "No command string"); continue; } if (strcasecmp(opcmd, "RESET") == 0) { reset_ometer(); reset_ptable(); reset_ctable(); } } } /*-------------------------- ControlInterval Tasks ---------------------*/ /* * Rescue server allocation */ static void new_rescserv() { dots_pnode_t *p; dots_cnode_t *pc; /* point to allocated cancdidate */ pthread_t tid; int sockfd, rv; struct sockaddr_in servaddr; char request[MAXLINE], addrstr[MAXADDR]; struct hostent *hptr; pc = alloc_candidate(); if (pc == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "No more rescue server candidates"); return; } sockfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(DotsdPort); inet_pton(AF_INET, pc->ip, &servaddr.sin_addr); // use rescue IP directly /* // map pc->name to pc->ip hptr = gethostbyname(pc->name); if (hptr == NULL) { ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "Cannot find host information for %s", pc->name); return; } inet_ntop(hptr->h_addrtype, hptr->h_addr_list[0], addrstr, sizeof(addrstr)); inet_pton(AF_INET, addrstr, &servaddr.sin_addr); */ rv = connect(sockfd, (SA *)&servaddr, sizeof(servaddr)); if (rv < 0) { ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "Connection error to %s", pc->name); remove_candidate(pc->name); return; } p = alloc_pnode(sockfd, C_Rescue, pc->name); if (p == NULL) { ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "alloc_pnode error"); return; } sprintf(request, "%d SOS %s %s %d %d %d\n", p->msg_seqno, my_fullname, my_ip, ApachePort, KeepAliveTTL, QCacheTTL); write(p->sockfd, request, strlen(request)); strcpy(p->req_cmd, "SOS"); p->req_seqno = p->msg_seqno; p->msg_seqno++; ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "[To %s] %s", pc->name, request); pthread_create(&tid, NULL, &tcp_handler, (void *)p); } /* * Check link utilization for outbound HTTP traffic */ static void check_ometer() { int curr, bytes; static int first_time = 1; double duration; curr = ometer->curr; gettimeofday(&ometer->link[curr].ts, NULL); apr_proc_mutex_lock(ometer_mutex); ometer->link[curr].out = ometer->reply_bytes; if (first_time == 1) { ometer->link[curr].load = 1.0 * ometer->link[curr].out / ControlInterval / 1000 / MaxDataRate; ometer->link_load = ometer->link[curr].load; first_time = 0; } else { bytes = ometer->link[curr].out - ometer->link[!curr].out; if (bytes > 0) { duration = ometer->link[curr].ts.tv_sec - ometer->link[!curr].ts.tv_sec + 1.0 * (ometer->link[curr].ts.tv_usec - ometer->link[!curr].ts.tv_usec) / 1000000; ometer->link[curr].load = 1.0 * bytes / duration / 1000 / MaxDataRate; } else { ometer->link[curr].load = 0; } ometer->link_load = dots_alpha * ometer->link_load + (1 - dots_alpha) * ometer->link[curr].load; } if (ometer->link_load < 0.0001) ometer->link_load = 0; apr_proc_mutex_unlock(ometer_mutex); } /* * check CPU utilization based on /proc/stat, only works on Linux */ static void check_cpu() { FILE *fp; char line[80]; static int first_time = 1; unsigned long long user, nice, system, idle, used; int curr = ometer->curr; fp = fopen("/proc/stat", "r"); if (fp == NULL) { ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "open error for /proc/stat: %d", errno); return; } if (fgets(line, 80, fp) != NULL) { sscanf(line + 5, "%llu %llu %llu %llu", &(ometer->cpu[curr].user), &(ometer->cpu[curr].nice), &(ometer->cpu[curr].system), &(ometer->cpu[curr].idle)); fclose(fp); } else { fclose(fp); ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "fgets error for /proc/stat: %d", errno); return; } ometer->cpu_uticks = ometer->cpu[curr].user + ometer->cpu[curr].nice + ometer->cpu[curr].system; ometer->cpu_aticks = ometer->cpu_uticks + ometer->cpu[curr].idle; if (first_time == 1) { used = ometer->cpu_uticks; ometer->cpu[curr].load = 1.0 * used / (used + ometer->cpu[curr].idle); ometer->cpu_load = ometer->cpu[curr].load; first_time = 0; return; } user = ometer->cpu[curr].user - ometer->cpu[!curr].user; nice = ometer->cpu[curr].nice - ometer->cpu[!curr].nice; system = ometer->cpu[curr].system - ometer->cpu[!curr].system; idle = ometer->cpu[curr].idle - ometer->cpu[!curr].idle; used = user + nice + system; ometer->cpu[curr].load = 1.0 * used / (used + idle); ometer->cpu_load = dots_alpha * ometer->cpu_load + (1 - dots_alpha) * ometer->cpu[curr].load; if (ometer->cpu_load < 0.0001) ometer->cpu_load = 0; } /* * Set redirect probability and alloc rescue servers, for origin server only */ static void set_redi_prob() { float beta, beta1, new_prob; if (ptable->alloc_ttl > 0) ptable->alloc_ttl--; if (ptable->prob_update_ttl > 0) ptable->prob_update_ttl--; switch (ptable->status) { // only for S_Normal and S_SOS case S_Normal: // alloc the initial rescue server? if (ptable->alloc_ttl == 0 && ((cpu_monitor && ometer->cpu_load > cpu_optload && ometer->link_load > 0.01) || ometer->link_load > link_optload)) { ptable->alloc_ttl = Default_ControlTTL; if (ctable->nfree > 0) { new_rescserv(); ptable->redi_prob = 0.5; ptable->resc_idle_ttl = 0; } else { ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "No rescue server available"); } } if (ptable->qcache_on == 0 && ((cpu_monitor && ometer->cpu_load > cpu_optload && ometer->link_load > 0.01) || ometer->link_load > link_optload)) { ptable->qcache_on = 1; ptable->qcache_ttl = Default_ControlTTL; ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "QCache has been activated in Normal state"); } else if (ptable->qcache_on == 1 && ometer->cpu_load < cpu_lower_thd && ometer->link_load < link_lower_thd) { ptable->qcache_ttl--; if (ptable->qcache_ttl <= 0) { ptable->qcache_on = 0; ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "QCache has been de-activated in Normal state"); } } break; case S_SOS: // alloc a new rescue server? if (ptable->need_resc_cap == 1 || ptable->no_resc_cap == 1) { apr_proc_mutex_lock(ptable_mutex); ptable->need_resc_cap = 0; ptable->no_resc_cap = 0; apr_proc_mutex_unlock(ptable_mutex); if (ptable->alloc_ttl == 0 && ((cpu_monitor && ometer->cpu_load > cpu_optload) || ometer->link_load > link_optload)) { ptable->alloc_ttl = Default_ControlTTL; if (ctable->nfree > 0) { new_rescserv(); } else { ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "No more rescue server available"); } } } if (((cpu_monitor && ometer->cpu_load > cpu_upper_thd && ometer->link_load > 0.01) || ometer->link_load > link_upper_thd) && ptable->redi_prob < 1 && ptable->prob_update_ttl == 0) { // increase redi_prob if (ptable->redi_prob > 0.99) { ptable->redi_prob = 1; } else if (ptable->redi_prob < 0.5) { ptable->redi_prob = 0.5; } else { beta = ometer->cpu_load / cpu_optload; beta1 = ometer->link_load / link_optload; if (beta1 > beta) beta = beta1; new_prob = beta * ptable->redi_prob; if (new_prob > 1) new_prob = 1; ptable->redi_prob = dots_alpha * ptable->redi_prob + (1 - dots_alpha) * new_prob; } ptable->prob_update_ttl = Default_ControlTTL; } else if (ometer->cpu_load < cpu_lower_thd && ometer->link_load < link_lower_thd && ptable->redi_prob > 0 && ptable->prob_update_ttl == 0) { // decrease redi_prob beta = ometer->cpu_load / cpu_lower_thd; beta1 = ometer->link_load / link_lower_thd; if (beta1 > beta) beta = beta1; if (ptable->redi_prob < 0.1) { ptable->redi_prob = 0; } else { new_prob = beta * ptable->redi_prob; ptable->redi_prob = dots_alpha * ptable->redi_prob + (1 - dots_alpha) * new_prob; } ptable->prob_update_ttl = Default_ControlTTL; } } if (ptable->prob_update_ttl == Default_ControlTTL) ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "redi_prob=%5.3f", ptable->redi_prob); } static void shutdown_orig(dots_pnode_t *p, char *reason) { char request[MAXLINE], cmd[MAXLINE]; sprintf(request, "%d SHUTDOWN %s\n", p->msg_seqno, reason); write(p->sockfd, request, strlen(request)); p->msg_seqno++; ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "[To %s] %s", p->orig_serv, request); close(p->sockfd); p->sockfd = -1; p->type = C_PastOrig; p->pastorig_ttl = 0; sprintf(cmd, "rm -rf %s/%s", ScriptRoot, p->orig_serv); system(cmd); } /* * S_SOS: cleanup cpu_lf and link_lf for each rescue server * S_Rescue: cleanup num_resc and resc_bytes for each origin server */ static void check_ptable() { float beta, new_rate; char request[MAXLINE]; dots_pnode_t *p, *pp, *q; switch (ptable->status) { // only for S_SOS and S_Rescue case S_SOS: if (ptable->redi_prob == 0) { // no HTTP redirect apr_proc_mutex_lock(ptable_mutex); ptable->resc_idle_ttl++; if (ptable->resc_idle_ttl >= RescueIdleTTL) {//release rescue server ptable->resc_idle_ttl = 0; p = ptable->pused; while (p != NULL) { sprintf(request, "%d SHUTDOWN idle\n", p->msg_seqno); write(p->sockfd, request, strlen(request)); p->msg_seqno++; ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "[To %s] %s", p->resc_serv, request); close(p->sockfd); p = p->next; } init_ptable(); } apr_proc_mutex_unlock(ptable_mutex); } else { // has HTTP redirects apr_proc_mutex_lock(ptable_mutex); ptable->resc_idle_ttl = 0; p = ptable->pused; while (p != NULL) { if (p->cpu_lf > 0 || p->link_lf > 0) { ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "resc_serv=%s, cpu_lf=%5.3f, link_lf=%5.3f", p->resc_serv, p->cpu_lf, p->link_lf); } p->cpu_lf = 0; p->link_lf = 0; if (p->set_done == 1) { if (p->recv_ka_ttl > 0) p->recv_ka_ttl--; else if (p->recv_ka_ttl == 0) { sprintf(request, "%d SHUTDOWN timeout\n", p->msg_seqno); write(p->sockfd, request, strlen(request)); p->msg_seqno++; close(p->sockfd); remove_candidate(p->candidate); ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "[To %s] %s, remove_candidate: %s", p->resc_serv, request, p->candidate); q = p; // to be released if (p == ptable->pused) ptable->pused = p->next; else pp->next = p->next; p = p->next; // move the node from used to free q->next = ptable->pfree; ptable->pfree = q; continue; } } pp = p; p = p->next; } apr_proc_mutex_unlock(ptable_mutex); } break; default: // S_Rescue or S_Normal apr_proc_mutex_lock(ptable_mutex); p = ptable->pused; while (p != NULL) { if (p->type == C_Origin && p->set_done == 1) { if (p->rate_update_ttl > 0) p->rate_update_ttl--; if (p->send_ka_ttl > 0) p->send_ka_ttl--; else if (p->send_ka_ttl == 0) { sprintf(request, "%d KEEPALIVE\n", p->msg_seqno); write(p->sockfd, request, strlen(request)); p->msg_seqno++; ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "[To %s] %s", p->orig_serv, request); p->send_ka_ttl = p->ka_interval; } if (p->resc_bytes > 0) { p->orig_idle_ttl = 0; ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "orig_serv=%s, num_resc=%d, resc_bytes=%d", p->orig_serv, p->num_resc, p->resc_bytes); // check cpu_load and take actions if (cpu_monitor && ometer->cpu_load > cpu_upper_thd) { if (p->redi_rate > 0) { // dec if (p->rate_update_ttl == 0) { beta = ometer->cpu_load / cpu_optload; if (beta > 2) beta = 2; new_rate = p->redi_rate / beta; p->redi_rate = (int) (dots_alpha * p->redi_rate + (1 - dots_alpha) * new_rate); p->rate_update_ttl = Default_ControlTTL; } } else shutdown_orig(p, "overload"); } else if (cpu_monitor && ometer->cpu_load < cpu_lower_thd) { if (p->rate_update_ttl == 0 && // inc p->num_resc/ControlInterval > dots_gamma*p->redi_rate) { if (ometer->cpu_load < 0.5*cpu_optload) { beta = 0.5; } else { beta = ometer->cpu_load / cpu_optload; } new_rate = p->redi_rate / beta; p->redi_rate = (int) (dots_alpha * p->redi_rate + (1 - dots_alpha) * new_rate); p->rate_update_ttl = Default_ControlTTL; } } // check link_load and take actions if (ometer->link_load > link_upper_thd) { if (p->redi_data_rate > 0) { // dec if (p->rate_update_ttl == 0) { beta = ometer->link_load / link_optload; if (beta > 2) beta = 2; new_rate = p->redi_data_rate / beta; p->redi_data_rate = (int) (dots_alpha * p->redi_data_rate + (1 - dots_alpha) * new_rate); p->rate_update_ttl = Default_ControlTTL; } } else shutdown_orig(p, "overload"); } else if (ometer->link_load < link_lower_thd) { // inc if (p->rate_update_ttl == 0 && p->resc_bytes/ControlInterval/1000 > dots_gamma*p->redi_data_rate) { if (ometer->link_load < 0.5*link_optload) { beta = 0.5; } else { beta = ometer->link_load / link_optload; } new_rate = p->redi_data_rate / beta; p->redi_data_rate = (int) (dots_alpha * p->redi_data_rate + (1 - dots_alpha) * new_rate); p->rate_update_ttl = Default_ControlTTL; } } if (p->rate_update_ttl == Default_ControlTTL) { sprintf(request, "%d RATE %d %d\n", p->msg_seqno, p->redi_rate, p->redi_data_rate); write(p->sockfd, request, strlen(request)); p->msg_seqno++; ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "[To %s] %s", p->orig_serv, request); } p->num_resc = 0; p->resc_bytes = 0; } else { p->orig_idle_ttl++; if (p->orig_idle_ttl > OriginIdleTTL) { shutdown_orig(p, "idle"); } } } else if (p->type == C_PastOrig) { p->pastorig_ttl++; if (p->pastorig_ttl > PastOriginTTL) { /* release p */ if (p == ptable->pused) ptable->pused = p->next; else pp->next = p->next; p->next = ptable->pfree; ptable->pfree = p; } } pp = p; p = p->next; } apr_proc_mutex_unlock(ptable_mutex); if (ptable->status == S_Rescue) update_status(); } } static void do_slp_register() { int link_rate = 0; float cpu_rate = 0; if (link_upper_thd > ometer->link_load) link_rate = (int) ((link_upper_thd - ometer->link_load) * MaxDataRate); if (cpu_upper_thd > ometer->cpu_load) cpu_rate = cpu_upper_thd - ometer->cpu_load; slp_register(my_fullname, DotsdPort, link_rate, cpu_rate); } /* * Service Registration and Service Discovery */ static void check_slp() { static regTTL = 10; static disTTL = 10; regTTL--; if (regTTL <= 0) { regTTL = RegisterTTL; do_slp_register(); } disTTL--; if (disTTL <= 0) { disTTL = DiscoverTTL; slp_discover(0, 0.1); } } /* * Timer thread: do something in a fixed interval */ static void *timer_server(void *dummy) { static char alias[MAXNAME]; ometer->curr = 0; for (;;) { check_ometer(); if (ometer->link[ometer->curr].load > 0) { // HAS HTTP requests check_cpu(); ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "cpu_load: %5.3f, %5.3f, link_load: %5.3f, %5.3f", ometer->cpu[ometer->curr].load, ometer->cpu_load, ometer->link[ometer->curr].load, ometer->link_load); } set_redi_prob(); // for origin server only check_ptable(); if (srvloc_set == 1) check_slp(); if (ptable->status == S_Normal && shmptr->alias_reg_num - shmptr->alias_alloc_num < alias_pool_size) { shmptr->alias_reg_num++; sprintf(alias, "vh%d.%s", shmptr->alias_reg_num, my_aliasbase); dns_update(DNS_ADD, alias, my_ip); } ometer->curr ^= 1; sleep(ControlInterval); } } /*---------------------------------- Main ---------------------------*/ static int lock_reg(int fd, int cmd, int type, off_t offset, int whence, off_t len) { struct flock lock; lock.l_type = type; /* F_RDLCK, F_WRLCK, F_UNLCK */ lock.l_start = offset; /* byte offset, relative to l_whence */ lock.l_whence = whence; /* SEEK_SET, SEEK_CUR, SEEK_END */ lock.l_len = len; /* #bytes (0 means to EOF) */ return( fcntl(fd, cmd, &lock) ); } // Set my_ip based on my_staticname int init_my_ip() { char **p; struct hostent *hp; struct in_addr in; // try domain name first hp = gethostbyname(my_staticname); if (hp == NULL) { // also try IP address if domain name fail gethostbyaddr(my_staticname, strlen(my_staticname), AF_INET); if (hp == NULL) { ap_log_error(APLOG_MARK, APLOG_CRIT, 0, NULL, "cannot resolve: %s", my_staticname); strcpy(my_ip, "NULL"); return; } } p = hp->h_addr_list; memcpy(&in.s_addr, *p, sizeof(in.s_addr)); sprintf(my_ip, "%s", inet_ntoa(in)); strcpy(shmptr->my_ip, my_ip); } int init_srvloc() { char ip[MAXADDR]; char **p; struct hostent *hp; if (SrvlocServerSet == 0) { // discover default mSLP DA via DNS SRV ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "Discover default mSLP DA via DNS SRV"); dns_srv_query("_slpda._tcp.dot-slash.net.", SrvlocServer, &SrvlocPort); shmptr->srvloc_port = SrvlocPort; strcpy(shmptr->srvloc_server, SrvlocServer); } srvloc_sockfd = socket(AF_INET, SOCK_DGRAM, 0); bzero(&srvloc_servaddr, sizeof(srvloc_servaddr)); srvloc_servaddr.sin_family = AF_INET; srvloc_servaddr.sin_port = htons(SrvlocPort); hp = gethostbyname(SrvlocServer); if (hp == NULL) { hp = gethostbyaddr(SrvlocServer, strlen(SrvlocServer), AF_INET); if (hp == NULL) { ap_log_error(APLOG_MARK, APLOG_CRIT, 0, NULL, "cannot resolve: %s", SrvlocServer); srvloc_sockfd = -1; // no service discovery return; } } // list of IP addresses p = hp->h_addr_list; // use the first IP address memcpy(&srvloc_servaddr.sin_addr, *p, sizeof(srvloc_servaddr.sin_addr)); } void dotsd(char *pidfile) { int fd, listenfd, connfd, rv, clilen; char buf[10], cmd[64]; struct sockaddr_in cliaddr, servaddr; pthread_t tid; dots_pnode_t *p; if ( (fd = open(pidfile, O_WRONLY | O_CREAT, FILE_MODE)) < 0) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "dotsd: open error"); return; } /* try and set a write lock on the entire file */ if (write_lock(fd, 0, SEEK_SET, 0) < 0) { if (errno == EACCES || errno == EAGAIN) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "dotsd running"); return; } else { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "write_lock error"); return; } } /* truncate to zero length, now that we have the lock */ if (ftruncate(fd, 0) < 0) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "dotsd: ftruncate error"); return; } /* and write our process ID */ sprintf(buf, "%d\n", getpid()); if (write(fd, buf, strlen(buf)) != strlen(buf)) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "dotsd: write error"); return; } ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "Dostd has been started"); // initialize my_ip based on my_staticname init_my_ip(); // initialize service discovery and perform initial registration if (srvloc_set == 1) { init_srvloc(); do_slp_register(); } else { ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, NULL, "Service discovery has not been enabled"); } pthread_create(&tid, NULL, &udp_server, NULL); pthread_create(&tid, NULL, &timer_server, NULL); listenfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(DotsdPort); rv = bind(listenfd, (SA *)&servaddr, sizeof(servaddr)); if (rv == -1) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "dotsd: bind error"); return; } listen(listenfd, 5); for (;;) { clilen = sizeof(cliaddr); connfd = accept(listenfd, (SA *)&cliaddr, &clilen); if (connfd < 0) { if (errno == EINTR) continue; else { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "accept error"); break; } } p = alloc_pnode(connfd, C_Origin, NULL); if (p == NULL) { ap_log_error(APLOG_MARK, APLOG_ERR, 0, NULL, "alloc_pnode error"); break; } pthread_create(&tid, NULL, &tcp_handler, (void *)p); } }