rvm 1.08
|
00001 #include "config.h" 00002 00003 #ifdef HAVE_UNISTD_H 00004 #include <unistd.h> 00005 #endif 00006 00007 #include <ctype.h> 00008 00009 #include <iostream> 00010 #include <string> 00011 #include <vector> 00012 00013 #include "asserts.h" 00014 #include "error.h" 00015 #include "estring.h" 00016 #include "fs.h" 00017 #include "rconfig.h" 00018 #include "timer.h" 00019 #include "logger.h" 00020 00021 #include "archiver.h" 00022 00023 //----------------------------------------------------------------------------- 00024 00025 /** C'tor */ 00026 rstat::rstat() 00027 { 00028 TRY_nomem(m_exit_str[0] = "Success"); 00029 TRY_nomem(m_exit_str[1] = "Syntax or usage error"); 00030 TRY_nomem(m_exit_str[2] = "Protocol incompatability error"); 00031 TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories"); 00032 TRY_nomem(m_exit_str[4] = "Requested action not supported"); 00033 TRY_nomem(m_exit_str[10] = "Error in socket I/O"); 00034 TRY_nomem(m_exit_str[11] = "Error in file I/O"); 00035 TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream"); 00036 TRY_nomem(m_exit_str[13] = "Errors with program diagnostics"); 00037 TRY_nomem(m_exit_str[14] = "Error in IPC code"); 00038 TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT"); 00039 TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()"); 00040 TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers"); 00041 TRY_nomem(m_exit_str[23] = "Partial transfer"); 00042 TRY_nomem(m_exit_str[24] = "Some files vanished before they could be transferred"); 00043 TRY_nomem(m_exit_str[25] = "The --max-delete limit stopped deletions"); 00044 TRY_nomem(m_exit_str[30] = "Timeout in data send/receive"); 00045 TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255"); 00046 TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal"); 00047 TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run"); 00048 TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found"); 00049 00050 TRY_nomem(m_signal_str[1] = "[HUP]: Hangup"); 00051 TRY_nomem(m_signal_str[2] = "[INT]: Interrupt "); 00052 TRY_nomem(m_signal_str[3] = "[QUIT]: Quit"); 00053 TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction"); 00054 TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap"); 00055 TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault"); 00056 TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination"); 00057 TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault"); 00058 TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception"); 00059 TRY_nomem(m_signal_str[10] = "[KILL]: Killed"); 00060 TRY_nomem(m_signal_str[11] = "[BUS]: Bus error"); 00061 TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault"); 00062 TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call"); 00063 TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers"); 00064 TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm"); 00065 TRY_nomem(m_signal_str[16] = "[TERM]: Software termination"); 00066 TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1"); 00067 TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2"); 00068 TRY_nomem(m_signal_str[19] = "[CLD]: Child status change"); 00069 TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart"); 00070 TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change"); 00071 TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition"); 00072 TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O"); 00073 TRY_nomem(m_signal_str[24] = "[STOP]: Stop"); 00074 TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character"); 00075 TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process"); 00076 TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read"); 00077 TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write"); 00078 TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired"); 00079 TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired"); 00080 TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit"); 00081 TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit"); 00082 TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked"); 00083 TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal"); 00084 TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR"); 00085 TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR"); 00086 TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation"); 00087 TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost"); 00088 TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal"); 00089 TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal"); 00090 00091 TRY_nomem(m_unknown_exit = "(Unknown exit code)"); 00092 TRY_nomem(m_unknown_signal = "(Unknown signal)"); 00093 } 00094 00095 /** Get a verbose string for an exit code */ 00096 const std::string& rstat::exit(const int a_int) const 00097 { 00098 if (m_exit_str.find(a_int) != m_exit_str.end()) { 00099 return(m_exit_str.find(a_int)->second); 00100 } 00101 return(m_unknown_exit); 00102 } 00103 00104 /** Get a verbose string for a signal number */ 00105 const std::string& rstat::signal(const int a_int) const 00106 { 00107 if (m_signal_str.find(a_int) != m_signal_str.end()) { 00108 return(m_signal_str.find(a_int)->second); 00109 } 00110 return(m_unknown_signal); 00111 } 00112 00113 class rstat rsync_estat_str; 00114 00115 //----------------------------------------------------------------------------- 00116 00117 /** C'tor 00118 00119 Set a job to be assiciated with this job archiver and initialize it's 00120 processing status to "pending". 00121 00122 */ 00123 job_archiver::job_archiver(const job * a_job) 00124 { 00125 clear(); 00126 m_job = a_job; 00127 m_status = status_pending; 00128 } 00129 00130 /** Generate a job prefix string 00131 00132 Create a string to uniquely identify this job to be used in the log file to 00133 uniquely identify this job 00134 00135 */ 00136 const std::string job_archiver::prefix(void) 00137 { 00138 estring lstr; 00139 00140 lstr = "[Job:"; 00141 lstr += estring((void*)m_job); 00142 lstr += "] "; 00143 00144 return(lstr); 00145 } 00146 00147 /** Generate a job id string */ 00148 const std::string job_archiver::id(void) 00149 { 00150 estring lstr; 00151 00152 lstr = prefix(); 00153 lstr += " "; 00154 lstr += m_job->generate_job_id(); 00155 00156 return(lstr); 00157 } 00158 00159 /** Clear the job archiver and return it to it's initial state 00160 00161 End any processes handling this job and return the job archiver to it's 00162 "pending" state. 00163 00164 */ 00165 void job_archiver::clear(void) 00166 { 00167 end(); 00168 m_child_pid = m_exec.my_pid(); 00169 m_status = status_pending; 00170 m_success = true; 00171 m_jr.clear(); 00172 m_jpr.clear(); 00173 m_error_msg.erase(); 00174 m_rsync_timeout_flag = false; 00175 } 00176 00177 /** End any processes handling this job 00178 00179 If any child processes are handling this job, terminate them. Erase any 00180 pending I/O for the now defunct child. Set our processing status to "done". 00181 00182 */ 00183 void job_archiver::end(void) 00184 { 00185 estring lstr; 00186 00187 m_timer.stop(); 00188 m_io_timer.stop(); 00189 if (m_exec.child_running()) { 00190 lstr = prefix(); 00191 lstr += "Terminating child process!\n"; 00192 logger.write(lstr,0,configuration_manager::logging_manager,m_child_pid); 00193 m_exec.kill_child(); 00194 } 00195 m_exec.clear(); 00196 m_io_out.erase(); 00197 m_io_err.erase(); 00198 m_status = status_done; 00199 } 00200 00201 /** Return the processing status of this job archiver */ 00202 const job_archiver::archiving_status job_archiver::status(void) 00203 { 00204 return(m_status); 00205 } 00206 00207 /** Begin processing 00208 00209 Attempt to fork a child process to handle this job. If unsuccessful then 00210 retry again later. The child then calls mf_do_chores() to handle the actual 00211 processing, while the parent updates the job archiver's status from 00212 "pending" to "processing" and begins a timer to measure the duration of the 00213 job process. 00214 00215 */ 00216 void job_archiver::start(void) 00217 { 00218 estring lstr; 00219 estring path; 00220 estring archive_dir; 00221 00222 m_jr.id(m_job->generate_job_id()); 00223 00224 // Create directories before forking 00225 job::paths_type::const_iterator pi; 00226 for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) { 00227 archive_dir = m_job->generate_archive_path(*pi); 00228 path = archiver.working_archive_path(); 00229 path += "/"; 00230 path += archive_dir; 00231 path = reform_path(path); 00232 if (!exists(path)) { 00233 lstr = prefix(); 00234 lstr += "Creating job archive path: "; 00235 lstr += archive_dir; 00236 lstr += "\n"; 00237 logger.write(lstr); 00238 mk_dirhier(path); 00239 } 00240 } 00241 00242 try { 00243 m_exec.fork(); 00244 } 00245 catch(error e) { 00246 lstr = prefix(); 00247 lstr += "Could not fork:\n"; 00248 logger.write(lstr); 00249 00250 lstr = e.str(prefix()); 00251 logger.write(lstr); 00252 00253 lstr = prefix(); 00254 lstr += "Will retry job later\n"; 00255 logger.write(lstr); 00256 00257 m_status = status_reschedule; 00258 } 00259 catch(...) { 00260 error e = err_unknown; 00261 00262 lstr = prefix(); 00263 lstr += "Could not fork:\n"; 00264 logger.write(lstr); 00265 00266 lstr = e.str(prefix()); 00267 logger.write(lstr); 00268 00269 lstr = prefix(); 00270 lstr += "Will retry job later\n"; 00271 logger.write(lstr); 00272 00273 m_status = status_reschedule; 00274 } 00275 00276 if (m_exec.is_child()) { 00277 // wait_for_debugger = true; 00278 00279 // while (wait_for_debugger); 00280 00281 m_exec.reroute_stdio(); 00282 try { 00283 mf_do_chores(); 00284 } 00285 catch(error e) { 00286 std::cerr << e; 00287 m_success = false; 00288 } 00289 catch(...) { 00290 std::cerr << err_unknown; 00291 m_success = false; 00292 } 00293 if (m_success) 00294 m_exec.exit(0); 00295 else 00296 m_exec.exit(1); 00297 } 00298 00299 m_child_pid = m_exec.child_pid(); 00300 00301 lstr = prefix(); 00302 lstr += "Spawning child process: PID "; 00303 lstr += estring(static_cast<unsigned long>(m_exec.child_pid())); 00304 lstr += "\n"; 00305 logger.write(lstr); 00306 00307 m_status = status_processing; 00308 m_timer.start(); 00309 m_io_timer.start(); 00310 m_rsync_timeout_flag = false; 00311 } 00312 00313 /** Parent processor for a job 00314 00315 Check for I/O from the child. Check the child's status to see if it's still 00316 running, has exited with an exit code, or has exited from a signal. If the 00317 child sis not exit normally (i.e. exit from a signal or exit with a non-zero 00318 exit code) then check the vault for overflow. If the vault has exceeded 00319 it's overflow threshold then that could be the cause for the child's 00320 failure, in which case we reschedule the child to be processed again later. 00321 00322 If the job is finished (whether successful or not), update the job 00323 archiver's status to "completed". 00324 00325 */ 00326 void job_archiver::process(void) 00327 { 00328 estring lstr; 00329 00330 if (m_exec.child_running()) { 00331 // Process child I/O 00332 mf_process_child_io(false); 00333 } 00334 else { 00335 // Process remaining child I/O 00336 mf_process_child_io(true); 00337 00338 // If child exited with an error, check vault overflow. If the vault is 00339 // filling up, then reschedule the job for later retry. 00340 lstr = prefix(); 00341 if (m_exec.child_signaled()) { 00342 lstr += "Child exited from signal: "; 00343 lstr += estring(m_exec.child_signal_no()); 00344 lstr += "\n"; 00345 } 00346 else if (m_exec.child_exit_code() != 0) { 00347 lstr += "Child exited abnormally with code: "; 00348 lstr += estring(m_exec.child_exit_code()); 00349 lstr += "\n"; 00350 } 00351 else { 00352 lstr += "Child exited successfully\n"; 00353 m_status = status_completed; 00354 } 00355 logger.write(lstr); 00356 00357 if (m_exec.child_signaled() || !m_exec.child_exited_normally()) { 00358 /* 00359 if (vaulter.overflow()) { 00360 lstr = prefix(); 00361 lstr += "Vault overflow detected, will retry later\n"; 00362 logger.write(lstr); 00363 m_status = status_reschedule; 00364 } 00365 else { 00366 m_status = status_error; 00367 } 00368 */ 00369 m_status = status_error; 00370 } 00371 else { 00372 m_status = status_completed; 00373 } 00374 00375 m_timer.stop(); 00376 m_io_timer.stop(); 00377 lstr = prefix(); 00378 lstr += "Finished, duration: "; 00379 lstr += m_timer.duration(); 00380 lstr += "\n"; 00381 logger.write(lstr); 00382 // m_status = status_completed; 00383 } 00384 } 00385 00386 /** Return the job report for this job */ 00387 single_job_report job_archiver::report(void) const 00388 { 00389 return(m_jr); 00390 } 00391 00392 /** Child processor for a job 00393 00394 For each path in this job: 00395 - Create the directory heiararchy for this job in the archive 00396 - Until done or until out of retrys: 00397 - Choose a hardlink source, if applicable and available 00398 - Construct the command line to pass to rsync 00399 - Spawn rsync 00400 - Process I/O sent back from rsync 00401 - Process exit code or signal number returned from rsync 00402 - Generate and submit a report to the report manager 00403 00404 */ 00405 void job_archiver::mf_do_chores(void) 00406 { 00407 /* 00408 { 00409 bool wait_for_debugger = true; 00410 00411 std::cerr << "Waiting for debugger to attach..." << std::endl; 00412 while (wait_for_debugger); 00413 std::cerr << "Debugger attached." << std::endl; 00414 } 00415 */ 00416 00417 job::paths_type::const_iterator pi; 00418 00419 for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) { 00420 estring archive_dir; 00421 estring path; 00422 // estring command_line; 00423 std::string binary; 00424 std::vector<std::string> argv; 00425 estring opt; 00426 bool hardlink = false; 00427 bool multi_hardlink = false; 00428 int num_retries = 0; 00429 bool done = false; 00430 int exit_code = 0; 00431 int signal_num = 0; 00432 timer t; 00433 uint64 files_total = 0; 00434 uint64 files_xferd = 0; 00435 uint64 size_total = 0; 00436 uint64 size_xferd = 0; 00437 bool overflow_detected = 0; 00438 estring error_msg; 00439 00440 archive_dir = m_job->generate_archive_path(*pi); 00441 // If archive_dir does not end with a '/', strip off characters until it 00442 // does. 00443 /* 00444 while ((archive_dir.size() > 0) 00445 && (archive_dir[archive_dir.size()-1] != '/')) 00446 { 00447 archive_dir.erase(archive_dir.size()-1); 00448 } 00449 */ 00450 00451 path = archiver.working_archive_path(); 00452 path += "/"; 00453 path += archive_dir; 00454 path = reform_path(path); 00455 /* 00456 if (!exists(path)) { 00457 std::cout << "Creating job archive path: " << archive_dir << std::endl; 00458 mk_dirhier(path); 00459 } 00460 else 00461 std::cout << "Archiving to existing path: " << archive_dir << std::endl; 00462 */ 00463 00464 if (!exists(path)) { 00465 std::string es; 00466 00467 TRY_nomem(es = "No such directory: \""); 00468 TRY_nomem(es += archive_dir); 00469 TRY_nomem(es += "\""); 00470 00471 throw(ERROR(EEXIST,es)); 00472 } 00473 00474 if (!writable(path)) { 00475 std::string es; 00476 00477 TRY_nomem(es = "Cannot write to archive directory: \""); 00478 TRY_nomem(es += archive_dir); 00479 TRY_nomem(es += "\""); 00480 00481 throw(ERROR(EACCES,es)); 00482 } 00483 00484 logger.set_error_logging(false); 00485 hardlink = m_job->rsync_hardlink; 00486 multi_hardlink = m_job->rsync_multi_hardlink; 00487 while ((num_retries <= m_job->rsync_retry_count) && !done) { 00488 execute exec; 00489 job::excludes_type::const_iterator ei; 00490 job::includes_type::const_iterator ii; 00491 std::vector<std::string>::const_iterator oi; 00492 00493 exit_code = 0; 00494 signal_num = 0; 00495 00496 /* 00497 command_line = config.rsync_local_path(); 00498 command_line += " "; 00499 command_line += m_job->rsync_options; 00500 */ 00501 /**/ 00502 binary = config.rsync_local_path(); 00503 argv = m_job->generate_rsync_options_vector(); 00504 /**/ 00505 00506 if (m_job->rsync_connection != job::connection_local) { 00507 /* 00508 command_line += " "; 00509 command_line += " --rsync-path="; 00510 if (m_job->rsync_remote_path.size()) { 00511 command_line += m_job->rsync_remote_path; 00512 } 00513 else { 00514 command_line += config.rsync_local_path(); 00515 } 00516 */ 00517 /**/ 00518 opt = "--rsync-path="; 00519 if (m_job->rsync_remote_path.size() != 0) 00520 opt += m_job->rsync_remote_path; 00521 else 00522 opt += config.rsync_local_path(); 00523 argv.push_back(opt); 00524 /**/ 00525 } 00526 00527 if (hardlink) { 00528 subdirectory subdir; 00529 std::string youngest; 00530 std::string relative_path; 00531 bool hardlink_opt = false; 00532 bool first_hardlink = false; 00533 int linkdest_count = 0; 00534 00535 subdir.path(vaulter.vault()); 00536 if (subdir.size() > 0) { 00537 subdirectory::const_iterator si; 00538 00539 sort(subdir.begin(), subdir.end()); 00540 reverse(subdir.begin(), subdir.end()); 00541 for (si = subdir.begin(); si != subdir.end(); ++si) { 00542 estring ypath; 00543 00544 if (first_hardlink && !multi_hardlink) { 00545 continue; 00546 } 00547 if (linkdest_count >= m_job->rsync_multi_hardlink_max) { 00548 continue; 00549 } 00550 if (!is_timestamp(*si)) 00551 continue; 00552 if (*si == config.timestamp().str()) 00553 continue; 00554 std::cout 00555 << "Considering potential hardlink source: " 00556 << *si 00557 << std::endl; 00558 ypath = vaulter.vault(); 00559 ypath += "/"; 00560 ypath += *si; 00561 ypath += "/"; 00562 ypath += archive_dir; 00563 if (exists(ypath)) { 00564 std::cout 00565 << "Using archive as hardlink source: " 00566 << *si 00567 << std::endl; 00568 // youngest = ypath; 00569 if (!hardlink_opt) { 00570 /**/ 00571 argv.push_back(std::string("--hard-links")); 00572 /**/ 00573 /* 00574 command_line += " --hard-links"; 00575 */ 00576 hardlink_opt = true; 00577 } 00578 00579 relative_path = mk_relative_path(ypath,path); 00580 /* 00581 command_line += " --link-dest="; 00582 command_line += relative_path; 00583 */ 00584 /**/ 00585 opt="--link-dest="; 00586 opt += relative_path; 00587 argv.push_back(opt); 00588 /**/ 00589 first_hardlink = true; 00590 linkdest_count++; // At most 20 link-dest options can be used w/ rsync 00591 } 00592 else { 00593 std::cout 00594 << "- No such path: " 00595 << ypath 00596 << std::endl; 00597 } 00598 } 00599 } 00600 /* 00601 if (youngest.size() > 0) { 00602 relative_path = mk_relative_path(youngest,path); 00603 command_line += " --hard-links --link-dest="; 00604 command_line += relative_path; 00605 } 00606 */ 00607 } 00608 00609 for ( 00610 ei = m_job->excludes.begin(); 00611 ei != m_job->excludes.end(); 00612 ++ei 00613 ) 00614 { 00615 /* 00616 command_line += " --exclude-from="; 00617 command_line += *ei; 00618 */ 00619 /**/ 00620 opt = "--exclude-from="; 00621 opt += *ei; 00622 argv.push_back(opt); 00623 /**/ 00624 } 00625 00626 for ( 00627 ii = m_job->includes.begin(); 00628 ii != m_job->includes.end(); 00629 ++ii 00630 ) 00631 { 00632 /* 00633 command_line += " --include-from="; 00634 command_line += *ii; 00635 */ 00636 /**/ 00637 opt = "--include-from="; 00638 opt += *ei; 00639 argv.push_back(opt); 00640 /**/ 00641 } 00642 00643 /* 00644 command_line += " "; 00645 command_line += m_job->generate_source_path(*pi); 00646 */ 00647 /**/ 00648 argv.push_back(m_job->generate_source_path(*pi)); 00649 /**/ 00650 00651 /* 00652 command_line += " "; 00653 command_line += path; 00654 */ 00655 /**/ 00656 argv.push_back(path); 00657 /**/ 00658 00659 /* 00660 std::cout << "Command Line: " << command_line << std::endl; 00661 */ 00662 /**/ 00663 std::cout << "Command Line:" << std::endl; 00664 { 00665 uint16 c; 00666 std::cout << " Binary: " << binary << std::endl; 00667 for (c = 0; c < argv.size(); c++) { 00668 std::cout << " Argv[" << c << "] = " << argv[c] << std::endl; 00669 } 00670 } 00671 /**/ 00672 00673 m_error_msg.erase(); 00674 00675 t.start(); 00676 /* 00677 exec.exec(command_line); 00678 */ 00679 /**/ 00680 m_io_timer.start(); 00681 m_rsync_timeout_flag = false; 00682 exec.exec(binary, argv); 00683 /**/ 00684 mf_process_rsync_io( 00685 exec, 00686 m_job->rsync_timeout, 00687 files_total, 00688 files_xferd, 00689 size_total, 00690 size_xferd, 00691 overflow_detected 00692 ); 00693 t.stop(); 00694 00695 signal_num = 0; 00696 if (exec.child_signaled()) { 00697 std::cout 00698 << "Rsync caught signal: [" 00699 << exec.child_signal_no() 00700 << "] " 00701 << rsync_estat_str.signal(exec.child_signal_no()) 00702 << std::endl; 00703 signal_num = exec.child_signal_no(); 00704 } 00705 std::cout 00706 << "Rsync exit code: [" 00707 << exec.child_exit_code() 00708 << "] " 00709 << rsync_estat_str.exit(exec.child_exit_code()) 00710 << std::endl; 00711 00712 exit_code = exec.child_exit_code(); 00713 if (!m_rsync_timeout_flag && exec.child_exited_normally() && (exit_code == 0)) { 00714 done = true; 00715 } 00716 else if (!m_rsync_timeout_flag && (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)) { 00717 std::cout << "Ignoring rsync failure" << std::endl; 00718 done = true; 00719 } 00720 else if (overflow_detected) { 00721 std::cout 00722 << "Vault overflow detected" 00723 << std::endl; 00724 break; 00725 } 00726 else { 00727 ++num_retries; 00728 logger.set_error_logging(true); 00729 } 00730 if (m_rsync_timeout_flag) { 00731 TRY_nomem(m_error_msg = "Rsync inactivity timeout"); 00732 } 00733 00734 if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail) 00735 { 00736 std::cout << "Failing, will not attempt to retry" << std::endl; 00737 break; 00738 } 00739 if (m_job->rsync_behavior[exit_code] 00740 == rsync_behavior::retry_without_hardlinks) 00741 { 00742 std::cout << "Retrying without hardlinks..." << std::endl; 00743 hardlink = false; 00744 } 00745 00746 if ((!done) && (m_job->rsync_retry_delay > 0)) { 00747 std::cout << "Retries left: " << (m_job->rsync_retry_count - num_retries + 1) << std::endl; 00748 if (num_retries <= m_job->rsync_retry_count) { 00749 std::cout << "Waiting " << m_job->rsync_retry_delay 00750 << " minutes before retrying... " << std::endl; 00751 sleep( (m_job->rsync_retry_delay) * 60); 00752 } 00753 } 00754 } 00755 if (!done) { 00756 if (num_retries >= m_job->rsync_retry_count) { 00757 std::cout << "Retry count exceeded" << std::endl; 00758 m_success = false; 00759 } 00760 else { 00761 std::cout << "Giving up on this path" << std::endl; 00762 m_success = false; 00763 } 00764 } 00765 reportio().write_report( 00766 m_job->generate_source_path(*pi), 00767 t, 00768 exit_code, 00769 signal_num, 00770 m_job->rsync_behavior[exit_code], 00771 m_error_msg 00772 ); 00773 } 00774 } 00775 00776 void job_archiver::mf_process_report(const std::string& a_str) 00777 { 00778 if (reportio().is_report(a_str)) { 00779 m_jpr = reportio().parse(a_str); 00780 m_jr.add_report(m_jpr); 00781 } 00782 } 00783 00784 /** Process I/O from the child 00785 00786 While there is I/O to be read, read and parse it. When the end of a line is 00787 reached write that line to the log file. If a_finalize is true, the flush 00788 the child I/O buffer string. 00789 00790 */ 00791 void job_archiver::mf_process_child_io(bool a_finalize) 00792 { 00793 estring lstr; 00794 bool io_flag = false; 00795 00796 while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) { 00797 int r; 00798 const int buffer_size = 128; 00799 char buffer[buffer_size] = { 0 }; 00800 00801 r = m_exec.out_read(buffer, buffer_size); 00802 if (r > 0) { 00803 int c; 00804 00805 io_flag = true; 00806 for (c = 0; c < r; ++c) { 00807 if ((buffer[c] == '\r') || (buffer[c] == '\n')) { 00808 lstr = prefix(); 00809 lstr += m_io_out; 00810 lstr += "\n"; 00811 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid); 00812 mf_process_report(lstr); 00813 m_io_out.erase(); 00814 } 00815 else { 00816 m_io_out += buffer[c]; 00817 } 00818 } 00819 } 00820 } 00821 if (a_finalize && (m_io_out.size() > 0)) { 00822 lstr = prefix(); 00823 lstr += m_io_out; 00824 lstr += "\n"; 00825 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid); 00826 mf_process_report(lstr); 00827 m_io_out.erase(); 00828 } 00829 00830 while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) { 00831 int r; 00832 const int buffer_size = 128; 00833 char buffer[buffer_size] = { 0 }; 00834 00835 r = m_exec.err_read(buffer, buffer_size); 00836 if (r > 0) { 00837 int c; 00838 00839 io_flag = true; 00840 for (c = 0; c < r; ++c) { 00841 if ((buffer[c] == '\r') || (buffer[c] == '\n')) { 00842 lstr = prefix(); 00843 lstr += m_io_err; 00844 lstr += "\n"; 00845 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid); 00846 mf_process_report(lstr); 00847 m_io_err.erase(); 00848 } 00849 else { 00850 m_io_err += buffer[c]; 00851 } 00852 } 00853 } 00854 } 00855 if (a_finalize && (m_io_err.size() > 0)) { 00856 lstr = prefix(); 00857 lstr += m_io_err; 00858 lstr += "\n"; 00859 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid); 00860 mf_process_report(lstr); 00861 m_io_err.erase(); 00862 } 00863 if (!io_flag) 00864 sleep(config.io_poll_interval()); 00865 } 00866 00867 /** Trim off all non-digit leading and trailing characters from a string */ 00868 void job_archiver::mf_trim_string(std::string& a_str) 00869 { 00870 while ((a_str.size() > 0) && (!isdigit(a_str[0]))) 00871 a_str.erase(0,1); 00872 while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1]))) 00873 a_str.erase(a_str.size()-1,1); 00874 } 00875 00876 /** Parse I/O from rsync 00877 00878 Search for special output from rsync to tell us something about the number 00879 and size of files and files transfered. 00880 00881 */ 00882 void job_archiver::mf_parse_rsync_io( 00883 const std::string a_str, 00884 uint64& a_files_total, 00885 uint64& a_files_xferd, 00886 uint64& a_size_total, 00887 uint64& a_size_xferd 00888 ) 00889 { 00890 estring estr; 00891 00892 if (a_str.find("Number of files: ") == 0) { 00893 estr = a_str; 00894 mf_trim_string(estr); 00895 try { 00896 a_files_total = estr; 00897 } 00898 catch(error e) { 00899 estring es; 00900 00901 es = "Could not parse total number of files processed by rsync"; 00902 e.push_back(ERROR_INSTANCE(es)); 00903 00904 // Not sure this is the best way to handle this... 00905 std::cerr << e; 00906 } 00907 catch(...) { 00908 error e = err_unknown; 00909 estring es; 00910 00911 es = "Could not parse total number of files processed by rsync"; 00912 e.push_back(ERROR_INSTANCE(es)); 00913 00914 // Not sure this is the best way to handle this... 00915 std::cerr << e; 00916 } 00917 } 00918 else if (a_str.find("Number of files transferred: ") == 0) { 00919 estr = a_str; 00920 mf_trim_string(estr); 00921 try { 00922 a_files_xferd = estr; 00923 } 00924 catch(error e) { 00925 estring es; 00926 00927 es = "Could not parse total number of files transferred by rsync"; 00928 e.push_back(ERROR_INSTANCE(es)); 00929 00930 // Not sure this is the best way to handle this... 00931 std::cerr << e; 00932 } 00933 catch(...) { 00934 error e = err_unknown; 00935 estring es; 00936 00937 es = "Could not parse total number of files transferred by rsync"; 00938 e.push_back(ERROR_INSTANCE(es)); 00939 00940 // Not sure this is the best way to handle this... 00941 std::cerr << e; 00942 } 00943 } 00944 else if (a_str.find("Total file size: ") == 0) { 00945 estr = a_str; 00946 mf_trim_string(estr); 00947 try { 00948 a_size_total = estr; 00949 } 00950 catch(error e) { 00951 estring es; 00952 00953 es = "Could not parse total size of files processed by rsync"; 00954 e.push_back(ERROR_INSTANCE(es)); 00955 00956 // Not sure this is the best way to handle this... 00957 std::cerr << e; 00958 } 00959 catch(...) { 00960 error e = err_unknown; 00961 estring es; 00962 00963 es = "Could not parse total size of files processed by rsync"; 00964 e.push_back(ERROR_INSTANCE(es)); 00965 00966 // Not sure this is the best way to handle this... 00967 std::cerr << e; 00968 } 00969 } 00970 else if (a_str.find("Total transferred file size: ") == 0) { 00971 estr = a_str; 00972 mf_trim_string(estr); 00973 try { 00974 a_size_xferd = estr; 00975 } 00976 catch(error e) { 00977 estring es; 00978 00979 es = "Could not parse total size of files transferred by rsync"; 00980 e.push_back(ERROR_INSTANCE(es)); 00981 00982 // Not sure this is the best way to handle this... 00983 std::cerr << e; 00984 } 00985 catch(...) { 00986 error e = err_unknown; 00987 estring es; 00988 00989 es = "Could not parse total size of files transferred by rsync"; 00990 e.push_back(ERROR_INSTANCE(es)); 00991 00992 // Not sure this is the best way to handle this... 00993 std::cerr << e; 00994 } 00995 } 00996 } 00997 00998 /** Process I/O from rsync 00999 01000 If there is I/O from rsync to be read, read it and then send it through the 01001 parser. 01002 01003 */ 01004 void job_archiver::mf_process_rsync_io( 01005 execute& a_exec, 01006 uint16 a_timeout, 01007 uint64& a_files_total, 01008 uint64& a_files_xferd, 01009 uint64& a_size_total, 01010 uint64& a_size_xferd, 01011 bool& a_overflow_detected 01012 ) 01013 { 01014 size_t ro; 01015 size_t re; 01016 estring out; 01017 estring err; 01018 bool io_flag; 01019 char buffer[1024] = { 0 }; 01020 char *ptr; 01021 01022 ro = 1; 01023 re = 1; 01024 while ((ro != 0) || (re != 0) || a_exec.child_running()) { 01025 io_flag = false; 01026 ro = 0; 01027 re = 0; 01028 01029 if (!a_overflow_detected) { 01030 a_overflow_detected = vaulter.overflow(); 01031 } 01032 01033 m_error_msg.erase(); 01034 01035 if (a_exec.out_ready()) { 01036 ro = read(a_exec.out_fd(), buffer, 1024); 01037 if (ro > 0) { 01038 io_flag = true; 01039 for (ptr = buffer; ptr != buffer+ro; ++ptr) { 01040 if ((*ptr != '\r') && (*ptr != '\n')) { 01041 out += *ptr; 01042 } 01043 else { 01044 reportio().write_rsync_out(out); 01045 out.erase(); 01046 } 01047 } 01048 } 01049 } 01050 01051 if (a_exec.err_ready()) { 01052 re = read(a_exec.err_fd(), buffer, 1024); 01053 if (re > 0) { 01054 io_flag = true; 01055 for (ptr = buffer; ptr != buffer+re; ++ptr) { 01056 if ((*ptr != '\r') && (*ptr != '\n')) { 01057 err += *ptr; 01058 } 01059 else { 01060 reportio().write_rsync_err(err); 01061 err.erase(); 01062 } 01063 } 01064 } 01065 } 01066 01067 m_io_timer.stop(); 01068 // std::cerr << "[DEBUG]: --------------------------" << std::endl; 01069 // std::cerr << "[DEBUG]: m_io_timer.start_value() = " << m_io_timer.start_value() << std::endl; 01070 // std::cerr << "[DEBUG]: m_io_timer.stop_value() = " << m_io_timer.stop_value() << std::endl; 01071 // std::cerr << "[DEBUG]: time(0) = " << time(0) << std::endl; 01072 // std::cerr << "[DEBUG]: m_io_timer.duration() = " << m_io_timer.duration_secs() << std::endl; 01073 // std::cerr << "[DEBUG]: difference = " << (time(0) - m_io_timer.start_value()) << std::endl; 01074 // std::cerr << "[DEBUG]: timeout = " << a_timeout << std::endl; 01075 // std::cerr << "[DEBUG]: io_flag = " << io_flag << std::endl; 01076 // std::cerr << "[DEBUG]: m_rsync_timeout_flag = " << m_rsync_timeout_flag << std::endl; 01077 if (io_flag) { 01078 m_io_timer.stop(); 01079 m_io_timer.start(); 01080 m_rsync_timeout_flag = false; 01081 } 01082 if (!m_rsync_timeout_flag && (m_io_timer.duration_secs() > a_timeout)) { 01083 std::cerr << "*** Rsync program inactivity timeout" << std::endl; 01084 a_exec.kill_child(); 01085 m_rsync_timeout_flag = true; 01086 } 01087 01088 if (!io_flag) 01089 sleep(config.io_poll_interval()); 01090 01091 } 01092 if (out.size() > 0) { 01093 std::cerr << out << std::endl; 01094 mf_parse_rsync_io( 01095 out, 01096 a_files_total, 01097 a_files_xferd, 01098 a_size_total, 01099 a_size_xferd 01100 ); 01101 out.erase(); 01102 } 01103 if (err.size() > 0) { 01104 std::cerr << err << std::endl; 01105 mf_parse_rsync_io( 01106 out, 01107 a_files_total, 01108 a_files_xferd, 01109 a_size_total, 01110 a_size_xferd 01111 ); 01112 err.erase(); 01113 } 01114 } 01115 01116 //----------------------------------------------------------------------------- 01117 01118 /** C'tor */ 01119 archive_manager::archive_manager() 01120 { 01121 if (this != &archiver) 01122 throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers")); 01123 01124 clear(); 01125 } 01126 01127 /** Clear the archive manager and clear the job list */ 01128 void archive_manager::clear(void) 01129 { 01130 m_jobs.clear(); 01131 m_initialized = false; 01132 } 01133 01134 /** Initialize the archive manager 01135 01136 Log the archive timestamp, select and prepare a vault. 01137 01138 */ 01139 void archive_manager::init(void) 01140 { 01141 timer t; 01142 estring lstr; 01143 01144 lstr = "Archive Manager - Beginning initialization\n"; 01145 logger.write(lstr); 01146 t.start(); 01147 01148 lstr = "Timestamp: "; 01149 lstr += config.timestamp().str(); 01150 lstr += "\n"; 01151 logger.write(lstr); 01152 01153 // Select a vault? 01154 vaulter.select(); 01155 lstr = "Vault selected: "; 01156 lstr += vaulter.vault(); 01157 lstr += "\n"; 01158 logger.write(lstr); 01159 01160 reporter.vault().add_report( 01161 vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault())) 01162 ); 01163 01164 // Prepare the vault? 01165 vaulter.prepare(); 01166 01167 t.stop(); 01168 lstr = "Archive Manager - Finished initialization"; 01169 lstr += ", duration: "; 01170 lstr += t.duration(); 01171 lstr += "\n"; 01172 logger.write(lstr); 01173 01174 m_initialized = true; 01175 } 01176 01177 /** Return the initialized status of the archive manager */ 01178 const bool archive_manager::initialized(void) const 01179 { 01180 return(m_initialized); 01181 } 01182 01183 /** Give a status report 01184 01185 After so many minutes of inactivity write a report to the log file of our 01186 current status of affairs. 01187 01188 */ 01189 void archive_manager::mf_log_status(void) 01190 { 01191 static timer t; 01192 const timer::value_type timeout = 30; 01193 estring lstr; 01194 std::vector<job_archiver*>::const_iterator ji; 01195 uint64 jobs_pending = 0; 01196 uint64 jobs_processing = 0; 01197 uint64 jobs_completed =0; 01198 uint64 jobs_remaining = 0; 01199 01200 if (!t.is_started()) 01201 t.start(); 01202 01203 t.stop(); 01204 if (t.duration_mins() < timeout) 01205 return; 01206 01207 lstr = "\n"; 01208 lstr += "STATUS REPORT:\n"; 01209 lstr += "================================================================\n"; 01210 logger.write(lstr); 01211 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) { 01212 lstr = "["; 01213 switch ((*ji)->status()) { 01214 case job_archiver::status_pending: 01215 lstr += "Pending "; 01216 ++jobs_pending; 01217 break; 01218 case job_archiver::status_processing: 01219 lstr += "Processing "; 01220 ++jobs_processing; 01221 break; 01222 case job_archiver::status_reschedule: 01223 lstr += "Reschedule "; 01224 ++jobs_pending; 01225 break; 01226 case job_archiver::status_fatal_error: 01227 lstr += "Fatal Error"; 01228 ++jobs_completed; 01229 break; 01230 case job_archiver::status_error: 01231 lstr += "Error "; 01232 ++jobs_completed; 01233 break; 01234 case job_archiver::status_completed: 01235 lstr += "Completed "; 01236 ++jobs_completed; 01237 break; 01238 case job_archiver::status_done: 01239 lstr += "Done "; 01240 ++jobs_completed; 01241 break; 01242 default: 01243 lstr += "<unknown> "; 01244 break; 01245 } 01246 lstr += "]: "; 01247 lstr += (*ji)->id(); 01248 lstr += "\n"; 01249 logger.write(lstr); 01250 } 01251 01252 lstr = "---------------------------------------------------------------\n"; 01253 lstr += " Jobs Pending: "; 01254 lstr += estring(jobs_pending); 01255 lstr += "\n"; 01256 01257 lstr += " Jobs Processing: "; 01258 lstr += estring(jobs_processing); 01259 lstr += "\n"; 01260 01261 lstr += " Jobs Completed: "; 01262 lstr += estring(jobs_completed); 01263 lstr += "\n"; 01264 01265 lstr += " Total: "; 01266 lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining); 01267 lstr += "\n"; 01268 01269 lstr += "Overflow Detected: "; 01270 if (vaulter.overflow()) { 01271 lstr += "TRUE"; 01272 } 01273 else { 01274 lstr += "false"; 01275 } 01276 lstr += "\n"; 01277 01278 logger.write(lstr); 01279 logger.write("\n"); 01280 t.start(); 01281 } 01282 01283 /** Archive jobs 01284 01285 Create an archive directory. Generate a to-do list of job archiver objects. 01286 Process the job archiver objects: 01287 - While there are less than rsync-parallel job archivers processing, start 01288 the first available job archiver. 01289 - Check the status of each job and process I/O from jobs underway. 01290 - Remove jobs that failed to start. 01291 - Possibly reschedule failed jobs. 01292 - Remove completed jobs from active processing. 01293 - Call mf_log_status(). 01294 01295 */ 01296 void archive_manager::archive(void) 01297 { 01298 timer t; 01299 estring lstr; 01300 configuration_manager::jobs_type::const_iterator cji; 01301 int num_processing = 0; 01302 std::vector<job_archiver*>::iterator ji; 01303 uint64 num_completed = 0; 01304 bool overflow_detected = false; 01305 estring debug_estr; 01306 01307 if (!initialized()) 01308 throw(INTERNAL_ERROR(0,"Archive manager is not initialized")); 01309 01310 lstr = "Archive Manager - Begin archiving\n"; 01311 logger.write(lstr); 01312 t.start(); 01313 01314 // Create archive directory 01315 try { 01316 if (exists(archive_path())) { 01317 lstr = "Found existing archive directory: "; 01318 lstr += archive_path(); 01319 lstr += "\n"; 01320 logger.write(lstr); 01321 01322 if (!writable(archive_path())) { 01323 std::string es; 01324 01325 TRY_nomem(es = "Cannot write to archive directory: \""); 01326 TRY_nomem(es += archive_path()); 01327 TRY_nomem(es += "\""); 01328 01329 throw(ERROR(EACCES,es)); 01330 } 01331 01332 rename_file(archive_path(), working_archive_path()); 01333 } 01334 else if (exists(working_archive_path())) { 01335 lstr = "Found existing archive directory: "; 01336 lstr += working_archive_path(); 01337 lstr += "\n"; 01338 logger.write(lstr); 01339 01340 if (!writable(working_archive_path())) { 01341 std::string es; 01342 01343 TRY_nomem(es = "Cannot write to working archive directory: \""); 01344 TRY_nomem(es += working_archive_path()); 01345 TRY_nomem(es += "\""); 01346 01347 throw(ERROR(EACCES,es)); 01348 } 01349 } 01350 else { 01351 lstr = "Creating archive directory: "; 01352 lstr += working_archive_path(); 01353 lstr += "\n"; 01354 logger.write(lstr); 01355 mk_dir(working_archive_path()); 01356 } 01357 } 01358 catch(error e) { 01359 logger.write("An error has occured: "); 01360 logger.write(e[0].what()); 01361 logger.write("\n"); 01362 throw(e); 01363 } 01364 catch(...) { 01365 error e = err_unknown; 01366 01367 logger.write("An error has occured: "); 01368 logger.write(e[0].what()); 01369 logger.write("\n"); 01370 throw(e); 01371 } 01372 01373 // Create a todo list 01374 logger.write("Creating to-do list\n"); 01375 for ( 01376 cji = config.jobs().begin(); 01377 cji != config.jobs().end(); 01378 ++cji 01379 ) 01380 { 01381 job_archiver* ptr; 01382 01383 ptr = new job_archiver(&(*cji)); 01384 if (ptr == 0) 01385 throw(err_nomem); 01386 TRY_nomem(m_jobs.push_back(ptr)); 01387 } 01388 01389 // Backup clients 01390 logger.write("Processing jobs...\n"); 01391 while (m_jobs.size() > num_completed) { 01392 01393 /* 01394 logger.write("[DEBUG]: ---[ TOP OF LOOP ]---\n"); 01395 01396 debug_estr = "[DEBUG]: overflow_detected = "; 01397 debug_estr += estring(overflow_detected); 01398 debug_estr += "\n"; 01399 logger.write(debug_estr); 01400 01401 debug_estr = "[DEBUG]: num_processing = "; 01402 debug_estr += estring(num_processing); 01403 debug_estr += "\n"; 01404 logger.write(debug_estr); 01405 */ 01406 01407 if (!overflow_detected) { 01408 overflow_detected = vaulter.overflow(true); 01409 /* 01410 if (overflow_detected) { 01411 logger.write("[DEBUG]: Variable Change :: "); 01412 logger.write("overflow_detected = true"); 01413 logger.write("\n"); 01414 } 01415 */ 01416 } 01417 01418 // If the vault has exceeded it's highwater mark, wait for the 01419 // currently-processing jobs to terminate, and then attempt to prepare the 01420 // vault. 01421 if (overflow_detected && (num_processing == 0)) { 01422 TRY(vaulter.prepare(true),"Cannot complete archive"); 01423 overflow_detected = vaulter.overflow(); 01424 /* 01425 if (!overflow_detected) { 01426 logger.write("[DEBUG]: Variable Change :: "); 01427 logger.write("overflow_detected = false"); 01428 logger.write("\n"); 01429 } 01430 */ 01431 } 01432 01433 // For each job in the list... 01434 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) 01435 { 01436 // While we're running less than rsync-parallel jobs, start new ones 01437 if ( 01438 !overflow_detected 01439 && (num_processing < config.rsync_parallel()) 01440 && ((*ji)->status() == job_archiver::status_pending) 01441 ) 01442 { 01443 try { 01444 (*ji)->start(); 01445 } 01446 catch(error e) { 01447 if (e.num() == 12) { 01448 lstr = "Error starting job: Out of memory, will retry later\n"; 01449 (*ji)->clear(); 01450 } 01451 else { 01452 (*ji)->end(); 01453 lstr = "Error starting job, aborting\n"; 01454 // logger.write("[DEBUG]: Variable Change :: num_processing--\n"); 01455 num_processing--; 01456 reporter.jobs().add_report((*ji)->report()); 01457 } 01458 logger.write(lstr); 01459 } 01460 catch(...) { 01461 (*ji)->end(); 01462 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB "; 01463 lstr += "-- JOB TERMINATED\n"; 01464 logger.write(lstr); 01465 // logger.write("[DEBUG]: Variable Change :: num_processing--\n"); 01466 num_processing--; 01467 reporter.jobs().add_report((*ji)->report()); 01468 } 01469 // logger.write("[DEBUG]: Variable Change :: num_processing++\n"); 01470 num_processing++; 01471 } 01472 01473 // Process jobs 01474 if ((*ji)->status() == job_archiver::status_processing) { 01475 try { 01476 (*ji)->process(); 01477 } 01478 catch(error e) { 01479 // TODO: Change 12 to ENOMEM? 01480 if (e.num() == 12) { 01481 lstr = "Error starting job: Out of memory, will retry later\n"; 01482 (*ji)->clear(); 01483 } 01484 else { 01485 (*ji)->end(); 01486 lstr = "Error starting job, aborting\n"; 01487 // logger.write("[DEBUG]: Variable Change :: num_processing--\n"); 01488 num_processing--; 01489 reporter.jobs().add_report((*ji)->report()); 01490 } 01491 logger.write(lstr); 01492 } 01493 catch(...) { 01494 (*ji)->end(); 01495 lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB "; 01496 lstr += "-- JOB TERMINATED\n"; 01497 logger.write(lstr); 01498 // logger.write("[DEBUG]: Variable Change :: num_processing--\n"); 01499 num_processing--; 01500 reporter.jobs().add_report((*ji)->report()); 01501 } 01502 } 01503 01504 // Remove jobs that could not start from active duty 01505 if ((*ji)->status() == job_archiver::status_reschedule) { 01506 (*ji)->clear(); 01507 // logger.write("[DEBUG]: Variable Change :: num_processing--\n"); 01508 num_processing--; 01509 } 01510 01511 // If a job exited with an error, and the vault is full, then reschedule 01512 // the job for later 01513 if ( 01514 ((*ji)->status() == job_archiver::status_error) 01515 && 01516 overflow_detected 01517 ) 01518 { 01519 lstr = "Vault overflow detected, will retry job later\n"; 01520 logger.write(lstr); 01521 (*ji)->clear(); 01522 num_processing--; 01523 } 01524 01525 // Remove completed jobs from the processing list 01526 if ( 01527 ((*ji)->status() == job_archiver::status_completed) 01528 || ((*ji)->status() == job_archiver::status_fatal_error) 01529 || ((*ji)->status() == job_archiver::status_error) 01530 ) { 01531 (*ji)->end(); 01532 // logger.write("[DEBUG]: Variable Change :: num_processing--\n"); 01533 num_processing--; 01534 num_completed++; 01535 01536 // logger.write("Adding job report to report manager\n"); 01537 reporter.jobs().add_report((*ji)->report()); 01538 } 01539 } 01540 01541 mf_log_status(); 01542 sleep(1); 01543 01544 // logger.write("[DEBUG]: ---[ BOTTOM OF LOOP ]---\n"); 01545 } 01546 01547 t.stop(); 01548 lstr = "Archive Manager - Finished archiving, duration: "; 01549 lstr += t.duration(); 01550 lstr += "\n"; 01551 logger.write(lstr); 01552 01553 lstr = "Archive Manager - Finalizing archive path\n"; 01554 logger.write(lstr); 01555 TRY( 01556 rename_file(working_archive_path(), archive_path()), 01557 "Cannot finalize archive" 01558 ); 01559 01560 reporter.vault().add_report( 01561 vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault())) 01562 ); 01563 } 01564 01565 /** Return an absolute path to the finished archive directory */ 01566 const std::string archive_manager::archive_path(void) const 01567 { 01568 estring path; 01569 01570 if (!initialized()) 01571 throw(INTERNAL_ERROR(0,"Archive manager is not initialized")); 01572 01573 path = vaulter.vault(); 01574 path += "/"; 01575 path += config.timestamp().str(); 01576 01577 return(path); 01578 } 01579 01580 /** Return the absolute path to the unfinished working archive directory */ 01581 const std::string archive_manager::working_archive_path(void) const 01582 { 01583 estring path; 01584 01585 if (!initialized()) 01586 throw(INTERNAL_ERROR(0,"Archive manager is not initialized")); 01587 01588 path = archive_path(); 01589 path += ".incomplete"; 01590 01591 return(path); 01592 } 01593 01594 //----------------------------------------------------------------------------- 01595 01596 /** The global archive manager */ 01597 archive_manager archiver; 01598