00001 #include "config.h"
00002
00003 #ifdef HAVE_UNISTD_H
00004 #include <unistd.h>
00005 #endif
00006
00007 #include <ctype.h>
00008
00009 #include <iostream>
00010 #include <string>
00011 #include <vector>
00012
00013 #include "asserts.h"
00014 #include "error.h"
00015 #include "estring.h"
00016 #include "fs.h"
00017 #include "rconfig.h"
00018 #include "timer.h"
00019 #include "logger.h"
00020
00021 #include "archiver.h"
00022
00023
00024
00025
00026 rstat::rstat()
00027 {
00028 TRY_nomem(m_exit_str[0] = "Success");
00029 TRY_nomem(m_exit_str[1] = "Syntax or usage error");
00030 TRY_nomem(m_exit_str[2] = "Protocol incompatability error");
00031 TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories");
00032 TRY_nomem(m_exit_str[4] = "Requested action not supported");
00033 TRY_nomem(m_exit_str[10] = "Error in socket I/O");
00034 TRY_nomem(m_exit_str[11] = "Error in file I/O");
00035 TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream");
00036 TRY_nomem(m_exit_str[13] = "Errors with program diagnostics");
00037 TRY_nomem(m_exit_str[14] = "Error in IPC code");
00038 TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT");
00039 TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()");
00040 TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers");
00041 TRY_nomem(m_exit_str[23] = "Partial transfer");
00042 TRY_nomem(m_exit_str[30] = "Timeout in data send/receive");
00043 TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255");
00044 TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal");
00045 TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run");
00046 TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found");
00047
00048 TRY_nomem(m_signal_str[1] = "[HUP]: Hangup");
00049 TRY_nomem(m_signal_str[2] = "[INT]: Interrupt ");
00050 TRY_nomem(m_signal_str[3] = "[QUIT]: Quit");
00051 TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction");
00052 TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap");
00053 TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault");
00054 TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination");
00055 TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault");
00056 TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception");
00057 TRY_nomem(m_signal_str[10] = "[KILL]: Killed");
00058 TRY_nomem(m_signal_str[11] = "[BUS]: Bus error");
00059 TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault");
00060 TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call");
00061 TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers");
00062 TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm");
00063 TRY_nomem(m_signal_str[16] = "[TERM]: Software termination");
00064 TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1");
00065 TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2");
00066 TRY_nomem(m_signal_str[19] = "[CLD]: Child status change");
00067 TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart");
00068 TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change");
00069 TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition");
00070 TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O");
00071 TRY_nomem(m_signal_str[24] = "[STOP]: Stop");
00072 TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character");
00073 TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process");
00074 TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read");
00075 TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write");
00076 TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired");
00077 TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired");
00078 TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit");
00079 TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit");
00080 TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked");
00081 TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal");
00082 TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR");
00083 TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR");
00084 TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation");
00085 TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost");
00086 TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal");
00087 TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal");
00088
00089 TRY_nomem(m_unknown_exit = "(Unknown exit code)");
00090 TRY_nomem(m_unknown_signal = "(Unknown signal)");
00091 }
00092
00093
00094 const std::string& rstat::exit(const int a_int) const
00095 {
00096 if (m_exit_str.find(a_int) != m_exit_str.end()) {
00097 return(m_exit_str.find(a_int)->second);
00098 }
00099 return(m_unknown_exit);
00100 }
00101
00102
00103 const std::string& rstat::signal(const int a_int) const
00104 {
00105 if (m_signal_str.find(a_int) != m_signal_str.end()) {
00106 return(m_signal_str.find(a_int)->second);
00107 }
00108 return(m_unknown_signal);
00109 }
00110
00111 class rstat rsync_estat_str;
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121 job_archiver::job_archiver(const job * a_job)
00122 {
00123 clear();
00124 m_job = a_job;
00125 m_status = status_pending;
00126 }
00127
00128
00129
00130
00131
00132
00133
00134 const std::string job_archiver::prefix(void)
00135 {
00136 estring lstr;
00137
00138 lstr = "[Job:";
00139 lstr += estring((void*)m_job);
00140 lstr += "] ";
00141
00142 return(lstr);
00143 }
00144
00145
00146 const std::string job_archiver::id(void)
00147 {
00148 estring lstr;
00149
00150 lstr = prefix();
00151 lstr += " ";
00152 lstr += m_job->generate_job_id();
00153
00154 return(lstr);
00155 }
00156
00157
00158
00159
00160
00161
00162
00163 void job_archiver::clear(void)
00164 {
00165 end();
00166 m_child_pid = m_exec.my_pid();
00167 m_status = status_pending;
00168 m_success = true;
00169 m_jr.clear();
00170 m_jpr.clear();
00171 m_error_msg.erase();
00172 }
00173
00174
00175
00176
00177
00178
00179
00180 void job_archiver::end(void)
00181 {
00182 estring lstr;
00183
00184 m_timer.stop();
00185 if (m_exec.child_running()) {
00186 lstr = prefix();
00187 lstr += "Terminating child process!\n";
00188 logger.write(lstr,0,configuration_manager::logging_manager,m_child_pid);
00189 m_exec.kill_child();
00190 }
00191 m_exec.clear();
00192 m_io_out.erase();
00193 m_io_err.erase();
00194 m_status = status_done;
00195 }
00196
00197
00198 const job_archiver::archiving_status job_archiver::status(void)
00199 {
00200 return(m_status);
00201 }
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212 void job_archiver::start(void)
00213 {
00214 estring lstr;
00215
00216 m_jr.id(m_job->generate_job_id());
00217
00218 try {
00219 m_exec.fork();
00220 }
00221 catch(error e) {
00222 lstr = prefix();
00223 lstr += "Could not fork:\n";
00224 logger.write(lstr);
00225
00226 lstr = e.str(prefix());
00227 logger.write(lstr);
00228
00229 lstr = prefix();
00230 lstr += "Will retry job later\n";
00231 logger.write(lstr);
00232
00233 m_status = status_reschedule;
00234 }
00235 catch(...) {
00236 error e = err_unknown;
00237
00238 lstr = prefix();
00239 lstr += "Could not fork:\n";
00240 logger.write(lstr);
00241
00242 lstr = e.str(prefix());
00243 logger.write(lstr);
00244
00245 lstr = prefix();
00246 lstr += "Will retry job later\n";
00247 logger.write(lstr);
00248
00249 m_status = status_reschedule;
00250 }
00251
00252 if (m_exec.is_child()) {
00253
00254
00255
00256
00257 m_exec.reroute_stdio();
00258 try {
00259 mf_do_chores();
00260 }
00261 catch(error e) {
00262 std::cerr << e;
00263 m_success = false;
00264 }
00265 catch(...) {
00266 std::cerr << err_unknown;
00267 m_success = false;
00268 }
00269 if (m_success)
00270 m_exec.exit(0);
00271 else
00272 m_exec.exit(1);
00273 }
00274
00275 m_child_pid = m_exec.child_pid();
00276
00277 lstr = prefix();
00278 lstr += "Spawning child process: PID ";
00279 lstr += estring(static_cast<unsigned long>(m_exec.child_pid()));
00280 lstr += "\n";
00281 logger.write(lstr);
00282
00283 m_status = status_processing;
00284 m_timer.start();
00285 }
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300 void job_archiver::process(void)
00301 {
00302 estring lstr;
00303
00304 if (m_exec.child_running()) {
00305
00306 mf_process_child_io(false);
00307 }
00308 else {
00309
00310 mf_process_child_io(true);
00311
00312
00313
00314 lstr = prefix();
00315 if (m_exec.child_signaled()) {
00316 lstr += "Child exited from signal: ";
00317 lstr += estring(m_exec.child_signal_no());
00318 lstr += "\n";
00319 }
00320 else if (m_exec.child_exit_code() != 0) {
00321 lstr += "Child exited abnormally with code: ";
00322 lstr += estring(m_exec.child_exit_code());
00323 lstr += "\n";
00324 }
00325 else {
00326 lstr += "Child exited successfully\n";
00327 m_status = status_completed;
00328 }
00329 logger.write(lstr);
00330
00331 if (m_exec.child_signaled() || !m_exec.child_exited_normally()) {
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343 m_status = status_error;
00344 }
00345 else {
00346 m_status = status_completed;
00347 }
00348
00349 m_timer.stop();
00350 lstr = prefix();
00351 lstr += "Finished, duration: ";
00352 lstr += m_timer.duration();
00353 lstr += "\n";
00354 logger.write(lstr);
00355
00356 }
00357 }
00358
00359
00360 single_job_report job_archiver::report(void) const
00361 {
00362 return(m_jr);
00363 }
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378 void job_archiver::mf_do_chores(void)
00379 {
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390 job::paths_type::const_iterator pi;
00391
00392 for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
00393 estring archive_dir;
00394 estring path;
00395
00396 std::string binary;
00397 std::vector<std::string> argv;
00398 estring opt;
00399 bool hardlink = false;
00400 bool multi_hardlink = false;
00401 int num_retries = 0;
00402 bool done = false;
00403 int exit_code = 0;
00404 int signal_num = 0;
00405 timer t;
00406 uint64 files_total = 0;
00407 uint64 files_xferd = 0;
00408 uint64 size_total = 0;
00409 uint64 size_xferd = 0;
00410 bool overflow_detected = 0;
00411 estring error_msg;
00412
00413 archive_dir = m_job->generate_archive_path(*pi);
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424 path = archiver.working_archive_path();
00425 path += "/";
00426 path += archive_dir;
00427 path = reform_path(path);
00428 if (!exists(path)) {
00429 std::cout << "Creating job archive path: " << archive_dir << std::endl;
00430 mk_dirhier(path);
00431 }
00432 else
00433 std::cout << "Archiving to existing path: " << archive_dir << std::endl;
00434
00435 if (!writable(path)) {
00436 std::string es;
00437
00438 TRY_nomem(es = "Cannot write to archive directory: \"");
00439 TRY_nomem(es += archive_dir);
00440 TRY_nomem(es += "\"");
00441
00442 throw(ERROR(EACCES,es));
00443 }
00444
00445 logger.set_error_logging(false);
00446 hardlink = m_job->rsync_hardlink;
00447 multi_hardlink = m_job->rsync_multi_hardlink;
00448 while ((num_retries <= m_job->rsync_retry_count) && !done) {
00449 execute exec;
00450 job::excludes_type::const_iterator ei;
00451 job::includes_type::const_iterator ii;
00452 std::vector<std::string>::const_iterator oi;
00453
00454 exit_code = 0;
00455 signal_num = 0;
00456
00457
00458
00459
00460
00461
00462
00463 binary = config.rsync_local_path();
00464 argv = m_job->generate_rsync_options_vector();
00465
00466
00467 if (m_job->rsync_connection != job::connection_local) {
00468
00469
00470
00471
00472
00473
00474
00475
00476
00477
00478
00479 opt = "--rsync-path=";
00480 if (m_job->rsync_remote_path.size() != 0)
00481 opt += m_job->rsync_remote_path;
00482 else
00483 opt += config.rsync_local_path();
00484 argv.push_back(opt);
00485
00486 }
00487
00488 if (hardlink) {
00489 subdirectory subdir;
00490 std::string youngest;
00491 std::string relative_path;
00492 bool hardlink_opt = false;
00493 bool first_hardlink = false;
00494 int linkdest_count = 0;
00495
00496 subdir.path(vaulter.vault());
00497 if (subdir.size() > 0) {
00498 subdirectory::const_iterator si;
00499
00500 sort(subdir.begin(), subdir.end());
00501 reverse(subdir.begin(), subdir.end());
00502 for (si = subdir.begin(); si != subdir.end(); ++si) {
00503 estring ypath;
00504
00505 if (first_hardlink && !multi_hardlink) {
00506 continue;
00507 }
00508 if (linkdest_count >= m_job->rsync_multi_hardlink_max) {
00509 continue;
00510 }
00511 if (!is_timestamp(*si))
00512 continue;
00513 if (*si == config.timestamp().str())
00514 continue;
00515 std::cout
00516 << "Considering potential hardlink source: "
00517 << *si
00518 << std::endl;
00519 ypath = vaulter.vault();
00520 ypath += "/";
00521 ypath += *si;
00522 ypath += "/";
00523 ypath += archive_dir;
00524 if (exists(ypath)) {
00525 std::cout
00526 << "Using archive as hardlink source: "
00527 << *si
00528 << std::endl;
00529
00530 if (!hardlink_opt) {
00531
00532 argv.push_back(std::string("--hard-links"));
00533
00534
00535
00536
00537 hardlink_opt = true;
00538 }
00539
00540 relative_path = mk_relative_path(ypath,path);
00541
00542
00543
00544
00545
00546 opt="--link-dest=";
00547 opt += relative_path;
00548 argv.push_back(opt);
00549
00550 first_hardlink = true;
00551 linkdest_count++;
00552 }
00553 else {
00554 std::cout
00555 << "- No such path: "
00556 << ypath
00557 << std::endl;
00558 }
00559 }
00560 }
00561
00562
00563
00564
00565
00566
00567
00568 }
00569
00570 for (
00571 ei = m_job->excludes.begin();
00572 ei != m_job->excludes.end();
00573 ++ei
00574 )
00575 {
00576
00577
00578
00579
00580
00581 opt = "--exclude-from=";
00582 opt += *ei;
00583 argv.push_back(opt);
00584
00585 }
00586
00587 for (
00588 ii = m_job->includes.begin();
00589 ii != m_job->includes.end();
00590 ++ii
00591 )
00592 {
00593
00594
00595
00596
00597
00598 opt = "--include-from=";
00599 opt += *ei;
00600 argv.push_back(opt);
00601
00602 }
00603
00604
00605
00606
00607
00608
00609 argv.push_back(m_job->generate_source_path(*pi));
00610
00611
00612
00613
00614
00615
00616
00617 argv.push_back(path);
00618
00619
00620
00621
00622
00623
00624 std::cout << "Command Line:" << std::endl;
00625 {
00626 uint16 c;
00627 std::cout << " Binary: " << binary << std::endl;
00628 for (c = 0; c < argv.size(); c++) {
00629 std::cout << " Argv[" << c << "] = " << argv[c] << std::endl;
00630 }
00631 }
00632
00633
00634 m_error_msg.erase();
00635
00636 t.start();
00637
00638
00639
00640
00641 exec.exec(binary, argv);
00642
00643 mf_process_rsync_io(
00644 exec,
00645 m_job->rsync_timeout,
00646 files_total,
00647 files_xferd,
00648 size_total,
00649 size_xferd,
00650 overflow_detected
00651 );
00652 t.stop();
00653
00654 signal_num = 0;
00655 if (exec.child_signaled()) {
00656 std::cout
00657 << "Rsync caught signal: ["
00658 << exec.child_signal_no()
00659 << "] "
00660 << rsync_estat_str.signal(exec.child_signal_no())
00661 << std::endl;
00662 signal_num = exec.child_signal_no();
00663 }
00664 std::cout
00665 << "Rsync exit code: ["
00666 << exec.child_exit_code()
00667 << "] "
00668 << rsync_estat_str.exit(exec.child_exit_code())
00669 << std::endl;
00670
00671 exit_code = exec.child_exit_code();
00672 if (exec.child_exited_normally() && (exit_code == 0))
00673 done = true;
00674 else if (overflow_detected) {
00675 std::cout
00676 << "Vault overflow detected"
00677 << std::endl;
00678 break;
00679 }
00680 else {
00681 ++num_retries;
00682 logger.set_error_logging(true);
00683 }
00684
00685 if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail)
00686 {
00687 std::cout << "Failing, will not attempt to retry" << std::endl;
00688 break;
00689 }
00690 if (m_job->rsync_behavior[exit_code]
00691 == rsync_behavior::retry_without_hardlinks)
00692 {
00693 std::cout << "Retrying without hardlinks..." << std::endl;
00694 hardlink = false;
00695 }
00696
00697 if ((!done) && (m_job->rsync_retry_delay > 0)) {
00698 std::cout << "Waiting " << m_job->rsync_retry_delay
00699 << " minutes before retrying..." << std::endl;
00700 sleep( (m_job->rsync_retry_delay) * 60);
00701 }
00702 }
00703 if (!done) {
00704 if (num_retries >= m_job->rsync_retry_count) {
00705 std::cout << "Retry count exceeded" << std::endl;
00706 }
00707 if (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)
00708 std::cout << "Ignoring rsync failure" << std::endl;
00709 else {
00710 std::cout << "Giving up on this path" << std::endl;
00711 m_success = false;
00712 }
00713 }
00714 reportio().write_report(
00715 m_job->generate_source_path(*pi),
00716 t,
00717 exit_code,
00718 signal_num,
00719 m_error_msg
00720 );
00721 }
00722 }
00723
00724 void job_archiver::mf_process_report(const std::string& a_str)
00725 {
00726 if (reportio().is_report(a_str)) {
00727 m_jpr = reportio().parse(a_str);
00728 m_jr.add_report(m_jpr);
00729 }
00730 }
00731
00732
00733
00734
00735
00736
00737
00738
00739 void job_archiver::mf_process_child_io(bool a_finalize)
00740 {
00741 estring lstr;
00742 bool io_flag = false;
00743
00744 while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) {
00745 int r;
00746 const int buffer_size = 128;
00747 char buffer[buffer_size] = { 0 };
00748
00749 r = m_exec.out_read(buffer, buffer_size);
00750 if (r > 0) {
00751 int c;
00752
00753 io_flag = true;
00754 for (c = 0; c < r; ++c) {
00755 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00756 lstr = prefix();
00757 lstr += m_io_out;
00758 lstr += "\n";
00759 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00760 mf_process_report(lstr);
00761 m_io_out.erase();
00762 }
00763 else {
00764 m_io_out += buffer[c];
00765 }
00766 }
00767 }
00768 }
00769 if (a_finalize && (m_io_out.size() > 0)) {
00770 lstr = prefix();
00771 lstr += m_io_out;
00772 lstr += "\n";
00773 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00774 mf_process_report(lstr);
00775 m_io_out.erase();
00776 }
00777
00778 while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) {
00779 int r;
00780 const int buffer_size = 128;
00781 char buffer[buffer_size] = { 0 };
00782
00783 r = m_exec.err_read(buffer, buffer_size);
00784 if (r > 0) {
00785 int c;
00786
00787 io_flag = true;
00788 for (c = 0; c < r; ++c) {
00789 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00790 lstr = prefix();
00791 lstr += m_io_err;
00792 lstr += "\n";
00793 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00794 mf_process_report(lstr);
00795 m_io_err.erase();
00796 }
00797 else {
00798 m_io_err += buffer[c];
00799 }
00800 }
00801 }
00802 }
00803 if (a_finalize && (m_io_err.size() > 0)) {
00804 lstr = prefix();
00805 lstr += m_io_err;
00806 lstr += "\n";
00807 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00808 mf_process_report(lstr);
00809 m_io_err.erase();
00810 }
00811 if (!io_flag)
00812 sleep(config.io_poll_interval());
00813 }
00814
00815
00816 void job_archiver::mf_trim_string(std::string& a_str)
00817 {
00818 while ((a_str.size() > 0) && (!isdigit(a_str[0])))
00819 a_str.erase(0,1);
00820 while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
00821 a_str.erase(a_str.size()-1,1);
00822 }
00823
00824
00825
00826
00827
00828
00829
00830 void job_archiver::mf_parse_rsync_io(
00831 const std::string a_str,
00832 uint64& a_files_total,
00833 uint64& a_files_xferd,
00834 uint64& a_size_total,
00835 uint64& a_size_xferd
00836 )
00837 {
00838 estring estr;
00839
00840 if (a_str.find("Number of files: ") == 0) {
00841 estr = a_str;
00842 mf_trim_string(estr);
00843 try {
00844 a_files_total = estr;
00845 }
00846 catch(error e) {
00847 estring es;
00848
00849 es = "Could not parse total number of files processed by rsync";
00850 e.push_back(ERROR_INSTANCE(es));
00851
00852
00853 std::cerr << e;
00854 }
00855 catch(...) {
00856 error e = err_unknown;
00857 estring es;
00858
00859 es = "Could not parse total number of files processed by rsync";
00860 e.push_back(ERROR_INSTANCE(es));
00861
00862
00863 std::cerr << e;
00864 }
00865 }
00866 else if (a_str.find("Number of files transferred: ") == 0) {
00867 estr = a_str;
00868 mf_trim_string(estr);
00869 try {
00870 a_files_xferd = estr;
00871 }
00872 catch(error e) {
00873 estring es;
00874
00875 es = "Could not parse total number of files transferred by rsync";
00876 e.push_back(ERROR_INSTANCE(es));
00877
00878
00879 std::cerr << e;
00880 }
00881 catch(...) {
00882 error e = err_unknown;
00883 estring es;
00884
00885 es = "Could not parse total number of files transferred by rsync";
00886 e.push_back(ERROR_INSTANCE(es));
00887
00888
00889 std::cerr << e;
00890 }
00891 }
00892 else if (a_str.find("Total file size: ") == 0) {
00893 estr = a_str;
00894 mf_trim_string(estr);
00895 try {
00896 a_size_total = estr;
00897 }
00898 catch(error e) {
00899 estring es;
00900
00901 es = "Could not parse total size of files processed by rsync";
00902 e.push_back(ERROR_INSTANCE(es));
00903
00904
00905 std::cerr << e;
00906 }
00907 catch(...) {
00908 error e = err_unknown;
00909 estring es;
00910
00911 es = "Could not parse total size of files processed by rsync";
00912 e.push_back(ERROR_INSTANCE(es));
00913
00914
00915 std::cerr << e;
00916 }
00917 }
00918 else if (a_str.find("Total transferred file size: ") == 0) {
00919 estr = a_str;
00920 mf_trim_string(estr);
00921 try {
00922 a_size_xferd = estr;
00923 }
00924 catch(error e) {
00925 estring es;
00926
00927 es = "Could not parse total size of files transferred by rsync";
00928 e.push_back(ERROR_INSTANCE(es));
00929
00930
00931 std::cerr << e;
00932 }
00933 catch(...) {
00934 error e = err_unknown;
00935 estring es;
00936
00937 es = "Could not parse total size of files transferred by rsync";
00938 e.push_back(ERROR_INSTANCE(es));
00939
00940
00941 std::cerr << e;
00942 }
00943 }
00944 }
00945
00946
00947
00948
00949
00950
00951
00952 void job_archiver::mf_process_rsync_io(
00953 execute& a_exec,
00954 uint16 a_timeout,
00955 uint64& a_files_total,
00956 uint64& a_files_xferd,
00957 uint64& a_size_total,
00958 uint64& a_size_xferd,
00959 bool& a_overflow_detected
00960 )
00961 {
00962 size_t ro;
00963 size_t re;
00964 estring out;
00965 estring err;
00966 timer t;
00967 bool io_flag;
00968 char buffer[1024] = { 0 };
00969 char *ptr;
00970
00971 ro = 1;
00972 re = 1;
00973 t.start();
00974 while ((ro != 0) || (re != 0) || a_exec.child_running()) {
00975 io_flag = false;
00976 ro = 0;
00977 re = 0;
00978
00979 if (!a_overflow_detected) {
00980 a_overflow_detected = vaulter.overflow();
00981 }
00982
00983 m_error_msg.erase();
00984
00985 if (a_exec.out_ready()) {
00986 ro = read(a_exec.out_fd(), buffer, 1024);
00987 if (ro > 0) {
00988 io_flag = true;
00989 t.start();
00990 for (ptr = buffer; ptr != buffer+ro; ++ptr) {
00991 if ((*ptr != '\r') && (*ptr != '\n')) {
00992 out += *ptr;
00993 }
00994 else {
00995 reportio().write_rsync_out(out);
00996 out.erase();
00997 }
00998 }
00999 }
01000 }
01001
01002 if (a_exec.err_ready()) {
01003 re = read(a_exec.err_fd(), buffer, 1024);
01004 if (re > 0) {
01005 io_flag = true;
01006 t.start();
01007 for (ptr = buffer; ptr != buffer+re; ++ptr) {
01008 if ((*ptr != '\r') && (*ptr != '\n')) {
01009 err += *ptr;
01010 }
01011 else {
01012 reportio().write_rsync_err(err);
01013 err.erase();
01014 }
01015 }
01016 }
01017 }
01018
01019 t.stop();
01020 if (t.duration_secs() > a_timeout) {
01021 std::cerr << "*** Rsync program inactivity timeout" << std::endl;
01022 a_exec.kill_child();
01023 TRY_nomem(m_error_msg = "Rsync inactivity timeout");
01024 }
01025
01026 if (!io_flag)
01027 sleep(config.io_poll_interval());
01028
01029 }
01030 if (out.size() > 0) {
01031 std::cerr << out << std::endl;
01032 mf_parse_rsync_io(
01033 out,
01034 a_files_total,
01035 a_files_xferd,
01036 a_size_total,
01037 a_size_xferd
01038 );
01039 out.erase();
01040 }
01041 if (err.size() > 0) {
01042 std::cerr << err << std::endl;
01043 mf_parse_rsync_io(
01044 out,
01045 a_files_total,
01046 a_files_xferd,
01047 a_size_total,
01048 a_size_xferd
01049 );
01050 err.erase();
01051 }
01052 }
01053
01054
01055
01056
01057 archive_manager::archive_manager()
01058 {
01059 if (this != &archiver)
01060 throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers"));
01061
01062 clear();
01063 }
01064
01065
01066 void archive_manager::clear(void)
01067 {
01068 m_jobs.clear();
01069 m_initialized = false;
01070 }
01071
01072
01073
01074
01075
01076
01077 void archive_manager::init(void)
01078 {
01079 timer t;
01080 estring lstr;
01081
01082 lstr = "Archive Manager - Beginning initialization\n";
01083 logger.write(lstr);
01084 t.start();
01085
01086 lstr = "Timestamp: ";
01087 lstr += config.timestamp().str();
01088 lstr += "\n";
01089 logger.write(lstr);
01090
01091
01092 vaulter.select();
01093 lstr = "Vault selected: ";
01094 lstr += vaulter.vault();
01095 lstr += "\n";
01096 logger.write(lstr);
01097
01098 reporter.vault().add_report(
01099 vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault()))
01100 );
01101
01102
01103 vaulter.prepare();
01104
01105 t.stop();
01106 lstr = "Archive Manager - Finished initialization";
01107 lstr += ", duration: ";
01108 lstr += t.duration();
01109 lstr += "\n";
01110 logger.write(lstr);
01111
01112 m_initialized = true;
01113 }
01114
01115
01116 const bool archive_manager::initialized(void) const
01117 {
01118 return(m_initialized);
01119 }
01120
01121
01122
01123
01124
01125
01126
01127 void archive_manager::mf_log_status(void)
01128 {
01129 static timer t;
01130 const timer::value_type timeout = 30;
01131 estring lstr;
01132 std::vector<job_archiver*>::const_iterator ji;
01133 uint64 jobs_pending = 0;
01134 uint64 jobs_processing = 0;
01135 uint64 jobs_completed =0;
01136 uint64 jobs_remaining = 0;
01137
01138 if (!t.is_started())
01139 t.start();
01140
01141 t.stop();
01142 if (t.duration_mins() < timeout)
01143 return;
01144
01145 lstr = "\n";
01146 lstr += "STATUS REPORT:\n";
01147 lstr += "================================================================\n";
01148 logger.write(lstr);
01149 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) {
01150 lstr = "[";
01151 switch ((*ji)->status()) {
01152 case job_archiver::status_pending:
01153 lstr += "Pending ";
01154 ++jobs_pending;
01155 break;
01156 case job_archiver::status_processing:
01157 lstr += "Processing ";
01158 ++jobs_processing;
01159 break;
01160 case job_archiver::status_reschedule:
01161 lstr += "Reschedule ";
01162 ++jobs_pending;
01163 break;
01164 case job_archiver::status_fatal_error:
01165 lstr += "Fatal Error";
01166 ++jobs_completed;
01167 break;
01168 case job_archiver::status_error:
01169 lstr += "Error ";
01170 ++jobs_completed;
01171 break;
01172 case job_archiver::status_completed:
01173 lstr += "Completed ";
01174 ++jobs_completed;
01175 break;
01176 case job_archiver::status_done:
01177 lstr += "Done ";
01178 ++jobs_completed;
01179 break;
01180 default:
01181 lstr += "<unknown> ";
01182 break;
01183 }
01184 lstr += "]: ";
01185 lstr += (*ji)->id();
01186 lstr += "\n";
01187 logger.write(lstr);
01188 }
01189
01190 lstr = "---------------------------------------------------------------\n";
01191 lstr += " Jobs Pending: ";
01192 lstr += estring(jobs_pending);
01193 lstr += "\n";
01194
01195 lstr += " Jobs Processing: ";
01196 lstr += estring(jobs_processing);
01197 lstr += "\n";
01198
01199 lstr += " Jobs Completed: ";
01200 lstr += estring(jobs_completed);
01201 lstr += "\n";
01202
01203 lstr += " Total: ";
01204 lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
01205 lstr += "\n";
01206
01207 lstr += "Overflow Detected: ";
01208 if (vaulter.overflow()) {
01209 lstr += "TRUE";
01210 }
01211 else {
01212 lstr += "false";
01213 }
01214 lstr += "\n";
01215
01216 logger.write(lstr);
01217 logger.write("\n");
01218 t.start();
01219 }
01220
01221
01222
01223
01224
01225
01226
01227
01228
01229
01230
01231
01232
01233
01234 void archive_manager::archive(void)
01235 {
01236 timer t;
01237 estring lstr;
01238 configuration_manager::jobs_type::const_iterator cji;
01239 int num_processing = 0;
01240 std::vector<job_archiver*>::iterator ji;
01241 uint64 num_completed = 0;
01242 bool overflow_detected = false;
01243 estring debug_estr;
01244
01245 if (!initialized())
01246 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01247
01248 lstr = "Archive Manager - Begin archiving\n";
01249 logger.write(lstr);
01250 t.start();
01251
01252
01253 try {
01254 if (exists(archive_path())) {
01255 lstr = "Found existing archive directory: ";
01256 lstr += archive_path();
01257 lstr += "\n";
01258 logger.write(lstr);
01259 rename_file(archive_path(), working_archive_path());
01260 }
01261 else if (exists(working_archive_path())) {
01262 lstr = "Found existing archive directory: ";
01263 lstr += working_archive_path();
01264 lstr += "\n";
01265 logger.write(lstr);
01266 }
01267 else {
01268 lstr = "Creating archive directory: ";
01269 lstr += working_archive_path();
01270 lstr += "\n";
01271 logger.write(lstr);
01272 mk_dir(working_archive_path());
01273 }
01274 }
01275 catch(error e) {
01276 logger.write("An error has occured: ");
01277 logger.write(e[0].what());
01278 logger.write("\n");
01279 throw(e);
01280 }
01281 catch(...) {
01282 error e = err_unknown;
01283
01284 logger.write("An error has occured: ");
01285 logger.write(e[0].what());
01286 logger.write("\n");
01287 throw(e);
01288 }
01289
01290
01291 logger.write("Creating to-do list\n");
01292 for (
01293 cji = config.jobs().begin();
01294 cji != config.jobs().end();
01295 ++cji
01296 )
01297 {
01298 job_archiver* ptr;
01299
01300 ptr = new job_archiver(&(*cji));
01301 if (ptr == 0)
01302 throw(err_nomem);
01303 TRY_nomem(m_jobs.push_back(ptr));
01304 }
01305
01306
01307 logger.write("Processing jobs...\n");
01308 while (m_jobs.size() > num_completed) {
01309
01310
01311
01312
01313
01314
01315
01316
01317
01318
01319
01320
01321
01322
01323
01324 if (!overflow_detected) {
01325 overflow_detected = vaulter.overflow(true);
01326
01327
01328
01329
01330
01331
01332
01333 }
01334
01335
01336
01337
01338 if (overflow_detected && (num_processing == 0)) {
01339 TRY(vaulter.prepare(true),"Cannot complete archive");
01340 overflow_detected = vaulter.overflow();
01341
01342
01343
01344
01345
01346
01347
01348 }
01349
01350
01351 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji)
01352 {
01353
01354 if (
01355 !overflow_detected
01356 && (num_processing < config.rsync_parallel())
01357 && ((*ji)->status() == job_archiver::status_pending)
01358 )
01359 {
01360 try {
01361 (*ji)->start();
01362 }
01363 catch(error e) {
01364 if (e.num() == 12) {
01365 lstr = "Error starting job: Out of memory, will retry later\n";
01366 (*ji)->clear();
01367 }
01368 else {
01369 (*ji)->end();
01370 lstr = "Error starting job, aborting\n";
01371
01372 num_processing--;
01373 reporter.jobs().add_report((*ji)->report());
01374 }
01375 logger.write(lstr);
01376 }
01377 catch(...) {
01378 (*ji)->end();
01379 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
01380 lstr += "-- JOB TERMINATED\n";
01381 logger.write(lstr);
01382
01383 num_processing--;
01384 reporter.jobs().add_report((*ji)->report());
01385 }
01386
01387 num_processing++;
01388 }
01389
01390
01391 if ((*ji)->status() == job_archiver::status_processing) {
01392 try {
01393 (*ji)->process();
01394 }
01395 catch(error e) {
01396
01397 if (e.num() == 12) {
01398 lstr = "Error starting job: Out of memory, will retry later\n";
01399 (*ji)->clear();
01400 }
01401 else {
01402 (*ji)->end();
01403 lstr = "Error starting job, aborting\n";
01404
01405 num_processing--;
01406 reporter.jobs().add_report((*ji)->report());
01407 }
01408 logger.write(lstr);
01409 }
01410 catch(...) {
01411 (*ji)->end();
01412 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
01413 lstr += "-- JOB TERMINATED\n";
01414 logger.write(lstr);
01415
01416 num_processing--;
01417 reporter.jobs().add_report((*ji)->report());
01418 }
01419 }
01420
01421
01422 if ((*ji)->status() == job_archiver::status_reschedule) {
01423 (*ji)->clear();
01424
01425 num_processing--;
01426 }
01427
01428
01429
01430 if (
01431 ((*ji)->status() == job_archiver::status_error)
01432 &&
01433 overflow_detected
01434 )
01435 {
01436 lstr = "Vault overflow detected, will retry job later\n";
01437 logger.write(lstr);
01438 (*ji)->clear();
01439 num_processing--;
01440 }
01441
01442
01443 if (
01444 ((*ji)->status() == job_archiver::status_completed)
01445 || ((*ji)->status() == job_archiver::status_fatal_error)
01446 || ((*ji)->status() == job_archiver::status_error)
01447 ) {
01448 (*ji)->end();
01449
01450 num_processing--;
01451 num_completed++;
01452
01453
01454 reporter.jobs().add_report((*ji)->report());
01455 }
01456 }
01457
01458 mf_log_status();
01459 sleep(1);
01460
01461
01462 }
01463
01464 t.stop();
01465 lstr = "Archive Manager - Finished archiving, duration: ";
01466 lstr += t.duration();
01467 lstr += "\n";
01468 logger.write(lstr);
01469
01470 lstr = "Archive Manager - Finalizing archive path\n";
01471 logger.write(lstr);
01472 TRY(
01473 rename_file(working_archive_path(), archive_path()),
01474 "Cannot finalize archive"
01475 );
01476
01477 reporter.vault().add_report(
01478 vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault()))
01479 );
01480 }
01481
01482
01483 const std::string archive_manager::archive_path(void) const
01484 {
01485 estring path;
01486
01487 if (!initialized())
01488 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01489
01490 path = vaulter.vault();
01491 path += "/";
01492 path += config.timestamp().str();
01493
01494 return(path);
01495 }
01496
01497
01498 const std::string archive_manager::working_archive_path(void) const
01499 {
01500 estring path;
01501
01502 if (!initialized())
01503 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01504
01505 path = archive_path();
01506 path += ".incomplete";
01507
01508 return(path);
01509 }
01510
01511
01512
01513
01514 archive_manager archiver;
01515