Scopira 20080306

clusteragent.h

00001 
00002 /*
00003  *  Copyright (c) 2005    National Research Council
00004  *
00005  *  All rights reserved.
00006  *
00007  *  This material is confidential and proprietary information of
00008  *  National Research Council Canada ("Confidential Information").
00009  *  This Confidential Information may only be used and reproduced
00010  *  in accordance with the terms of the license agreement.
00011  *
00012  */
00013 
00014 #ifndef __INCLUDED_SCOPIRA_AGENT_CLUSTERAGENT_H__
00015 #define __INCLUDED_SCOPIRA_AGENT_CLUSTERAGENT_H__
00016 
00017 #include <queue>
00018 
00019 #include <scopira/tool/netflow.h>
00020 #include <scopira/tool/url.h>
00021 #include <scopira/tool/uuid.h>
00022 #include <scopira/tool/time.h>
00023 #include <scopira/core/register.h>
00024 #include <scopira/agent/localagent.h>
00025 
00026 namespace scopira
00027 {
00028   namespace agent
00029   {
00030     class node_spec;
00031     class cluster_agent;
00032 
00033     class uptime_task;//forward
00034   }
00035 }
00036 
00044 class scopira::agent::node_spec
00045 {
00046   public:
00047     enum {
00048       all_jobs_c,
00049       my_jobs_c,
00050       no_jobs_c,
00051     };
00052 
00053     scopira::tool::uuid pm_uuid;
00054     bool pm_ismaster;           // is this a master? (default, no)
00055     scopira::tool::url pm_url;    // url, DEDUCED (to make it NATable?) (only really valid if pm_listening==true) (only really valid if pm_listening==true)
00056     bool pm_isdirectroute;   //can direct peer-links be made to this node? (if not, all messages will be routed via the master node) (default, yes)
00057     short pm_jobfilter;        // what kind of jobs will this thing accept (default all)
00058   public:
00060     node_spec(void);
00061 
00062     bool load(scopira::tool::itflow_i& in);
00063     void save(scopira::tool::otflow_i& out) const;
00064 
00065     void parse_options(const std::string &options);
00066 };
00067 
00068 scopira::tool::oflow_i& operator<<(scopira::tool::oflow_i& o, const scopira::agent::machine_spec &spec);
00069 
00075 class scopira::agent::cluster_agent : public scopira::agent::local_agent
00076 {
00077   public:
00079     enum { default_port_c = 5555 };
00080 
00081   private:
00082     typedef local_agent parent_type;
00083 
00084     friend class scopira::agent::uptime_task;
00085 
00086   public:
00088     cluster_agent(void);
00090     virtual ~cluster_agent();
00091 
00092     virtual void notify_stop(void);
00095     //virtual void wait_stop(void);
00096 
00097     virtual void enqueue_network_quit(void);
00098 
00101     virtual bool failed(void) const;
00102     virtual void set_agenterror_reactor(agenterror_reactor_i *r);
00103 
00104     virtual int find_services(scopira::tool::uuid &serviceid, scopira::basekit::narray<scopira::tool::uuid> &out);
00105 
00106     virtual void reg_context(scopira::tool::uuid &ctxid, taskmsg_reactor_i *reac);
00107     virtual void unreg_context(scopira::tool::uuid ctxid);
00108 
00109     virtual int universe_size(void);
00110     virtual scopira::tool::uuid get_agent_id(void);
00111 
00112     virtual scopira::tool::uuid launch_task(const std::type_info &t, scopira::tool::uuid where);
00113     virtual scopira::tool::uuid launch_group(int numps, const std::type_info &t);
00114     virtual void launch_slaves(scopira::tool::uuid masterid, int numtotalps, const std::type_info &t,
00115       scopira::basekit::narray<scopira::tool::uuid> &peers);
00116     virtual void kill_task(scopira::tool::uuid ps);
00117     virtual bool wait_task(scopira::tool::uuid ps, int msec);
00118     virtual bool is_alive_task(scopira::tool::uuid ps);
00119     //SCOPIRA_EXPORT virtual bool is_killed_task(scopira::tool::uuid ps) = 0;  // using the local agent one directly!
00120 
00121     /* using local_agent's version for all of these:
00122     virtual bool wait_msg(scopira::tool::uuid src, scopira::tool::uuid dest, int timeout);
00123     virtual void send_msg(scopira::tool::uuid src, scopira::tool::uuid dest, scopira::tool::bufferflow *buf);
00124     virtual void recv_msg(scopira::tool::uuid &src, scopira::tool::uuid dest, scopira::tool::count_ptr<scopira::tool::bufferflow> &buf);*/
00125     virtual void send_msg_bcast(scopira::tool::uuid src, scopira::tool::uuid destserviceid, scopira::tool::bufferflow *buf);
00126 
00127     // needed by agent_i::get_cluster_server_url
00128     const node_spec & spec(void) const { return dm_nodespec; }
00129 
00130   protected:
00131     virtual void la_send_msg(scopira::tool::uuid src, scopira::tool::uuid dest, scopira::tool::bufferflow *buf);
00132     virtual void la_dead_task(scopira::tool::uuid taskid);
00133 
00134   private:
00135     class link; //fwd
00136     class net_link; //fwd
00137 
00142     class admin_msg : public virtual scopira::tool::object
00143     {
00144       public:
00145         enum {
00146           type_normal_c,
00147           type_instant_c,   // use with care, executed in caller-thread and/or receive thread
00148           type_tmp_c,       // a speciail control msg that can be used while the links are in tmp mode
00149         };
00150       public:
00151         virtual const char * get_debug_name(void) const = 0;
00152         virtual short get_type(void) const { return type_normal_c; }
00153         virtual void execute_agent(cluster_agent &e) = 0;
00154     };
00159     class network_msg : public admin_msg
00160     {
00161       public:
00162         // internal class
00163         struct xtion_t {
00164           scopira::tool::uuid pm_src;
00165           int pm_id;
00166           xtion_t(void) : pm_id(0) { }
00167           xtion_t(scopira::tool::uuid src, int id)
00168             : pm_src(src), pm_id(id) { }
00169           bool load(scopira::tool::itflow_i& in) { return pm_src.load(in) && in.read_int(pm_id); }
00170           void save(scopira::tool::otflow_i& out) const { pm_src.save(out); out.write_int(pm_id); }
00171         };
00172       public:
00175         virtual bool load(scopira::tool::iobjflow_i& in);
00176         virtual void save(scopira::tool::oobjflow_i& out) const;
00177 
00178         virtual void execute_agent(cluster_agent &e);
00179         virtual void execute_agent(cluster_agent &e, link *lk) = 0;
00180 
00181         void set_net_link(net_link *l);
00182 
00183         void set_xtion(const xtion_t &x) { dm_xtion = x; }
00184         const xtion_t & get_xtion(void) const { return dm_xtion; }
00185 
00186       protected:
00187         network_msg(void);
00188       private:
00189         scopira::tool::count_ptr<net_link> dm_link;
00190 
00191       protected:  // on so reply msg can reach em
00192         // these are used by both network_msg (for sending) and reply_msg (for replies)
00193         xtion_t dm_xtion;
00194     };
00205     class reply_msg : public network_msg
00206     {
00207       public:
00209         reply_msg(int errorcode = 0);
00210 
00211         virtual bool load(scopira::tool::iobjflow_i& in);
00212         virtual void save(scopira::tool::oobjflow_i& out) const;
00213 
00215         virtual const char * get_debug_name(void) const { return "reply_msg"; }
00216         virtual short get_type(void) const { return type_instant_c; }
00217 
00218         virtual void execute_agent(cluster_agent &e, link *lk);
00219 
00220         int get_code(void) const { return dm_errorcode; }
00221       private:
00222         int dm_errorcode;
00223     };
00224 
00226     class make_link_msg;
00227     class quit_msg;
00228     class remove_link_msg;
00229     // network_msg - announces remote's uuid
00230     class peer_hello_msg;
00231     class nottmp_hello_msg;
00232     class make_back_link_msg;
00233     class all_quit_msg;
00234     class resolve_agent_msg;
00235     class reply_resolve_agent_msg;
00236     class routed_msg;
00237     class master_launch_task_msg;
00238     class reply_master_launch_task_msg;
00239     class make_task_msg;
00240     class client_wait_msg;
00241     class client_wait_task;
00242     class client_kill_msg;
00243     class client_is_alive_msg;
00244     class send_data_msg;
00245     class bcast_data_msg;
00246     class find_services_msg;
00247     class reply_find_services_msg;
00248     class reg_context_msg;
00249     class dead_task_msg;
00250     class default_group_size_msg;
00251     static scopira::core::register_flow<cluster_agent::reply_msg> r81123;
00252     static scopira::core::register_flow<cluster_agent::peer_hello_msg> r09344;
00253     static scopira::core::register_flow<cluster_agent::nottmp_hello_msg> r55123;
00254     static scopira::core::register_flow<cluster_agent::make_back_link_msg> r54788;
00255     static scopira::core::register_flow<cluster_agent::all_quit_msg> r72218;
00256     static scopira::core::register_flow<cluster_agent::resolve_agent_msg> r11928;
00257     static scopira::core::register_flow<cluster_agent::reply_resolve_agent_msg> r62383;
00258     static scopira::core::register_flow<cluster_agent::routed_msg> r49045;
00259     static scopira::core::register_flow<cluster_agent::master_launch_task_msg> r88312;
00260     static scopira::core::register_flow<cluster_agent::reply_master_launch_task_msg> r00922;
00261     static scopira::core::register_flow<cluster_agent::make_task_msg> r79933;
00262     static scopira::core::register_flow<cluster_agent::client_wait_msg> r28334;
00263     static scopira::core::register_flow<cluster_agent::client_kill_msg> r45666;
00264     static scopira::core::register_flow<cluster_agent::client_is_alive_msg> r44512;
00265     static scopira::core::register_flow<cluster_agent::send_data_msg> r49075;
00266     static scopira::core::register_flow<cluster_agent::bcast_data_msg> r49076;
00267     static scopira::core::register_flow<cluster_agent::find_services_msg> r4834444;
00268     static scopira::core::register_flow<cluster_agent::reply_find_services_msg> r4834445;
00269     static scopira::core::register_flow<cluster_agent::reg_context_msg> r89063;
00270     static scopira::core::register_flow<cluster_agent::dead_task_msg> r85221;
00271     static scopira::core::register_flow<cluster_agent::default_group_size_msg> r85553;
00272 
00278     class net_link : public virtual scopira::tool::object
00279     {
00280       private:
00281         friend class network_msg;
00282 
00283         enum {
00284           blocksize_c = 1024*8,
00285           magic_c = 0xE23A34F7,
00286         };
00287 
00288         struct queue_area
00289         {
00290           link *pm_link;
00291 
00292           // The next message the sending thread should send.
00293           // Upon sending the message, the sending thread will signal the host-link
00294           // queue to remove the sent message from its queue, hense we dont
00295           // need a full queue here
00296           // This is like a queue of size 1
00297           scopira::tool::count_ptr<network_msg> pm_sendmsg;
00298 
00299           bool pm_alive;            // should this link die asap?
00300 
00301           queue_area(void) : pm_link(0), pm_alive(true) { }
00302         };
00303 
00304         scopira::tool::event_area<queue_area> dm_queue;
00305 
00306         scopira::tool::count_ptr<scopira::tool::netflow> dm_sock;
00307 
00309         scopira::tool::thread dm_recv_thread;
00311         scopira::tool::thread dm_send_thread;
00312 
00313       public:
00322         static bool make_link(const scopira::tool::url &remote, scopira::tool::count_ptr<net_link> &nl);
00323 
00325         net_link(scopira::tool::netflow *fl);
00327         virtual ~net_link();
00328 
00331         void set_link(link *l);
00332 
00334         scopira::tool::netflow * get_sock(void) const { return dm_sock.get(); }
00335 
00340         void set_sendmsg(network_msg *msg);
00341 
00342       private:
00343         static void* recv_thread_func(void *herep);
00344         static void* send_thread_func(void *herep);
00345     };
00346 
00351     class link : public virtual scopira::tool::object
00352     {
00353       private:
00354         typedef std::list<scopira::tool::count_ptr<network_msg> > msgqueue_t;
00355         // type_tmp_c type msgs are my friends
00356         friend class peer_hello_msg;
00357         friend class nottmp_hello_msg;
00358 
00359         cluster_agent *dm_agent;
00360 
00362         bool dm_initter;
00364         scopira::tool::url dm_peerurl;
00366         bool dm_tmpaddress;
00367 
00370         scopira::tool::uuid dm_peeruuid;
00371 
00372         struct queue_area
00373         {
00374           public:
00375             msgqueue_t pm_sendq;    // pop off the front, append to the back
00376             bool pm_remotetmp;                // true if the REMOTE end IS tmp
00377             bool pm_remotemaster;       // true if the remote is a master (ie. this is the master link connection)
00378             scopira::tool::count_ptr<net_link> pm_link;
00379 
00380           public:
00381             queue_area(void) : pm_remotetmp(true), pm_remotemaster(false) { }
00382         };
00383 
00384         scopira::tool::event_area<queue_area> dm_queue;
00385 
00386       public:
00388         link(cluster_agent *a, scopira::tool::uuid remoteuuid,
00389           bool tmpaddress, const scopira::tool::url &initterurl = scopira::tool::url("dummy://hostname/"));
00390 
00391         bool is_tmpaddress(void) const { return dm_tmpaddress; }
00392 
00393         scopira::tool::uuid get_uuid(void) const { assert(!dm_tmpaddress); return dm_peeruuid; }
00394 
00395         // does locking!
00396         bool has_link(void);
00398         void accept_net_link(net_link *nl);
00399 
00401         void enqueue_msg(network_msg *msg);
00402 
00404         void print_status(void);
00405 
00407         //bool is_connected(void);
00408 
00409         // handlers from net_link
00410         void on_sent(network_msg *msg);
00411         void on_recv(network_msg *msg);
00412         // called only when the connection is *dropped* (ie. not on natural exits)
00413         void on_close_net_link(void);
00414       private:
00415         // grab the next message from the queue. if putnewmsghere is non-null, it will be placed there, otherwise
00416         // it will be enqueued to the actually link
00417         void unleash_a_msg(scopira::tool::event_ptr<queue_area> &L, scopira::tool::count_ptr<network_msg> *putnewmsghere);
00418     };
00419 
00420   private:
00421     enum {
00422       failed_initting_c,
00423       failed_cantinit_c,
00424       failed_opening_c,
00425       failed_cantopen_c,
00426       failed_ok_c,
00427       failed_lostmaster_c,
00428     };
00429     typedef std::map<scopira::tool::uuid, scopira::tool::count_ptr<link> > peers_t;
00430 
00435     struct peer_area
00436     {
00437       public:
00439         scopira::tool::count_ptr<link> pm_masterlink;
00441         peers_t pm_peers;
00443         bool pm_alive;
00444       public:
00445         peer_area(void) : pm_alive(false) { }
00446     };
00447 
00448     typedef std::list<scopira::tool::count_ptr<admin_msg> > msgqueue_t;
00449     class cron_item {
00450       public:
00451         double pm_triggertime;
00452         scopira::tool::count_ptr<admin_msg> pm_msg;
00453       public:
00454         cron_item(void) : pm_triggertime(0) { }
00455         cron_item(double triggertime, admin_msg *msg) : pm_triggertime(triggertime), pm_msg(msg) { }
00457         bool operator<(const cron_item &rhs) const { return pm_triggertime > rhs.pm_triggertime; }
00458     };
00459     typedef std::priority_queue<cron_item> cronqueue_t;
00460 
00466     struct admin_area
00467     {
00468       public:
00469         msgqueue_t pm_main_queue;
00470         cronqueue_t pm_cron_queue;
00472         bool pm_alive;
00474         char pm_failedstate;
00476         agenterror_reactor_i *pm_agenterror;
00477       public:
00478         admin_area(void) : pm_alive(false), pm_failedstate(failed_initting_c), pm_agenterror(0) { }
00479     };
00480 
00483     class meta_node : public scopira::tool::object
00484     {
00485       public:
00486         // all read only
00487         machine_spec pm_machinespec;
00488         node_spec pm_nodespec;
00489         // except for this
00490         typedef std::vector<scopira::tool::uuid> taskslist_t;
00491         struct load_area
00492         {
00493           taskslist_t pm_tasks;
00494         };
00495         scopira::tool::event_area<load_area> pm_loadarea;
00496       public:
00497         meta_node(const machine_spec &mspec, const node_spec &nspec);
00498     };
00499     typedef std::map<scopira::tool::uuid, scopira::tool::count_ptr<meta_node> > nodemap_t;
00500     typedef std::map<scopira::tool::uuid, scopira::tool::uuid> taskmap_t;
00501 
00509     struct meta_area
00510     {
00511       public:
00513         nodemap_t pm_nodes;
00515         taskmap_t pm_tasks;
00516     };
00517 
00518     typedef std::map<int, scopira::tool::count_ptr<network_msg> > netmsgmap_t;
00519     typedef std::map<int, scopira::tool::count_ptr<reply_msg> > repmsgmap_t;
00520 
00521     struct xtion_area
00522     {
00523       public:
00525         repmsgmap_t pm_replies;
00528         netmsgmap_t pm_handlers;
00530         int pm_nextid;
00532         bool pm_alive;
00533 
00534         xtion_area(void) : pm_nextid(3000), pm_alive(true) { }
00535     };
00536 
00537     typedef std::map<scopira::tool::uuid, scopira::tool::count_ptr<reply_resolve_agent_msg> > resolvemap_t;
00538 
00539     // my "dns" cache
00540     struct resolve_area
00541     {
00542       public:
00543         resolvemap_t pm_resolves;
00544     };
00545 
00547     scopira::tool::uuid_generator dm_uuidmaker;
00549     scopira::tool::chrono dm_age;
00550 
00552     node_spec dm_nodespec;
00553 
00555     std::string dm_password;
00557     int dm_listenport;
00560     int dm_udpport;
00561 
00563     scopira::tool::event_area<peer_area> dm_peerarea;
00565     scopira::tool::event_area<admin_area> dm_adminarea;
00567     scopira::tool::event_area<meta_area> dm_metarea;
00569     scopira::tool::event_area<xtion_area> dm_xtionarea;
00571     scopira::tool::rw_area<resolve_area> dm_resolvearea;
00572 
00574     scopira::tool::netflow dm_listensocket;
00576     scopira::tool::udpflow dm_udpsocket;
00577 
00579     scopira::tool::thread dm_listenthread;
00581     scopira::tool::thread dm_udpthread;
00583     scopira::tool::thread dm_adminthread;
00584 
00585   private:
00590     void enqueue_msg(admin_msg *msg);
00596     void enqueue_msg(scopira::tool::uuid targetuuid, network_msg *msg);
00601     void enqueue_reply_msg(network_msg *theq, reply_msg *thereply)
00602       { enqueue_reply_msg(theq->get_xtion(), thereply); }
00607     void enqueue_reply_msg(const network_msg::xtion_t &x, reply_msg *thereply);
00613     void enqueue_cron_msg(double w, admin_msg *msg);
00614 
00621     void do_xtion(scopira::tool::uuid targetuuid,
00622         network_msg *msg, scopira::tool::count_ptr<reply_msg> &replye);
00630     void enqueue_xtion(scopira::tool::uuid targetuuid,
00631         network_msg *msg, network_msg *handler);
00632 
00638     void do_resolve_xtion(scopira::tool::uuid id, bool dotaskresolve,
00639         scopira::tool::count_ptr<reply_resolve_agent_msg> &out);
00640 
00642     void print_status(void);
00643 
00644   private:
00645     static void* listen_thread_func(void *herep);
00646     static void* udp_thread_func(void *herep);
00647     static void* admin_thread_func(void *herep);
00648 
00649     // this is called periodically by admin_thread_func
00650     // for debug/output/status purposes
00651     void print_status(admin_area *area);
00652 };
00653 
00654 #endif
00655