Changeset 345000


Ignore:
Timestamp:
04/24/10 20:14:49 (4 years ago)
Author:
Pavan Balaji <balaji@…>
Branches:
master
Children:
c65e43
Parents:
1aa237
Message:

[svn-r6544] For fork, allow the bootstrap server to just create a socketpair and
use it for communication instead of explicit TCP connections.

Location:
src/pm/hydra
Files:
20 edited

Legend:

Unmodified
Added
Removed
  • src/pm/hydra/pm/pmiserv/pmip.c

    ra98a989 r345000  
    168168int main(int argc, char **argv) 
    169169{ 
    170     int i, count, pid, ret_status, sent, closed; 
     170    int i, count, pid, ret_status, sent, closed, ret; 
    171171    enum HYD_pmcd_pmi_cmd cmd; 
    172172    HYD_status status = HYD_SUCCESS; 
     
    187187    HYDU_ERR_POP(status, "unable to initialize the demux engine\n"); 
    188188 
    189     /* Connect back upstream and add the socket to the demux engine */ 
    190     status = HYDU_sock_connect(HYD_pmcd_pmip.upstream.server_name, 
    191                                HYD_pmcd_pmip.upstream.server_port, 
    192                                &HYD_pmcd_pmip.upstream.control); 
    193     HYDU_ERR_POP2(status, "unable to connect to server %s at port %d\n", 
    194                   HYD_pmcd_pmip.upstream.server_name, HYD_pmcd_pmip.upstream.server_port); 
     189    /* See if HYDRA_CONTROL_FD is set before trying to connect upstream */ 
     190    ret = MPL_env2int("HYDRA_CONTROL_FD", &HYD_pmcd_pmip.upstream.control); 
     191    if (ret < 0) { 
     192        HYDU_ERR_POP(status, "error reading HYDRA_CONTROL_FD environment\n"); 
     193    } 
     194    else if (ret == 0) { 
     195        status = HYDU_sock_connect(HYD_pmcd_pmip.upstream.server_name, 
     196                                   HYD_pmcd_pmip.upstream.server_port, 
     197                                   &HYD_pmcd_pmip.upstream.control); 
     198        HYDU_ERR_POP2(status, "unable to connect to server %s at port %d\n", 
     199                      HYD_pmcd_pmip.upstream.server_name, HYD_pmcd_pmip.upstream.server_port); 
     200    } 
    195201 
    196202    status = HYDU_sock_write(HYD_pmcd_pmip.upstream.control, 
  • src/pm/hydra/pm/pmiserv/pmiserv.h

    r20e47b r345000  
    1010#include "pmi_common.h" 
    1111 
     12HYD_status HYD_pmcd_pmiserv_proxy_init_cb(int fd, HYD_event_t events, void *userp); 
    1213HYD_status HYD_pmcd_pmiserv_control_listen_cb(int fd, HYD_event_t events, void *userp); 
    1314HYD_status HYD_pmcd_pmiserv_cleanup(void); 
  • src/pm/hydra/pm/pmiserv/pmiserv_cb.c

    r20e47b r345000  
    246246} 
    247247 
    248 HYD_status HYD_pmcd_pmiserv_control_listen_cb(int fd, HYD_event_t events, void *userp) 
    249 { 
    250     int accept_fd, proxy_id, count, pgid, closed; 
     248HYD_status HYD_pmcd_pmiserv_proxy_init_cb(int fd, HYD_event_t events, void *userp) 
     249{ 
     250    int proxy_id, count, pgid, closed; 
    251251    struct HYD_pg *pg; 
    252252    struct HYD_proxy *proxy; 
    253     struct HYD_pmcd_pmi_pg_scratch *pg_scratch; 
    254     HYD_status status = HYD_SUCCESS; 
    255  
    256     HYDU_FUNC_ENTER(); 
    257  
    258     /* We got a control socket connection */ 
    259     status = HYDU_sock_accept(fd, &accept_fd); 
    260     HYDU_ERR_POP(status, "accept error\n"); 
     253    HYD_status status = HYD_SUCCESS; 
     254 
     255    HYDU_FUNC_ENTER(); 
    261256 
    262257    /* Get the PGID of the connection */ 
     
    264259 
    265260    /* Read the proxy ID */ 
    266     status = HYDU_sock_read(accept_fd, &proxy_id, sizeof(int), &count, &closed, 
     261    status = HYDU_sock_read(fd, &proxy_id, sizeof(int), &count, &closed, 
    267262                            HYDU_SOCK_COMM_MSGWAIT); 
    268263    HYDU_ERR_POP(status, "sock read returned error\n"); 
     
    285280                         "cannot find proxy with ID %d\n", proxy_id); 
    286281 
    287     pg_scratch = (struct HYD_pmcd_pmi_pg_scratch *) pg->pg_scratch; 
    288     pg_scratch->control_listen_fd = fd; 
    289  
    290282    /* This will be the control socket for this proxy */ 
    291     proxy->control_fd = accept_fd; 
     283    proxy->control_fd = fd; 
    292284 
    293285    /* Send out the executable information */ 
     
    295287    HYDU_ERR_POP(status, "unable to send exec info to proxy\n"); 
    296288 
    297     status = HYDT_dmx_register_fd(1, &accept_fd, HYD_POLLIN, proxy, control_cb); 
     289    status = HYDT_dmx_deregister_fd(fd); 
     290    HYDU_ERR_POP(status, "unable to register fd\n"); 
     291 
     292    status = HYDT_dmx_register_fd(1, &fd, HYD_POLLIN, proxy, control_cb); 
     293    HYDU_ERR_POP(status, "unable to register fd\n"); 
     294 
     295  fn_exit: 
     296    HYDU_FUNC_EXIT(); 
     297    return status; 
     298 
     299  fn_fail: 
     300    goto fn_exit; 
     301} 
     302 
     303HYD_status HYD_pmcd_pmiserv_control_listen_cb(int fd, HYD_event_t events, void *userp) 
     304{ 
     305    int accept_fd, pgid; 
     306    struct HYD_pg *pg; 
     307    struct HYD_pmcd_pmi_pg_scratch *pg_scratch; 
     308    HYD_status status = HYD_SUCCESS; 
     309 
     310    HYDU_FUNC_ENTER(); 
     311 
     312    /* Get the PGID of the connection */ 
     313    pgid = ((int) (size_t) userp); 
     314 
     315    /* Find the process group */ 
     316    for (pg = &HYD_handle.pg_list; pg; pg = pg->next) 
     317        if (pg->pgid == pgid) 
     318            break; 
     319    if (!pg) 
     320        HYDU_ERR_SETANDJUMP1(status, HYD_INTERNAL_ERROR, "could not find pg with ID %d\n", 
     321                             pgid); 
     322 
     323    pg_scratch = (struct HYD_pmcd_pmi_pg_scratch *) pg->pg_scratch; 
     324    pg_scratch->control_listen_fd = fd; 
     325 
     326    /* We got a control socket connection */ 
     327    status = HYDU_sock_accept(fd, &accept_fd); 
     328    HYDU_ERR_POP(status, "accept error\n"); 
     329 
     330    status = HYDT_dmx_register_fd(1, &accept_fd, HYD_POLLIN, userp, 
     331                                  HYD_pmcd_pmiserv_proxy_init_cb); 
    298332    HYDU_ERR_POP(status, "unable to register fd\n"); 
    299333 
  • src/pm/hydra/pm/pmiserv/pmiserv_pmci.c

    ra98a989 r345000  
    225225    char *proxy_args[HYD_NUM_TMP_STRINGS] = { NULL }, *control_port = NULL; 
    226226    char *pmi_fd = NULL; 
    227     int pmi_rank = -1, enable_stdin, ret; 
     227    int pmi_rank = -1, enable_stdin, ret, node_count, i, *control_fd; 
    228228    HYD_status status = HYD_SUCCESS; 
    229229 
     
    255255    /* Copy the host list to pass to the bootstrap server */ 
    256256    node_list = NULL; 
     257    node_count = 0; 
    257258    for (proxy = HYD_handle.pg_list.proxy_list; proxy; proxy = proxy->next) { 
    258259        HYDU_alloc_node(&node); 
     
    268269            tnode->next = node; 
    269270        } 
     271 
     272        node_count++; 
    270273    } 
    271274 
     
    287290    HYDU_ERR_POP(status, "unable to check if stdin is valid\n"); 
    288291 
    289     status = HYDT_bsci_launch_procs(proxy_args, node_list, enable_stdin, stdout_cb, 
     292    HYDU_MALLOC(control_fd, int *, node_count * sizeof(int), status); 
     293    for (i = 0; i < node_count; i++) 
     294        control_fd[i] = -1; 
     295 
     296    status = HYDT_bsci_launch_procs(proxy_args, node_list, control_fd, enable_stdin, stdout_cb, 
    290297                                    stderr_cb); 
    291298    HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n"); 
     299 
     300    for (i = 0, proxy = HYD_handle.pg_list.proxy_list; proxy; proxy = proxy->next, i++) 
     301        if (control_fd[i] != -1) { 
     302            proxy->control_fd = control_fd[i]; 
     303 
     304            status = HYDT_dmx_register_fd(1, &control_fd[i], HYD_POLLIN, (void *) (size_t) 0, 
     305                                          HYD_pmcd_pmiserv_proxy_init_cb); 
     306            HYDU_ERR_POP(status, "unable to register fd\n"); 
     307        } 
     308 
     309    HYDU_FREE(control_fd); 
    292310 
    293311  fn_exit: 
  • src/pm/hydra/pm/pmiserv/pmiserv_pmi_v1.c

    ra98a989 r345000  
    557557    HYDU_FREE(pmi_fd); 
    558558 
    559     status = HYDT_bsci_launch_procs(proxy_args, node_list, 0, HYD_handle.stdout_cb, 
     559    status = HYDT_bsci_launch_procs(proxy_args, node_list, NULL, 0, HYD_handle.stdout_cb, 
    560560                                    HYD_handle.stderr_cb); 
    561561    HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n"); 
  • src/pm/hydra/pm/pmiserv/pmiserv_pmi_v2.c

    ra98a989 r345000  
    740740    HYDU_FREE(pmi_fd); 
    741741 
    742     status = HYDT_bsci_launch_procs(proxy_args, node_list, 0, HYD_handle.stdout_cb, 
     742    status = HYDT_bsci_launch_procs(proxy_args, node_list, NULL, 0, HYD_handle.stdout_cb, 
    743743                                    HYD_handle.stderr_cb); 
    744744    HYDU_ERR_POP(status, "bootstrap server cannot launch processes\n"); 
  • src/pm/hydra/tools/bootstrap/fork/fork.h

    r0bd6b0 r345000  
    1111 
    1212HYD_status HYDT_bscd_fork_launch_procs(char **args, struct HYD_node *node_list, 
    13                                        int enable_stdin, 
     13                                       int *control_fd, int enable_stdin, 
    1414                                       HYD_status(*stdout_cb) (void *buf, int buflen), 
    1515                                       HYD_status(*stderr_cb) (void *buf, int buflen)); 
  • src/pm/hydra/tools/bootstrap/fork/fork_launch.c

    r608115 r345000  
    1313 
    1414HYD_status HYDT_bscd_fork_launch_procs(char **args, struct HYD_node *node_list, 
    15                                        int enable_stdin, 
     15                                       int *control_fd, int enable_stdin, 
    1616                                       HYD_status(*stdout_cb) (void *buf, int buflen), 
    1717                                       HYD_status(*stderr_cb) (void *buf, int buflen)) 
     
    1919    int num_hosts, idx, i, fd; 
    2020    int *pid, *fd_list; 
     21    int sockpair[2]; 
    2122    struct HYD_node *node; 
     23    struct HYD_env *env = NULL; 
    2224    char *targs[HYD_NUM_TMP_STRINGS]; 
    2325    HYD_status status = HYD_SUCCESS; 
     
    5759        targs[idx + 1] = NULL; 
    5860 
     61        if (control_fd) { 
     62            char *str; 
     63 
     64            if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockpair) < 0) 
     65                HYDU_ERR_SETANDJUMP(status, HYD_INTERNAL_ERROR, "pipe error\n"); 
     66 
     67            str = HYDU_int_to_str(sockpair[1]); 
     68            status = HYDU_env_create(&env, "HYDRA_CONTROL_FD", str); 
     69            HYDU_ERR_POP(status, "unable to create env\n"); 
     70            HYDU_FREE(str); 
     71 
     72            control_fd[i] = sockpair[0]; 
     73        } 
     74 
    5975        /* The stdin pointer will be some value for process_id 0; for 
    6076         * everyone else, it's NULL. */ 
    61         status = HYDU_create_process(targs, NULL, NULL, 
     77        status = HYDU_create_process(targs, NULL, env, 
    6278                                     ((i == 0 && enable_stdin) ? &fd_stdin : NULL), 
    6379                                     &fd_stdout, &fd_stderr, 
     
    8399                                      HYDT_bscu_inter_cb); 
    84100        HYDU_ERR_POP(status, "demux returned error registering fd\n"); 
     101 
     102        if (control_fd) { 
     103            close(sockpair[1]); 
     104            HYDU_env_free(env); 
     105        } 
    85106    } 
    86107 
  • src/pm/hydra/tools/bootstrap/include/bsci.h.in

    rb3fe85 r345000  
    3535    /** \brief Launch processes */ 
    3636    HYD_status(*launch_procs) ( 
    37         char **args, struct HYD_node *node_list, int enable_stdin, 
     37        char **args, struct HYD_node *node_list, int *control_fd, int enable_stdin, 
    3838        HYD_status(*stdout_cb) (void *buf, int buflen), 
    3939        HYD_status(*stderr_cb) (void *buf, int buflen)); 
     
    8484 * \param[in]   args            Arguments to be used for the launched processes 
    8585 * \param[in]   node_list       List of nodes to launch processes on 
    86  * \param[in[   enable_stdin    Whether to enable stdin or not 
     86 * \param[out]  control_fd      Control socket to communicate with the launched process 
     87 * \param[in]   enable_stdin    Whether to enable stdin or not 
    8788 * \param[in]   stdout_cb       Stdout callback function 
    8889 * \param[in]   stderr_cb       Stderr callback function 
     
    101102 */ 
    102103HYD_status HYDT_bsci_launch_procs( 
    103     char **args, struct HYD_node *node_list, int enable_stdin, 
     104    char **args, struct HYD_node *node_list, int *control_fd, int enable_stdin, 
    104105    HYD_status(*stdout_cb) (void *buf, int buflen), 
    105106    HYD_status(*stderr_cb) (void *buf, int buflen)); 
  • src/pm/hydra/tools/bootstrap/persist/persist_client.h

    r71ebda r345000  
    1313 
    1414HYD_status HYDT_bscd_persist_launch_procs(char **args, struct HYD_node *node_list, 
    15                                           int enable_stdin, 
     15                                          int *control_fd, int enable_stdin, 
    1616                                          HYD_status(*stdout_cb) (void *buf, int buflen), 
    1717                                          HYD_status(*stderr_cb) (void *buf, int buflen)); 
  • src/pm/hydra/tools/bootstrap/persist/persist_launch.c

    r711744 r345000  
    5959 
    6060HYD_status HYDT_bscd_persist_launch_procs(char **args, struct HYD_node *node_list, 
    61                                           int enable_stdin, 
     61                                          int *control_fd, int enable_stdin, 
    6262                                          HYD_status(*stdout_cb) (void *buf, int buflen), 
    6363                                          HYD_status(*stderr_cb) (void *buf, int buflen)) 
  • src/pm/hydra/tools/bootstrap/poe/poe.h

    r0bd6b0 r345000  
    1111 
    1212HYD_status HYDT_bscd_poe_launch_procs(char **args, struct HYD_node *node_list, 
    13                                       int enable_stdin, 
     13                                      int *control_fd, int enable_stdin, 
    1414                                      HYD_status(*stdout_cb) (void *buf, int buflen), 
    1515                                      HYD_status(*stderr_cb) (void *buf, int buflen)); 
  • src/pm/hydra/tools/bootstrap/poe/poe_launch.c

    r608115 r345000  
    6060 
    6161HYD_status HYDT_bscd_poe_launch_procs(char **args, struct HYD_node *node_list, 
    62                                       int enable_stdin, 
     62                                      int *control_fd, int enable_stdin, 
    6363                                      HYD_status(*stdout_cb) (void *buf, int buflen), 
    6464                                      HYD_status(*stderr_cb) (void *buf, int buflen)) 
  • src/pm/hydra/tools/bootstrap/rsh/rsh.h

    r0bd6b0 r345000  
    1111 
    1212HYD_status HYDT_bscd_rsh_launch_procs(char **args, struct HYD_node *node_list, 
    13                                       int enable_stdin, 
     13                                      int *control_fd, int enable_stdin, 
    1414                                      HYD_status(*stdout_cb) (void *buf, int buflen), 
    1515                                      HYD_status(*stderr_cb) (void *buf, int buflen)); 
  • src/pm/hydra/tools/bootstrap/rsh/rsh_launch.c

    r608115 r345000  
    1313 
    1414HYD_status HYDT_bscd_rsh_launch_procs(char **args, struct HYD_node *node_list, 
    15                                       int enable_stdin, 
     15                                      int *control_fd, int enable_stdin, 
    1616                                      HYD_status(*stdout_cb) (void *buf, int buflen), 
    1717                                      HYD_status(*stderr_cb) (void *buf, int buflen)) 
  • src/pm/hydra/tools/bootstrap/slurm/slurm.h

    r0bd6b0 r345000  
    1111 
    1212HYD_status HYDT_bscd_slurm_launch_procs(char **args, struct HYD_node *node_list, 
    13                                         int enable_stdin, 
     13                                        int *control_fd, int enable_stdin, 
    1414                                        HYD_status(*stdout_cb) (void *buf, int buflen), 
    1515                                        HYD_status(*stderr_cb) (void *buf, int buflen)); 
  • src/pm/hydra/tools/bootstrap/slurm/slurm_launch.c

    r608115 r345000  
    6060 
    6161HYD_status HYDT_bscd_slurm_launch_procs(char **args, struct HYD_node *node_list, 
    62                                         int enable_stdin, 
     62                                        int *control_fd, int enable_stdin, 
    6363                                        HYD_status(*stdout_cb) (void *buf, int buflen), 
    6464                                        HYD_status(*stderr_cb) (void *buf, int buflen)) 
  • src/pm/hydra/tools/bootstrap/src/bsci_launch.c

    rfd73df r345000  
    88#include "bsci.h" 
    99 
    10 HYD_status HYDT_bsci_launch_procs(char **args, struct HYD_node *node_list, int enable_stdin, 
     10HYD_status HYDT_bsci_launch_procs(char **args, struct HYD_node *node_list, int *control_fd, 
     11                                  int enable_stdin, 
    1112                                  HYD_status(*stdout_cb) (void *buf, int buflen), 
    1213                                  HYD_status(*stderr_cb) (void *buf, int buflen)) 
     
    1617    HYDU_FUNC_ENTER(); 
    1718 
    18     status = HYDT_bsci_fns.launch_procs(args, node_list, enable_stdin, stdout_cb, stderr_cb); 
     19    status = HYDT_bsci_fns.launch_procs(args, node_list, control_fd, enable_stdin, 
     20                                        stdout_cb, stderr_cb); 
    1921    HYDU_ERR_POP(status, "bootstrap device returned error while launching processes\n"); 
    2022 
  • src/pm/hydra/tools/bootstrap/ssh/ssh.h

    r1aba3bd r345000  
    2626 
    2727HYD_status HYDT_bscd_ssh_launch_procs(char **args, struct HYD_node *node_list, 
    28                                       int enable_stdin, 
     28                                      int *control_fd, int enable_stdin, 
    2929                                      HYD_status(*stdout_cb) (void *buf, int buflen), 
    3030                                      HYD_status(*stderr_cb) (void *buf, int buflen)); 
  • src/pm/hydra/tools/bootstrap/ssh/ssh_launch.c

    r00006d r345000  
    8686 
    8787HYD_status HYDT_bscd_ssh_launch_procs(char **args, struct HYD_node *node_list, 
    88                                       int enable_stdin, 
     88                                      int *control_fd, int enable_stdin, 
    8989                                      HYD_status(*stdout_cb) (void *buf, int buflen), 
    9090                                      HYD_status(*stderr_cb) (void *buf, int buflen)) 
Note: See TracChangeset for help on using the changeset viewer.