00001 #include "config.h"
00002
00003 #ifdef HAVE_UNISTD_H
00004 #include <unistd.h>
00005 #endif
00006
00007 #include <ctype.h>
00008
00009 #include <iostream>
00010 #include <string>
00011 #include <vector>
00012
00013 #include "asserts.h"
00014 #include "error.h"
00015 #include "estring.h"
00016 #include "fs.h"
00017 #include "rconfig.h"
00018 #include "timer.h"
00019 #include "logger.h"
00020
00021 #include "archiver.h"
00022
00023
00024
00025
00026 rstat::rstat()
00027 {
00028 TRY_nomem(m_exit_str[0] = "Success");
00029 TRY_nomem(m_exit_str[1] = "Syntax or usage error");
00030 TRY_nomem(m_exit_str[2] = "Protocol incompatability error");
00031 TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories");
00032 TRY_nomem(m_exit_str[4] = "Requested action not supported");
00033 TRY_nomem(m_exit_str[10] = "Error in socket I/O");
00034 TRY_nomem(m_exit_str[11] = "Error in file I/O");
00035 TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream");
00036 TRY_nomem(m_exit_str[13] = "Errors with program diagnostics");
00037 TRY_nomem(m_exit_str[14] = "Error in IPC code");
00038 TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT");
00039 TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()");
00040 TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers");
00041 TRY_nomem(m_exit_str[23] = "Partial transfer");
00042 TRY_nomem(m_exit_str[24] = "Some files vanished before they could be transferred");
00043 TRY_nomem(m_exit_str[25] = "The --max-delete limit stopped deletions");
00044 TRY_nomem(m_exit_str[30] = "Timeout in data send/receive");
00045 TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255");
00046 TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal");
00047 TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run");
00048 TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found");
00049
00050 TRY_nomem(m_signal_str[1] = "[HUP]: Hangup");
00051 TRY_nomem(m_signal_str[2] = "[INT]: Interrupt ");
00052 TRY_nomem(m_signal_str[3] = "[QUIT]: Quit");
00053 TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction");
00054 TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap");
00055 TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault");
00056 TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination");
00057 TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault");
00058 TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception");
00059 TRY_nomem(m_signal_str[10] = "[KILL]: Killed");
00060 TRY_nomem(m_signal_str[11] = "[BUS]: Bus error");
00061 TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault");
00062 TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call");
00063 TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers");
00064 TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm");
00065 TRY_nomem(m_signal_str[16] = "[TERM]: Software termination");
00066 TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1");
00067 TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2");
00068 TRY_nomem(m_signal_str[19] = "[CLD]: Child status change");
00069 TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart");
00070 TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change");
00071 TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition");
00072 TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O");
00073 TRY_nomem(m_signal_str[24] = "[STOP]: Stop");
00074 TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character");
00075 TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process");
00076 TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read");
00077 TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write");
00078 TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired");
00079 TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired");
00080 TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit");
00081 TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit");
00082 TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked");
00083 TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal");
00084 TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR");
00085 TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR");
00086 TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation");
00087 TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost");
00088 TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal");
00089 TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal");
00090
00091 TRY_nomem(m_unknown_exit = "(Unknown exit code)");
00092 TRY_nomem(m_unknown_signal = "(Unknown signal)");
00093 }
00094
00095
00096 const std::string& rstat::exit(const int a_int) const
00097 {
00098 if (m_exit_str.find(a_int) != m_exit_str.end()) {
00099 return(m_exit_str.find(a_int)->second);
00100 }
00101 return(m_unknown_exit);
00102 }
00103
00104
00105 const std::string& rstat::signal(const int a_int) const
00106 {
00107 if (m_signal_str.find(a_int) != m_signal_str.end()) {
00108 return(m_signal_str.find(a_int)->second);
00109 }
00110 return(m_unknown_signal);
00111 }
00112
00113 class rstat rsync_estat_str;
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123 job_archiver::job_archiver(const job * a_job)
00124 {
00125 clear();
00126 m_job = a_job;
00127 m_status = status_pending;
00128 }
00129
00130
00131
00132
00133
00134
00135
00136 const std::string job_archiver::prefix(void)
00137 {
00138 estring lstr;
00139
00140 lstr = "[Job:";
00141 lstr += estring((void*)m_job);
00142 lstr += "] ";
00143
00144 return(lstr);
00145 }
00146
00147
00148 const std::string job_archiver::id(void)
00149 {
00150 estring lstr;
00151
00152 lstr = prefix();
00153 lstr += " ";
00154 lstr += m_job->generate_job_id();
00155
00156 return(lstr);
00157 }
00158
00159
00160
00161
00162
00163
00164
00165 void job_archiver::clear(void)
00166 {
00167 end();
00168 m_child_pid = m_exec.my_pid();
00169 m_status = status_pending;
00170 m_success = true;
00171 m_jr.clear();
00172 m_jpr.clear();
00173 m_error_msg.erase();
00174 m_rsync_timeout_flag = false;
00175 }
00176
00177
00178
00179
00180
00181
00182
00183 void job_archiver::end(void)
00184 {
00185 estring lstr;
00186
00187 m_timer.stop();
00188 m_io_timer.stop();
00189 if (m_exec.child_running()) {
00190 lstr = prefix();
00191 lstr += "Terminating child process!\n";
00192 logger.write(lstr,0,configuration_manager::logging_manager,m_child_pid);
00193 m_exec.kill_child();
00194 }
00195 m_exec.clear();
00196 m_io_out.erase();
00197 m_io_err.erase();
00198 m_status = status_done;
00199 }
00200
00201
00202 const job_archiver::archiving_status job_archiver::status(void)
00203 {
00204 return(m_status);
00205 }
00206
00207
00208
00209
00210
00211
00212
00213
00214
00215
00216 void job_archiver::start(void)
00217 {
00218 estring lstr;
00219 estring path;
00220 estring archive_dir;
00221
00222 m_jr.id(m_job->generate_job_id());
00223
00224
00225 job::paths_type::const_iterator pi;
00226 for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
00227 archive_dir = m_job->generate_archive_path(*pi);
00228 path = archiver.working_archive_path();
00229 path += "/";
00230 path += archive_dir;
00231 path = reform_path(path);
00232 if (!exists(path)) {
00233 lstr = prefix();
00234 lstr += "Creating job archive path: ";
00235 lstr += archive_dir;
00236 lstr += "\n";
00237 logger.write(lstr);
00238 mk_dirhier(path);
00239 }
00240 }
00241
00242 try {
00243 m_exec.fork();
00244 }
00245 catch(error e) {
00246 lstr = prefix();
00247 lstr += "Could not fork:\n";
00248 logger.write(lstr);
00249
00250 lstr = e.str(prefix());
00251 logger.write(lstr);
00252
00253 lstr = prefix();
00254 lstr += "Will retry job later\n";
00255 logger.write(lstr);
00256
00257 m_status = status_reschedule;
00258 }
00259 catch(...) {
00260 error e = err_unknown;
00261
00262 lstr = prefix();
00263 lstr += "Could not fork:\n";
00264 logger.write(lstr);
00265
00266 lstr = e.str(prefix());
00267 logger.write(lstr);
00268
00269 lstr = prefix();
00270 lstr += "Will retry job later\n";
00271 logger.write(lstr);
00272
00273 m_status = status_reschedule;
00274 }
00275
00276 if (m_exec.is_child()) {
00277
00278
00279
00280
00281 m_exec.reroute_stdio();
00282 try {
00283 mf_do_chores();
00284 }
00285 catch(error e) {
00286 std::cerr << e;
00287 m_success = false;
00288 }
00289 catch(...) {
00290 std::cerr << err_unknown;
00291 m_success = false;
00292 }
00293 if (m_success)
00294 m_exec.exit(0);
00295 else
00296 m_exec.exit(1);
00297 }
00298
00299 m_child_pid = m_exec.child_pid();
00300
00301 lstr = prefix();
00302 lstr += "Spawning child process: PID ";
00303 lstr += estring(static_cast<unsigned long>(m_exec.child_pid()));
00304 lstr += "\n";
00305 logger.write(lstr);
00306
00307 m_status = status_processing;
00308 m_timer.start();
00309 m_io_timer.start();
00310 m_rsync_timeout_flag = false;
00311 }
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326 void job_archiver::process(void)
00327 {
00328 estring lstr;
00329
00330 if (m_exec.child_running()) {
00331
00332 mf_process_child_io(false);
00333 }
00334 else {
00335
00336 mf_process_child_io(true);
00337
00338
00339
00340 lstr = prefix();
00341 if (m_exec.child_signaled()) {
00342 lstr += "Child exited from signal: ";
00343 lstr += estring(m_exec.child_signal_no());
00344 lstr += "\n";
00345 }
00346 else if (m_exec.child_exit_code() != 0) {
00347 lstr += "Child exited abnormally with code: ";
00348 lstr += estring(m_exec.child_exit_code());
00349 lstr += "\n";
00350 }
00351 else {
00352 lstr += "Child exited successfully\n";
00353 m_status = status_completed;
00354 }
00355 logger.write(lstr);
00356
00357 if (m_exec.child_signaled() || !m_exec.child_exited_normally()) {
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369 m_status = status_error;
00370 }
00371 else {
00372 m_status = status_completed;
00373 }
00374
00375 m_timer.stop();
00376 m_io_timer.stop();
00377 lstr = prefix();
00378 lstr += "Finished, duration: ";
00379 lstr += m_timer.duration();
00380 lstr += "\n";
00381 logger.write(lstr);
00382
00383 }
00384 }
00385
00386
00387 single_job_report job_archiver::report(void) const
00388 {
00389 return(m_jr);
00390 }
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405 void job_archiver::mf_do_chores(void)
00406 {
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417 job::paths_type::const_iterator pi;
00418
00419 for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
00420 estring archive_dir;
00421 estring path;
00422
00423 std::string binary;
00424 std::vector<std::string> argv;
00425 estring opt;
00426 bool hardlink = false;
00427 bool multi_hardlink = false;
00428 int num_retries = 0;
00429 bool done = false;
00430 int exit_code = 0;
00431 int signal_num = 0;
00432 timer t;
00433 uint64 files_total = 0;
00434 uint64 files_xferd = 0;
00435 uint64 size_total = 0;
00436 uint64 size_xferd = 0;
00437 bool overflow_detected = 0;
00438 estring error_msg;
00439
00440 archive_dir = m_job->generate_archive_path(*pi);
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451 path = archiver.working_archive_path();
00452 path += "/";
00453 path += archive_dir;
00454 path = reform_path(path);
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464 if (!exists(path)) {
00465 std::string es;
00466
00467 TRY_nomem(es = "No such directory: \"");
00468 TRY_nomem(es += archive_dir);
00469 TRY_nomem(es += "\"");
00470
00471 throw(ERROR(EEXIST,es));
00472 }
00473
00474 if (!writable(path)) {
00475 std::string es;
00476
00477 TRY_nomem(es = "Cannot write to archive directory: \"");
00478 TRY_nomem(es += archive_dir);
00479 TRY_nomem(es += "\"");
00480
00481 throw(ERROR(EACCES,es));
00482 }
00483
00484 logger.set_error_logging(false);
00485 hardlink = m_job->rsync_hardlink;
00486 multi_hardlink = m_job->rsync_multi_hardlink;
00487 while ((num_retries <= m_job->rsync_retry_count) && !done) {
00488 execute exec;
00489 job::excludes_type::const_iterator ei;
00490 job::includes_type::const_iterator ii;
00491 std::vector<std::string>::const_iterator oi;
00492
00493 exit_code = 0;
00494 signal_num = 0;
00495
00496
00497
00498
00499
00500
00501
00502 binary = config.rsync_local_path();
00503 argv = m_job->generate_rsync_options_vector();
00504
00505
00506 if (m_job->rsync_connection != job::connection_local) {
00507
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517
00518 opt = "--rsync-path=";
00519 if (m_job->rsync_remote_path.size() != 0)
00520 opt += m_job->rsync_remote_path;
00521 else
00522 opt += config.rsync_local_path();
00523 argv.push_back(opt);
00524
00525 }
00526
00527 if (hardlink) {
00528 subdirectory subdir;
00529 std::string youngest;
00530 std::string relative_path;
00531 bool hardlink_opt = false;
00532 bool first_hardlink = false;
00533 int linkdest_count = 0;
00534
00535 subdir.path(vaulter.vault());
00536 if (subdir.size() > 0) {
00537 subdirectory::const_iterator si;
00538
00539 sort(subdir.begin(), subdir.end());
00540 reverse(subdir.begin(), subdir.end());
00541 for (si = subdir.begin(); si != subdir.end(); ++si) {
00542 estring ypath;
00543
00544 if (first_hardlink && !multi_hardlink) {
00545 continue;
00546 }
00547 if (linkdest_count >= m_job->rsync_multi_hardlink_max) {
00548 continue;
00549 }
00550 if (!is_timestamp(*si))
00551 continue;
00552 if (*si == config.timestamp().str())
00553 continue;
00554 std::cout
00555 << "Considering potential hardlink source: "
00556 << *si
00557 << std::endl;
00558 ypath = vaulter.vault();
00559 ypath += "/";
00560 ypath += *si;
00561 ypath += "/";
00562 ypath += archive_dir;
00563 if (exists(ypath)) {
00564 std::cout
00565 << "Using archive as hardlink source: "
00566 << *si
00567 << std::endl;
00568
00569 if (!hardlink_opt) {
00570
00571 argv.push_back(std::string("--hard-links"));
00572
00573
00574
00575
00576 hardlink_opt = true;
00577 }
00578
00579 relative_path = mk_relative_path(ypath,path);
00580
00581
00582
00583
00584
00585 opt="--link-dest=";
00586 opt += relative_path;
00587 argv.push_back(opt);
00588
00589 first_hardlink = true;
00590 linkdest_count++;
00591 }
00592 else {
00593 std::cout
00594 << "- No such path: "
00595 << ypath
00596 << std::endl;
00597 }
00598 }
00599 }
00600
00601
00602
00603
00604
00605
00606
00607 }
00608
00609 for (
00610 ei = m_job->excludes.begin();
00611 ei != m_job->excludes.end();
00612 ++ei
00613 )
00614 {
00615
00616
00617
00618
00619
00620 opt = "--exclude-from=";
00621 opt += *ei;
00622 argv.push_back(opt);
00623
00624 }
00625
00626 for (
00627 ii = m_job->includes.begin();
00628 ii != m_job->includes.end();
00629 ++ii
00630 )
00631 {
00632
00633
00634
00635
00636
00637 opt = "--include-from=";
00638 opt += *ei;
00639 argv.push_back(opt);
00640
00641 }
00642
00643
00644
00645
00646
00647
00648 argv.push_back(m_job->generate_source_path(*pi));
00649
00650
00651
00652
00653
00654
00655
00656 argv.push_back(path);
00657
00658
00659
00660
00661
00662
00663 std::cout << "Command Line:" << std::endl;
00664 {
00665 uint16 c;
00666 std::cout << " Binary: " << binary << std::endl;
00667 for (c = 0; c < argv.size(); c++) {
00668 std::cout << " Argv[" << c << "] = " << argv[c] << std::endl;
00669 }
00670 }
00671
00672
00673 m_error_msg.erase();
00674
00675 t.start();
00676
00677
00678
00679
00680 m_io_timer.start();
00681 m_rsync_timeout_flag = false;
00682 exec.exec(binary, argv);
00683
00684 mf_process_rsync_io(
00685 exec,
00686 m_job->rsync_timeout,
00687 files_total,
00688 files_xferd,
00689 size_total,
00690 size_xferd,
00691 overflow_detected
00692 );
00693 t.stop();
00694
00695 signal_num = 0;
00696 if (exec.child_signaled()) {
00697 std::cout
00698 << "Rsync caught signal: ["
00699 << exec.child_signal_no()
00700 << "] "
00701 << rsync_estat_str.signal(exec.child_signal_no())
00702 << std::endl;
00703 signal_num = exec.child_signal_no();
00704 }
00705 std::cout
00706 << "Rsync exit code: ["
00707 << exec.child_exit_code()
00708 << "] "
00709 << rsync_estat_str.exit(exec.child_exit_code())
00710 << std::endl;
00711
00712 exit_code = exec.child_exit_code();
00713 if (!m_rsync_timeout_flag && exec.child_exited_normally() && (exit_code == 0)) {
00714 done = true;
00715 }
00716 else if (!m_rsync_timeout_flag && (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)) {
00717 std::cout << "Ignoring rsync failure" << std::endl;
00718 done = true;
00719 }
00720 else if (overflow_detected) {
00721 std::cout
00722 << "Vault overflow detected"
00723 << std::endl;
00724 break;
00725 }
00726 else {
00727 ++num_retries;
00728 logger.set_error_logging(true);
00729 }
00730 if (m_rsync_timeout_flag) {
00731 TRY_nomem(m_error_msg = "Rsync inactivity timeout");
00732 }
00733
00734 if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail)
00735 {
00736 std::cout << "Failing, will not attempt to retry" << std::endl;
00737 break;
00738 }
00739 if (m_job->rsync_behavior[exit_code]
00740 == rsync_behavior::retry_without_hardlinks)
00741 {
00742 std::cout << "Retrying without hardlinks..." << std::endl;
00743 hardlink = false;
00744 }
00745
00746 if ((!done) && (m_job->rsync_retry_delay > 0)) {
00747 std::cout << "Retries left: " << (m_job->rsync_retry_count - num_retries + 1) << std::endl;
00748 if (num_retries <= m_job->rsync_retry_count) {
00749 std::cout << "Waiting " << m_job->rsync_retry_delay
00750 << " minutes before retrying... " << std::endl;
00751 sleep( (m_job->rsync_retry_delay) * 60);
00752 }
00753 }
00754 }
00755 if (!done) {
00756 if (num_retries >= m_job->rsync_retry_count) {
00757 std::cout << "Retry count exceeded" << std::endl;
00758 m_success = false;
00759 }
00760 else {
00761 std::cout << "Giving up on this path" << std::endl;
00762 m_success = false;
00763 }
00764 }
00765 reportio().write_report(
00766 m_job->generate_source_path(*pi),
00767 t,
00768 exit_code,
00769 signal_num,
00770 m_job->rsync_behavior[exit_code],
00771 m_error_msg
00772 );
00773 }
00774 }
00775
00776 void job_archiver::mf_process_report(const std::string& a_str)
00777 {
00778 if (reportio().is_report(a_str)) {
00779 m_jpr = reportio().parse(a_str);
00780 m_jr.add_report(m_jpr);
00781 }
00782 }
00783
00784
00785
00786
00787
00788
00789
00790
00791 void job_archiver::mf_process_child_io(bool a_finalize)
00792 {
00793 estring lstr;
00794 bool io_flag = false;
00795
00796 while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) {
00797 int r;
00798 const int buffer_size = 128;
00799 char buffer[buffer_size] = { 0 };
00800
00801 r = m_exec.out_read(buffer, buffer_size);
00802 if (r > 0) {
00803 int c;
00804
00805 io_flag = true;
00806 for (c = 0; c < r; ++c) {
00807 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00808 lstr = prefix();
00809 lstr += m_io_out;
00810 lstr += "\n";
00811 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00812 mf_process_report(lstr);
00813 m_io_out.erase();
00814 }
00815 else {
00816 m_io_out += buffer[c];
00817 }
00818 }
00819 }
00820 }
00821 if (a_finalize && (m_io_out.size() > 0)) {
00822 lstr = prefix();
00823 lstr += m_io_out;
00824 lstr += "\n";
00825 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00826 mf_process_report(lstr);
00827 m_io_out.erase();
00828 }
00829
00830 while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) {
00831 int r;
00832 const int buffer_size = 128;
00833 char buffer[buffer_size] = { 0 };
00834
00835 r = m_exec.err_read(buffer, buffer_size);
00836 if (r > 0) {
00837 int c;
00838
00839 io_flag = true;
00840 for (c = 0; c < r; ++c) {
00841 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00842 lstr = prefix();
00843 lstr += m_io_err;
00844 lstr += "\n";
00845 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00846 mf_process_report(lstr);
00847 m_io_err.erase();
00848 }
00849 else {
00850 m_io_err += buffer[c];
00851 }
00852 }
00853 }
00854 }
00855 if (a_finalize && (m_io_err.size() > 0)) {
00856 lstr = prefix();
00857 lstr += m_io_err;
00858 lstr += "\n";
00859 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00860 mf_process_report(lstr);
00861 m_io_err.erase();
00862 }
00863 if (!io_flag)
00864 sleep(config.io_poll_interval());
00865 }
00866
00867
00868 void job_archiver::mf_trim_string(std::string& a_str)
00869 {
00870 while ((a_str.size() > 0) && (!isdigit(a_str[0])))
00871 a_str.erase(0,1);
00872 while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
00873 a_str.erase(a_str.size()-1,1);
00874 }
00875
00876
00877
00878
00879
00880
00881
00882 void job_archiver::mf_parse_rsync_io(
00883 const std::string a_str,
00884 uint64& a_files_total,
00885 uint64& a_files_xferd,
00886 uint64& a_size_total,
00887 uint64& a_size_xferd
00888 )
00889 {
00890 estring estr;
00891
00892 if (a_str.find("Number of files: ") == 0) {
00893 estr = a_str;
00894 mf_trim_string(estr);
00895 try {
00896 a_files_total = estr;
00897 }
00898 catch(error e) {
00899 estring es;
00900
00901 es = "Could not parse total number of files processed by rsync";
00902 e.push_back(ERROR_INSTANCE(es));
00903
00904
00905 std::cerr << e;
00906 }
00907 catch(...) {
00908 error e = err_unknown;
00909 estring es;
00910
00911 es = "Could not parse total number of files processed by rsync";
00912 e.push_back(ERROR_INSTANCE(es));
00913
00914
00915 std::cerr << e;
00916 }
00917 }
00918 else if (a_str.find("Number of files transferred: ") == 0) {
00919 estr = a_str;
00920 mf_trim_string(estr);
00921 try {
00922 a_files_xferd = estr;
00923 }
00924 catch(error e) {
00925 estring es;
00926
00927 es = "Could not parse total number of files transferred by rsync";
00928 e.push_back(ERROR_INSTANCE(es));
00929
00930
00931 std::cerr << e;
00932 }
00933 catch(...) {
00934 error e = err_unknown;
00935 estring es;
00936
00937 es = "Could not parse total number of files transferred by rsync";
00938 e.push_back(ERROR_INSTANCE(es));
00939
00940
00941 std::cerr << e;
00942 }
00943 }
00944 else if (a_str.find("Total file size: ") == 0) {
00945 estr = a_str;
00946 mf_trim_string(estr);
00947 try {
00948 a_size_total = estr;
00949 }
00950 catch(error e) {
00951 estring es;
00952
00953 es = "Could not parse total size of files processed by rsync";
00954 e.push_back(ERROR_INSTANCE(es));
00955
00956
00957 std::cerr << e;
00958 }
00959 catch(...) {
00960 error e = err_unknown;
00961 estring es;
00962
00963 es = "Could not parse total size of files processed by rsync";
00964 e.push_back(ERROR_INSTANCE(es));
00965
00966
00967 std::cerr << e;
00968 }
00969 }
00970 else if (a_str.find("Total transferred file size: ") == 0) {
00971 estr = a_str;
00972 mf_trim_string(estr);
00973 try {
00974 a_size_xferd = estr;
00975 }
00976 catch(error e) {
00977 estring es;
00978
00979 es = "Could not parse total size of files transferred by rsync";
00980 e.push_back(ERROR_INSTANCE(es));
00981
00982
00983 std::cerr << e;
00984 }
00985 catch(...) {
00986 error e = err_unknown;
00987 estring es;
00988
00989 es = "Could not parse total size of files transferred by rsync";
00990 e.push_back(ERROR_INSTANCE(es));
00991
00992
00993 std::cerr << e;
00994 }
00995 }
00996 }
00997
00998
00999
01000
01001
01002
01003
01004 void job_archiver::mf_process_rsync_io(
01005 execute& a_exec,
01006 uint16 a_timeout,
01007 uint64& a_files_total,
01008 uint64& a_files_xferd,
01009 uint64& a_size_total,
01010 uint64& a_size_xferd,
01011 bool& a_overflow_detected
01012 )
01013 {
01014 size_t ro;
01015 size_t re;
01016 estring out;
01017 estring err;
01018 bool io_flag;
01019 char buffer[1024] = { 0 };
01020 char *ptr;
01021
01022 ro = 1;
01023 re = 1;
01024 while ((ro != 0) || (re != 0) || a_exec.child_running()) {
01025 io_flag = false;
01026 ro = 0;
01027 re = 0;
01028
01029 if (!a_overflow_detected) {
01030 a_overflow_detected = vaulter.overflow();
01031 }
01032
01033 m_error_msg.erase();
01034
01035 if (a_exec.out_ready()) {
01036 ro = read(a_exec.out_fd(), buffer, 1024);
01037 if (ro > 0) {
01038 io_flag = true;
01039 for (ptr = buffer; ptr != buffer+ro; ++ptr) {
01040 if ((*ptr != '\r') && (*ptr != '\n')) {
01041 out += *ptr;
01042 }
01043 else {
01044 reportio().write_rsync_out(out);
01045 out.erase();
01046 }
01047 }
01048 }
01049 }
01050
01051 if (a_exec.err_ready()) {
01052 re = read(a_exec.err_fd(), buffer, 1024);
01053 if (re > 0) {
01054 io_flag = true;
01055 for (ptr = buffer; ptr != buffer+re; ++ptr) {
01056 if ((*ptr != '\r') && (*ptr != '\n')) {
01057 err += *ptr;
01058 }
01059 else {
01060 reportio().write_rsync_err(err);
01061 err.erase();
01062 }
01063 }
01064 }
01065 }
01066
01067 m_io_timer.stop();
01068
01069
01070
01071
01072
01073
01074
01075
01076
01077 if (io_flag) {
01078 m_io_timer.stop();
01079 m_io_timer.start();
01080 m_rsync_timeout_flag = false;
01081 }
01082 if (!m_rsync_timeout_flag && (m_io_timer.duration_secs() > a_timeout)) {
01083 std::cerr << "*** Rsync program inactivity timeout" << std::endl;
01084 a_exec.kill_child();
01085 m_rsync_timeout_flag = true;
01086 }
01087
01088 if (!io_flag)
01089 sleep(config.io_poll_interval());
01090
01091 }
01092 if (out.size() > 0) {
01093 std::cerr << out << std::endl;
01094 mf_parse_rsync_io(
01095 out,
01096 a_files_total,
01097 a_files_xferd,
01098 a_size_total,
01099 a_size_xferd
01100 );
01101 out.erase();
01102 }
01103 if (err.size() > 0) {
01104 std::cerr << err << std::endl;
01105 mf_parse_rsync_io(
01106 out,
01107 a_files_total,
01108 a_files_xferd,
01109 a_size_total,
01110 a_size_xferd
01111 );
01112 err.erase();
01113 }
01114 }
01115
01116
01117
01118
01119 archive_manager::archive_manager()
01120 {
01121 if (this != &archiver)
01122 throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers"));
01123
01124 clear();
01125 }
01126
01127
01128 void archive_manager::clear(void)
01129 {
01130 m_jobs.clear();
01131 m_initialized = false;
01132 }
01133
01134
01135
01136
01137
01138
01139 void archive_manager::init(void)
01140 {
01141 timer t;
01142 estring lstr;
01143
01144 lstr = "Archive Manager - Beginning initialization\n";
01145 logger.write(lstr);
01146 t.start();
01147
01148 lstr = "Timestamp: ";
01149 lstr += config.timestamp().str();
01150 lstr += "\n";
01151 logger.write(lstr);
01152
01153
01154 vaulter.select();
01155 lstr = "Vault selected: ";
01156 lstr += vaulter.vault();
01157 lstr += "\n";
01158 logger.write(lstr);
01159
01160 reporter.vault().add_report(
01161 vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault()))
01162 );
01163
01164
01165 vaulter.prepare();
01166
01167 t.stop();
01168 lstr = "Archive Manager - Finished initialization";
01169 lstr += ", duration: ";
01170 lstr += t.duration();
01171 lstr += "\n";
01172 logger.write(lstr);
01173
01174 m_initialized = true;
01175 }
01176
01177
01178 const bool archive_manager::initialized(void) const
01179 {
01180 return(m_initialized);
01181 }
01182
01183
01184
01185
01186
01187
01188
01189 void archive_manager::mf_log_status(void)
01190 {
01191 static timer t;
01192 const timer::value_type timeout = 30;
01193 estring lstr;
01194 std::vector<job_archiver*>::const_iterator ji;
01195 uint64 jobs_pending = 0;
01196 uint64 jobs_processing = 0;
01197 uint64 jobs_completed =0;
01198 uint64 jobs_remaining = 0;
01199
01200 if (!t.is_started())
01201 t.start();
01202
01203 t.stop();
01204 if (t.duration_mins() < timeout)
01205 return;
01206
01207 lstr = "\n";
01208 lstr += "STATUS REPORT:\n";
01209 lstr += "================================================================\n";
01210 logger.write(lstr);
01211 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) {
01212 lstr = "[";
01213 switch ((*ji)->status()) {
01214 case job_archiver::status_pending:
01215 lstr += "Pending ";
01216 ++jobs_pending;
01217 break;
01218 case job_archiver::status_processing:
01219 lstr += "Processing ";
01220 ++jobs_processing;
01221 break;
01222 case job_archiver::status_reschedule:
01223 lstr += "Reschedule ";
01224 ++jobs_pending;
01225 break;
01226 case job_archiver::status_fatal_error:
01227 lstr += "Fatal Error";
01228 ++jobs_completed;
01229 break;
01230 case job_archiver::status_error:
01231 lstr += "Error ";
01232 ++jobs_completed;
01233 break;
01234 case job_archiver::status_completed:
01235 lstr += "Completed ";
01236 ++jobs_completed;
01237 break;
01238 case job_archiver::status_done:
01239 lstr += "Done ";
01240 ++jobs_completed;
01241 break;
01242 default:
01243 lstr += "<unknown> ";
01244 break;
01245 }
01246 lstr += "]: ";
01247 lstr += (*ji)->id();
01248 lstr += "\n";
01249 logger.write(lstr);
01250 }
01251
01252 lstr = "---------------------------------------------------------------\n";
01253 lstr += " Jobs Pending: ";
01254 lstr += estring(jobs_pending);
01255 lstr += "\n";
01256
01257 lstr += " Jobs Processing: ";
01258 lstr += estring(jobs_processing);
01259 lstr += "\n";
01260
01261 lstr += " Jobs Completed: ";
01262 lstr += estring(jobs_completed);
01263 lstr += "\n";
01264
01265 lstr += " Total: ";
01266 lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
01267 lstr += "\n";
01268
01269 lstr += "Overflow Detected: ";
01270 if (vaulter.overflow()) {
01271 lstr += "TRUE";
01272 }
01273 else {
01274 lstr += "false";
01275 }
01276 lstr += "\n";
01277
01278 logger.write(lstr);
01279 logger.write("\n");
01280 t.start();
01281 }
01282
01283
01284
01285
01286
01287
01288
01289
01290
01291
01292
01293
01294
01295
01296 void archive_manager::archive(void)
01297 {
01298 timer t;
01299 estring lstr;
01300 configuration_manager::jobs_type::const_iterator cji;
01301 int num_processing = 0;
01302 std::vector<job_archiver*>::iterator ji;
01303 uint64 num_completed = 0;
01304 bool overflow_detected = false;
01305 estring debug_estr;
01306
01307 if (!initialized())
01308 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01309
01310 lstr = "Archive Manager - Begin archiving\n";
01311 logger.write(lstr);
01312 t.start();
01313
01314
01315 try {
01316 if (exists(archive_path())) {
01317 lstr = "Found existing archive directory: ";
01318 lstr += archive_path();
01319 lstr += "\n";
01320 logger.write(lstr);
01321 rename_file(archive_path(), working_archive_path());
01322 }
01323 else if (exists(working_archive_path())) {
01324 lstr = "Found existing archive directory: ";
01325 lstr += working_archive_path();
01326 lstr += "\n";
01327 logger.write(lstr);
01328 }
01329 else {
01330 lstr = "Creating archive directory: ";
01331 lstr += working_archive_path();
01332 lstr += "\n";
01333 logger.write(lstr);
01334 mk_dir(working_archive_path());
01335 }
01336 }
01337 catch(error e) {
01338 logger.write("An error has occured: ");
01339 logger.write(e[0].what());
01340 logger.write("\n");
01341 throw(e);
01342 }
01343 catch(...) {
01344 error e = err_unknown;
01345
01346 logger.write("An error has occured: ");
01347 logger.write(e[0].what());
01348 logger.write("\n");
01349 throw(e);
01350 }
01351
01352
01353 logger.write("Creating to-do list\n");
01354 for (
01355 cji = config.jobs().begin();
01356 cji != config.jobs().end();
01357 ++cji
01358 )
01359 {
01360 job_archiver* ptr;
01361
01362 ptr = new job_archiver(&(*cji));
01363 if (ptr == 0)
01364 throw(err_nomem);
01365 TRY_nomem(m_jobs.push_back(ptr));
01366 }
01367
01368
01369 logger.write("Processing jobs...\n");
01370 while (m_jobs.size() > num_completed) {
01371
01372
01373
01374
01375
01376
01377
01378
01379
01380
01381
01382
01383
01384
01385
01386 if (!overflow_detected) {
01387 overflow_detected = vaulter.overflow(true);
01388
01389
01390
01391
01392
01393
01394
01395 }
01396
01397
01398
01399
01400 if (overflow_detected && (num_processing == 0)) {
01401 TRY(vaulter.prepare(true),"Cannot complete archive");
01402 overflow_detected = vaulter.overflow();
01403
01404
01405
01406
01407
01408
01409
01410 }
01411
01412
01413 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji)
01414 {
01415
01416 if (
01417 !overflow_detected
01418 && (num_processing < config.rsync_parallel())
01419 && ((*ji)->status() == job_archiver::status_pending)
01420 )
01421 {
01422 try {
01423 (*ji)->start();
01424 }
01425 catch(error e) {
01426 if (e.num() == 12) {
01427 lstr = "Error starting job: Out of memory, will retry later\n";
01428 (*ji)->clear();
01429 }
01430 else {
01431 (*ji)->end();
01432 lstr = "Error starting job, aborting\n";
01433
01434 num_processing--;
01435 reporter.jobs().add_report((*ji)->report());
01436 }
01437 logger.write(lstr);
01438 }
01439 catch(...) {
01440 (*ji)->end();
01441 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
01442 lstr += "-- JOB TERMINATED\n";
01443 logger.write(lstr);
01444
01445 num_processing--;
01446 reporter.jobs().add_report((*ji)->report());
01447 }
01448
01449 num_processing++;
01450 }
01451
01452
01453 if ((*ji)->status() == job_archiver::status_processing) {
01454 try {
01455 (*ji)->process();
01456 }
01457 catch(error e) {
01458
01459 if (e.num() == 12) {
01460 lstr = "Error starting job: Out of memory, will retry later\n";
01461 (*ji)->clear();
01462 }
01463 else {
01464 (*ji)->end();
01465 lstr = "Error starting job, aborting\n";
01466
01467 num_processing--;
01468 reporter.jobs().add_report((*ji)->report());
01469 }
01470 logger.write(lstr);
01471 }
01472 catch(...) {
01473 (*ji)->end();
01474 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
01475 lstr += "-- JOB TERMINATED\n";
01476 logger.write(lstr);
01477
01478 num_processing--;
01479 reporter.jobs().add_report((*ji)->report());
01480 }
01481 }
01482
01483
01484 if ((*ji)->status() == job_archiver::status_reschedule) {
01485 (*ji)->clear();
01486
01487 num_processing--;
01488 }
01489
01490
01491
01492 if (
01493 ((*ji)->status() == job_archiver::status_error)
01494 &&
01495 overflow_detected
01496 )
01497 {
01498 lstr = "Vault overflow detected, will retry job later\n";
01499 logger.write(lstr);
01500 (*ji)->clear();
01501 num_processing--;
01502 }
01503
01504
01505 if (
01506 ((*ji)->status() == job_archiver::status_completed)
01507 || ((*ji)->status() == job_archiver::status_fatal_error)
01508 || ((*ji)->status() == job_archiver::status_error)
01509 ) {
01510 (*ji)->end();
01511
01512 num_processing--;
01513 num_completed++;
01514
01515
01516 reporter.jobs().add_report((*ji)->report());
01517 }
01518 }
01519
01520 mf_log_status();
01521 sleep(1);
01522
01523
01524 }
01525
01526 t.stop();
01527 lstr = "Archive Manager - Finished archiving, duration: ";
01528 lstr += t.duration();
01529 lstr += "\n";
01530 logger.write(lstr);
01531
01532 lstr = "Archive Manager - Finalizing archive path\n";
01533 logger.write(lstr);
01534 TRY(
01535 rename_file(working_archive_path(), archive_path()),
01536 "Cannot finalize archive"
01537 );
01538
01539 reporter.vault().add_report(
01540 vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault()))
01541 );
01542 }
01543
01544
01545 const std::string archive_manager::archive_path(void) const
01546 {
01547 estring path;
01548
01549 if (!initialized())
01550 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01551
01552 path = vaulter.vault();
01553 path += "/";
01554 path += config.timestamp().str();
01555
01556 return(path);
01557 }
01558
01559
01560 const std::string archive_manager::working_archive_path(void) const
01561 {
01562 estring path;
01563
01564 if (!initialized())
01565 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01566
01567 path = archive_path();
01568 path += ".incomplete";
01569
01570 return(path);
01571 }
01572
01573
01574
01575
01576 archive_manager archiver;
01577