00001 #include "config.h"
00002
00003 #ifdef HAVE_UNISTD_H
00004 #include <unistd.h>
00005 #endif
00006
00007 #include <ctype.h>
00008
00009 #include <iostream>
00010 #include <string>
00011 #include <vector>
00012
00013 #include "asserts.h"
00014 #include "error.h"
00015 #include "estring.h"
00016 #include "fs.h"
00017 #include "rconfig.h"
00018 #include "timer.h"
00019 #include "logger.h"
00020
00021 #include "archiver.h"
00022
00023
00024
00025
00026 rstat::rstat()
00027 {
00028 TRY_nomem(m_exit_str[0] = "Success");
00029 TRY_nomem(m_exit_str[1] = "Syntax or usage error");
00030 TRY_nomem(m_exit_str[2] = "Protocol incompatability error");
00031 TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories");
00032 TRY_nomem(m_exit_str[4] = "Requested action not supported");
00033 TRY_nomem(m_exit_str[10] = "Error in socket I/O");
00034 TRY_nomem(m_exit_str[11] = "Error in file I/O");
00035 TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream");
00036 TRY_nomem(m_exit_str[13] = "Errors with program diagnostics");
00037 TRY_nomem(m_exit_str[14] = "Error in IPC code");
00038 TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT");
00039 TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()");
00040 TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers");
00041 TRY_nomem(m_exit_str[23] = "Partial transfer");
00042 TRY_nomem(m_exit_str[30] = "Timeout in data send/receive");
00043 TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255");
00044 TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal");
00045 TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run");
00046 TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found");
00047
00048 TRY_nomem(m_signal_str[1] = "[HUP]: Hangup");
00049 TRY_nomem(m_signal_str[2] = "[INT]: Interrupt ");
00050 TRY_nomem(m_signal_str[3] = "[QUIT]: Quit");
00051 TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction");
00052 TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap");
00053 TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault");
00054 TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination");
00055 TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault");
00056 TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception");
00057 TRY_nomem(m_signal_str[10] = "[KILL]: Killed");
00058 TRY_nomem(m_signal_str[11] = "[BUS]: Bus error");
00059 TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault");
00060 TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call");
00061 TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers");
00062 TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm");
00063 TRY_nomem(m_signal_str[16] = "[TERM]: Software termination");
00064 TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1");
00065 TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2");
00066 TRY_nomem(m_signal_str[19] = "[CLD]: Child status change");
00067 TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart");
00068 TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change");
00069 TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition");
00070 TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O");
00071 TRY_nomem(m_signal_str[24] = "[STOP]: Stop");
00072 TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character");
00073 TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process");
00074 TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read");
00075 TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write");
00076 TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired");
00077 TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired");
00078 TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit");
00079 TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit");
00080 TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked");
00081 TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal");
00082 TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR");
00083 TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR");
00084 TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation");
00085 TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost");
00086 TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal");
00087 TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal");
00088
00089 TRY_nomem(m_unknown_exit = "(Unknown exit code)");
00090 TRY_nomem(m_unknown_signal = "(Unknown signal)");
00091 }
00092
00093
00094 const std::string& rstat::exit(const int a_int) const
00095 {
00096 if (m_exit_str.find(a_int) != m_exit_str.end()) {
00097 return(m_exit_str.find(a_int)->second);
00098 }
00099 return(m_unknown_exit);
00100 }
00101
00102
00103 const std::string& rstat::signal(const int a_int) const
00104 {
00105 if (m_signal_str.find(a_int) != m_signal_str.end()) {
00106 return(m_signal_str.find(a_int)->second);
00107 }
00108 return(m_unknown_signal);
00109 }
00110
00111 class rstat rsync_estat_str;
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121 job_archiver::job_archiver(const job * a_job)
00122 {
00123 clear();
00124 m_job = a_job;
00125 m_status = status_pending;
00126 }
00127
00128
00129
00130
00131
00132
00133
00134 const std::string job_archiver::prefix(void)
00135 {
00136 estring lstr;
00137
00138 lstr = "[Job:";
00139 lstr += estring((void*)m_job);
00140 lstr += "] ";
00141
00142 return(lstr);
00143 }
00144
00145
00146 const std::string job_archiver::id(void)
00147 {
00148 estring lstr;
00149
00150 lstr = prefix();
00151 lstr += " ";
00152 lstr += m_job->generate_job_id();
00153
00154 return(lstr);
00155 }
00156
00157
00158
00159
00160
00161
00162
00163 void job_archiver::clear(void)
00164 {
00165 end();
00166 m_child_pid = m_exec.my_pid();
00167 m_status = status_pending;
00168 m_success = true;
00169 m_jr.clear();
00170 m_jpr.clear();
00171 m_error_msg.erase();
00172 }
00173
00174
00175
00176
00177
00178
00179
00180 void job_archiver::end(void)
00181 {
00182 estring lstr;
00183
00184 m_timer.stop();
00185 if (m_exec.child_running()) {
00186 lstr = prefix();
00187 lstr += "Terminating child process!\n";
00188 logger.write(lstr,0,configuration_manager::logging_manager,m_child_pid);
00189 m_exec.kill_child();
00190 }
00191 m_exec.clear();
00192 m_io_out.erase();
00193 m_io_err.erase();
00194 m_status = status_done;
00195 }
00196
00197
00198 const job_archiver::archiving_status job_archiver::status(void)
00199 {
00200 return(m_status);
00201 }
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212 void job_archiver::start(void)
00213 {
00214 estring lstr;
00215
00216 m_jr.id(m_job->generate_job_id());
00217
00218 try {
00219 m_exec.fork();
00220 }
00221 catch(error e) {
00222 lstr = prefix();
00223 lstr += "Could not fork:\n";
00224 logger.write(lstr);
00225
00226 lstr = e.str(prefix());
00227 logger.write(lstr);
00228
00229 lstr = prefix();
00230 lstr += "Will retry job later\n";
00231 logger.write(lstr);
00232
00233 m_status = status_retry_later;
00234 }
00235 catch(...) {
00236 error e = err_unknown;
00237
00238 lstr = prefix();
00239 lstr += "Could not fork:\n";
00240 logger.write(lstr);
00241
00242 lstr = e.str(prefix());
00243 logger.write(lstr);
00244
00245 lstr = prefix();
00246 lstr += "Will retry job later\n";
00247 logger.write(lstr);
00248
00249 m_status = status_retry_later;
00250 }
00251
00252 if (m_exec.is_child()) {
00253
00254
00255
00256
00257 m_exec.reroute_stdio();
00258 try {
00259 mf_do_chores();
00260 }
00261 catch(error e) {
00262 std::cerr << e;
00263 m_success = false;
00264 }
00265 catch(...) {
00266 std::cerr << err_unknown;
00267 m_success = false;
00268 }
00269 if (m_success)
00270 m_exec.exit(0);
00271 else
00272 m_exec.exit(1);
00273 }
00274
00275 m_child_pid = m_exec.child_pid();
00276
00277 lstr = prefix();
00278 lstr += "Spawning child process: PID ";
00279 lstr += estring(static_cast<unsigned long>(m_exec.child_pid()));
00280 lstr += "\n";
00281 logger.write(lstr);
00282
00283 m_status = status_processing;
00284 m_timer.start();
00285 }
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300 void job_archiver::process(void)
00301 {
00302 estring lstr;
00303
00304 if (m_exec.child_running()) {
00305
00306 mf_process_child_io(false);
00307 }
00308 else {
00309
00310 mf_process_child_io(true);
00311
00312
00313
00314 lstr = prefix();
00315 if (m_exec.child_signaled()) {
00316 lstr += "Child exited from signal: ";
00317 lstr += estring(m_exec.child_signal_no());
00318 lstr += "\n";
00319 }
00320 else if (m_exec.child_exit_code() != 0) {
00321 lstr += "Child exited abnormally with code: ";
00322 lstr += estring(m_exec.child_exit_code());
00323 lstr += "\n";
00324 }
00325 else {
00326 lstr += "Child exited successfully\n";
00327 m_status = status_completed;
00328 }
00329 logger.write(lstr);
00330
00331 if (m_exec.child_signaled() || !m_exec.child_exited_normally()) {
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343 m_status = status_error;
00344 }
00345 else {
00346 m_status = status_completed;
00347 }
00348
00349 m_timer.stop();
00350 lstr = prefix();
00351 lstr += "Finished, duration: ";
00352 lstr += m_timer.duration();
00353 lstr += "\n";
00354 logger.write(lstr);
00355
00356 }
00357 }
00358
00359
00360 single_job_report job_archiver::report(void) const
00361 {
00362 return(m_jr);
00363 }
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378 void job_archiver::mf_do_chores(void)
00379 {
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390 job::paths_type::const_iterator pi;
00391
00392 for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
00393 estring archive_dir;
00394 estring path;
00395 estring command_line;
00396 bool hardlink = false;
00397 int num_retries = 0;
00398 bool done = false;
00399 int exit_code = 0;
00400 int signal_num = 0;
00401 timer t;
00402 uint64 files_total = 0;
00403 uint64 files_xferd = 0;
00404 uint64 size_total = 0;
00405 uint64 size_xferd = 0;
00406 bool overflow_detected = 0;
00407 estring error_msg;
00408
00409 archive_dir = m_job->generate_archive_path(*pi);
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420 path = archiver.working_archive_path();
00421 path += "/";
00422 path += archive_dir;
00423 path = reform_path(path);
00424 if (!exists(path)) {
00425 std::cout << "Creating job archive path: " << archive_dir << std::endl;
00426 mk_dirhier(path);
00427 }
00428 else
00429 std::cout << "Archiving to existing path: " << archive_dir << std::endl;
00430
00431 hardlink = m_job->rsync_hardlink;
00432 while ((num_retries < m_job->rsync_retry_count) && !done) {
00433 execute exec;
00434 job::excludes_type::const_iterator ei;
00435 job::includes_type::const_iterator ii;
00436
00437 exit_code = 0;
00438 signal_num = 0;
00439
00440 command_line = config.rsync_local_path();
00441 command_line += " ";
00442 command_line += m_job->rsync_options;
00443
00444 if (m_job->rsync_connection != job::connection_local) {
00445 command_line += " ";
00446 command_line += " --rsync-path=";
00447 if (m_job->rsync_remote_path.size() != 0)
00448 command_line += m_job->rsync_remote_path;
00449 else
00450 command_line += config.rsync_local_path();
00451 }
00452
00453 if (hardlink) {
00454 subdirectory subdir;
00455 std::string youngest;
00456 std::string relative_path;
00457
00458 subdir.path(vaulter.vault());
00459 if (subdir.size() > 0) {
00460 subdirectory::const_iterator si;
00461
00462 sort(subdir.begin(), subdir.end());
00463 reverse(subdir.begin(), subdir.end());
00464 for (si = subdir.begin(); si != subdir.end(); ++si) {
00465 estring ypath;
00466
00467 if (!is_timestamp(*si))
00468 continue;
00469 if (*si == config.timestamp().str())
00470 continue;
00471 std::cout
00472 << "Considering potential hardlink source: "
00473 << *si
00474 << std::endl;
00475 ypath = vaulter.vault();
00476 ypath += "/";
00477 ypath += *si;
00478 ypath += "/";
00479 ypath += archive_dir;
00480 if (exists(ypath)) {
00481 std::cout
00482 << "Using archive as hardlink source: "
00483 << *si
00484 << std::endl;
00485 youngest = ypath;
00486 break;
00487 }
00488 else {
00489 std::cout
00490 << "- No such path: "
00491 << ypath
00492 << std::endl;
00493 }
00494 }
00495 }
00496 if (youngest.size() > 0) {
00497 relative_path = mk_relative_path(youngest,path);
00498 command_line += " --hard-links --link-dest=";
00499 command_line += relative_path;
00500 }
00501 }
00502
00503 for (
00504 ei = m_job->excludes.begin();
00505 ei != m_job->excludes.end();
00506 ++ei
00507 )
00508 {
00509 command_line += " --exclude-from=";
00510 command_line += *ei;
00511 }
00512
00513 for (
00514 ii = m_job->includes.begin();
00515 ii != m_job->includes.end();
00516 ++ii
00517 )
00518 {
00519 command_line += " --include-from=";
00520 command_line += *ii;
00521 }
00522
00523 command_line += " ";
00524 command_line += m_job->generate_source_path(*pi);
00525
00526 command_line += " ";
00527 command_line += path;
00528
00529 std::cout << "Command Line: " << command_line << std::endl;
00530
00531 m_error_msg.erase();
00532
00533 t.start();
00534 exec.exec(command_line);
00535 mf_process_rsync_io(
00536 exec,
00537 m_job->rsync_timeout,
00538 files_total,
00539 files_xferd,
00540 size_total,
00541 size_xferd,
00542 overflow_detected
00543 );
00544 t.stop();
00545
00546 signal_num = 0;
00547 if (exec.child_signaled()) {
00548 std::cout
00549 << "Rsync caught signal: ["
00550 << exec.child_signal_no()
00551 << "] "
00552 << rsync_estat_str.signal(exec.child_signal_no())
00553 << std::endl;
00554 signal_num = exec.child_signal_no();
00555 }
00556 std::cout
00557 << "Rsync exit code: ["
00558 << exec.child_exit_code()
00559 << "] "
00560 << rsync_estat_str.exit(exec.child_exit_code())
00561 << std::endl;
00562
00563 exit_code = exec.child_exit_code();
00564 if (exec.child_exited_normally() && (exit_code == 0))
00565 done = true;
00566 else if (overflow_detected) {
00567 std::cout
00568 << "Vault overflow detected"
00569 << std::endl;
00570 break;
00571 }
00572 else
00573 ++num_retries;
00574
00575 if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail)
00576 {
00577 std::cout << "Failing, will not attempt to retry" << std::endl;
00578 break;
00579 }
00580 if (m_job->rsync_behavior[exit_code]
00581 == rsync_behavior::retry_without_hardlinks)
00582 {
00583 std::cout << "Retrying without hardlinks..." << std::endl;
00584 hardlink = false;
00585 }
00586 }
00587 if (!done) {
00588 if (num_retries >= m_job->rsync_retry_count) {
00589 std::cout << "Retry count exceeded" << std::endl;
00590 }
00591 if (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)
00592 std::cout << "Ignoring rsync failure" << std::endl;
00593 else {
00594 std::cout << "Giving up on this path" << std::endl;
00595 m_success = false;
00596 }
00597 }
00598 reportio().write_report(
00599 m_job->generate_source_path(*pi),
00600 t,
00601 exit_code,
00602 signal_num,
00603 m_error_msg
00604 );
00605 }
00606 }
00607
00608 void job_archiver::mf_process_report(const std::string& a_str)
00609 {
00610 if (reportio().is_report(a_str)) {
00611 m_jpr = reportio().parse(a_str);
00612 m_jr.add_report(m_jpr);
00613 }
00614 }
00615
00616
00617
00618
00619
00620
00621
00622
00623 void job_archiver::mf_process_child_io(bool a_finalize)
00624 {
00625 estring lstr;
00626 bool io_flag = false;
00627
00628 while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) {
00629 int r;
00630 const int buffer_size = 128;
00631 char buffer[buffer_size] = { 0 };
00632
00633 r = m_exec.out_read(buffer, buffer_size);
00634 if (r > 0) {
00635 int c;
00636
00637 io_flag = true;
00638 for (c = 0; c < r; ++c) {
00639 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00640 lstr = prefix();
00641 lstr += m_io_out;
00642 lstr += "\n";
00643 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00644 mf_process_report(lstr);
00645 m_io_out.erase();
00646 }
00647 else {
00648 m_io_out += buffer[c];
00649 }
00650 }
00651 }
00652 }
00653 if (a_finalize && (m_io_out.size() > 0)) {
00654 lstr = prefix();
00655 lstr += m_io_out;
00656 lstr += "\n";
00657 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00658 mf_process_report(lstr);
00659 m_io_out.erase();
00660 }
00661
00662 while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) {
00663 int r;
00664 const int buffer_size = 128;
00665 char buffer[buffer_size] = { 0 };
00666
00667 r = m_exec.err_read(buffer, buffer_size);
00668 if (r > 0) {
00669 int c;
00670
00671 io_flag = true;
00672 for (c = 0; c < r; ++c) {
00673 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00674 lstr = prefix();
00675 lstr += m_io_err;
00676 lstr += "\n";
00677 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00678 mf_process_report(lstr);
00679 m_io_err.erase();
00680 }
00681 else {
00682 m_io_err += buffer[c];
00683 }
00684 }
00685 }
00686 }
00687 if (a_finalize && (m_io_err.size() > 0)) {
00688 lstr = prefix();
00689 lstr += m_io_err;
00690 lstr += "\n";
00691 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00692 mf_process_report(lstr);
00693 m_io_err.erase();
00694 }
00695 if (!io_flag)
00696 sleep(config.io_poll_interval());
00697 }
00698
00699
00700 void job_archiver::mf_trim_string(std::string& a_str)
00701 {
00702 while ((a_str.size() > 0) && (!isdigit(a_str[0])))
00703 a_str.erase(0,1);
00704 while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
00705 a_str.erase(a_str.size()-1,1);
00706 }
00707
00708
00709
00710
00711
00712
00713
00714 void job_archiver::mf_parse_rsync_io(
00715 const std::string a_str,
00716 uint64& a_files_total,
00717 uint64& a_files_xferd,
00718 uint64& a_size_total,
00719 uint64& a_size_xferd
00720 )
00721 {
00722 estring estr;
00723
00724 if (a_str.find("Number of files: ") == 0) {
00725 estr = a_str;
00726 mf_trim_string(estr);
00727 try {
00728 a_files_total = estr;
00729 }
00730 catch(error e) {
00731 estring es;
00732
00733 es = "Could not parse total number of files processed by rsync";
00734 e.push_back(ERROR_INSTANCE(es));
00735
00736
00737 std::cerr << e;
00738 }
00739 catch(...) {
00740 error e = err_unknown;
00741 estring es;
00742
00743 es = "Could not parse total number of files processed by rsync";
00744 e.push_back(ERROR_INSTANCE(es));
00745
00746
00747 std::cerr << e;
00748 }
00749 }
00750 else if (a_str.find("Number of files transferred: ") == 0) {
00751 estr = a_str;
00752 mf_trim_string(estr);
00753 try {
00754 a_files_xferd = estr;
00755 }
00756 catch(error e) {
00757 estring es;
00758
00759 es = "Could not parse total number of files transferred by rsync";
00760 e.push_back(ERROR_INSTANCE(es));
00761
00762
00763 std::cerr << e;
00764 }
00765 catch(...) {
00766 error e = err_unknown;
00767 estring es;
00768
00769 es = "Could not parse total number of files transferred by rsync";
00770 e.push_back(ERROR_INSTANCE(es));
00771
00772
00773 std::cerr << e;
00774 }
00775 }
00776 else if (a_str.find("Total file size: ") == 0) {
00777 estr = a_str;
00778 mf_trim_string(estr);
00779 try {
00780 a_size_total = estr;
00781 }
00782 catch(error e) {
00783 estring es;
00784
00785 es = "Could not parse total size of files processed by rsync";
00786 e.push_back(ERROR_INSTANCE(es));
00787
00788
00789 std::cerr << e;
00790 }
00791 catch(...) {
00792 error e = err_unknown;
00793 estring es;
00794
00795 es = "Could not parse total size of files processed by rsync";
00796 e.push_back(ERROR_INSTANCE(es));
00797
00798
00799 std::cerr << e;
00800 }
00801 }
00802 else if (a_str.find("Total transferred file size: ") == 0) {
00803 estr = a_str;
00804 mf_trim_string(estr);
00805 try {
00806 a_size_xferd = estr;
00807 }
00808 catch(error e) {
00809 estring es;
00810
00811 es = "Could not parse total size of files transferred by rsync";
00812 e.push_back(ERROR_INSTANCE(es));
00813
00814
00815 std::cerr << e;
00816 }
00817 catch(...) {
00818 error e = err_unknown;
00819 estring es;
00820
00821 es = "Could not parse total size of files transferred by rsync";
00822 e.push_back(ERROR_INSTANCE(es));
00823
00824
00825 std::cerr << e;
00826 }
00827 }
00828 }
00829
00830
00831
00832
00833
00834
00835
00836 void job_archiver::mf_process_rsync_io(
00837 execute& a_exec,
00838 uint16 a_timeout,
00839 uint64& a_files_total,
00840 uint64& a_files_xferd,
00841 uint64& a_size_total,
00842 uint64& a_size_xferd,
00843 bool& a_overflow_detected
00844 )
00845 {
00846 size_t ro;
00847 size_t re;
00848 estring out;
00849 estring err;
00850 timer t;
00851 bool io_flag;
00852 char buffer[1024] = { 0 };
00853 char *ptr;
00854
00855 ro = 1;
00856 re = 1;
00857 t.start();
00858 while ((ro != 0) || (re != 0) || a_exec.child_running()) {
00859 io_flag = false;
00860 ro = 0;
00861 re = 0;
00862
00863 if (!a_overflow_detected) {
00864 a_overflow_detected = vaulter.overflow();
00865 }
00866
00867 m_error_msg.erase();
00868
00869 if (a_exec.out_ready()) {
00870 ro = read(a_exec.out_fd(), buffer, 1024);
00871 if (ro > 0) {
00872 io_flag = true;
00873 t.start();
00874 for (ptr = buffer; ptr != buffer+ro; ++ptr) {
00875 if ((*ptr != '\r') && (*ptr != '\n')) {
00876 out += *ptr;
00877 }
00878 else {
00879 reportio().write_rsync_out(out);
00880 out.erase();
00881 }
00882 }
00883 }
00884 }
00885
00886 if (a_exec.err_ready()) {
00887 re = read(a_exec.err_fd(), buffer, 1024);
00888 if (re > 0) {
00889 io_flag = true;
00890 t.start();
00891 for (ptr = buffer; ptr != buffer+re; ++ptr) {
00892 if ((*ptr != '\r') && (*ptr != '\n')) {
00893 err += *ptr;
00894 }
00895 else {
00896 reportio().write_rsync_err(err);
00897 err.erase();
00898 }
00899 }
00900 }
00901 }
00902
00903 t.stop();
00904 if (t.duration_secs() > a_timeout) {
00905 std::cerr << "*** Rsync program inactivity timeout" << std::endl;
00906 a_exec.kill_child();
00907 TRY_nomem(m_error_msg = "Rsync inactivity timeout");
00908 }
00909
00910 if (!io_flag)
00911 sleep(config.io_poll_interval());
00912
00913 }
00914 if (out.size() > 0) {
00915 std::cerr << out << std::endl;
00916 mf_parse_rsync_io(
00917 out,
00918 a_files_total,
00919 a_files_xferd,
00920 a_size_total,
00921 a_size_xferd
00922 );
00923 out.erase();
00924 }
00925 if (err.size() > 0) {
00926 std::cerr << err << std::endl;
00927 mf_parse_rsync_io(
00928 out,
00929 a_files_total,
00930 a_files_xferd,
00931 a_size_total,
00932 a_size_xferd
00933 );
00934 err.erase();
00935 }
00936 }
00937
00938
00939
00940
00941 archive_manager::archive_manager()
00942 {
00943 if (this != &archiver)
00944 throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers"));
00945
00946 clear();
00947 }
00948
00949
00950 void archive_manager::clear(void)
00951 {
00952 m_jobs.clear();
00953 m_initialized = false;
00954 }
00955
00956
00957
00958
00959
00960
00961 void archive_manager::init(void)
00962 {
00963 timer t;
00964 estring lstr;
00965
00966 lstr = "Archive Manager - Beginning initialization\n";
00967 logger.write(lstr);
00968 t.start();
00969
00970 lstr = "Timestamp: ";
00971 lstr += config.timestamp().str();
00972 lstr += "\n";
00973 logger.write(lstr);
00974
00975
00976 vaulter.select();
00977 lstr = "Vault selected: ";
00978 lstr += vaulter.vault();
00979 lstr += "\n";
00980 logger.write(lstr);
00981
00982 reporter.vault().add_report(
00983 vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault()))
00984 );
00985
00986
00987 vaulter.prepare();
00988
00989 t.stop();
00990 lstr = "Archive Manager - Finished initialization";
00991 lstr += ", duration: ";
00992 lstr += t.duration();
00993 lstr += "\n";
00994 logger.write(lstr);
00995
00996 m_initialized = true;
00997 }
00998
00999
01000 const bool archive_manager::initialized(void) const
01001 {
01002 return(m_initialized);
01003 }
01004
01005
01006
01007
01008
01009
01010
01011 void archive_manager::mf_log_status(void)
01012 {
01013 static timer t;
01014 const timer::value_type timeout = 30;
01015 estring lstr;
01016 std::vector<job_archiver*>::const_iterator ji;
01017 uint64 jobs_pending = 0;
01018 uint64 jobs_processing = 0;
01019 uint64 jobs_completed =0;
01020 uint64 jobs_remaining = 0;
01021
01022 if (!t.is_started())
01023 t.start();
01024
01025 t.stop();
01026 if (t.duration_mins() < timeout)
01027 return;
01028
01029 lstr = "\n";
01030 lstr += "STATUS REPORT:\n";
01031 lstr += "================================================================\n";
01032 logger.write(lstr);
01033 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) {
01034 lstr = "[";
01035 switch ((*ji)->status()) {
01036 case job_archiver::status_pending:
01037 lstr += "Pending ";
01038 ++jobs_pending;
01039 break;
01040 case job_archiver::status_processing:
01041 lstr += "Processing ";
01042 ++jobs_processing;
01043 break;
01044 case job_archiver::status_retry_later:
01045 lstr += "Reschedule ";
01046 ++jobs_pending;
01047 break;
01048 case job_archiver::status_fatal_error:
01049 lstr += "Fatal Error";
01050 ++jobs_completed;
01051 break;
01052 case job_archiver::status_error:
01053 lstr += "Error ";
01054 ++jobs_completed;
01055 break;
01056 case job_archiver::status_completed:
01057 lstr += "Completed ";
01058 ++jobs_completed;
01059 break;
01060 case job_archiver::status_done:
01061 lstr += "Done ";
01062 ++jobs_completed;
01063 break;
01064 default:
01065 lstr += "<unknown> ";
01066 break;
01067 }
01068 lstr += "]: ";
01069 lstr += (*ji)->id();
01070 lstr += "\n";
01071 logger.write(lstr);
01072 }
01073
01074 lstr = "---------------------------------------------------------------\n";
01075 lstr += " Jobs Pending: ";
01076 lstr += estring(jobs_pending);
01077 lstr += "\n";
01078
01079 lstr += " Jobs Processing: ";
01080 lstr += estring(jobs_processing);
01081 lstr += "\n";
01082
01083 lstr += " Jobs Completed: ";
01084 lstr += estring(jobs_completed);
01085 lstr += "\n";
01086
01087 lstr += " Total: ";
01088 lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
01089 lstr += "\n";
01090
01091 logger.write(lstr);
01092 logger.write("\n");
01093 t.start();
01094 }
01095
01096
01097
01098
01099
01100
01101
01102
01103
01104
01105
01106
01107
01108
01109 void archive_manager::archive(void)
01110 {
01111 timer t;
01112 estring lstr;
01113 configuration_manager::jobs_type::const_iterator cji;
01114 int num_processing = 0;
01115 std::vector<job_archiver*>::iterator ji;
01116 uint64 num_completed = 0;
01117 bool overflow_detected = false;
01118 estring debug_estr;
01119
01120 if (!initialized())
01121 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01122
01123 lstr = "Archive Manager - Begin archiving\n";
01124 logger.write(lstr);
01125 t.start();
01126
01127
01128 try {
01129 if (exists(archive_path())) {
01130 lstr = "Found existing archive directory: ";
01131 lstr += archive_path();
01132 lstr += "\n";
01133 logger.write(lstr);
01134 rename_file(archive_path(), working_archive_path());
01135 }
01136 else if (exists(working_archive_path())) {
01137 lstr = "Found existing archive directory: ";
01138 lstr += working_archive_path();
01139 lstr += "\n";
01140 logger.write(lstr);
01141 }
01142 else {
01143 lstr = "Creating archive directory: ";
01144 lstr += working_archive_path();
01145 lstr += "\n";
01146 logger.write(lstr);
01147 mk_dir(working_archive_path());
01148 }
01149 }
01150 catch(error e) {
01151 logger.write("An error has occured: ");
01152 logger.write(e[0].what());
01153 logger.write("\n");
01154 throw(e);
01155 }
01156 catch(...) {
01157 error e = err_unknown;
01158
01159 logger.write("An error has occured: ");
01160 logger.write(e[0].what());
01161 logger.write("\n");
01162 throw(e);
01163 }
01164
01165
01166 logger.write("Creating to-do list\n");
01167 for (
01168 cji = config.jobs().begin();
01169 cji != config.jobs().end();
01170 ++cji
01171 )
01172 {
01173 job_archiver* ptr;
01174
01175 ptr = new job_archiver(&(*cji));
01176 if (ptr == 0)
01177 throw(err_nomem);
01178 TRY_nomem(m_jobs.push_back(ptr));
01179 }
01180
01181
01182 logger.write("Processing jobs...\n");
01183 while (m_jobs.size() > num_completed) {
01184
01185
01186
01187
01188
01189
01190
01191
01192
01193
01194
01195
01196
01197
01198
01199 if (!overflow_detected) {
01200 overflow_detected = vaulter.overflow(true);
01201
01202
01203
01204
01205
01206
01207
01208 }
01209
01210
01211
01212
01213 if (overflow_detected && (num_processing == 0)) {
01214 TRY(vaulter.prepare(true),"Cannot complete archive");
01215 overflow_detected = vaulter.overflow();
01216
01217
01218
01219
01220
01221
01222
01223 }
01224
01225
01226 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji)
01227 {
01228
01229 if (
01230 !overflow_detected
01231 && (num_processing < config.rsync_parallel())
01232 && ((*ji)->status() == job_archiver::status_pending)
01233 )
01234 {
01235 try {
01236 (*ji)->start();
01237 }
01238 catch(error e) {
01239 if (e.num() == 12) {
01240 lstr = "Error starting job: Out of memory, will retry later\n";
01241 (*ji)->clear();
01242 }
01243 else {
01244 (*ji)->end();
01245 lstr = "Error starting job, aborting\n";
01246
01247 num_processing--;
01248 reporter.jobs().add_report((*ji)->report());
01249 }
01250 logger.write(lstr);
01251 }
01252 catch(...) {
01253 (*ji)->end();
01254 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
01255 lstr += "-- JOB TERMINATED\n";
01256 logger.write(lstr);
01257
01258 num_processing--;
01259 reporter.jobs().add_report((*ji)->report());
01260 }
01261
01262 num_processing++;
01263 }
01264
01265
01266 if ((*ji)->status() == job_archiver::status_processing) {
01267 try {
01268 (*ji)->process();
01269 }
01270 catch(error e) {
01271
01272 if (e.num() == 12) {
01273 lstr = "Error starting job: Out of memory, will retry later\n";
01274 (*ji)->clear();
01275 }
01276 else {
01277 (*ji)->end();
01278 lstr = "Error starting job, aborting\n";
01279
01280 num_processing--;
01281 reporter.jobs().add_report((*ji)->report());
01282 }
01283 logger.write(lstr);
01284 }
01285 catch(...) {
01286 (*ji)->end();
01287 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
01288 lstr += "-- JOB TERMINATED\n";
01289 logger.write(lstr);
01290
01291 num_processing--;
01292 reporter.jobs().add_report((*ji)->report());
01293 }
01294 }
01295
01296
01297 if ((*ji)->status() == job_archiver::status_retry_later) {
01298 (*ji)->clear();
01299
01300 num_processing--;
01301 }
01302
01303
01304
01305 if (
01306 ((*ji)->status() == job_archiver::status_error)
01307 &&
01308 overflow_detected
01309 )
01310 {
01311 lstr = "Vault overflow detected, will retry job later\n";
01312 logger.write(lstr);
01313 (*ji)->clear();
01314 num_processing--;
01315 }
01316
01317
01318 if (
01319 ((*ji)->status() == job_archiver::status_completed)
01320 || ((*ji)->status() == job_archiver::status_fatal_error)
01321 || ((*ji)->status() == job_archiver::status_error)
01322 ) {
01323 (*ji)->end();
01324
01325 num_processing--;
01326 num_completed++;
01327
01328
01329 reporter.jobs().add_report((*ji)->report());
01330 }
01331 }
01332
01333 mf_log_status();
01334 sleep(1);
01335
01336
01337 }
01338
01339 t.stop();
01340 lstr = "Archive Manager - Finished archiving, duration: ";
01341 lstr += t.duration();
01342 lstr += "\n";
01343 logger.write(lstr);
01344
01345 lstr = "Archive Manager - Finalizing archive path\n";
01346 logger.write(lstr);
01347 TRY(
01348 rename_file(working_archive_path(), archive_path()),
01349 "Cannot finalize archive"
01350 );
01351
01352 reporter.vault().add_report(
01353 vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault()))
01354 );
01355 }
01356
01357
01358 const std::string archive_manager::archive_path(void) const
01359 {
01360 estring path;
01361
01362 if (!initialized())
01363 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01364
01365 path = vaulter.vault();
01366 path += "/";
01367 path += config.timestamp().str();
01368
01369 return(path);
01370 }
01371
01372
01373 const std::string archive_manager::working_archive_path(void) const
01374 {
01375 estring path;
01376
01377 if (!initialized())
01378 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01379
01380 path = archive_path();
01381 path += ".incomplete";
01382
01383 return(path);
01384 }
01385
01386
01387
01388
01389 archive_manager archiver;
01390