rvm  1.11
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
archiver.cc
Go to the documentation of this file.
1 #include "config.h"
2 
3 #ifdef HAVE_UNISTD_H
4 #include <unistd.h>
5 #endif
6 
7 #include <ctype.h>
8 
9 #include <iostream>
10 #include <string>
11 #include <vector>
12 
13 #include "asserts.h"
14 #include "error.h"
15 #include "estring.h"
16 #include "fs.h"
17 #include "rconfig.h"
18 #include "timer.h"
19 #include "logger.h"
20 
21 #include "archiver.h"
22 
23 //-----------------------------------------------------------------------------
24 
25 /** C'tor */
27 {
28  TRY_nomem(m_exit_str[0] = "Success");
29  TRY_nomem(m_exit_str[1] = "Syntax or usage error");
30  TRY_nomem(m_exit_str[2] = "Protocol incompatability error");
31  TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories");
32  TRY_nomem(m_exit_str[4] = "Requested action not supported");
33  TRY_nomem(m_exit_str[10] = "Error in socket I/O");
34  TRY_nomem(m_exit_str[11] = "Error in file I/O");
35  TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream");
36  TRY_nomem(m_exit_str[13] = "Errors with program diagnostics");
37  TRY_nomem(m_exit_str[14] = "Error in IPC code");
38  TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT");
39  TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()");
40  TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers");
41  TRY_nomem(m_exit_str[23] = "Partial transfer");
42  TRY_nomem(m_exit_str[24] = "Some files vanished before they could be transferred");
43  TRY_nomem(m_exit_str[25] = "The --max-delete limit stopped deletions");
44  TRY_nomem(m_exit_str[30] = "Timeout in data send/receive");
45  TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255");
46  TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal");
47  TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run");
48  TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found");
49  TRY_nomem(m_exit_str[255] = "An SSH error occured");
50 
51  TRY_nomem(m_signal_str[1] = "[HUP]: Hangup");
52  TRY_nomem(m_signal_str[2] = "[INT]: Interrupt ");
53  TRY_nomem(m_signal_str[3] = "[QUIT]: Quit");
54  TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction");
55  TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap");
56  TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault");
57  TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination");
58  TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault");
59  TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception");
60  TRY_nomem(m_signal_str[10] = "[KILL]: Killed");
61  TRY_nomem(m_signal_str[11] = "[BUS]: Bus error");
62  TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault");
63  TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call");
64  TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers");
65  TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm");
66  TRY_nomem(m_signal_str[16] = "[TERM]: Software termination");
67  TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1");
68  TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2");
69  TRY_nomem(m_signal_str[19] = "[CLD]: Child status change");
70  TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart");
71  TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change");
72  TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition");
73  TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O");
74  TRY_nomem(m_signal_str[24] = "[STOP]: Stop");
75  TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character");
76  TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process");
77  TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read");
78  TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write");
79  TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired");
80  TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired");
81  TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit");
82  TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit");
83  TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked");
84  TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal");
85  TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR");
86  TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR");
87  TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation");
88  TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost");
89  TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal");
90  TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal");
91 
92  TRY_nomem(m_unknown_exit = "(Unknown exit code)");
93  TRY_nomem(m_unknown_signal = "(Unknown signal)");
94 }
95 
96 /** Get a verbose string for an exit code */
97 const std::string& rstat::exit(const int a_int) const
98 {
99  if (m_exit_str.find(a_int) != m_exit_str.end()) {
100  return(m_exit_str.find(a_int)->second);
101  }
102  return(m_unknown_exit);
103 }
104 
105 /** Get a verbose string for a signal number */
106 const std::string& rstat::signal(const int a_int) const
107 {
108  if (m_signal_str.find(a_int) != m_signal_str.end()) {
109  return(m_signal_str.find(a_int)->second);
110  }
111  return(m_unknown_signal);
112 }
113 
115 
116 //-----------------------------------------------------------------------------
117 
118 /** C'tor
119 
120  Set a job to be assiciated with this job archiver and initialize it's
121  processing status to "pending".
122 
123  */
125 {
126  clear();
127  m_job = a_job;
129 }
130 
131 /** Generate a job prefix string
132 
133  Create a string to uniquely identify this job to be used in the log file to
134  uniquely identify this job
135 
136  */
137 const std::string job_archiver::prefix(void)
138 {
139  estring lstr;
140 
141  lstr = "[Job:";
142  lstr += estring((void*)m_job);
143  lstr += "] ";
144 
145  return(lstr);
146 }
147 
148 /** Generate a job id string */
149 const std::string job_archiver::id(void)
150 {
151  estring lstr;
152 
153  lstr = prefix();
154  lstr += " ";
155  lstr += m_job->generate_job_id();
156 
157  return(lstr);
158 }
159 
160 /** Clear the job archiver and return it to it's initial state
161 
162  End any processes handling this job and return the job archiver to it's
163  "pending" state.
164 
165  */
167 {
168  end();
171  m_success = true;
172  m_jr.clear();
173  m_jpr.clear();
174  m_error_msg.erase();
175  m_rsync_timeout_flag = false;
176 }
177 
178 /** End any processes handling this job
179 
180  If any child processes are handling this job, terminate them. Erase any
181  pending I/O for the now defunct child. Set our processing status to "done".
182 
183 */
185 {
186  estring lstr;
187 
188  m_timer.stop();
189  m_io_timer.stop();
190  if (m_exec.child_running()) {
191  lstr = prefix();
192  lstr += "Terminating child process!\n";
194  m_exec.kill_child();
195  }
196  m_exec.clear();
197  m_io_out.erase();
198  m_io_err.erase();
200 }
201 
202 /** Return the processing status of this job archiver */
204 {
205  return(m_status);
206 }
207 
208 /** Begin processing
209 
210  Attempt to fork a child process to handle this job. If unsuccessful then
211  retry again later. The child then calls mf_do_chores() to handle the actual
212  processing, while the parent updates the job archiver's status from
213  "pending" to "processing" and begins a timer to measure the duration of the
214  job process.
215 
216 */
218 {
219  estring lstr;
220  estring path;
221  estring archive_dir;
222 
224 
225  // Create directories before forking
226  job::paths_type::const_iterator pi;
227  for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
228  archive_dir = m_job->generate_archive_path(*pi);
230  path += "/";
231  path += archive_dir;
232  path = reform_path(path);
233  if (!exists(path)) {
234  lstr = prefix();
235  lstr += "Creating job archive path: ";
236  lstr += archive_dir;
237  lstr += "\n";
238  logger.write(lstr);
239  mk_dirhier(path);
240  }
241  }
242 
243  try {
244  m_exec.fork();
245  }
246  catch(error e) {
247  lstr = prefix();
248  lstr += "Could not fork:\n";
249  logger.write(lstr);
250 
251  lstr = e.str(prefix());
252  logger.write(lstr);
253 
254  lstr = prefix();
255  lstr += "Will retry job later\n";
256  logger.write(lstr);
257 
259  }
260  catch(...) {
261  error e = err_unknown;
262 
263  lstr = prefix();
264  lstr += "Could not fork:\n";
265  logger.write(lstr);
266 
267  lstr = e.str(prefix());
268  logger.write(lstr);
269 
270  lstr = prefix();
271  lstr += "Will retry job later\n";
272  logger.write(lstr);
273 
275  }
276 
277  if (m_exec.is_child()) {
278  // wait_for_debugger = true;
279 
280  // while (wait_for_debugger);
281 
283  try {
284  mf_do_chores();
285  }
286  catch(error e) {
287  std::cerr << e;
288  m_success = false;
289  }
290  catch(...) {
291  std::cerr << err_unknown;
292  m_success = false;
293  }
294  if (m_success)
295  m_exec.exit(0);
296  else
297  m_exec.exit(1);
298  }
299 
301 
302  lstr = prefix();
303  lstr += "Spawning child process: PID ";
304  lstr += estring(static_cast<unsigned long>(m_exec.child_pid()));
305  lstr += "\n";
306  logger.write(lstr);
307 
309  m_timer.start();
310  m_io_timer.start();
311  m_rsync_timeout_flag = false;
312 }
313 
314 /** Parent processor for a job
315 
316  Check for I/O from the child. Check the child's status to see if it's still
317  running, has exited with an exit code, or has exited from a signal. If the
318  child sis not exit normally (i.e. exit from a signal or exit with a non-zero
319  exit code) then check the vault for overflow. If the vault has exceeded
320  it's overflow threshold then that could be the cause for the child's
321  failure, in which case we reschedule the child to be processed again later.
322 
323  If the job is finished (whether successful or not), update the job
324  archiver's status to "completed".
325 
326  */
328 {
329  estring lstr;
330 
331  if (m_exec.child_running()) {
332  // Process child I/O
333  mf_process_child_io(false);
334  }
335  else {
336  // Process remaining child I/O
337  mf_process_child_io(true);
338 
339  // If child exited with an error, check vault overflow. If the vault is
340  // filling up, then reschedule the job for later retry.
341  lstr = prefix();
342  if (m_exec.child_signaled()) {
343  lstr += "Child exited from signal: ";
344  lstr += estring(m_exec.child_signal_no());
345  lstr += "\n";
346  }
347  else if (m_exec.child_exit_code() != 0) {
348  lstr += "Child exited abnormally with code: ";
349  lstr += estring(m_exec.child_exit_code());
350  lstr += "\n";
351  }
352  else {
353  lstr += "Child exited successfully\n";
355  }
356  logger.write(lstr);
357 
359  /*
360  if (vaulter.overflow()) {
361  lstr = prefix();
362  lstr += "Vault overflow detected, will retry later\n";
363  logger.write(lstr);
364  m_status = status_reschedule;
365  }
366  else {
367  m_status = status_error;
368  }
369  */
371  }
372  else {
374  }
375 
376  m_timer.stop();
377  m_io_timer.stop();
378  lstr = prefix();
379  lstr += "Finished, duration: ";
380  lstr += m_timer.duration();
381  lstr += "\n";
382  logger.write(lstr);
383  // m_status = status_completed;
384  }
385 }
386 
387 /** Return the job report for this job */
389 {
390  return(m_jr);
391 }
392 
393 /** Child processor for a job
394 
395  For each path in this job:
396  - Create the directory heiararchy for this job in the archive
397  - Until done or until out of retrys:
398  - Choose a hardlink source, if applicable and available
399  - Construct the command line to pass to rsync
400  - Spawn rsync
401  - Process I/O sent back from rsync
402  - Process exit code or signal number returned from rsync
403  - Generate and submit a report to the report manager
404 
405  */
407 {
408  /*
409  {
410  bool wait_for_debugger = true;
411 
412  std::cerr << "Waiting for debugger to attach..." << std::endl;
413  while (wait_for_debugger);
414  std::cerr << "Debugger attached." << std::endl;
415  }
416  */
417 
418  job::paths_type::const_iterator pi;
419 
420  for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
421  estring archive_dir;
422  estring path;
423  std::string binary;
424  std::vector<std::string> argv;
425  std::vector<std::string> extra_argv;
426  estring opt;
427  bool hardlink = false;
428  bool multi_hardlink = false;
429  int num_retries = 0;
430  bool done = false;
431  int exit_code = 0;
432  int signal_num = 0;
433  timer t;
434  uint64 files_total = 0;
435  uint64 files_xferd = 0;
436  uint64 size_total = 0;
437  uint64 size_xferd = 0;
438  bool overflow_detected = 0;
439  estring error_msg;
440 
441  archive_dir = m_job->generate_archive_path(*pi);
442 
444  path += "/";
445  path += archive_dir;
446  path = reform_path(path);
447 
448  if (!exists(path)) {
449  std::string es;
450 
451  TRY_nomem(es = "No such directory: \"");
452  TRY_nomem(es += archive_dir);
453  TRY_nomem(es += "\"");
454 
455  throw(ERROR(EEXIST,es));
456  }
457 
458  if (!writable(path)) {
459  std::string es;
460 
461  TRY_nomem(es = "Cannot write to archive directory: \"");
462  TRY_nomem(es += archive_dir);
463  TRY_nomem(es += "\"");
464 
465  throw(ERROR(EACCES,es));
466  }
467 
468  logger.set_error_logging(false);
469  hardlink = m_job->rsync_hardlink;
470  multi_hardlink = m_job->rsync_multi_hardlink;
471  while ((num_retries <= m_job->rsync_retry_count) && !done) {
472  execute exec;
473  job::excludes_type::const_iterator ei;
474  job::includes_type::const_iterator ii;
475  std::vector<std::string>::const_iterator oi;
476 
477  exit_code = 0;
478  signal_num = 0;
479 
481  binary = config.ssh_local_path();
482  }
483  else {
484  binary = config.rsync_local_path();
485  }
486 
487  argv.clear();
489  argv.push_back("-o");
490  argv.push_back("BatchMode=yes");
491  extra_argv = m_job->generate_ssh_options_vector();
492  argv.insert(argv.end(), extra_argv.begin(), extra_argv.end());
493  argv.push_back("-l");
494  argv.push_back(m_job->rsync_remote_user);
495  argv.push_back(m_job->hostname);
496  if (m_job->rsync_remote_path.size() != 0)
497  argv.push_back(m_job->rsync_remote_path);
498  else
499  argv.push_back(config.rsync_local_path());
500  }
501 
502  extra_argv = m_job->generate_rsync_options_vector();
503  argv.insert(argv.end(), extra_argv.begin(), extra_argv.end());
504 
505 
507  opt = "--rsync-path=";
508  if (m_job->rsync_remote_path.size() != 0)
509  opt += m_job->rsync_remote_path;
510  else
511  opt += config.rsync_local_path();
512  argv.push_back(opt);
513  }
514 
515  if (hardlink) {
516  subdirectory subdir;
517  std::string youngest;
518  std::string relative_path;
519  bool hardlink_opt = false;
520  bool first_hardlink = false;
521  int linkdest_count = 0;
522 
523  subdir.path(vaulter.vault());
524  if (subdir.size() > 0) {
525  subdirectory::const_iterator si;
526 
527  sort(subdir.begin(), subdir.end());
528  reverse(subdir.begin(), subdir.end());
529  for (si = subdir.begin(); si != subdir.end(); ++si) {
530  estring ypath;
531 
532  if (first_hardlink && !multi_hardlink) {
533  continue;
534  }
535  if (linkdest_count >= m_job->rsync_multi_hardlink_max) {
536  continue;
537  }
538  if (!is_timestamp(*si))
539  continue;
540  if (*si == config.timestamp().str())
541  continue;
542  std::cout
543  << "Considering potential hardlink source: "
544  << *si
545  << std::endl;
546  ypath = vaulter.vault();
547  ypath += "/";
548  ypath += *si;
549  ypath += "/";
550  ypath += archive_dir;
551  if (exists(ypath)) {
552  std::cout
553  << "Using archive as hardlink source: "
554  << *si
555  << std::endl;
556  if (!hardlink_opt) {
557  argv.push_back(std::string("--hard-links"));
558  hardlink_opt = true;
559  }
560 
561  relative_path = mk_relative_path(ypath,path);
562 
563  opt="--link-dest=";
564  opt += relative_path;
565  argv.push_back(opt);
566 
567  first_hardlink = true;
568  linkdest_count++; // At most 20 link-dest options can be used w/ rsync
569  }
570  else {
571  std::cout
572  << "- No such path: "
573  << ypath
574  << std::endl;
575  }
576  }
577  }
578  }
579 
580  for (
581  ei = m_job->excludes.begin();
582  ei != m_job->excludes.end();
583  ++ei
584  )
585  {
586  opt = "--exclude-from=";
587  opt += *ei;
588  argv.push_back(opt);
589  }
590 
591  for (
592  ii = m_job->includes.begin();
593  ii != m_job->includes.end();
594  ++ii
595  )
596  {
597  opt = "--include-from=";
598  opt += *ei;
599  argv.push_back(opt);
600  }
601 
602  argv.push_back(m_job->generate_source_path(*pi));
603 
604  argv.push_back(path);
605 
606  std::cout << "Command Line:" << std::endl;
607  {
608  uint16 c;
609  std::cout << " Binary: " << binary << std::endl;
610  for (c = 0; c < argv.size(); c++) {
611  std::cout << " Argv[" << c << "] = " << argv[c] << std::endl;
612  }
613  }
614 
615  m_error_msg.erase();
616 
617  t.start();
618 
619  m_io_timer.start();
620  m_rsync_timeout_flag = false;
621  exec.exec(binary, argv);
622 
624  exec,
626  files_total,
627  files_xferd,
628  size_total,
629  size_xferd,
630  overflow_detected
631  );
632  t.stop();
633 
634  signal_num = 0;
635  if (exec.child_signaled()) {
636  std::cout
637  << "Rsync caught signal: ["
638  << exec.child_signal_no()
639  << "] "
641  << std::endl;
642  signal_num = exec.child_signal_no();
643  }
644  std::cout
645  << "Rsync exit code: ["
646  << exec.child_exit_code()
647  << "] "
649  << std::endl;
650 
651  exit_code = exec.child_exit_code();
652  if (!m_rsync_timeout_flag && exec.child_exited_normally() && (exit_code == 0)) {
653  done = true;
654  }
655  else if (!m_rsync_timeout_flag && (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)) {
656  std::cout << "Ignoring rsync failure" << std::endl;
657  done = true;
658  }
659  else if (overflow_detected) {
660  std::cout
661  << "Vault overflow detected"
662  << std::endl;
663  break;
664  }
665  else {
666  ++num_retries;
668  }
669  if (m_rsync_timeout_flag) {
670  TRY_nomem(m_error_msg = "Rsync inactivity timeout");
671  }
672 
673  if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail)
674  {
675  std::cout << "Failing, will not attempt to retry" << std::endl;
676  break;
677  }
678  if (m_job->rsync_behavior[exit_code]
680  {
681  std::cout << "Retrying without hardlinks..." << std::endl;
682  hardlink = false;
683  }
684 
685  if ((!done) && (m_job->rsync_retry_delay > 0)) {
686  std::cout << "Retries left: " << (m_job->rsync_retry_count - num_retries + 1) << std::endl;
687  if (num_retries <= m_job->rsync_retry_count) {
688  std::cout << "Waiting " << m_job->rsync_retry_delay
689  << " minutes before retrying... " << std::endl;
690  sleep( (m_job->rsync_retry_delay) * 60);
691  }
692  }
693  }
694  if (!done) {
695  if (num_retries >= m_job->rsync_retry_count) {
696  std::cout << "Retry count exceeded" << std::endl;
697  m_success = false;
698  }
699  else {
700  std::cout << "Giving up on this path" << std::endl;
701  m_success = false;
702  }
703  }
706  t,
707  exit_code,
708  signal_num,
709  m_job->rsync_behavior[exit_code],
711  );
712  }
713 }
714 
715 void job_archiver::mf_process_report(const std::string& a_str)
716 {
717  if (reportio().is_report(a_str)) {
718  m_jpr = reportio().parse(a_str);
720  }
721 }
722 
723 /** Process I/O from the child
724 
725  While there is I/O to be read, read and parse it. When the end of a line is
726  reached write that line to the log file. If a_finalize is true, the flush
727  the child I/O buffer string.
728 
729  */
731 {
732  estring lstr;
733  bool io_flag = false;
734 
735  while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) {
736  int r;
737  const int buffer_size = 128;
738  char buffer[buffer_size] = { 0 };
739 
740  r = m_exec.out_read(buffer, buffer_size);
741  if (r > 0) {
742  int c;
743 
744  io_flag = true;
745  for (c = 0; c < r; ++c) {
746  if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
747  lstr = prefix();
748  lstr += m_io_out;
749  lstr += "\n";
751  mf_process_report(lstr);
752  m_io_out.erase();
753  }
754  else {
755  m_io_out += buffer[c];
756  }
757  }
758  }
759  }
760  if (a_finalize && (m_io_out.size() > 0)) {
761  lstr = prefix();
762  lstr += m_io_out;
763  lstr += "\n";
765  mf_process_report(lstr);
766  m_io_out.erase();
767  }
768 
769  while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) {
770  int r;
771  const int buffer_size = 128;
772  char buffer[buffer_size] = { 0 };
773 
774  r = m_exec.err_read(buffer, buffer_size);
775  if (r > 0) {
776  int c;
777 
778  io_flag = true;
779  for (c = 0; c < r; ++c) {
780  if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
781  lstr = prefix();
782  lstr += m_io_err;
783  lstr += "\n";
785  mf_process_report(lstr);
786  m_io_err.erase();
787  }
788  else {
789  m_io_err += buffer[c];
790  }
791  }
792  }
793  }
794  if (a_finalize && (m_io_err.size() > 0)) {
795  lstr = prefix();
796  lstr += m_io_err;
797  lstr += "\n";
799  mf_process_report(lstr);
800  m_io_err.erase();
801  }
802  if (!io_flag)
803  sleep(config.io_poll_interval());
804 }
805 
806 /** Trim off all non-digit leading and trailing characters from a string */
807 void job_archiver::mf_trim_string(std::string& a_str)
808 {
809  while ((a_str.size() > 0) && (!isdigit(a_str[0])))
810  a_str.erase(0,1);
811  while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
812  a_str.erase(a_str.size()-1,1);
813 }
814 
815 /** Parse I/O from rsync
816 
817  Search for special output from rsync to tell us something about the number
818  and size of files and files transfered.
819 
820  */
822  const std::string a_str,
823  uint64& a_files_total,
824  uint64& a_files_xferd,
825  uint64& a_size_total,
826  uint64& a_size_xferd
827  )
828 {
829  estring estr;
830 
831  if (a_str.find("Number of files: ") == 0) {
832  estr = a_str;
833  mf_trim_string(estr);
834  try {
835  a_files_total = estr;
836  }
837  catch(error e) {
838  estring es;
839 
840  es = "Could not parse total number of files processed by rsync";
841  e.push_back(ERROR_INSTANCE(es));
842 
843  // Not sure this is the best way to handle this...
844  std::cerr << e;
845  }
846  catch(...) {
847  error e = err_unknown;
848  estring es;
849 
850  es = "Could not parse total number of files processed by rsync";
851  e.push_back(ERROR_INSTANCE(es));
852 
853  // Not sure this is the best way to handle this...
854  std::cerr << e;
855  }
856  }
857  else if (a_str.find("Number of files transferred: ") == 0) {
858  estr = a_str;
859  mf_trim_string(estr);
860  try {
861  a_files_xferd = estr;
862  }
863  catch(error e) {
864  estring es;
865 
866  es = "Could not parse total number of files transferred by rsync";
867  e.push_back(ERROR_INSTANCE(es));
868 
869  // Not sure this is the best way to handle this...
870  std::cerr << e;
871  }
872  catch(...) {
873  error e = err_unknown;
874  estring es;
875 
876  es = "Could not parse total number of files transferred by rsync";
877  e.push_back(ERROR_INSTANCE(es));
878 
879  // Not sure this is the best way to handle this...
880  std::cerr << e;
881  }
882  }
883  else if (a_str.find("Total file size: ") == 0) {
884  estr = a_str;
885  mf_trim_string(estr);
886  try {
887  a_size_total = estr;
888  }
889  catch(error e) {
890  estring es;
891 
892  es = "Could not parse total size of files processed by rsync";
893  e.push_back(ERROR_INSTANCE(es));
894 
895  // Not sure this is the best way to handle this...
896  std::cerr << e;
897  }
898  catch(...) {
899  error e = err_unknown;
900  estring es;
901 
902  es = "Could not parse total size of files processed by rsync";
903  e.push_back(ERROR_INSTANCE(es));
904 
905  // Not sure this is the best way to handle this...
906  std::cerr << e;
907  }
908  }
909  else if (a_str.find("Total transferred file size: ") == 0) {
910  estr = a_str;
911  mf_trim_string(estr);
912  try {
913  a_size_xferd = estr;
914  }
915  catch(error e) {
916  estring es;
917 
918  es = "Could not parse total size of files transferred by rsync";
919  e.push_back(ERROR_INSTANCE(es));
920 
921  // Not sure this is the best way to handle this...
922  std::cerr << e;
923  }
924  catch(...) {
925  error e = err_unknown;
926  estring es;
927 
928  es = "Could not parse total size of files transferred by rsync";
929  e.push_back(ERROR_INSTANCE(es));
930 
931  // Not sure this is the best way to handle this...
932  std::cerr << e;
933  }
934  }
935 }
936 
937 /** Process I/O from rsync
938 
939  If there is I/O from rsync to be read, read it and then send it through the
940  parser.
941 
942  */
944  execute& a_exec,
945  uint16 a_timeout,
946  uint64& a_files_total,
947  uint64& a_files_xferd,
948  uint64& a_size_total,
949  uint64& a_size_xferd,
950  bool& a_overflow_detected
951  )
952 {
953  size_t ro;
954  size_t re;
955  estring out;
956  estring err;
957  bool io_flag;
958  char buffer[1024] = { 0 };
959  char *ptr;
960 
961  ro = 1;
962  re = 1;
963  while ((ro != 0) || (re != 0) || a_exec.child_running()) {
964  io_flag = false;
965  ro = 0;
966  re = 0;
967 
968  if (!a_overflow_detected) {
969  a_overflow_detected = vaulter.overflow();
970  }
971 
972  m_error_msg.erase();
973 
974  if (a_exec.out_ready()) {
975  ro = read(a_exec.out_fd(), buffer, 1024);
976  if (ro > 0) {
977  io_flag = true;
978  for (ptr = buffer; ptr != buffer+ro; ++ptr) {
979  if ((*ptr != '\r') && (*ptr != '\n')) {
980  out += *ptr;
981  }
982  else {
983  reportio().write_rsync_out(out);
984  out.erase();
985  }
986  }
987  }
988  }
989 
990  if (a_exec.err_ready()) {
991  re = read(a_exec.err_fd(), buffer, 1024);
992  if (re > 0) {
993  io_flag = true;
994  for (ptr = buffer; ptr != buffer+re; ++ptr) {
995  if ((*ptr != '\r') && (*ptr != '\n')) {
996  err += *ptr;
997  }
998  else {
999  reportio().write_rsync_err(err);
1000  err.erase();
1001  }
1002  }
1003  }
1004  }
1005 
1006  m_io_timer.stop();
1007 // std::cerr << "[DEBUG]: --------------------------" << std::endl;
1008 // std::cerr << "[DEBUG]: m_io_timer.start_value() = " << m_io_timer.start_value() << std::endl;
1009 // std::cerr << "[DEBUG]: m_io_timer.stop_value() = " << m_io_timer.stop_value() << std::endl;
1010 // std::cerr << "[DEBUG]: time(0) = " << time(0) << std::endl;
1011 // std::cerr << "[DEBUG]: m_io_timer.duration() = " << m_io_timer.duration_secs() << std::endl;
1012 // std::cerr << "[DEBUG]: difference = " << (time(0) - m_io_timer.start_value()) << std::endl;
1013 // std::cerr << "[DEBUG]: timeout = " << a_timeout << std::endl;
1014 // std::cerr << "[DEBUG]: io_flag = " << io_flag << std::endl;
1015 // std::cerr << "[DEBUG]: m_rsync_timeout_flag = " << m_rsync_timeout_flag << std::endl;
1016  if (io_flag) {
1017  m_io_timer.stop();
1018  m_io_timer.start();
1019  m_rsync_timeout_flag = false;
1020  }
1021  if (!m_rsync_timeout_flag && (m_io_timer.duration_secs() > a_timeout)) {
1022  std::cerr << "*** Rsync program inactivity timeout" << std::endl;
1023  a_exec.kill_child();
1024  m_rsync_timeout_flag = true;
1025  }
1026 
1027  if (!io_flag)
1028  sleep(config.io_poll_interval());
1029 
1030  }
1031  if (out.size() > 0) {
1032  std::cerr << out << std::endl;
1034  out,
1035  a_files_total,
1036  a_files_xferd,
1037  a_size_total,
1038  a_size_xferd
1039  );
1040  out.erase();
1041  }
1042  if (err.size() > 0) {
1043  std::cerr << err << std::endl;
1045  out,
1046  a_files_total,
1047  a_files_xferd,
1048  a_size_total,
1049  a_size_xferd
1050  );
1051  err.erase();
1052  }
1053 }
1054 
1055 //-----------------------------------------------------------------------------
1056 
1057 /** C'tor */
1059 {
1060  if (this != &archiver)
1061  throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers"));
1062 
1063  clear();
1064 }
1065 
1066 /** Clear the archive manager and clear the job list */
1068 {
1069  m_jobs.clear();
1070  m_initialized = false;
1071 }
1072 
1073 /** Initialize the archive manager
1074 
1075  Log the archive timestamp, select and prepare a vault.
1076 
1077  */
1079 {
1080  timer t;
1081  estring lstr;
1082 
1083  lstr = "Archive Manager - Beginning initialization\n";
1084  logger.write(lstr);
1085  t.start();
1086 
1087  lstr = "Timestamp: ";
1088  lstr += config.timestamp().str();
1089  lstr += "\n";
1090  logger.write(lstr);
1091 
1092  // Select a vault?
1093  vaulter.select();
1094  lstr = "Vault selected: ";
1095  lstr += vaulter.vault();
1096  lstr += "\n";
1097  logger.write(lstr);
1098 
1100  vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault()))
1101  );
1102 
1103  // Prepare the vault?
1104  vaulter.prepare();
1105 
1106  t.stop();
1107  lstr = "Archive Manager - Finished initialization";
1108  lstr += ", duration: ";
1109  lstr += t.duration();
1110  lstr += "\n";
1111  logger.write(lstr);
1112 
1113  m_initialized = true;
1114 }
1115 
1116 /** Return the initialized status of the archive manager */
1117 const bool archive_manager::initialized(void) const
1118 {
1119  return(m_initialized);
1120 }
1121 
1122 /** Give a status report
1123 
1124  After so many minutes of inactivity write a report to the log file of our
1125  current status of affairs.
1126 
1127  */
1129 {
1130  static timer t;
1131  const timer::value_type timeout = 30;
1132  estring lstr;
1133  std::vector<job_archiver*>::const_iterator ji;
1134  uint64 jobs_pending = 0;
1135  uint64 jobs_processing = 0;
1136  uint64 jobs_completed =0;
1137  uint64 jobs_remaining = 0;
1138 
1139  if (!t.is_started())
1140  t.start();
1141 
1142  t.stop();
1143  if (t.duration_mins() < timeout)
1144  return;
1145 
1146  lstr = "\n";
1147  lstr += "STATUS REPORT:\n";
1148  lstr += "================================================================\n";
1149  logger.write(lstr);
1150  for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) {
1151  lstr = "[";
1152  switch ((*ji)->status()) {
1154  lstr += "Pending ";
1155  ++jobs_pending;
1156  break;
1158  lstr += "Processing ";
1159  ++jobs_processing;
1160  break;
1162  lstr += "Reschedule ";
1163  ++jobs_pending;
1164  break;
1166  lstr += "Fatal Error";
1167  ++jobs_completed;
1168  break;
1170  lstr += "Error ";
1171  ++jobs_completed;
1172  break;
1174  lstr += "Completed ";
1175  ++jobs_completed;
1176  break;
1178  lstr += "Done ";
1179  ++jobs_completed;
1180  break;
1181  default:
1182  lstr += "<unknown> ";
1183  break;
1184  }
1185  lstr += "]: ";
1186  lstr += (*ji)->id();
1187  lstr += "\n";
1188  logger.write(lstr);
1189  }
1190 
1191  lstr = "---------------------------------------------------------------\n";
1192  lstr += " Jobs Pending: ";
1193  lstr += estring(jobs_pending);
1194  lstr += "\n";
1195 
1196  lstr += " Jobs Processing: ";
1197  lstr += estring(jobs_processing);
1198  lstr += "\n";
1199 
1200  lstr += " Jobs Completed: ";
1201  lstr += estring(jobs_completed);
1202  lstr += "\n";
1203 
1204  lstr += " Total: ";
1205  lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
1206  lstr += "\n";
1207 
1208  lstr += "Overflow Detected: ";
1209  if (vaulter.overflow()) {
1210  lstr += "TRUE";
1211  }
1212  else {
1213  lstr += "false";
1214  }
1215  lstr += "\n";
1216 
1217  logger.write(lstr);
1218  logger.write("\n");
1219  t.start();
1220 }
1221 
1222 /** Archive jobs
1223 
1224  Create an archive directory. Generate a to-do list of job archiver objects.
1225  Process the job archiver objects:
1226  - While there are less than rsync-parallel job archivers processing, start
1227  the first available job archiver.
1228  - Check the status of each job and process I/O from jobs underway.
1229  - Remove jobs that failed to start.
1230  - Possibly reschedule failed jobs.
1231  - Remove completed jobs from active processing.
1232  - Call mf_log_status().
1233 
1234  */
1236 {
1237  timer t;
1238  estring lstr;
1239  configuration_manager::jobs_type::const_iterator cji;
1240  int num_processing = 0;
1241  std::vector<job_archiver*>::iterator ji;
1242  uint64 num_completed = 0;
1243  bool overflow_detected = false;
1244  estring debug_estr;
1245 
1246  if (!initialized())
1247  throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
1248 
1249  lstr = "Archive Manager - Begin archiving\n";
1250  logger.write(lstr);
1251  t.start();
1252 
1253  // Create archive directory
1254  try {
1255  if (exists(archive_path())) {
1256  lstr = "Found existing archive directory: ";
1257  lstr += archive_path();
1258  lstr += "\n";
1259  logger.write(lstr);
1260 
1261  if (!writable(archive_path())) {
1262  std::string es;
1263 
1264  TRY_nomem(es = "Cannot write to archive directory: \"");
1265  TRY_nomem(es += archive_path());
1266  TRY_nomem(es += "\"");
1267 
1268  throw(ERROR(EACCES,es));
1269  }
1270 
1272  }
1273  else if (exists(working_archive_path())) {
1274  lstr = "Found existing archive directory: ";
1275  lstr += working_archive_path();
1276  lstr += "\n";
1277  logger.write(lstr);
1278 
1279  if (!writable(working_archive_path())) {
1280  std::string es;
1281 
1282  TRY_nomem(es = "Cannot write to working archive directory: \"");
1284  TRY_nomem(es += "\"");
1285 
1286  throw(ERROR(EACCES,es));
1287  }
1288  }
1289  else {
1290  lstr = "Creating archive directory: ";
1291  lstr += working_archive_path();
1292  lstr += "\n";
1293  logger.write(lstr);
1295  }
1296  }
1297  catch(error e) {
1298  logger.write("An error has occured: ");
1299  logger.write(e[0].what());
1300  logger.write("\n");
1301  throw(e);
1302  }
1303  catch(...) {
1304  error e = err_unknown;
1305 
1306  logger.write("An error has occured: ");
1307  logger.write(e[0].what());
1308  logger.write("\n");
1309  throw(e);
1310  }
1311 
1312  // Create a todo list
1313  logger.write("Creating to-do list\n");
1314  for (
1315  cji = config.jobs().begin();
1316  cji != config.jobs().end();
1317  ++cji
1318  )
1319  {
1320  job_archiver* ptr;
1321 
1322  ptr = new job_archiver(&(*cji));
1323  if (ptr == 0)
1324  throw(err_nomem);
1325  TRY_nomem(m_jobs.push_back(ptr));
1326  }
1327 
1328  // Backup clients
1329  logger.write("Processing jobs...\n");
1330  while (m_jobs.size() > num_completed) {
1331 
1332  /*
1333  logger.write("[DEBUG]: ---[ TOP OF LOOP ]---\n");
1334 
1335  debug_estr = "[DEBUG]: overflow_detected = ";
1336  debug_estr += estring(overflow_detected);
1337  debug_estr += "\n";
1338  logger.write(debug_estr);
1339 
1340  debug_estr = "[DEBUG]: num_processing = ";
1341  debug_estr += estring(num_processing);
1342  debug_estr += "\n";
1343  logger.write(debug_estr);
1344  */
1345 
1346  if (!overflow_detected) {
1347  overflow_detected = vaulter.overflow(true);
1348  /*
1349  if (overflow_detected) {
1350  logger.write("[DEBUG]: Variable Change :: ");
1351  logger.write("overflow_detected = true");
1352  logger.write("\n");
1353  }
1354  */
1355  }
1356 
1357  // If the vault has exceeded it's highwater mark, wait for the
1358  // currently-processing jobs to terminate, and then attempt to prepare the
1359  // vault.
1360  if (overflow_detected && (num_processing == 0)) {
1361  TRY(vaulter.prepare(true),"Cannot complete archive");
1362  overflow_detected = vaulter.overflow();
1363  /*
1364  if (!overflow_detected) {
1365  logger.write("[DEBUG]: Variable Change :: ");
1366  logger.write("overflow_detected = false");
1367  logger.write("\n");
1368  }
1369  */
1370  }
1371 
1372  // For each job in the list...
1373  for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji)
1374  {
1375  // While we're running less than rsync-parallel jobs, start new ones
1376  if (
1377  !overflow_detected
1378  && (num_processing < config.rsync_parallel())
1379  && ((*ji)->status() == job_archiver::status_pending)
1380  )
1381  {
1382  try {
1383  (*ji)->start();
1384  }
1385  catch(error e) {
1386  if (e.num() == 12) {
1387  lstr = "Error starting job: Out of memory, will retry later\n";
1388  (*ji)->clear();
1389  }
1390  else {
1391  (*ji)->end();
1392  lstr = "Error starting job, aborting\n";
1393  // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1394  num_processing--;
1395  reporter.jobs().add_report((*ji)->report());
1396  }
1397  logger.write(lstr);
1398  }
1399  catch(...) {
1400  (*ji)->end();
1401  lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
1402  lstr += "-- JOB TERMINATED\n";
1403  logger.write(lstr);
1404  // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1405  num_processing--;
1406  reporter.jobs().add_report((*ji)->report());
1407  }
1408  // logger.write("[DEBUG]: Variable Change :: num_processing++\n");
1409  num_processing++;
1410  }
1411 
1412  // Process jobs
1413  if ((*ji)->status() == job_archiver::status_processing) {
1414  try {
1415  (*ji)->process();
1416  }
1417  catch(error e) {
1418  // TODO: Change 12 to ENOMEM?
1419  if (e.num() == 12) {
1420  lstr = "Error starting job: Out of memory, will retry later\n";
1421  (*ji)->clear();
1422  }
1423  else {
1424  (*ji)->end();
1425  lstr = "Error starting job, aborting\n";
1426  // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1427  num_processing--;
1428  reporter.jobs().add_report((*ji)->report());
1429  }
1430  logger.write(lstr);
1431  }
1432  catch(...) {
1433  (*ji)->end();
1434  lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
1435  lstr += "-- JOB TERMINATED\n";
1436  logger.write(lstr);
1437  // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1438  num_processing--;
1439  reporter.jobs().add_report((*ji)->report());
1440  }
1441  }
1442 
1443  // Remove jobs that could not start from active duty
1444  if ((*ji)->status() == job_archiver::status_reschedule) {
1445  (*ji)->clear();
1446  // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1447  num_processing--;
1448  }
1449 
1450  // If a job exited with an error, and the vault is full, then reschedule
1451  // the job for later
1452  if (
1453  ((*ji)->status() == job_archiver::status_error)
1454  &&
1455  overflow_detected
1456  )
1457  {
1458  lstr = "Vault overflow detected, will retry job later\n";
1459  logger.write(lstr);
1460  (*ji)->clear();
1461  num_processing--;
1462  }
1463 
1464  // Remove completed jobs from the processing list
1465  if (
1466  ((*ji)->status() == job_archiver::status_completed)
1467  || ((*ji)->status() == job_archiver::status_fatal_error)
1468  || ((*ji)->status() == job_archiver::status_error)
1469  ) {
1470  (*ji)->end();
1471  // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
1472  num_processing--;
1473  num_completed++;
1474 
1475  // logger.write("Adding job report to report manager\n");
1476  reporter.jobs().add_report((*ji)->report());
1477  }
1478  }
1479 
1480  mf_log_status();
1481  sleep(1);
1482 
1483  // logger.write("[DEBUG]: ---[ BOTTOM OF LOOP ]---\n");
1484  }
1485 
1486  t.stop();
1487  lstr = "Archive Manager - Finished archiving, duration: ";
1488  lstr += t.duration();
1489  lstr += "\n";
1490  logger.write(lstr);
1491 
1492  lstr = "Archive Manager - Finalizing archive path\n";
1493  logger.write(lstr);
1494  TRY(
1496  "Cannot finalize archive"
1497  );
1498 
1500  vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault()))
1501  );
1502 }
1503 
1504 /** Return an absolute path to the finished archive directory */
1505 const std::string archive_manager::archive_path(void) const
1506 {
1507  estring path;
1508 
1509  if (!initialized())
1510  throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
1511 
1512  path = vaulter.vault();
1513  path += "/";
1514  path += config.timestamp().str();
1515 
1516  return(path);
1517 }
1518 
1519 /** Return the absolute path to the unfinished working archive directory */
1520 const std::string archive_manager::working_archive_path(void) const
1521 {
1522  estring path;
1523 
1524  if (!initialized())
1525  throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
1526 
1527  path = archive_path();
1528  path += ".incomplete";
1529 
1530  return(path);
1531 }
1532 
1533 //-----------------------------------------------------------------------------
1534 
1535 /** The global archive manager */
1537 
uint16 rsync_multi_hardlink_max
Definition: rconfig.h:178
void reroute_stdio(void)
Called by the child to reroute the child's stdin, stdout, and stderr to the parent.
Definition: exec.cc:138
std::string reform_path(const std::string &a_path)
Reformat a path to remove double slashes.
Definition: fs.cc:205
includes_type includes
Definition: rconfig.h:169
rsync_connection_type rsync_connection
Definition: rconfig.h:175
std::string m_unknown_exit
Definition: archiver.h:27
Retrieve information about a filesystem.
Definition: fs.h:316
void mk_dir(const std::string &a_path)
Create a directory.
Definition: fs.cc:599
log_manager logger
The global log manager.
Definition: logger.cc:138
uint16 rsync_retry_delay
Definition: rconfig.h:186
job_path_report parse(const std::string &a_str)
Parse a received report from a child process and return a job_path_report.
Definition: reporter.cc:571
Archive the paths associated with a single job.
Definition: archiver.h:36
const duration_type duration_secs(void) const
Reutrn the duration in seconds.
Definition: timer.cc:411
int child_signal_no(void)
If the child was signaled, return the signal number.
Definition: exec.cc:284
void clear(void)
Clear all values.
Definition: reporter.cc:683
const std::string & rsync_local_path(void) const
Return the rsync-local-path.
Definition: rconfig.cc:1550
void clear(void)
Erase the string value.
Definition: estring.cc:414
const std::string generate_job_id(void) const
Generate a unique ID string for this job.
Definition: rconfig.cc:800
pid_t child_pid(void)
Returns the child's PID.
Definition: exec.cc:170
timer m_timer
Definition: archiver.h:62
An extended string class.
Definition: estring.h:52
void add_report(const single_job_report &a_class)
Add a job report to the list.
Definition: reporter.cc:748
Submit or parse a job path report.
Definition: reporter.h:138
bool err_eof(void)
Check for err EOF.
Definition: exec.cc:533
#define err_nomem
Definition: error.h:116
void set_error_logging(bool a_b)
Use error-logging-level instead of logging-level.
Definition: logger.cc:130
void clear(void)
Clear the job archiver and return it to it's initial state.
Definition: archiver.cc:166
bool out_eof(void)
Check for output EOF.
Definition: exec.cc:527
void fork(void)
Fork a child process.
Definition: exec.cc:72
single_job_report report(void) const
Return the job report for this job.
Definition: archiver.cc:388
Hold configuration data for a single job.
Definition: rconfig.h:136
const std::string prefix(void)
Generate a job prefix string.
Definition: archiver.cc:137
void clear(void)
Clear the archive manager and clear the job list.
Definition: archiver.cc:1067
void mf_log_status(void)
Give a status report.
Definition: archiver.cc:1128
void kill_child(void)
Send a KILL signal to the child.
Definition: exec.cc:188
bool m_success
Definition: archiver.h:66
const bool is_started(void) const
Return whether or not the timer has been started.
Definition: timer.cc:399
A single job report.
Definition: reporter.h:164
void write_rsync_err(const std::string &a_str)
Write a report line for output from rsync to parent on child's std::cerr.
Definition: reporter.cc:525
int err_read(char *buf, const int len)
Allow parent to read from err_fd()
Definition: exec.cc:607
void push_back(const error_instance &a_e)
Definition: error.cc:215
void mk_dirhier(const std::string a_path)
Recursively create a directory heirarchy.
Definition: fs.cc:683
void exit(int code=0)
Called by the child to exit with a particular code.
Definition: exec.cc:130
const std::string generate_source_path(const std::string &a_path) const
Generate the source path to be passed to rsync on the command line.
Definition: rconfig.cc:728
excludes_type excludes
Definition: rconfig.h:168
void num(int a_i)
Definition: error.cc:205
class rsync_behavior rsync_behavior
Definition: rconfig.h:174
const uint16 & io_poll_interval(void) const
Return the number of seconds to sleep between polling for I/O.
Definition: rconfig.cc:1586
void write(const std::string &a_str, const uint16 a_indention=0, const configuration_manager::logging_type a_logging_level=configuration_manager::logging_manager, const pid_t a_pid=pid())
Write a string to the log file.
Definition: logger.cc:96
bool writable(const std::string &a_path)
Return true if the file or directory exists and is writable.
Definition: fs.cc:427
const class timestamp & timestamp(void) const
Return the timestamp of this instance of rvm.
Definition: rconfig.cc:1505
const uint16 & rsync_parallel(void) const
Return the rsync-parallel.
Definition: rconfig.cc:1577
bool err_ready(void)
Check I/O for output.
Definition: exec.cc:508
bool out_ready(void)
Check I/O for output.
Definition: exec.cc:488
bool m_initialized
Definition: archiver.h:113
class rstat rsync_estat_str
Definition: archiver.cc:114
int out_fd(void)
Return a file descriptor for I/O between parent a child.
Definition: exec.cc:327
void process(void)
Parent processor for a job.
Definition: archiver.cc:327
archive_manager archiver
The global archive manager.
Definition: archiver.cc:1536
void id(const std::string &a_str)
Set a descriptive ID for this job report.
Definition: reporter.cc:702
void mf_process_child_io(bool a_finalize)
Process I/O from the child.
Definition: archiver.cc:730
uint16 rsync_retry_count
Definition: rconfig.h:185
bool child_running(void)
Returns true if the child is running.
Definition: exec.cc:220
#define err_unknown
Definition: error.h:114
pid_t m_child_pid
Definition: archiver.h:69
void clear(void)
Reset the execute class to default values, kill the child processif one is running.
Definition: exec.cc:46
single_job_report m_jr
Definition: archiver.h:71
const job * m_job
Definition: archiver.h:60
std::map< int, std::string > m_exit_str
Definition: archiver.h:25
void start(void)
Begin processing.
Definition: archiver.cc:217
std::string rsync_remote_user
Definition: rconfig.h:181
Vault stats report.
Definition: reporter.h:13
report_manager reporter
The global report manager.
Definition: reporter.cc:1124
jobs_report & jobs(void)
Return the jobs reporter object.
Definition: reporter.cc:975
const std::string & exit(const int a_int) const
Get a verbose string for an exit code.
Definition: archiver.cc:97
bool relative_path(const std::string &a_path)
Return true if the string looks like a relative path.
Definition: fs.cc:195
void exec(const std::string command)
Execute a command, rerouting stdin, stdout, and stderr to parent.
Definition: exec.cc:390
bool exists(const std::string &a_path)
Return true if the file or directory exists.
Definition: fs.cc:385
void end(void)
End any processes handling this job.
Definition: archiver.cc:184
void mf_trim_string(std::string &a_str)
Trim off all non-digit leading and trailing characters from a string.
Definition: archiver.cc:807
int out_read(char *buf, const int len)
Allow parent to read out_fd()
Definition: exec.cc:573
void select(void)
Select a vault.
Definition: vaulter.cc:59
#define TRY_nomem(code)
Definition: error.h:144
estring m_io_out
Definition: archiver.h:67
bool child_signaled(void)
Returns true if the child was signaled.
Definition: exec.cc:252
std::vector< job_archiver * > m_jobs
Definition: archiver.h:112
uint16 rsync_timeout
Definition: rconfig.h:187
const std::string generate_archive_path(const std::string &a_path) const
Generate the archive-path subdirectory for this job.
Definition: rconfig.cc:641
void start(void)
Start (or restart) the timer.
Definition: timer.cc:137
void mf_do_chores(void)
Child processor for a job.
Definition: archiver.cc:406
void mf_process_rsync_io(execute &a_exec, uint16 a_timeout, uint64 &a_files_total, uint64 &a_files_xferd, uint64 &a_size_total, uint64 &a_size_xferd, bool &a_overflow_detected)
Process I/O from rsync.
Definition: archiver.cc:943
An error class.
Definition: error.h:72
int child_exit_code(void)
Return the child's exit code.
Definition: exec.cc:273
std::map< int, std::string > m_signal_str
Definition: archiver.h:26
bool rsync_hardlink
Definition: rconfig.h:176
archiving_status m_status
Definition: archiver.h:61
const type & path(const std::string a_path, const std::string a_filter="*")
Return a vector of strings of a list of files in a subdirectory.
Definition: fs.cc:1361
const std::string vault(void) const
Return the path to the selected vault.
Definition: vaulter.cc:291
void add_report(const job_path_report &a_class)
Add a path report for this job.
Definition: reporter.cc:690
bool child_exited_normally(void)
Returns true if the child has exited normally.
Definition: exec.cc:242
void clear(void)
Clear all values.
Definition: reporter.cc:372
std::string hostname
Definition: rconfig.h:171
#define TRY(code, es)
Definition: error.h:126
void prepare(bool a_assume_overflow=false)
Prepare the selected vault.
Definition: vaulter.cc:686
job_path_report m_jpr
Definition: archiver.h:70
const std::string & signal(const int a_int) const
Get a verbose string for a signal number.
Definition: archiver.cc:106
#define INTERNAL_ERROR(e, s)
Definition: error.h:123
std::string rsync_remote_path
Definition: rconfig.h:182
void mf_process_report(const std::string &a_str)
Definition: archiver.cc:715
estring m_io_err
Definition: archiver.h:68
configuration_manager config
The global configuration manager instance.
Definition: rconfig.cc:3364
time_t value_type
Definition: timer.h:32
std::string m_error_msg
Definition: archiver.h:72
Map exit codes and signal numbers to verbose strings.
Definition: archiver.h:17
bool is_timestamp(const std::string &a_s)
Return true if the string is a valid timestamp.
Definition: tstamp.cc:502
Fork a child process or execute an external program.
Definition: exec.h:21
void init(void)
Initialize the archive manager.
Definition: archiver.cc:1078
vault_report & vault(void)
Return the vault reporter object.
Definition: reporter.cc:969
archive_manager()
C'tor.
Definition: archiver.cc:1058
std::string mk_relative_path(const std::string a_path_to, const std::string a_path_from)
Make the path a_path_to relative from a_path_from, where a_path_to and a_path_from are directory name...
Definition: fs.cc:314
job_archiver(const job *a_job)
C'tor.
Definition: archiver.cc:124
void write_report(const std::string a_source, const timer &a_timer, const int a_exit_code, const int a_signal_num, const rsync_behavior::value_type &a_behavior, const std::string &a_error_msg)
Generate and submit a report to the parent process on child's std::cout.
Definition: reporter.cc:534
bool m_rsync_timeout_flag
Definition: archiver.h:64
const archiving_status status(void)
Return the processing status of this job archiver.
Definition: archiver.cc:203
pid_t my_pid(void)
Returns the PID.
Definition: exec.cc:120
const std::string duration(void) const
Generate a duration string.
Definition: timer.cc:385
int err_fd(void)
Return a file descriptor for I/O between parent and child.
Definition: exec.cc:347
void add_report(const vault_stats_report &a_class)
Add a vault report to the list.
Definition: reporter.cc:176
#define ERROR_INSTANCE(s)
Definition: error.h:67
Create (or update an existing) archive in the selected vault.
Definition: archiver.h:98
const std::vector< std::string > generate_ssh_options_vector(void) const
Generate ssh command line options.
Definition: rconfig.cc:944
const jobs_type & jobs(void) const
Return a list of jobs.
Definition: rconfig.cc:1667
void mf_parse_rsync_io(std::string a_str, uint64 &a_files_total, uint64 &a_files_xferd, uint64 &a_size_total, uint64 &a_size_xferd)
Parse I/O from rsync.
Definition: archiver.cc:821
execute m_exec
Definition: archiver.h:65
const std::string str(void) const
Generate a string.
Definition: tstamp.cc:387
#define ERROR(e, s)
Definition: error.h:120
const std::vector< std::string > generate_rsync_options_vector(void) const
Generate rsync command line options.
Definition: rconfig.cc:872
const duration_type duration_mins(void) const
Return the duration in minutes.
Definition: timer.cc:422
const bool overflow(bool a_report=false)
Test to see if a vault has exceeded it's overflow threshold.
Definition: vaulter.cc:378
void archive(void)
Archive jobs.
Definition: archiver.cc:1235
void rename_file(const std::string a_from, const std::string a_to)
Rename a file or directory.
Definition: fs.cc:709
const std::string working_archive_path(void) const
Return the absolute path to the unfinished working archive directory.
Definition: archiver.cc:1520
timer m_io_timer
Definition: archiver.h:63
void stop(void)
Stop the timer.
Definition: timer.cc:143
Retrieve a list of files in a subdirectory that match a given wildcard filename.
Definition: fs.h:273
rstat()
C'tor.
Definition: archiver.cc:26
const std::string str(const std::string a_prefix="") const
Definition: error.cc:304
const bool initialized(void) const
Return the initialized status of the archive manager.
Definition: archiver.cc:1117
bool rsync_multi_hardlink
Definition: rconfig.h:177
std::string m_unknown_signal
Definition: archiver.h:28
void write_rsync_out(const std::string &a_str)
Write a report line for output from rsync to parent on child's std::cout.
Definition: reporter.cc:516
bool is_child(void)
Returns true if called by the child.
Definition: exec.cc:101
const std::string & ssh_local_path(void) const
Return the ssh-local-path.
Definition: rconfig.cc:1568
const std::string id(void)
Generate a job id string.
Definition: archiver.cc:149
paths_type paths
Definition: rconfig.h:173
Used as a stopwatch.
Definition: timer.h:29
const std::string archive_path(void) const
Return an absolute path to the finished archive directory.
Definition: archiver.cc:1505
vault_manager vaulter
The global vault manager.
Definition: vaulter.cc:772