Scopira  20080306
localagent.h
1 
2 /*
3  * Copyright (c) 2005 National Research Council
4  *
5  * All rights reserved.
6  *
7  * This material is confidential and proprietary information of
8  * National Research Council Canada ("Confidential Information").
9  * This Confidential Information may only be used and reproduced
10  * in accordance with the terms of the license agreement.
11  *
12  */
13 
14 #ifndef __INCLUDED_SCOPIRA_AGENT_LOCALAGENT_H__
15 #define __INCLUDED_SCOPIRA_AGENT_LOCALAGENT_H__
16 
17 #include <map>
18 #include <list>
19 #include <set>
20 #include <vector>
21 
22 #include <scopira/tool/thread.h>
23 #include <scopira/tool/output.h>
24 #include <scopira/tool/time.h>
25 #include <scopira/agent/context.h>
26 
27 namespace scopira
28 {
29  namespace agent
30  {
31  class machine_spec;
32  class local_agent;
33 
34  class uptime_task;//forward
35  }
36 }
37 
44 {
45  public:
46  short pm_numcpu;
47  bool pm_intelorder;
48  bool pm_64bit;
49  int pm_speed; // unkown units, TODO
50  int pm_ram; // in MEGS (gig=1024)
51  public:
53  machine_spec(void);
54 
55  bool load(scopira::tool::itflow_i& in);
56  void save(scopira::tool::otflow_i& out) const;
57 
59  void set_auto_detect(void);
60 };
61 
63 
71 {
72  private:
73  friend class scopira::agent::uptime_task;
74 
75  public:
77  local_agent(void);
79  virtual ~local_agent();
80 
81  virtual void notify_stop(void);
82  virtual void wait_stop(void);
83 
84  virtual void enqueue_network_quit(void) { }
85 
88  virtual bool failed(void) const { return false; }
89  virtual void set_agenterror_reactor(agenterror_reactor_i *r) { } //im never goin to call this, so ill just ignore this
90 
91  virtual void reg_context(scopira::tool::uuid &ctxid, taskmsg_reactor_i *reac);
92  virtual void unreg_context(scopira::tool::uuid ctxid);
93 
94  virtual int find_services(scopira::tool::uuid &serviceid, scopira::basekit::narray<scopira::tool::uuid> &out);
95 
96  virtual int universe_size(void);
97  virtual scopira::tool::uuid get_agent_id(void);
98 
99  virtual scopira::tool::uuid launch_task(const std::type_info &t, scopira::tool::uuid where);
100  virtual scopira::tool::uuid launch_group(int numps, const std::type_info &t);
101  virtual void launch_slaves(scopira::tool::uuid masterid, int numtotalps, const std::type_info &t,
103  virtual void kill_task(scopira::tool::uuid ps);
104  virtual bool wait_task(scopira::tool::uuid ps, int msec);
105  virtual bool is_alive_task(scopira::tool::uuid ps);
106  virtual bool is_killed_task(scopira::tool::uuid ps);
107 
108  virtual bool wait_msg(const uuid_query &srcq, scopira::tool::uuid &foundsrc, scopira::tool::uuid dest, int timeout);
110  // this implementation does local deliver
111  // decendants override this
112  virtual void send_msg_bcast(scopira::tool::uuid src, scopira::tool::uuid destserviceid, scopira::tool::bufferflow *buf);
114 
115  protected:
116  // all these functions are for itneraction with inherited classes, if any
117  // la_ means "local_agent" API
118 
120  machine_spec & la_get_spec(void) { return dm_spec; }
122  scopira::tool::uuid_generator & la_get_generator(void) { return dm_uugen; }
125  bool la_launch_task(int myindex, const scopira::basekit::narray<scopira::tool::uuid> &taskids, const std::string &typestring, short phase);
127  scopira::tool::uuid la_launch_proxy(agent_task_i *at);
128  void la_update_slave_master(scopira::tool::uuid master, scopira::basekit::narray<scopira::tool::uuid> &peers);
132  virtual void la_dead_task(scopira::tool::uuid taskid) { }
134 
136  void la_print_status(void);
137 
138  private:
139  static void* worker_func(void *data);
140 
141  private:
146  struct msg_t
147  {
148  scopira::tool::uuid pm_src;
150 
151  public:
153  : pm_src(src), pm_buf(buf) { }
154  };
155 
156  typedef std::list<msg_t> msglist_t;
157 
158  class process_t : public scopira::tool::object
159  {
160  public:
161  // process "modes" (pm_mode)
162  enum {
163  ps_empty_c = 0, // initial state, should never be in this state though
164  ps_born_c, // a process in phase 1 of a 2 phase creation sequence. this process is not yet runnable
165  ps_ready_c, // ready to run
166  ps_running_c, // currently being run by a worker thread
167  ps_sleep_c, // waiting for a MSG or for wake up time
168  ps_done_c, // process should die/is dieing
169 
170  // spcial ones, but ARE schedulable
171  // NO SPECIALI maker, as per normal process?
172  // ps_remote_c
173 
174  // "special" types (not schedulable):
175  ps_context_c, // this is a marker process for a context's msg queue
176  //ps_proxy_c, // this process is "remote" (not on this machine). all msgs sent to it should be done so via enqueue_remote_ps_msg
177  };
178 
179  scopira::tool::uuid pm_id;
180  bool pm_special; // if true, this isnt a normal, process
181  scopira::tool::count_ptr<scopira::agent::agent_task_i> pm_task; // only non-null on local, normal processes
182 
183  // GROUP STUFF (TODO should this be in the state_area?)
184  int pm_index; // my index in the group
185  scopira::basekit::narray<scopira::tool::uuid> pm_peers; // this list should never be empty (ie. it should always contain atleast myself)
186 
187  // the services this task has. it is const after its initial creation
188  std::set<scopira::tool::uuid> pm_services;
189 
190  // internal class
191  struct state_area {
192  short pm_mode;
193  bool pm_killreq; // this a kill requested been made of this task
194  bool pm_canmove; // can this process be migrated
195  bool pm_onmsg; // should this proces be awoken on a msg
196  double pm_ontime; // the agent-clock time (or any time after) this process should run. 0 means, not by time
197 
198  msglist_t pm_msgqueue;
199 
200  // a reactor, if any
201  taskmsg_reactor_i *pm_reactor;
202 
203  // ctor
204  state_area(void) : pm_mode(ps_empty_c), pm_killreq(false), pm_canmove(false),
205  pm_onmsg(false), pm_ontime(0), pm_reactor(0) { }
206  };
207 
209 
210  process_t(scopira::tool::uuid id, short specialstate = ps_empty_c)
211  : pm_id(id), pm_special(specialstate?ps_empty_c:false),
212  pm_index(0) { pm_peers.resize(1); pm_peers[0]=id; pm_state.pm_data.pm_mode = specialstate; }
213  process_t(int myindex, const scopira::basekit::narray<scopira::tool::uuid> &peers)
214  : pm_id(peers[myindex]), pm_index(myindex), pm_peers(peers) { }
215 
216  // this will populate the pm_services vector from the global registry
217  void load_services(const std::type_info &nfo);
218  };
219 
220  typedef std::map<scopira::tool::uuid, scopira::tool::count_ptr<process_t> > psmap_t;
221  typedef std::vector<scopira::tool::count_ptr<scopira::tool::thread> > threadlist_t;
222 
223  // always lock this before locking the ps_area, if youre gonna lock both!
224  struct kernel_area {
225  bool pm_alive; // if this is false, Ive been sent the kill signal.
226 
227  threadlist_t pm_threads;
228 
229  scopira::tool::uuid pm_nextps; // the next ps to "look at", zero if do first
230 
231  kernel_area(void) : pm_alive(true) { } // ctor
232  };
233  struct ps_area {
234  psmap_t pm_ps; // the processes
235  };
236 
238 
239  scopira::tool::chrono dm_agentclock;
240 
241  machine_spec dm_spec;
242 
243  // never try to lock the kernel if youre holdin any process locks!
244  scopira::tool::event_area<kernel_area> dm_kernel; // my "OS kernel"
246 
247  private:
248  void get_ps(scopira::tool::uuid id, scopira::tool::count_ptr<process_t> &foundps) const;
251  void set_min_threads(void);
253  void kill_all_local_tasks(void);
254 };
255 
256 #endif
257 
258 
machine_spec & la_get_spec(void)
yes, RW access
Definition: localagent.h:120
virtual void la_dead_task(scopira::tool::uuid taskid)
la calls this after a local task is terminated
Definition: localagent.h:132
virtual bool failed(void) const
Definition: localagent.h:88
Definition: archiveflow.h:20
Definition: flow.h:212
Definition: flow.h:281
Definition: bufferflow.h:40
Definition: object.h:71
Definition: task.h:45
virtual void la_send_msg(scopira::tool::uuid src, scopira::tool::uuid dest, scopira::tool::bufferflow *buf)
la calls this as a last resort (after trying local delivery)
Definition: localagent.h:130
Definition: uuid.h:183
scopira::tool::uuid_generator & la_get_generator(void)
get the generator
Definition: localagent.h:122
Definition: context.h:82
void set_auto_detect(void)
set all the value sby autodetecting
Definition: agent.h:160
virtual void set_agenterror_reactor(agenterror_reactor_i *r)
sets the agenterror handler... there can only be one, eh
Definition: localagent.h:89
machine_spec(void)
zero initing
virtual void enqueue_network_quit(void)
Definition: localagent.h:84
Definition: agent.h:49
Definition: agent.h:179
Definition: localagent.h:70
Definition: thread_pthreads.h:52
Definition: localagent.h:43
Definition: uuid.h:242
Definition: context.h:132
void resize(size_t len)
Definition: narray.h:877
Definition: time.h:241
Definition: flow.h:159