00001 #include "config.h"
00002
00003 #ifdef HAVE_UNISTD_H
00004 #include <unistd.h>
00005 #endif
00006
00007 #include <ctype.h>
00008
00009 #include <iostream>
00010 #include <string>
00011 #include <vector>
00012
00013 #include "asserts.h"
00014 #include "error.h"
00015 #include "estring.h"
00016 #include "fs.h"
00017 #include "rconfig.h"
00018 #include "timer.h"
00019 #include "logger.h"
00020
00021 #include "archiver.h"
00022
00023
00024
00025
00026 rstat::rstat()
00027 {
00028 TRY_nomem(m_exit_str[0] = "Success");
00029 TRY_nomem(m_exit_str[1] = "Syntax or usage error");
00030 TRY_nomem(m_exit_str[2] = "Protocol incompatability error");
00031 TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories");
00032 TRY_nomem(m_exit_str[4] = "Requested action not supported");
00033 TRY_nomem(m_exit_str[10] = "Error in socket I/O");
00034 TRY_nomem(m_exit_str[11] = "Error in file I/O");
00035 TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream");
00036 TRY_nomem(m_exit_str[13] = "Errors with program diagnostics");
00037 TRY_nomem(m_exit_str[14] = "Error in IPC code");
00038 TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT");
00039 TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()");
00040 TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers");
00041 TRY_nomem(m_exit_str[23] = "Partial transfer");
00042 TRY_nomem(m_exit_str[30] = "Timeout in data send/receive");
00043 TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255");
00044 TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal");
00045 TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run");
00046 TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found");
00047
00048 TRY_nomem(m_signal_str[1] = "[HUP]: Hangup");
00049 TRY_nomem(m_signal_str[2] = "[INT]: Interrupt ");
00050 TRY_nomem(m_signal_str[3] = "[QUIT]: Quit");
00051 TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction");
00052 TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap");
00053 TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault");
00054 TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination");
00055 TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault");
00056 TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception");
00057 TRY_nomem(m_signal_str[10] = "[KILL]: Killed");
00058 TRY_nomem(m_signal_str[11] = "[BUS]: Bus error");
00059 TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault");
00060 TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call");
00061 TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers");
00062 TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm");
00063 TRY_nomem(m_signal_str[16] = "[TERM]: Software termination");
00064 TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1");
00065 TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2");
00066 TRY_nomem(m_signal_str[19] = "[CLD]: Child status change");
00067 TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart");
00068 TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change");
00069 TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition");
00070 TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O");
00071 TRY_nomem(m_signal_str[24] = "[STOP]: Stop");
00072 TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character");
00073 TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process");
00074 TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read");
00075 TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write");
00076 TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired");
00077 TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired");
00078 TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit");
00079 TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit");
00080 TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked");
00081 TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal");
00082 TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR");
00083 TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR");
00084 TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation");
00085 TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost");
00086 TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal");
00087 TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal");
00088
00089 TRY_nomem(m_unknown_exit = "(Unknown exit code)");
00090 TRY_nomem(m_unknown_signal = "(Unknown signal)");
00091 }
00092
00093
00094 const std::string& rstat::exit(const int a_int) const
00095 {
00096 if (m_exit_str.find(a_int) != m_exit_str.end()) {
00097 return(m_exit_str.find(a_int)->second);
00098 }
00099 return(m_unknown_exit);
00100 }
00101
00102
00103 const std::string& rstat::signal(const int a_int) const
00104 {
00105 if (m_signal_str.find(a_int) != m_signal_str.end()) {
00106 return(m_signal_str.find(a_int)->second);
00107 }
00108 return(m_unknown_signal);
00109 }
00110
00111 class rstat rsync_estat_str;
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121 job_archiver::job_archiver(const job * a_job)
00122 {
00123 clear();
00124 m_job = a_job;
00125 m_status = status_pending;
00126 }
00127
00128
00129
00130
00131
00132
00133
00134 const std::string job_archiver::prefix(void)
00135 {
00136 estring lstr;
00137
00138 lstr = "[Job:";
00139 lstr += estring((void*)m_job);
00140 lstr += "] ";
00141
00142 return(lstr);
00143 }
00144
00145
00146 const std::string job_archiver::id(void)
00147 {
00148 estring lstr;
00149
00150 lstr = prefix();
00151 lstr += " ";
00152 lstr += m_job->generate_job_id();
00153
00154 return(lstr);
00155 }
00156
00157
00158
00159
00160
00161
00162
00163 void job_archiver::clear(void)
00164 {
00165 end();
00166 m_child_pid = m_exec.my_pid();
00167 m_status = status_pending;
00168 m_success = true;
00169 m_jr.clear();
00170 m_jpr.clear();
00171 m_error_msg.erase();
00172 }
00173
00174
00175
00176
00177
00178
00179
00180 void job_archiver::end(void)
00181 {
00182 estring lstr;
00183
00184 m_timer.stop();
00185 if (m_exec.child_running()) {
00186 lstr = prefix();
00187 lstr += "Terminating child process!\n";
00188 logger.write(lstr,0,configuration_manager::logging_manager,m_child_pid);
00189 m_exec.kill_child();
00190 }
00191 m_exec.clear();
00192 m_io_out.erase();
00193 m_io_err.erase();
00194 m_status = status_done;
00195 }
00196
00197
00198 const job_archiver::archiving_status job_archiver::status(void)
00199 {
00200 return(m_status);
00201 }
00202
00203
00204
00205
00206
00207
00208
00209
00210
00211
00212 void job_archiver::start(void)
00213 {
00214 estring lstr;
00215
00216 m_jr.id(m_job->generate_job_id());
00217
00218 try {
00219 m_exec.fork();
00220 }
00221 catch(error e) {
00222 lstr = prefix();
00223 lstr += "Could not fork:\n";
00224 logger.write(lstr);
00225
00226 lstr = e.str(prefix());
00227 logger.write(lstr);
00228
00229 lstr = prefix();
00230 lstr += "Will retry job later\n";
00231 logger.write(lstr);
00232
00233 m_status = status_retry_later;
00234 }
00235 catch(...) {
00236 error e = err_unknown;
00237
00238 lstr = prefix();
00239 lstr += "Could not fork:\n";
00240 logger.write(lstr);
00241
00242 lstr = e.str(prefix());
00243 logger.write(lstr);
00244
00245 lstr = prefix();
00246 lstr += "Will retry job later\n";
00247 logger.write(lstr);
00248
00249 m_status = status_retry_later;
00250 }
00251
00252 if (m_exec.is_child()) {
00253
00254
00255
00256
00257 m_exec.reroute_stdio();
00258 try {
00259 mf_do_chores();
00260 }
00261 catch(error e) {
00262 std::cerr << e;
00263 m_success = false;
00264 }
00265 catch(...) {
00266 std::cerr << err_unknown;
00267 m_success = false;
00268 }
00269 if (m_success)
00270 m_exec.exit(0);
00271 else
00272 m_exec.exit(1);
00273 }
00274
00275 m_child_pid = m_exec.child_pid();
00276
00277 lstr = prefix();
00278 lstr += "Spawning child process: PID ";
00279 lstr += estring(static_cast<unsigned long>(m_exec.child_pid()));
00280 lstr += "\n";
00281 logger.write(lstr);
00282
00283 m_status = status_processing;
00284 m_timer.start();
00285 }
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300 void job_archiver::process(void)
00301 {
00302 estring lstr;
00303
00304 if (m_exec.child_running()) {
00305
00306 mf_process_child_io(false);
00307 }
00308 else {
00309
00310 mf_process_child_io(true);
00311
00312
00313
00314 lstr = prefix();
00315 if (m_exec.child_signaled()) {
00316 lstr += "Child exited from signal: ";
00317 lstr += estring(m_exec.child_signal_no());
00318 lstr += "\n";
00319 }
00320 else if (m_exec.child_exit_code() != 0) {
00321 lstr += "Child exited abnormally with code: ";
00322 lstr += estring(m_exec.child_exit_code());
00323 lstr += "\n";
00324 }
00325 else {
00326 lstr += "Child exited successfully\n";
00327 m_status = status_completed;
00328 }
00329 logger.write(lstr);
00330
00331 if (m_exec.child_signaled() || !m_exec.child_exited_normally()) {
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343 m_status = status_error;
00344 }
00345 else {
00346 m_status = status_completed;
00347 }
00348
00349 m_timer.stop();
00350 lstr = prefix();
00351 lstr += "Finished, duration: ";
00352 lstr += m_timer.duration();
00353 lstr += "\n";
00354 logger.write(lstr);
00355
00356 }
00357 }
00358
00359
00360 single_job_report job_archiver::report(void) const
00361 {
00362 return(m_jr);
00363 }
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378 void job_archiver::mf_do_chores(void)
00379 {
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390 job::paths_type::const_iterator pi;
00391
00392 for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
00393 estring archive_dir;
00394 estring path;
00395 estring command_line;
00396 bool hardlink = false;
00397 int num_retries = 0;
00398 bool done = false;
00399 int exit_code = 0;
00400 int signal_num = 0;
00401 timer t;
00402 uint64 files_total = 0;
00403 uint64 files_xferd = 0;
00404 uint64 size_total = 0;
00405 uint64 size_xferd = 0;
00406 bool overflow_detected = 0;
00407 estring error_msg;
00408
00409 archive_dir = m_job->generate_archive_path(*pi);
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420 path = archiver.working_archive_path();
00421 path += "/";
00422 path += archive_dir;
00423 path = reform_path(path);
00424 if (!exists(path)) {
00425 std::cout << "Creating job archive path: " << archive_dir << std::endl;
00426 mk_dirhier(path);
00427 }
00428 else
00429 std::cout << "Archiving to existing path: " << archive_dir << std::endl;
00430
00431 if (!writable(path)) {
00432 std::string es;
00433
00434 TRY_nomem(es = "Cannot write to archive directory: \"");
00435 TRY_nomem(es += archive_dir);
00436 TRY_nomem(es += "\"");
00437
00438 throw(ERROR(EACCES,es));
00439 }
00440
00441 logger.set_error_logging(false);
00442 hardlink = m_job->rsync_hardlink;
00443 while ((num_retries < m_job->rsync_retry_count) && !done) {
00444 execute exec;
00445 job::excludes_type::const_iterator ei;
00446 job::includes_type::const_iterator ii;
00447
00448 exit_code = 0;
00449 signal_num = 0;
00450
00451 command_line = config.rsync_local_path();
00452 command_line += " ";
00453 command_line += m_job->rsync_options;
00454
00455 if (m_job->rsync_connection != job::connection_local) {
00456 command_line += " ";
00457 command_line += " --rsync-path=";
00458 if (m_job->rsync_remote_path.size() != 0)
00459 command_line += m_job->rsync_remote_path;
00460 else
00461 command_line += config.rsync_local_path();
00462 }
00463
00464 if (hardlink) {
00465 subdirectory subdir;
00466 std::string youngest;
00467 std::string relative_path;
00468
00469 subdir.path(vaulter.vault());
00470 if (subdir.size() > 0) {
00471 subdirectory::const_iterator si;
00472
00473 sort(subdir.begin(), subdir.end());
00474 reverse(subdir.begin(), subdir.end());
00475 for (si = subdir.begin(); si != subdir.end(); ++si) {
00476 estring ypath;
00477
00478 if (!is_timestamp(*si))
00479 continue;
00480 if (*si == config.timestamp().str())
00481 continue;
00482 std::cout
00483 << "Considering potential hardlink source: "
00484 << *si
00485 << std::endl;
00486 ypath = vaulter.vault();
00487 ypath += "/";
00488 ypath += *si;
00489 ypath += "/";
00490 ypath += archive_dir;
00491 if (exists(ypath)) {
00492 std::cout
00493 << "Using archive as hardlink source: "
00494 << *si
00495 << std::endl;
00496 youngest = ypath;
00497 break;
00498 }
00499 else {
00500 std::cout
00501 << "- No such path: "
00502 << ypath
00503 << std::endl;
00504 }
00505 }
00506 }
00507 if (youngest.size() > 0) {
00508 relative_path = mk_relative_path(youngest,path);
00509 command_line += " --hard-links --link-dest=";
00510 command_line += relative_path;
00511 }
00512 }
00513
00514 for (
00515 ei = m_job->excludes.begin();
00516 ei != m_job->excludes.end();
00517 ++ei
00518 )
00519 {
00520 command_line += " --exclude-from=";
00521 command_line += *ei;
00522 }
00523
00524 for (
00525 ii = m_job->includes.begin();
00526 ii != m_job->includes.end();
00527 ++ii
00528 )
00529 {
00530 command_line += " --include-from=";
00531 command_line += *ii;
00532 }
00533
00534 command_line += " ";
00535 command_line += m_job->generate_source_path(*pi);
00536
00537 command_line += " ";
00538 command_line += path;
00539
00540 std::cout << "Command Line: " << command_line << std::endl;
00541
00542 m_error_msg.erase();
00543
00544 t.start();
00545 exec.exec(command_line);
00546 mf_process_rsync_io(
00547 exec,
00548 m_job->rsync_timeout,
00549 files_total,
00550 files_xferd,
00551 size_total,
00552 size_xferd,
00553 overflow_detected
00554 );
00555 t.stop();
00556
00557 signal_num = 0;
00558 if (exec.child_signaled()) {
00559 std::cout
00560 << "Rsync caught signal: ["
00561 << exec.child_signal_no()
00562 << "] "
00563 << rsync_estat_str.signal(exec.child_signal_no())
00564 << std::endl;
00565 signal_num = exec.child_signal_no();
00566 }
00567 std::cout
00568 << "Rsync exit code: ["
00569 << exec.child_exit_code()
00570 << "] "
00571 << rsync_estat_str.exit(exec.child_exit_code())
00572 << std::endl;
00573
00574 exit_code = exec.child_exit_code();
00575 if (exec.child_exited_normally() && (exit_code == 0))
00576 done = true;
00577 else if (overflow_detected) {
00578 std::cout
00579 << "Vault overflow detected"
00580 << std::endl;
00581 break;
00582 }
00583 else {
00584 ++num_retries;
00585 logger.set_error_logging(true);
00586 }
00587
00588 if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail)
00589 {
00590 std::cout << "Failing, will not attempt to retry" << std::endl;
00591 break;
00592 }
00593 if (m_job->rsync_behavior[exit_code]
00594 == rsync_behavior::retry_without_hardlinks)
00595 {
00596 std::cout << "Retrying without hardlinks..." << std::endl;
00597 hardlink = false;
00598 }
00599 }
00600 if (!done) {
00601 if (num_retries >= m_job->rsync_retry_count) {
00602 std::cout << "Retry count exceeded" << std::endl;
00603 }
00604 if (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)
00605 std::cout << "Ignoring rsync failure" << std::endl;
00606 else {
00607 std::cout << "Giving up on this path" << std::endl;
00608 m_success = false;
00609 }
00610 }
00611 reportio().write_report(
00612 m_job->generate_source_path(*pi),
00613 t,
00614 exit_code,
00615 signal_num,
00616 m_error_msg
00617 );
00618 }
00619 }
00620
00621 void job_archiver::mf_process_report(const std::string& a_str)
00622 {
00623 if (reportio().is_report(a_str)) {
00624 m_jpr = reportio().parse(a_str);
00625 m_jr.add_report(m_jpr);
00626 }
00627 }
00628
00629
00630
00631
00632
00633
00634
00635
00636 void job_archiver::mf_process_child_io(bool a_finalize)
00637 {
00638 estring lstr;
00639 bool io_flag = false;
00640
00641 while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) {
00642 int r;
00643 const int buffer_size = 128;
00644 char buffer[buffer_size] = { 0 };
00645
00646 r = m_exec.out_read(buffer, buffer_size);
00647 if (r > 0) {
00648 int c;
00649
00650 io_flag = true;
00651 for (c = 0; c < r; ++c) {
00652 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00653 lstr = prefix();
00654 lstr += m_io_out;
00655 lstr += "\n";
00656 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00657 mf_process_report(lstr);
00658 m_io_out.erase();
00659 }
00660 else {
00661 m_io_out += buffer[c];
00662 }
00663 }
00664 }
00665 }
00666 if (a_finalize && (m_io_out.size() > 0)) {
00667 lstr = prefix();
00668 lstr += m_io_out;
00669 lstr += "\n";
00670 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00671 mf_process_report(lstr);
00672 m_io_out.erase();
00673 }
00674
00675 while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) {
00676 int r;
00677 const int buffer_size = 128;
00678 char buffer[buffer_size] = { 0 };
00679
00680 r = m_exec.err_read(buffer, buffer_size);
00681 if (r > 0) {
00682 int c;
00683
00684 io_flag = true;
00685 for (c = 0; c < r; ++c) {
00686 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00687 lstr = prefix();
00688 lstr += m_io_err;
00689 lstr += "\n";
00690 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00691 mf_process_report(lstr);
00692 m_io_err.erase();
00693 }
00694 else {
00695 m_io_err += buffer[c];
00696 }
00697 }
00698 }
00699 }
00700 if (a_finalize && (m_io_err.size() > 0)) {
00701 lstr = prefix();
00702 lstr += m_io_err;
00703 lstr += "\n";
00704 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00705 mf_process_report(lstr);
00706 m_io_err.erase();
00707 }
00708 if (!io_flag)
00709 sleep(config.io_poll_interval());
00710 }
00711
00712
00713 void job_archiver::mf_trim_string(std::string& a_str)
00714 {
00715 while ((a_str.size() > 0) && (!isdigit(a_str[0])))
00716 a_str.erase(0,1);
00717 while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
00718 a_str.erase(a_str.size()-1,1);
00719 }
00720
00721
00722
00723
00724
00725
00726
00727 void job_archiver::mf_parse_rsync_io(
00728 const std::string a_str,
00729 uint64& a_files_total,
00730 uint64& a_files_xferd,
00731 uint64& a_size_total,
00732 uint64& a_size_xferd
00733 )
00734 {
00735 estring estr;
00736
00737 if (a_str.find("Number of files: ") == 0) {
00738 estr = a_str;
00739 mf_trim_string(estr);
00740 try {
00741 a_files_total = estr;
00742 }
00743 catch(error e) {
00744 estring es;
00745
00746 es = "Could not parse total number of files processed by rsync";
00747 e.push_back(ERROR_INSTANCE(es));
00748
00749
00750 std::cerr << e;
00751 }
00752 catch(...) {
00753 error e = err_unknown;
00754 estring es;
00755
00756 es = "Could not parse total number of files processed by rsync";
00757 e.push_back(ERROR_INSTANCE(es));
00758
00759
00760 std::cerr << e;
00761 }
00762 }
00763 else if (a_str.find("Number of files transferred: ") == 0) {
00764 estr = a_str;
00765 mf_trim_string(estr);
00766 try {
00767 a_files_xferd = estr;
00768 }
00769 catch(error e) {
00770 estring es;
00771
00772 es = "Could not parse total number of files transferred by rsync";
00773 e.push_back(ERROR_INSTANCE(es));
00774
00775
00776 std::cerr << e;
00777 }
00778 catch(...) {
00779 error e = err_unknown;
00780 estring es;
00781
00782 es = "Could not parse total number of files transferred by rsync";
00783 e.push_back(ERROR_INSTANCE(es));
00784
00785
00786 std::cerr << e;
00787 }
00788 }
00789 else if (a_str.find("Total file size: ") == 0) {
00790 estr = a_str;
00791 mf_trim_string(estr);
00792 try {
00793 a_size_total = estr;
00794 }
00795 catch(error e) {
00796 estring es;
00797
00798 es = "Could not parse total size of files processed by rsync";
00799 e.push_back(ERROR_INSTANCE(es));
00800
00801
00802 std::cerr << e;
00803 }
00804 catch(...) {
00805 error e = err_unknown;
00806 estring es;
00807
00808 es = "Could not parse total size of files processed by rsync";
00809 e.push_back(ERROR_INSTANCE(es));
00810
00811
00812 std::cerr << e;
00813 }
00814 }
00815 else if (a_str.find("Total transferred file size: ") == 0) {
00816 estr = a_str;
00817 mf_trim_string(estr);
00818 try {
00819 a_size_xferd = estr;
00820 }
00821 catch(error e) {
00822 estring es;
00823
00824 es = "Could not parse total size of files transferred by rsync";
00825 e.push_back(ERROR_INSTANCE(es));
00826
00827
00828 std::cerr << e;
00829 }
00830 catch(...) {
00831 error e = err_unknown;
00832 estring es;
00833
00834 es = "Could not parse total size of files transferred by rsync";
00835 e.push_back(ERROR_INSTANCE(es));
00836
00837
00838 std::cerr << e;
00839 }
00840 }
00841 }
00842
00843
00844
00845
00846
00847
00848
00849 void job_archiver::mf_process_rsync_io(
00850 execute& a_exec,
00851 uint16 a_timeout,
00852 uint64& a_files_total,
00853 uint64& a_files_xferd,
00854 uint64& a_size_total,
00855 uint64& a_size_xferd,
00856 bool& a_overflow_detected
00857 )
00858 {
00859 size_t ro;
00860 size_t re;
00861 estring out;
00862 estring err;
00863 timer t;
00864 bool io_flag;
00865 char buffer[1024] = { 0 };
00866 char *ptr;
00867
00868 ro = 1;
00869 re = 1;
00870 t.start();
00871 while ((ro != 0) || (re != 0) || a_exec.child_running()) {
00872 io_flag = false;
00873 ro = 0;
00874 re = 0;
00875
00876 if (!a_overflow_detected) {
00877 a_overflow_detected = vaulter.overflow();
00878 }
00879
00880 m_error_msg.erase();
00881
00882 if (a_exec.out_ready()) {
00883 ro = read(a_exec.out_fd(), buffer, 1024);
00884 if (ro > 0) {
00885 io_flag = true;
00886 t.start();
00887 for (ptr = buffer; ptr != buffer+ro; ++ptr) {
00888 if ((*ptr != '\r') && (*ptr != '\n')) {
00889 out += *ptr;
00890 }
00891 else {
00892 reportio().write_rsync_out(out);
00893 out.erase();
00894 }
00895 }
00896 }
00897 }
00898
00899 if (a_exec.err_ready()) {
00900 re = read(a_exec.err_fd(), buffer, 1024);
00901 if (re > 0) {
00902 io_flag = true;
00903 t.start();
00904 for (ptr = buffer; ptr != buffer+re; ++ptr) {
00905 if ((*ptr != '\r') && (*ptr != '\n')) {
00906 err += *ptr;
00907 }
00908 else {
00909 reportio().write_rsync_err(err);
00910 err.erase();
00911 }
00912 }
00913 }
00914 }
00915
00916 t.stop();
00917 if (t.duration_secs() > a_timeout) {
00918 std::cerr << "*** Rsync program inactivity timeout" << std::endl;
00919 a_exec.kill_child();
00920 TRY_nomem(m_error_msg = "Rsync inactivity timeout");
00921 }
00922
00923 if (!io_flag)
00924 sleep(config.io_poll_interval());
00925
00926 }
00927 if (out.size() > 0) {
00928 std::cerr << out << std::endl;
00929 mf_parse_rsync_io(
00930 out,
00931 a_files_total,
00932 a_files_xferd,
00933 a_size_total,
00934 a_size_xferd
00935 );
00936 out.erase();
00937 }
00938 if (err.size() > 0) {
00939 std::cerr << err << std::endl;
00940 mf_parse_rsync_io(
00941 out,
00942 a_files_total,
00943 a_files_xferd,
00944 a_size_total,
00945 a_size_xferd
00946 );
00947 err.erase();
00948 }
00949 }
00950
00951
00952
00953
00954 archive_manager::archive_manager()
00955 {
00956 if (this != &archiver)
00957 throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers"));
00958
00959 clear();
00960 }
00961
00962
00963 void archive_manager::clear(void)
00964 {
00965 m_jobs.clear();
00966 m_initialized = false;
00967 }
00968
00969
00970
00971
00972
00973
00974 void archive_manager::init(void)
00975 {
00976 timer t;
00977 estring lstr;
00978
00979 lstr = "Archive Manager - Beginning initialization\n";
00980 logger.write(lstr);
00981 t.start();
00982
00983 lstr = "Timestamp: ";
00984 lstr += config.timestamp().str();
00985 lstr += "\n";
00986 logger.write(lstr);
00987
00988
00989 vaulter.select();
00990 lstr = "Vault selected: ";
00991 lstr += vaulter.vault();
00992 lstr += "\n";
00993 logger.write(lstr);
00994
00995 reporter.vault().add_report(
00996 vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault()))
00997 );
00998
00999
01000 vaulter.prepare();
01001
01002 t.stop();
01003 lstr = "Archive Manager - Finished initialization";
01004 lstr += ", duration: ";
01005 lstr += t.duration();
01006 lstr += "\n";
01007 logger.write(lstr);
01008
01009 m_initialized = true;
01010 }
01011
01012
01013 const bool archive_manager::initialized(void) const
01014 {
01015 return(m_initialized);
01016 }
01017
01018
01019
01020
01021
01022
01023
01024 void archive_manager::mf_log_status(void)
01025 {
01026 static timer t;
01027 const timer::value_type timeout = 30;
01028 estring lstr;
01029 std::vector<job_archiver*>::const_iterator ji;
01030 uint64 jobs_pending = 0;
01031 uint64 jobs_processing = 0;
01032 uint64 jobs_completed =0;
01033 uint64 jobs_remaining = 0;
01034
01035 if (!t.is_started())
01036 t.start();
01037
01038 t.stop();
01039 if (t.duration_mins() < timeout)
01040 return;
01041
01042 lstr = "\n";
01043 lstr += "STATUS REPORT:\n";
01044 lstr += "================================================================\n";
01045 logger.write(lstr);
01046 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) {
01047 lstr = "[";
01048 switch ((*ji)->status()) {
01049 case job_archiver::status_pending:
01050 lstr += "Pending ";
01051 ++jobs_pending;
01052 break;
01053 case job_archiver::status_processing:
01054 lstr += "Processing ";
01055 ++jobs_processing;
01056 break;
01057 case job_archiver::status_retry_later:
01058 lstr += "Reschedule ";
01059 ++jobs_pending;
01060 break;
01061 case job_archiver::status_fatal_error:
01062 lstr += "Fatal Error";
01063 ++jobs_completed;
01064 break;
01065 case job_archiver::status_error:
01066 lstr += "Error ";
01067 ++jobs_completed;
01068 break;
01069 case job_archiver::status_completed:
01070 lstr += "Completed ";
01071 ++jobs_completed;
01072 break;
01073 case job_archiver::status_done:
01074 lstr += "Done ";
01075 ++jobs_completed;
01076 break;
01077 default:
01078 lstr += "<unknown> ";
01079 break;
01080 }
01081 lstr += "]: ";
01082 lstr += (*ji)->id();
01083 lstr += "\n";
01084 logger.write(lstr);
01085 }
01086
01087 lstr = "---------------------------------------------------------------\n";
01088 lstr += " Jobs Pending: ";
01089 lstr += estring(jobs_pending);
01090 lstr += "\n";
01091
01092 lstr += " Jobs Processing: ";
01093 lstr += estring(jobs_processing);
01094 lstr += "\n";
01095
01096 lstr += " Jobs Completed: ";
01097 lstr += estring(jobs_completed);
01098 lstr += "\n";
01099
01100 lstr += " Total: ";
01101 lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
01102 lstr += "\n";
01103
01104 lstr += "Overflow Detected: ";
01105 if (vaulter.overflow()) {
01106 lstr += "TRUE";
01107 }
01108 else {
01109 lstr += "false";
01110 }
01111 lstr += "\n";
01112
01113 logger.write(lstr);
01114 logger.write("\n");
01115 t.start();
01116 }
01117
01118
01119
01120
01121
01122
01123
01124
01125
01126
01127
01128
01129
01130
01131 void archive_manager::archive(void)
01132 {
01133 timer t;
01134 estring lstr;
01135 configuration_manager::jobs_type::const_iterator cji;
01136 int num_processing = 0;
01137 std::vector<job_archiver*>::iterator ji;
01138 uint64 num_completed = 0;
01139 bool overflow_detected = false;
01140 estring debug_estr;
01141
01142 if (!initialized())
01143 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01144
01145 lstr = "Archive Manager - Begin archiving\n";
01146 logger.write(lstr);
01147 t.start();
01148
01149
01150 try {
01151 if (exists(archive_path())) {
01152 lstr = "Found existing archive directory: ";
01153 lstr += archive_path();
01154 lstr += "\n";
01155 logger.write(lstr);
01156 rename_file(archive_path(), working_archive_path());
01157 }
01158 else if (exists(working_archive_path())) {
01159 lstr = "Found existing archive directory: ";
01160 lstr += working_archive_path();
01161 lstr += "\n";
01162 logger.write(lstr);
01163 }
01164 else {
01165 lstr = "Creating archive directory: ";
01166 lstr += working_archive_path();
01167 lstr += "\n";
01168 logger.write(lstr);
01169 mk_dir(working_archive_path());
01170 }
01171 }
01172 catch(error e) {
01173 logger.write("An error has occured: ");
01174 logger.write(e[0].what());
01175 logger.write("\n");
01176 throw(e);
01177 }
01178 catch(...) {
01179 error e = err_unknown;
01180
01181 logger.write("An error has occured: ");
01182 logger.write(e[0].what());
01183 logger.write("\n");
01184 throw(e);
01185 }
01186
01187
01188 logger.write("Creating to-do list\n");
01189 for (
01190 cji = config.jobs().begin();
01191 cji != config.jobs().end();
01192 ++cji
01193 )
01194 {
01195 job_archiver* ptr;
01196
01197 ptr = new job_archiver(&(*cji));
01198 if (ptr == 0)
01199 throw(err_nomem);
01200 TRY_nomem(m_jobs.push_back(ptr));
01201 }
01202
01203
01204 logger.write("Processing jobs...\n");
01205 while (m_jobs.size() > num_completed) {
01206
01207
01208
01209
01210
01211
01212
01213
01214
01215
01216
01217
01218
01219
01220
01221 if (!overflow_detected) {
01222 overflow_detected = vaulter.overflow(true);
01223
01224
01225
01226
01227
01228
01229
01230 }
01231
01232
01233
01234
01235 if (overflow_detected && (num_processing == 0)) {
01236 TRY(vaulter.prepare(true),"Cannot complete archive");
01237 overflow_detected = vaulter.overflow();
01238
01239
01240
01241
01242
01243
01244
01245 }
01246
01247
01248 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji)
01249 {
01250
01251 if (
01252 !overflow_detected
01253 && (num_processing < config.rsync_parallel())
01254 && ((*ji)->status() == job_archiver::status_pending)
01255 )
01256 {
01257 try {
01258 (*ji)->start();
01259 }
01260 catch(error e) {
01261 if (e.num() == 12) {
01262 lstr = "Error starting job: Out of memory, will retry later\n";
01263 (*ji)->clear();
01264 }
01265 else {
01266 (*ji)->end();
01267 lstr = "Error starting job, aborting\n";
01268
01269 num_processing--;
01270 reporter.jobs().add_report((*ji)->report());
01271 }
01272 logger.write(lstr);
01273 }
01274 catch(...) {
01275 (*ji)->end();
01276 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
01277 lstr += "-- JOB TERMINATED\n";
01278 logger.write(lstr);
01279
01280 num_processing--;
01281 reporter.jobs().add_report((*ji)->report());
01282 }
01283
01284 num_processing++;
01285 }
01286
01287
01288 if ((*ji)->status() == job_archiver::status_processing) {
01289 try {
01290 (*ji)->process();
01291 }
01292 catch(error e) {
01293
01294 if (e.num() == 12) {
01295 lstr = "Error starting job: Out of memory, will retry later\n";
01296 (*ji)->clear();
01297 }
01298 else {
01299 (*ji)->end();
01300 lstr = "Error starting job, aborting\n";
01301
01302 num_processing--;
01303 reporter.jobs().add_report((*ji)->report());
01304 }
01305 logger.write(lstr);
01306 }
01307 catch(...) {
01308 (*ji)->end();
01309 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
01310 lstr += "-- JOB TERMINATED\n";
01311 logger.write(lstr);
01312
01313 num_processing--;
01314 reporter.jobs().add_report((*ji)->report());
01315 }
01316 }
01317
01318
01319 if ((*ji)->status() == job_archiver::status_retry_later) {
01320 (*ji)->clear();
01321
01322 num_processing--;
01323 }
01324
01325
01326
01327 if (
01328 ((*ji)->status() == job_archiver::status_error)
01329 &&
01330 overflow_detected
01331 )
01332 {
01333 lstr = "Vault overflow detected, will retry job later\n";
01334 logger.write(lstr);
01335 (*ji)->clear();
01336 num_processing--;
01337 }
01338
01339
01340 if (
01341 ((*ji)->status() == job_archiver::status_completed)
01342 || ((*ji)->status() == job_archiver::status_fatal_error)
01343 || ((*ji)->status() == job_archiver::status_error)
01344 ) {
01345 (*ji)->end();
01346
01347 num_processing--;
01348 num_completed++;
01349
01350
01351 reporter.jobs().add_report((*ji)->report());
01352 }
01353 }
01354
01355 mf_log_status();
01356 sleep(1);
01357
01358
01359 }
01360
01361 t.stop();
01362 lstr = "Archive Manager - Finished archiving, duration: ";
01363 lstr += t.duration();
01364 lstr += "\n";
01365 logger.write(lstr);
01366
01367 lstr = "Archive Manager - Finalizing archive path\n";
01368 logger.write(lstr);
01369 TRY(
01370 rename_file(working_archive_path(), archive_path()),
01371 "Cannot finalize archive"
01372 );
01373
01374 reporter.vault().add_report(
01375 vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault()))
01376 );
01377 }
01378
01379
01380 const std::string archive_manager::archive_path(void) const
01381 {
01382 estring path;
01383
01384 if (!initialized())
01385 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01386
01387 path = vaulter.vault();
01388 path += "/";
01389 path += config.timestamp().str();
01390
01391 return(path);
01392 }
01393
01394
01395 const std::string archive_manager::working_archive_path(void) const
01396 {
01397 estring path;
01398
01399 if (!initialized())
01400 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01401
01402 path = archive_path();
01403 path += ".incomplete";
01404
01405 return(path);
01406 }
01407
01408
01409
01410
01411 archive_manager archiver;
01412