Scopira 20080306

localagent.h

00001 
00002 /*
00003  *  Copyright (c) 2005    National Research Council
00004  *
00005  *  All rights reserved.
00006  *
00007  *  This material is confidential and proprietary information of
00008  *  National Research Council Canada ("Confidential Information").
00009  *  This Confidential Information may only be used and reproduced
00010  *  in accordance with the terms of the license agreement.
00011  *
00012  */
00013 
00014 #ifndef __INCLUDED_SCOPIRA_AGENT_LOCALAGENT_H__
00015 #define __INCLUDED_SCOPIRA_AGENT_LOCALAGENT_H__
00016 
00017 #include <map>
00018 #include <list>
00019 #include <set>
00020 #include <vector>
00021 
00022 #include <scopira/tool/thread.h>
00023 #include <scopira/tool/output.h>
00024 #include <scopira/tool/time.h>
00025 #include <scopira/agent/context.h>
00026 
00027 namespace scopira
00028 {
00029   namespace agent
00030   {
00031     class machine_spec;
00032     class local_agent;
00033 
00034     class uptime_task;//forward
00035   }
00036 }
00037 
00043 class scopira::agent::machine_spec
00044 {
00045   public:
00046     short pm_numcpu;
00047     bool pm_intelorder;
00048     bool pm_64bit;
00049     int pm_speed;   // unkown units, TODO
00050     int pm_ram;     // in MEGS (gig=1024)
00051   public:
00053     machine_spec(void);
00054 
00055     bool load(scopira::tool::itflow_i& in);
00056     void save(scopira::tool::otflow_i& out) const;
00057 
00059     void set_auto_detect(void);
00060 };
00061 
00062 scopira::tool::oflow_i& operator<<(scopira::tool::oflow_i& o, const scopira::agent::machine_spec &spec);
00063 
00070 class scopira::agent::local_agent : public scopira::agent::agent_i
00071 {
00072   private:
00073     friend class scopira::agent::uptime_task;
00074 
00075   public:
00077     local_agent(void);
00079     virtual ~local_agent();
00080 
00081     virtual void notify_stop(void);
00082     virtual void wait_stop(void);
00083 
00084     virtual void enqueue_network_quit(void) { }
00085 
00088     virtual bool failed(void) const { return false; }
00089     virtual void set_agenterror_reactor(agenterror_reactor_i *r) { } //im never goin to call this, so ill just ignore this
00090 
00091     virtual void reg_context(scopira::tool::uuid &ctxid, taskmsg_reactor_i *reac);
00092     virtual void unreg_context(scopira::tool::uuid ctxid);
00093 
00094     virtual int find_services(scopira::tool::uuid &serviceid, scopira::basekit::narray<scopira::tool::uuid> &out);
00095 
00096     virtual int universe_size(void);
00097     virtual scopira::tool::uuid get_agent_id(void);
00098 
00099     virtual scopira::tool::uuid launch_task(const std::type_info &t, scopira::tool::uuid where);
00100     virtual scopira::tool::uuid launch_group(int numps, const std::type_info &t);
00101     virtual void launch_slaves(scopira::tool::uuid masterid, int numtotalps, const std::type_info &t,
00102       scopira::basekit::narray<scopira::tool::uuid> &peers);
00103     virtual void kill_task(scopira::tool::uuid ps);
00104     virtual bool wait_task(scopira::tool::uuid ps, int msec);
00105     virtual bool is_alive_task(scopira::tool::uuid ps);
00106     virtual bool is_killed_task(scopira::tool::uuid ps);
00107 
00108     virtual bool wait_msg(const uuid_query &srcq, scopira::tool::uuid &foundsrc, scopira::tool::uuid dest, int timeout);
00109     virtual void send_msg(scopira::tool::uuid src, scopira::tool::uuid dest, scopira::tool::bufferflow *buf);
00110     // this implementation does local deliver
00111     // decendants override this
00112     virtual void send_msg_bcast(scopira::tool::uuid src, scopira::tool::uuid destserviceid, scopira::tool::bufferflow *buf);
00113     virtual void recv_msg(const uuid_query &srcq, scopira::tool::uuid &foundsrc, scopira::tool::uuid dest, scopira::tool::count_ptr<scopira::tool::bufferflow> &buf);
00114 
00115   protected:
00116     // all these functions are for itneraction with inherited classes, if any
00117     // la_ means "local_agent" API
00118 
00120     machine_spec & la_get_spec(void) { return dm_spec; }
00122     scopira::tool::uuid_generator & la_get_generator(void) { return dm_uugen; }
00125     bool la_launch_task(int myindex, const scopira::basekit::narray<scopira::tool::uuid> &taskids, const std::string &typestring, short phase);
00127     scopira::tool::uuid la_launch_proxy(agent_task_i *at);
00128     void la_update_slave_master(scopira::tool::uuid master, scopira::basekit::narray<scopira::tool::uuid> &peers);
00130     virtual void la_send_msg(scopira::tool::uuid src, scopira::tool::uuid dest, scopira::tool::bufferflow *buf) { }
00132     virtual void la_dead_task(scopira::tool::uuid taskid) { }
00134     
00136     void la_print_status(void);
00137 
00138   private:
00139     static void* worker_func(void *data);
00140 
00141   private:
00146     struct msg_t
00147     {
00148       scopira::tool::uuid pm_src;
00149       scopira::tool::count_ptr<scopira::tool::bufferflow> pm_buf;
00150 
00151       public:
00152       msg_t(scopira::tool::uuid src, scopira::tool::bufferflow *buf)
00153         : pm_src(src), pm_buf(buf) { }
00154     };
00155 
00156     typedef std::list<msg_t> msglist_t;
00157 
00158     class process_t : public scopira::tool::object
00159     {
00160       public:
00161         // process "modes" (pm_mode)
00162         enum {
00163           ps_empty_c = 0,     // initial state, should never be in this state though
00164           ps_born_c,      // a process in phase 1 of a 2 phase creation sequence. this process is not yet runnable
00165           ps_ready_c,     // ready to run
00166           ps_running_c,   // currently being run by a worker thread
00167           ps_sleep_c,     // waiting for a MSG or for wake up time
00168           ps_done_c,      // process should die/is dieing
00169 
00170           // spcial ones, but ARE schedulable
00171           // NO SPECIALI maker, as per normal process?
00172           // ps_remote_c
00173 
00174           // "special" types (not schedulable):
00175           ps_context_c,   // this is a marker process for a context's msg queue
00176           //ps_proxy_c,    // this process is "remote" (not on this machine). all msgs sent to it should be done so via enqueue_remote_ps_msg
00177         };
00178 
00179         scopira::tool::uuid pm_id;
00180         bool pm_special;        // if true, this isnt a normal, process
00181         scopira::tool::count_ptr<scopira::agent::agent_task_i> pm_task;   // only non-null on local, normal processes
00182 
00183         // GROUP STUFF (TODO should this be in the state_area?)
00184         int pm_index;   // my index in the group
00185         scopira::basekit::narray<scopira::tool::uuid> pm_peers;    // this list should never be empty (ie. it should always contain atleast myself)
00186 
00187         // the services this task has. it is const after its initial creation
00188         std::set<scopira::tool::uuid> pm_services;
00189 
00190         // internal class
00191         struct state_area {
00192           short pm_mode;
00193           bool pm_killreq;      // this a kill requested been made of this task
00194           bool pm_canmove;      // can this process be migrated
00195           bool pm_onmsg;       // should this proces be awoken on a msg
00196           double pm_ontime;     // the agent-clock time (or any time after) this process should run. 0 means, not by time
00197 
00198           msglist_t pm_msgqueue;
00199 
00200           // a reactor, if any
00201           taskmsg_reactor_i *pm_reactor;
00202 
00203           // ctor
00204           state_area(void) : pm_mode(ps_empty_c), pm_killreq(false), pm_canmove(false),
00205             pm_onmsg(false), pm_ontime(0), pm_reactor(0) { }
00206         };
00207 
00208         scopira::tool::event_area<state_area> pm_state;
00209 
00210         process_t(scopira::tool::uuid id, short specialstate = ps_empty_c)
00211           : pm_id(id), pm_special(specialstate?ps_empty_c:false),
00212           pm_index(0) { pm_peers.resize(1); pm_peers[0]=id; pm_state.pm_data.pm_mode = specialstate; }
00213         process_t(int myindex, const scopira::basekit::narray<scopira::tool::uuid> &peers)
00214           : pm_id(peers[myindex]), pm_index(myindex), pm_peers(peers) { }
00215 
00216         // this will populate the pm_services vector from the global registry
00217         void load_services(const std::type_info &nfo);
00218     };
00219 
00220     typedef std::map<scopira::tool::uuid, scopira::tool::count_ptr<process_t> > psmap_t;
00221     typedef std::vector<scopira::tool::count_ptr<scopira::tool::thread> > threadlist_t;
00222 
00223     // always lock this before locking the ps_area, if youre gonna lock both!
00224     struct kernel_area {
00225       bool pm_alive;    // if this is false, Ive been sent the kill signal.
00226 
00227       threadlist_t pm_threads;
00228 
00229       scopira::tool::uuid pm_nextps; // the next ps to "look at", zero if do first
00230 
00231       kernel_area(void) : pm_alive(true) { } // ctor
00232     };
00233     struct ps_area {
00234       psmap_t pm_ps;    // the processes
00235     };
00236 
00237     scopira::tool::uuid_generator dm_uugen;
00238 
00239     scopira::tool::chrono dm_agentclock;
00240 
00241     machine_spec dm_spec;
00242 
00243     // never try to lock the kernel if youre holdin any process locks!
00244     scopira::tool::event_area<kernel_area> dm_kernel; // my "OS kernel"
00245     scopira::tool::rw_area<ps_area> dm_psarea;
00246 
00247   private:
00248     void get_ps(scopira::tool::uuid id, scopira::tool::count_ptr<process_t> &foundps) const;
00251     void set_min_threads(void);
00253     void kill_all_local_tasks(void);
00254 };
00255 
00256 #endif
00257 
00258