Scopira  20080306
clusteragent.h
1 
2 /*
3  * Copyright (c) 2005 National Research Council
4  *
5  * All rights reserved.
6  *
7  * This material is confidential and proprietary information of
8  * National Research Council Canada ("Confidential Information").
9  * This Confidential Information may only be used and reproduced
10  * in accordance with the terms of the license agreement.
11  *
12  */
13 
14 #ifndef __INCLUDED_SCOPIRA_AGENT_CLUSTERAGENT_H__
15 #define __INCLUDED_SCOPIRA_AGENT_CLUSTERAGENT_H__
16 
17 #include <queue>
18 
19 #include <scopira/tool/netflow.h>
20 #include <scopira/tool/url.h>
21 #include <scopira/tool/uuid.h>
22 #include <scopira/tool/time.h>
23 #include <scopira/core/register.h>
24 #include <scopira/agent/localagent.h>
25 
26 namespace scopira
27 {
28  namespace agent
29  {
30  class node_spec;
31  class cluster_agent;
32 
33  class uptime_task;//forward
34  }
35 }
36 
45 {
46  public:
47  enum {
48  all_jobs_c,
49  my_jobs_c,
50  no_jobs_c,
51  };
52 
53  scopira::tool::uuid pm_uuid;
54  bool pm_ismaster; // is this a master? (default, no)
55  scopira::tool::url pm_url; // url, DEDUCED (to make it NATable?) (only really valid if pm_listening==true) (only really valid if pm_listening==true)
56  bool pm_isdirectroute; //can direct peer-links be made to this node? (if not, all messages will be routed via the master node) (default, yes)
57  short pm_jobfilter; // what kind of jobs will this thing accept (default all)
58  public:
60  node_spec(void);
61 
62  bool load(scopira::tool::itflow_i& in);
63  void save(scopira::tool::otflow_i& out) const;
64 
65  void parse_options(const std::string &options);
66 };
67 
69 
76 {
77  public:
79  enum { default_port_c = 5555 };
80 
81  private:
82  typedef local_agent parent_type;
83 
84  friend class scopira::agent::uptime_task;
85 
86  public:
88  cluster_agent(void);
90  virtual ~cluster_agent();
91 
92  virtual void notify_stop(void);
95  //virtual void wait_stop(void);
96 
97  virtual void enqueue_network_quit(void);
98 
101  virtual bool failed(void) const;
102  virtual void set_agenterror_reactor(agenterror_reactor_i *r);
103 
104  virtual int find_services(scopira::tool::uuid &serviceid, scopira::basekit::narray<scopira::tool::uuid> &out);
105 
106  virtual void reg_context(scopira::tool::uuid &ctxid, taskmsg_reactor_i *reac);
107  virtual void unreg_context(scopira::tool::uuid ctxid);
108 
109  virtual int universe_size(void);
110  virtual scopira::tool::uuid get_agent_id(void);
111 
112  virtual scopira::tool::uuid launch_task(const std::type_info &t, scopira::tool::uuid where);
113  virtual scopira::tool::uuid launch_group(int numps, const std::type_info &t);
114  virtual void launch_slaves(scopira::tool::uuid masterid, int numtotalps, const std::type_info &t,
116  virtual void kill_task(scopira::tool::uuid ps);
117  virtual bool wait_task(scopira::tool::uuid ps, int msec);
118  virtual bool is_alive_task(scopira::tool::uuid ps);
119  //SCOPIRA_EXPORT virtual bool is_killed_task(scopira::tool::uuid ps) = 0; // using the local agent one directly!
120 
121  /* using local_agent's version for all of these:
122  virtual bool wait_msg(scopira::tool::uuid src, scopira::tool::uuid dest, int timeout);
123  virtual void send_msg(scopira::tool::uuid src, scopira::tool::uuid dest, scopira::tool::bufferflow *buf);
124  virtual void recv_msg(scopira::tool::uuid &src, scopira::tool::uuid dest, scopira::tool::count_ptr<scopira::tool::bufferflow> &buf);*/
125  virtual void send_msg_bcast(scopira::tool::uuid src, scopira::tool::uuid destserviceid, scopira::tool::bufferflow *buf);
126 
127  // needed by agent_i::get_cluster_server_url
128  const node_spec & spec(void) const { return dm_nodespec; }
129 
130  protected:
131  virtual void la_send_msg(scopira::tool::uuid src, scopira::tool::uuid dest, scopira::tool::bufferflow *buf);
132  virtual void la_dead_task(scopira::tool::uuid taskid);
133 
134  private:
135  class link; //fwd
136  class net_link; //fwd
137 
142  class admin_msg : public virtual scopira::tool::object
143  {
144  public:
145  enum {
146  type_normal_c,
147  type_instant_c, // use with care, executed in caller-thread and/or receive thread
148  type_tmp_c, // a speciail control msg that can be used while the links are in tmp mode
149  };
150  public:
151  virtual const char * get_debug_name(void) const = 0;
152  virtual short get_type(void) const { return type_normal_c; }
153  virtual void execute_agent(cluster_agent &e) = 0;
154  };
159  class network_msg : public admin_msg
160  {
161  public:
162  // internal class
163  struct xtion_t {
164  scopira::tool::uuid pm_src;
165  int pm_id;
166  xtion_t(void) : pm_id(0) { }
167  xtion_t(scopira::tool::uuid src, int id)
168  : pm_src(src), pm_id(id) { }
169  bool load(scopira::tool::itflow_i& in) { return pm_src.load(in) && in.read_int(pm_id); }
170  void save(scopira::tool::otflow_i& out) const { pm_src.save(out); out.write_int(pm_id); }
171  };
172  public:
175  virtual bool load(scopira::tool::iobjflow_i& in);
176  virtual void save(scopira::tool::oobjflow_i& out) const;
177 
178  virtual void execute_agent(cluster_agent &e);
179  virtual void execute_agent_link(cluster_agent &e, link *lk) = 0;
180 
181  void set_net_link(net_link *l);
182 
183  void set_xtion(const xtion_t &x) { dm_xtion = x; }
184  const xtion_t & get_xtion(void) const { return dm_xtion; }
185 
186  protected:
187  network_msg(void);
188  private:
190 
191  protected: // on so reply msg can reach em
192  // these are used by both network_msg (for sending) and reply_msg (for replies)
193  xtion_t dm_xtion;
194  };
205  class reply_msg : public network_msg
206  {
207  public:
209  reply_msg(int errorcode = 0);
210 
211  virtual bool load(scopira::tool::iobjflow_i& in);
212  virtual void save(scopira::tool::oobjflow_i& out) const;
213 
215  virtual const char * get_debug_name(void) const { return "reply_msg"; }
216  virtual short get_type(void) const { return type_instant_c; }
217 
218  virtual void execute_agent_link(cluster_agent &e, link *lk);
219 
220  int get_code(void) const { return dm_errorcode; }
221  private:
222  int dm_errorcode;
223  };
224 
226  class make_link_msg;
227  class quit_msg;
228  class remove_link_msg;
229  // network_msg - announces remote's uuid
230  class peer_hello_msg;
231  class nottmp_hello_msg;
232  class make_back_link_msg;
233  class all_quit_msg;
234  class resolve_agent_msg;
235  class reply_resolve_agent_msg;
236  class routed_msg;
237  class master_launch_task_msg;
238  class reply_master_launch_task_msg;
239  class make_task_msg;
240  class client_wait_msg;
241  class client_wait_task;
242  class client_kill_msg;
243  class client_is_alive_msg;
244  class send_data_msg;
245  class bcast_data_msg;
246  class find_services_msg;
247  class reply_find_services_msg;
248  class reg_context_msg;
249  class dead_task_msg;
250  class default_group_size_msg;
272 
278  class net_link : public virtual scopira::tool::object
279  {
280  private:
281  friend class network_msg;
282 
283  enum {
284  blocksize_c = 1024*8,
285  magic_c = 0xE23A34F7,
286  };
287 
288  struct queue_area
289  {
290  link *pm_link;
291 
292  // The next message the sending thread should send.
293  // Upon sending the message, the sending thread will signal the host-link
294  // queue to remove the sent message from its queue, hense we dont
295  // need a full queue here
296  // This is like a queue of size 1
298 
299  bool pm_alive; // should this link die asap?
300 
301  queue_area(void) : pm_link(0), pm_alive(true) { }
302  };
303 
305 
307 
309  scopira::tool::thread dm_recv_thread;
311  scopira::tool::thread dm_send_thread;
312 
313  public:
322  static bool make_link(const scopira::tool::url &remote, scopira::tool::count_ptr<net_link> &nl);
323 
325  net_link(scopira::tool::netflow *fl);
327  virtual ~net_link();
328 
331  void set_link(link *l);
332 
334  scopira::tool::netflow * get_sock(void) const { return dm_sock.get(); }
335 
340  void set_sendmsg(network_msg *msg);
341 
342  private:
343  static void* recv_thread_func(void *herep);
344  static void* send_thread_func(void *herep);
345  };
346 
351  class link : public virtual scopira::tool::object
352  {
353  private:
354  typedef std::list<scopira::tool::count_ptr<network_msg> > msgqueue_t;
355  // type_tmp_c type msgs are my friends
356  friend class peer_hello_msg;
357  friend class nottmp_hello_msg;
358 
359  cluster_agent *dm_agent;
360 
362  bool dm_initter;
364  scopira::tool::url dm_peerurl;
366  bool dm_tmpaddress;
367 
370  scopira::tool::uuid dm_peeruuid;
371 
372  struct queue_area
373  {
374  public:
375  msgqueue_t pm_sendq; // pop off the front, append to the back
376  bool pm_remotetmp; // true if the REMOTE end IS tmp
377  bool pm_remotemaster; // true if the remote is a master (ie. this is the master link connection)
379 
380  public:
381  queue_area(void) : pm_remotetmp(true), pm_remotemaster(false) { }
382  };
383 
385 
386  public:
388  link(cluster_agent *a, scopira::tool::uuid remoteuuid,
389  bool tmpaddress, const scopira::tool::url &initterurl = scopira::tool::url("dummy://hostname/"));
390 
391  bool is_tmpaddress(void) const { return dm_tmpaddress; }
392 
393  scopira::tool::uuid get_uuid(void) const { assert(!dm_tmpaddress); return dm_peeruuid; }
394 
395  // does locking!
396  bool has_link(void);
398  void accept_net_link(net_link *nl);
399 
401  void enqueue_msg(network_msg *msg);
402 
404  void print_status(void);
405 
407  //bool is_connected(void);
408 
409  // handlers from net_link
410  void on_sent(network_msg *msg);
411  void on_recv(network_msg *msg);
412  // called only when the connection is *dropped* (ie. not on natural exits)
413  void on_close_net_link(void);
414  private:
415  // grab the next message from the queue. if putnewmsghere is non-null, it will be placed there, otherwise
416  // it will be enqueued to the actually link
418  };
419 
420  private:
421  enum {
422  failed_initting_c,
423  failed_cantinit_c,
424  failed_opening_c,
425  failed_cantopen_c,
426  failed_ok_c,
427  failed_lostmaster_c,
428  };
429  typedef std::map<scopira::tool::uuid, scopira::tool::count_ptr<link> > peers_t;
430 
435  struct peer_area
436  {
437  public:
439  scopira::tool::count_ptr<link> pm_masterlink;
441  peers_t pm_peers;
443  bool pm_alive;
444  public:
445  peer_area(void) : pm_alive(false) { }
446  };
447 
448  typedef std::list<scopira::tool::count_ptr<admin_msg> > msgqueue_t;
449  class cron_item {
450  public:
451  double pm_triggertime;
453  public:
454  cron_item(void) : pm_triggertime(0) { }
455  cron_item(double triggertime, admin_msg *msg) : pm_triggertime(triggertime), pm_msg(msg) { }
457  bool operator<(const cron_item &rhs) const { return pm_triggertime > rhs.pm_triggertime; }
458  };
459  typedef std::priority_queue<cron_item> cronqueue_t;
460 
466  struct admin_area
467  {
468  public:
469  msgqueue_t pm_main_queue;
470  cronqueue_t pm_cron_queue;
472  bool pm_alive;
474  char pm_failedstate;
476  agenterror_reactor_i *pm_agenterror;
477  public:
478  admin_area(void) : pm_alive(false), pm_failedstate(failed_initting_c), pm_agenterror(0) { }
479  };
480 
483  class meta_node : public scopira::tool::object
484  {
485  public:
486  // all read only
487  machine_spec pm_machinespec;
488  node_spec pm_nodespec;
489  // except for this
490  typedef std::vector<scopira::tool::uuid> taskslist_t;
491  struct load_area
492  {
493  taskslist_t pm_tasks;
494  };
496  public:
497  meta_node(const machine_spec &mspec, const node_spec &nspec);
498  };
499  typedef std::map<scopira::tool::uuid, scopira::tool::count_ptr<meta_node> > nodemap_t;
500  typedef std::map<scopira::tool::uuid, scopira::tool::uuid> taskmap_t;
501 
509  struct meta_area
510  {
511  public:
513  nodemap_t pm_nodes;
515  taskmap_t pm_tasks;
516  };
517 
518  typedef std::map<int, scopira::tool::count_ptr<network_msg> > netmsgmap_t;
519  typedef std::map<int, scopira::tool::count_ptr<reply_msg> > repmsgmap_t;
520 
521  struct xtion_area
522  {
523  public:
525  repmsgmap_t pm_replies;
528  netmsgmap_t pm_handlers;
530  int pm_nextid;
532  bool pm_alive;
533 
534  xtion_area(void) : pm_nextid(3000), pm_alive(true) { }
535  };
536 
537  typedef std::map<scopira::tool::uuid, scopira::tool::count_ptr<reply_resolve_agent_msg> > resolvemap_t;
538 
539  // my "dns" cache
540  struct resolve_area
541  {
542  public:
543  resolvemap_t pm_resolves;
544  };
545 
547  scopira::tool::uuid_generator dm_uuidmaker;
549  scopira::tool::chrono dm_age;
550 
552  node_spec dm_nodespec;
553 
555  std::string dm_password;
557  int dm_listenport;
560  int dm_udpport;
561 
572 
574  scopira::tool::netflow dm_listensocket;
576  scopira::tool::udpflow dm_udpsocket;
577 
579  scopira::tool::thread dm_listenthread;
581  scopira::tool::thread dm_udpthread;
583  scopira::tool::thread dm_adminthread;
584 
585  private:
590  void enqueue_msg(admin_msg *msg);
596  void enqueue_msg(scopira::tool::uuid targetuuid, network_msg *msg);
601  void enqueue_reply_msg(network_msg *theq, reply_msg *thereply)
602  { enqueue_reply_msg(theq->get_xtion(), thereply); }
607  void enqueue_reply_msg(const network_msg::xtion_t &x, reply_msg *thereply);
613  void enqueue_cron_msg(double w, admin_msg *msg);
614 
621  void do_xtion(scopira::tool::uuid targetuuid,
622  network_msg *msg, scopira::tool::count_ptr<reply_msg> &replye);
630  void enqueue_xtion(scopira::tool::uuid targetuuid,
631  network_msg *msg, network_msg *handler);
632 
638  void do_resolve_xtion(scopira::tool::uuid id, bool dotaskresolve,
640 
642  void print_status(void);
643 
644  private:
645  static void* listen_thread_func(void *herep);
646  static void* udp_thread_func(void *herep);
647  static void* admin_thread_func(void *herep);
648 
649  // this is called periodically by admin_thread_func
650  // for debug/output/status purposes
651  void print_status(admin_area *area);
652 };
653 
654 #endif
655 
Definition: clusteragent.h:75
node_spec(void)
zero initing
Definition: clusteragent.h:44
Definition: archiveflow.h:20
bool load(scopira::tool::itflow_i &in)
Serialization - yes, non-virtual.
Definition: flow.h:212
Definition: flow.h:281
virtual void write_int(int)=0
Definition: flow.h:352
T * get(void) const
Definition: object.h:378
Definition: bufferflow.h:40
Definition: object.h:71
Definition: uuid.h:183
Definition: url.h:45
Definition: agent.h:160
Definition: thread_pthreads.h:108
Definition: thread_pthreads.h:58
Definition: netflow.h:249
void save(scopira::tool::otflow_i &out) const
Serialization - yes, non-virtual.
Definition: netflow.h:123
Definition: flow.h:421
Definition: localagent.h:70
virtual bool read_int(int &)=0
Definition: localagent.h:43
Definition: uuid.h:242
Definition: time.h:241
Definition: flow.h:159