archiver.cc

Go to the documentation of this file.
00001 #include "config.h"
00002 
00003 #ifdef HAVE_UNISTD_H
00004 #include <unistd.h>
00005 #endif
00006 
00007 #include <ctype.h>
00008 
00009 #include <iostream>
00010 #include <string>
00011 #include <vector>
00012 
00013 #include "asserts.h"
00014 #include "error.h"
00015 #include "estring.h"
00016 #include "fs.h"
00017 #include "rconfig.h"
00018 #include "timer.h"
00019 #include "logger.h"
00020 
00021 #include "archiver.h"
00022 
00023 //-----------------------------------------------------------------------------
00024 
00025 /** C'tor */
00026 rstat::rstat()
00027 {
00028         TRY_nomem(m_exit_str[0] = "Success");
00029         TRY_nomem(m_exit_str[1] = "Syntax or usage error");
00030         TRY_nomem(m_exit_str[2] = "Protocol incompatability error");
00031         TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories");
00032         TRY_nomem(m_exit_str[4] = "Requested action not supported");
00033         TRY_nomem(m_exit_str[10] = "Error in socket I/O");
00034         TRY_nomem(m_exit_str[11] = "Error in file I/O");
00035         TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream");
00036         TRY_nomem(m_exit_str[13] = "Errors with program diagnostics");
00037         TRY_nomem(m_exit_str[14] = "Error in IPC code");
00038         TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT");
00039         TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()");
00040         TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers");
00041         TRY_nomem(m_exit_str[23] = "Partial transfer");
00042         TRY_nomem(m_exit_str[30] = "Timeout in data send/receive");
00043         TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255");
00044         TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal");
00045         TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run");
00046         TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found");
00047 
00048         TRY_nomem(m_signal_str[1] = "[HUP]: Hangup");
00049         TRY_nomem(m_signal_str[2] = "[INT]: Interrupt ");
00050         TRY_nomem(m_signal_str[3] = "[QUIT]: Quit");
00051         TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction");
00052         TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap");
00053         TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault");
00054         TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination");
00055         TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault");
00056         TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception");
00057         TRY_nomem(m_signal_str[10] = "[KILL]: Killed");
00058         TRY_nomem(m_signal_str[11] = "[BUS]: Bus error");
00059         TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault");
00060         TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call");
00061         TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers");
00062         TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm");
00063         TRY_nomem(m_signal_str[16] = "[TERM]: Software termination");
00064         TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1");
00065         TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2");
00066         TRY_nomem(m_signal_str[19] = "[CLD]: Child status change");
00067         TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart");
00068         TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change");
00069         TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition");
00070         TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O");
00071         TRY_nomem(m_signal_str[24] = "[STOP]: Stop");
00072         TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character");
00073         TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process");
00074         TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read");
00075         TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write");
00076         TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired");
00077         TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired");
00078         TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit");
00079         TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit");
00080         TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked");
00081         TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal");
00082         TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR");
00083         TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR");
00084         TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation");
00085         TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost");
00086         TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal");
00087         TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal");
00088 
00089         TRY_nomem(m_unknown_exit = "(Unknown exit code)");
00090         TRY_nomem(m_unknown_signal = "(Unknown signal)");
00091 }
00092 
00093 /** Get a verbose string for an exit code */
00094 const std::string& rstat::exit(const int a_int) const
00095 {
00096         if (m_exit_str.find(a_int) != m_exit_str.end()) {
00097                 return(m_exit_str.find(a_int)->second);
00098         }
00099         return(m_unknown_exit);
00100 }
00101 
00102 /** Get a verbose string for a signal number */
00103 const std::string& rstat::signal(const int a_int) const
00104 {
00105         if (m_signal_str.find(a_int) != m_signal_str.end()) {
00106                 return(m_signal_str.find(a_int)->second);
00107         }
00108         return(m_unknown_signal);
00109 }
00110 
00111 class rstat rsync_estat_str;
00112 
00113 //-----------------------------------------------------------------------------
00114 
00115 /** C'tor
00116 
00117         Set a job to be assiciated with this job archiver and initialize it's
00118         processing status to "pending".
00119 
00120  */
00121 job_archiver::job_archiver(const job * a_job)
00122 {
00123         clear();
00124         m_job = a_job;
00125         m_status = status_pending;
00126 }
00127 
00128 /** Generate a job prefix string
00129 
00130         Create a string to uniquely identify this job to be used in the log file to
00131         uniquely identify this job
00132 
00133  */
00134 const std::string job_archiver::prefix(void)
00135 {
00136         estring lstr;
00137 
00138         lstr = "[Job:";
00139         lstr += estring((void*)m_job);
00140         lstr += "] ";
00141 
00142         return(lstr);
00143 }
00144 
00145 /** Generate a job id string */
00146 const std::string job_archiver::id(void)
00147 {
00148         estring lstr;
00149 
00150         lstr = prefix();
00151         lstr += " ";
00152         lstr += m_job->generate_job_id();
00153 
00154         return(lstr);
00155 }
00156 
00157 /** Clear the job archiver and return it to it's initial state
00158         
00159         End any processes handling this job and return the job archiver to it's
00160         "pending" state.
00161 
00162  */
00163 void job_archiver::clear(void)
00164 {
00165         end();
00166         m_child_pid = m_exec.my_pid();
00167         m_status = status_pending;
00168         m_success = true;
00169         m_jr.clear();
00170         m_jpr.clear();
00171         m_error_msg.erase();
00172 }
00173 
00174 /** End any processes handling this job
00175 
00176         If any child processes are handling this job, terminate them.  Erase any
00177         pending I/O for the now defunct child.  Set our processing status to "done".
00178 
00179 */
00180 void job_archiver::end(void)
00181 {
00182         estring lstr;
00183 
00184         m_timer.stop();
00185         if (m_exec.child_running()) {
00186                 lstr = prefix();
00187                 lstr += "Terminating child process!\n";
00188                 logger.write(lstr,0,configuration_manager::logging_manager,m_child_pid);
00189                 m_exec.kill_child();
00190         }
00191         m_exec.clear();
00192         m_io_out.erase();
00193         m_io_err.erase();
00194         m_status = status_done;
00195 }
00196 
00197 /** Return the processing status of this job archiver */
00198 const job_archiver::archiving_status job_archiver::status(void)
00199 {
00200         return(m_status);
00201 }
00202 
00203 /** Begin processing
00204 
00205         Attempt to fork a child process to handle this job.  If unsuccessful then
00206         retry again later.  The child then calls mf_do_chores() to handle the actual
00207         processing, while the parent updates the job archiver's status from
00208         "pending" to "processing" and begins a timer to measure the duration of the
00209         job process.
00210         
00211 */
00212 void job_archiver::start(void)
00213 {
00214         estring lstr;
00215 
00216         m_jr.id(m_job->generate_job_id());
00217 
00218         try {
00219                 m_exec.fork();
00220         }
00221         catch(error e) {
00222                 lstr = prefix();
00223                 lstr += "Could not fork:\n";
00224                 logger.write(lstr);
00225 
00226                 lstr = e.str(prefix());
00227                 logger.write(lstr);
00228 
00229                 lstr = prefix();
00230                 lstr += "Will retry job later\n";
00231                 logger.write(lstr);
00232 
00233                 m_status = status_reschedule;
00234         }
00235         catch(...) {
00236                 error e = err_unknown;
00237 
00238                 lstr = prefix();
00239                 lstr += "Could not fork:\n";
00240                 logger.write(lstr);
00241 
00242                 lstr = e.str(prefix());
00243                 logger.write(lstr);
00244 
00245                 lstr = prefix();
00246                 lstr += "Will retry job later\n";
00247                 logger.write(lstr);
00248 
00249                 m_status = status_reschedule;
00250         }
00251 
00252         if (m_exec.is_child()) {
00253                 // wait_for_debugger = true;
00254 
00255                 // while (wait_for_debugger);
00256 
00257                 m_exec.reroute_stdio();
00258                 try {
00259                         mf_do_chores();
00260                 }
00261                 catch(error e) {
00262                         std::cerr << e;
00263                         m_success = false;
00264                 }
00265                 catch(...) {
00266                         std::cerr << err_unknown;
00267                         m_success = false;
00268                 }
00269                 if (m_success)
00270                         m_exec.exit(0);
00271                 else
00272                         m_exec.exit(1);
00273         }
00274 
00275         m_child_pid = m_exec.child_pid();
00276 
00277         lstr = prefix();
00278         lstr += "Spawning child process: PID ";
00279         lstr += estring(static_cast<unsigned long>(m_exec.child_pid()));
00280         lstr += "\n";
00281         logger.write(lstr);
00282 
00283         m_status = status_processing;
00284         m_timer.start();
00285 }
00286 
00287 /** Parent processor for a job
00288 
00289         Check for I/O from the child.  Check the child's status to see if it's still
00290         running, has exited with an exit code, or has exited from a signal.  If the
00291         child sis not exit normally (i.e. exit from a signal or exit with a non-zero
00292         exit code) then check the vault for overflow.  If the vault has exceeded
00293         it's overflow threshold then that could be the cause for the child's
00294         failure, in which case we reschedule the child to be processed again later.
00295 
00296         If the job is finished (whether successful or not), update the job
00297         archiver's status to "completed".
00298         
00299  */
00300 void job_archiver::process(void)
00301 {
00302         estring lstr;
00303         
00304         if (m_exec.child_running()) {
00305                 // Process child I/O
00306                 mf_process_child_io(false);
00307         }
00308         else {
00309                 // Process remaining child I/O
00310                 mf_process_child_io(true);
00311 
00312                 // If child exited with an error, check vault overflow.  If the vault is
00313                 // filling up, then reschedule the job for later retry.
00314                 lstr = prefix();
00315                 if (m_exec.child_signaled()) {
00316                         lstr += "Child exited from signal: ";
00317                         lstr += estring(m_exec.child_signal_no());
00318                         lstr += "\n";
00319                 }
00320                 else if (m_exec.child_exit_code() != 0) {
00321                         lstr += "Child exited abnormally with code: ";
00322                         lstr += estring(m_exec.child_exit_code());
00323                         lstr += "\n";
00324                 }
00325                 else {
00326                         lstr += "Child exited successfully\n";
00327                         m_status = status_completed;
00328                 }
00329                 logger.write(lstr);
00330 
00331                 if (m_exec.child_signaled() || !m_exec.child_exited_normally()) {
00332                         /*
00333                         if (vaulter.overflow()) {
00334                                 lstr = prefix();
00335                                 lstr += "Vault overflow detected, will retry later\n";
00336                                 logger.write(lstr);
00337                                 m_status = status_reschedule;
00338                         }
00339                         else {
00340                                 m_status = status_error;
00341                         }
00342                         */
00343                         m_status = status_error;
00344                 }
00345                 else {
00346                         m_status = status_completed;
00347                 }
00348 
00349                 m_timer.stop();
00350                 lstr = prefix();
00351                 lstr += "Finished, duration: ";
00352                 lstr += m_timer.duration();
00353                 lstr += "\n";
00354                 logger.write(lstr);
00355                 // m_status = status_completed;
00356         }
00357 }
00358 
00359 /** Return the job report for this job */
00360 single_job_report job_archiver::report(void) const
00361 {
00362         return(m_jr);
00363 }
00364 
00365 /** Child processor for a job
00366 
00367         For each path in this job:
00368         - Create the directory heiararchy for this job in the archive
00369         - Until done or until out of retrys:
00370                 - Choose a hardlink source, if applicable and available
00371                 - Construct the command line to pass to rsync
00372                 - Spawn rsync
00373                 - Process I/O sent back from rsync
00374                 - Process exit code or signal number returned from rsync
00375         - Generate and submit a report to the report manager
00376         
00377  */
00378 void job_archiver::mf_do_chores(void)
00379 {
00380         /*
00381         {
00382                 bool wait_for_debugger = true;
00383 
00384                 std::cerr << "Waiting for debugger to attach..." << std::endl;
00385                 while (wait_for_debugger);
00386                 std::cerr << "Debugger attached." << std::endl;
00387         }
00388         */
00389 
00390         job::paths_type::const_iterator pi;
00391 
00392         for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
00393                 estring archive_dir;
00394                 estring path;
00395                 // estring command_line;
00396                 std::string binary;
00397                 std::vector<std::string> argv;
00398                 estring opt;
00399                 bool hardlink = false;
00400                 bool multi_hardlink = false;
00401                 int num_retries = 0;
00402                 bool done = false;
00403                 int exit_code = 0;
00404                 int signal_num = 0;
00405                 timer t;
00406                 uint64 files_total = 0;
00407                 uint64 files_xferd = 0;
00408                 uint64 size_total = 0;
00409                 uint64 size_xferd = 0;
00410                 bool overflow_detected = 0;
00411                 estring error_msg;
00412 
00413                 archive_dir = m_job->generate_archive_path(*pi);
00414                 // If archive_dir does not end with a '/', strip off characters until it
00415                 // does.
00416                 /*
00417                 while ((archive_dir.size() > 0) 
00418                         && (archive_dir[archive_dir.size()-1] != '/'))
00419                 {
00420                         archive_dir.erase(archive_dir.size()-1);
00421                 }
00422                 */
00423 
00424                 path = archiver.working_archive_path();
00425                 path += "/";
00426                 path += archive_dir;
00427                 path = reform_path(path);
00428                 if (!exists(path)) {
00429                         std::cout << "Creating job archive path: " << archive_dir << std::endl;
00430                         mk_dirhier(path);
00431                 }
00432                 else
00433                         std::cout << "Archiving to existing path: " << archive_dir << std::endl;
00434 
00435                 if (!writable(path)) {
00436                         std::string es;
00437 
00438                         TRY_nomem(es = "Cannot write to archive directory: \"");
00439                         TRY_nomem(es += archive_dir);
00440                         TRY_nomem(es += "\"");
00441 
00442                         throw(ERROR(EACCES,es));
00443                 }
00444 
00445                 logger.set_error_logging(false);
00446                 hardlink = m_job->rsync_hardlink;
00447                 multi_hardlink = m_job->rsync_multi_hardlink;
00448                 while ((num_retries <= m_job->rsync_retry_count) && !done) {
00449                         execute exec;
00450                         job::excludes_type::const_iterator ei;
00451                         job::includes_type::const_iterator ii;
00452                         std::vector<std::string>::const_iterator oi;
00453 
00454                         exit_code = 0;
00455                         signal_num = 0;
00456 
00457                         /*
00458                         command_line = config.rsync_local_path();
00459                         command_line += " ";
00460                         command_line += m_job->rsync_options;
00461                         */
00462                         /**/
00463                         binary = config.rsync_local_path();
00464                         argv = m_job->generate_rsync_options_vector();
00465                         /**/
00466 
00467                         if (m_job->rsync_connection != job::connection_local) {
00468                                 /*
00469                                 command_line += " ";
00470                                 command_line += " --rsync-path=";
00471                                 if (m_job->rsync_remote_path.size()) {
00472                                         command_line += m_job->rsync_remote_path;
00473                                 }
00474                                 else {
00475                                         command_line += config.rsync_local_path();
00476                                 }
00477                                 */
00478                                 /**/
00479                                 opt = "--rsync-path=";
00480                                 if (m_job->rsync_remote_path.size() != 0)
00481                                         opt += m_job->rsync_remote_path;
00482                                 else
00483                                         opt += config.rsync_local_path();
00484                                 argv.push_back(opt);
00485                                 /**/
00486                         }
00487 
00488                         if (hardlink) {
00489                                 subdirectory subdir;
00490                                 std::string youngest;
00491                                 std::string relative_path;
00492                                 bool hardlink_opt = false;
00493                                 bool first_hardlink = false;
00494                                 int linkdest_count = 0;
00495 
00496                                 subdir.path(vaulter.vault());
00497                                 if (subdir.size() > 0) {
00498                                         subdirectory::const_iterator si;
00499 
00500                                         sort(subdir.begin(), subdir.end());
00501                                         reverse(subdir.begin(), subdir.end());
00502                                         for (si = subdir.begin(); si != subdir.end(); ++si) {
00503                                                 estring ypath;
00504 
00505                                                 if (first_hardlink && !multi_hardlink) {
00506                                                         continue;
00507                                                 }
00508                                                 if (linkdest_count >= m_job->rsync_multi_hardlink_max) {
00509                                                         continue;
00510                                                 }
00511                                                 if (!is_timestamp(*si))
00512                                                         continue;
00513                                                 if (*si == config.timestamp().str())
00514                                                         continue;
00515                                                 std::cout 
00516                                                         << "Considering potential hardlink source: "
00517                                                         << *si
00518                                                         << std::endl;
00519                                                 ypath = vaulter.vault();
00520                                                 ypath += "/";
00521                                                 ypath += *si;
00522                                                 ypath += "/";
00523                                                 ypath += archive_dir;
00524                                                 if (exists(ypath)) {
00525                                                         std::cout 
00526                                                                 << "Using archive as hardlink source: " 
00527                                                                 << *si
00528                                                                 << std::endl;
00529                                                         // youngest = ypath;
00530                                                         if (!hardlink_opt) {
00531                                                                 /**/
00532                                                                 argv.push_back(std::string("--hard-links"));
00533                                                                 /**/
00534                                                                 /*
00535                                                                 command_line += " --hard-links";
00536                                                                 */
00537                                                                 hardlink_opt = true;
00538                                                         }
00539 
00540                                                         relative_path = mk_relative_path(ypath,path);
00541                                                         /*
00542                                                         command_line += " --link-dest=";
00543                                                         command_line += relative_path;
00544                                                         */
00545                                                         /**/
00546                                                         opt="--link-dest=";
00547                                                         opt += relative_path;
00548                                                         argv.push_back(opt);
00549                                                         /**/
00550                                                         first_hardlink = true;
00551                                                         linkdest_count++; // At most 20 link-dest options can be used w/ rsync
00552                                                 }
00553                                                 else {
00554                                                         std::cout
00555                                                                 << "- No such path: " 
00556                                                                 << ypath
00557                                                                 << std::endl;
00558                                                 }
00559                                         }
00560                                 }
00561                                 /*
00562                                 if (youngest.size() > 0) {
00563                                         relative_path = mk_relative_path(youngest,path);
00564                                         command_line += " --hard-links --link-dest=";
00565                                         command_line += relative_path;
00566                                 }
00567                                 */
00568                         }
00569 
00570                         for (
00571                                 ei = m_job->excludes.begin();
00572                                 ei != m_job->excludes.end();
00573                                 ++ei
00574                                 )
00575                         {
00576                                 /*
00577                                 command_line += " --exclude-from=";
00578                                 command_line += *ei;
00579                                 */
00580                                 /**/
00581                                 opt = "--exclude-from=";
00582                                 opt += *ei;
00583                                 argv.push_back(opt);
00584                                 /**/
00585                         }
00586 
00587                         for (
00588                                 ii = m_job->includes.begin();
00589                                 ii != m_job->includes.end();
00590                                 ++ii
00591                                 )
00592                         {
00593                                 /*
00594                                 command_line += " --include-from=";
00595                                 command_line += *ii;
00596                                 */
00597                                 /**/
00598                                 opt = "--include-from=";
00599                                 opt += *ei;
00600                                 argv.push_back(opt);
00601                                 /**/
00602                         }
00603 
00604                         /*
00605                         command_line += " ";
00606                         command_line += m_job->generate_source_path(*pi);
00607                         */
00608                         /**/
00609                         argv.push_back(m_job->generate_source_path(*pi));
00610                         /**/
00611                         
00612                         /*
00613                         command_line += " ";
00614                         command_line += path;
00615                         */
00616                         /**/
00617                         argv.push_back(path);
00618                         /**/
00619         
00620                         /*
00621                         std::cout << "Command Line: " << command_line << std::endl;
00622                         */
00623                         /**/
00624                         std::cout << "Command Line:" << std::endl;
00625                         {
00626                                 uint16 c;
00627                                 std::cout << "  Binary: " << binary << std::endl;
00628                                 for (c = 0; c < argv.size(); c++) {
00629                                         std::cout << "    Argv[" << c << "] = " << argv[c] << std::endl;
00630                                 }
00631                         }
00632                         /**/
00633 
00634                         m_error_msg.erase();
00635 
00636                         t.start();
00637                         /*
00638                         exec.exec(command_line);
00639                         */
00640                         /**/
00641                         exec.exec(binary, argv);
00642                         /**/
00643                         mf_process_rsync_io(
00644                                 exec, 
00645                                 m_job->rsync_timeout,
00646                                 files_total,
00647                                 files_xferd,
00648                                 size_total,
00649                                 size_xferd,
00650                                 overflow_detected
00651                                 );
00652                         t.stop();
00653 
00654                         signal_num = 0;
00655                         if (exec.child_signaled()) {
00656                                 std::cout 
00657                                         << "Rsync caught signal: [" 
00658                                         << exec.child_signal_no()
00659                                         << "] "
00660                                         << rsync_estat_str.signal(exec.child_signal_no())
00661                                         << std::endl;
00662                                 signal_num = exec.child_signal_no();
00663                         }
00664                         std::cout
00665                                 << "Rsync exit code: ["
00666                                 << exec.child_exit_code()
00667                                 << "] "
00668                                 << rsync_estat_str.exit(exec.child_exit_code())
00669                                 << std::endl;
00670 
00671                         exit_code = exec.child_exit_code();
00672                         if (exec.child_exited_normally() && (exit_code == 0))
00673                                 done = true;
00674                         else if (overflow_detected) {
00675                                 std::cout 
00676                                         << "Vault overflow detected"
00677                                         << std::endl;
00678                                 break;
00679                         }
00680                         else {
00681                                 ++num_retries;
00682                                 logger.set_error_logging(true);
00683                         }
00684 
00685                         if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail)
00686                         {
00687                                 std::cout << "Failing, will not attempt to retry" << std::endl;
00688                                 break;
00689                         }
00690                         if (m_job->rsync_behavior[exit_code] 
00691                                 == rsync_behavior::retry_without_hardlinks)
00692                         {
00693                                 std::cout << "Retrying without hardlinks..." << std::endl;
00694                                 hardlink = false;
00695                         }
00696 
00697                         if ((!done) && (m_job->rsync_retry_delay > 0)) {
00698                                 std::cout << "Waiting " << m_job->rsync_retry_delay 
00699                                         << " minutes before retrying..." << std::endl;
00700                                 sleep( (m_job->rsync_retry_delay) * 60);
00701                         }
00702                 }
00703                 if (!done) {
00704                         if (num_retries >= m_job->rsync_retry_count) {
00705                                 std::cout << "Retry count exceeded" << std::endl;
00706                         }
00707                         if (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)
00708                                 std::cout << "Ignoring rsync failure" << std::endl;
00709                         else {
00710                                 std::cout << "Giving up on this path" << std::endl;
00711                                 m_success = false;
00712                         }
00713                 }
00714                 reportio().write_report(
00715                         m_job->generate_source_path(*pi),
00716                         t,
00717                         exit_code,
00718                         signal_num,
00719                         m_error_msg
00720                         );
00721         }
00722 }
00723 
00724 void job_archiver::mf_process_report(const std::string& a_str)
00725 {
00726         if (reportio().is_report(a_str)) {
00727                 m_jpr = reportio().parse(a_str);
00728                 m_jr.add_report(m_jpr);
00729         }
00730 }
00731 
00732 /** Process I/O from the child
00733 
00734         While there is I/O to be read, read and parse it.  When the end of a line is
00735         reached write that line to the log file.  If a_finalize is true, the flush
00736         the child I/O buffer string.
00737         
00738  */
00739 void job_archiver::mf_process_child_io(bool a_finalize)
00740 {
00741         estring lstr;
00742         bool io_flag = false;
00743 
00744         while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) {
00745                 int r;
00746                 const int buffer_size = 128;
00747                 char buffer[buffer_size] = { 0 };
00748 
00749                 r = m_exec.out_read(buffer, buffer_size);
00750                 if (r > 0) {
00751                         int c;
00752 
00753                         io_flag = true;
00754                         for (c = 0; c < r; ++c) {
00755                                 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00756                                         lstr = prefix();
00757                                         lstr += m_io_out;
00758                                         lstr += "\n";
00759                                         logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00760                                         mf_process_report(lstr);
00761                                         m_io_out.erase();
00762                                 }
00763                                 else {
00764                                         m_io_out += buffer[c];
00765                                 }
00766                         }
00767                 }
00768         }
00769         if (a_finalize && (m_io_out.size() > 0)) {
00770                 lstr = prefix();
00771                 lstr += m_io_out;
00772                 lstr += "\n";
00773                 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00774                 mf_process_report(lstr);
00775                 m_io_out.erase();
00776         }
00777 
00778         while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) {
00779                 int r;
00780                 const int buffer_size = 128;
00781                 char buffer[buffer_size] = { 0 };
00782 
00783                 r = m_exec.err_read(buffer, buffer_size);
00784                 if (r > 0) {
00785                         int c;
00786 
00787                         io_flag = true;
00788                         for (c = 0; c < r; ++c) {
00789                                 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00790                                         lstr = prefix();
00791                                         lstr += m_io_err;
00792                                         lstr += "\n";
00793                                         logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00794                                         mf_process_report(lstr);
00795                                         m_io_err.erase();
00796                                 }
00797                                 else {
00798                                         m_io_err += buffer[c];
00799                                 }
00800                         }
00801                 }
00802         }
00803         if (a_finalize && (m_io_err.size() > 0)) {
00804                 lstr = prefix();
00805                 lstr += m_io_err;
00806                 lstr += "\n";
00807                 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00808                 mf_process_report(lstr);
00809                 m_io_err.erase();
00810         }
00811         if (!io_flag)
00812                 sleep(config.io_poll_interval());
00813 }
00814 
00815 /** Trim off all non-digit leading and trailing characters from a string */
00816 void job_archiver::mf_trim_string(std::string& a_str)
00817 {
00818         while ((a_str.size() > 0) && (!isdigit(a_str[0])))
00819                 a_str.erase(0,1);
00820         while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
00821                 a_str.erase(a_str.size()-1,1);
00822 }
00823 
00824 /** Parse I/O from rsync
00825 
00826         Search for special output from rsync to tell us something about the number
00827         and size of files and files transfered.
00828 
00829  */
00830 void job_archiver::mf_parse_rsync_io(
00831         const std::string a_str,
00832         uint64& a_files_total,
00833         uint64& a_files_xferd,
00834         uint64& a_size_total,
00835         uint64& a_size_xferd
00836         )
00837 {
00838         estring estr;
00839 
00840         if (a_str.find("Number of files: ") == 0) {
00841                 estr = a_str;
00842                 mf_trim_string(estr);
00843                 try {
00844                         a_files_total = estr;
00845                 }
00846                 catch(error e) {
00847                         estring es;
00848 
00849                         es = "Could not parse total number of files processed by rsync";
00850                         e.push_back(ERROR_INSTANCE(es));
00851 
00852                         // Not sure this is the best way to handle this...
00853                         std::cerr << e;
00854                 }
00855                 catch(...) {
00856                         error e = err_unknown;
00857                         estring es;
00858 
00859                         es = "Could not parse total number of files processed by rsync";
00860                         e.push_back(ERROR_INSTANCE(es));
00861 
00862                         // Not sure this is the best way to handle this...
00863                         std::cerr << e;
00864                 }
00865         }
00866         else if (a_str.find("Number of files transferred: ") == 0) {
00867                 estr = a_str;
00868                 mf_trim_string(estr);
00869                 try {
00870                         a_files_xferd = estr;
00871                 }
00872                 catch(error e) {
00873                         estring es;
00874 
00875                         es = "Could not parse total number of files transferred by rsync";
00876                         e.push_back(ERROR_INSTANCE(es));
00877 
00878                         // Not sure this is the best way to handle this...
00879                         std::cerr << e;
00880                 }
00881                 catch(...) {
00882                         error e = err_unknown;
00883                         estring es;
00884 
00885                         es = "Could not parse total number of files transferred by rsync";
00886                         e.push_back(ERROR_INSTANCE(es));
00887 
00888                         // Not sure this is the best way to handle this...
00889                         std::cerr << e;
00890                 }
00891         }
00892         else if (a_str.find("Total file size: ") == 0) {
00893                 estr = a_str;
00894                 mf_trim_string(estr);
00895                 try {
00896                         a_size_total = estr;
00897                 }
00898                 catch(error e) {
00899                         estring es;
00900 
00901                         es = "Could not parse total size of files processed by rsync";
00902                         e.push_back(ERROR_INSTANCE(es));
00903 
00904                         // Not sure this is the best way to handle this...
00905                         std::cerr << e;
00906                 }
00907                 catch(...) {
00908                         error e = err_unknown;
00909                         estring es;
00910 
00911                         es = "Could not parse total size of files processed by rsync";
00912                         e.push_back(ERROR_INSTANCE(es));
00913 
00914                         // Not sure this is the best way to handle this...
00915                         std::cerr << e;
00916                 }
00917         }
00918         else if (a_str.find("Total transferred file size: ") == 0) {
00919                 estr = a_str;
00920                 mf_trim_string(estr);
00921                 try {
00922                         a_size_xferd = estr;
00923                 }
00924                 catch(error e) {
00925                         estring es;
00926 
00927                         es = "Could not parse total size of files transferred by rsync";
00928                         e.push_back(ERROR_INSTANCE(es));
00929 
00930                         // Not sure this is the best way to handle this...
00931                         std::cerr << e;
00932                 }
00933                 catch(...) {
00934                         error e = err_unknown;
00935                         estring es;
00936 
00937                         es = "Could not parse total size of files transferred by rsync";
00938                         e.push_back(ERROR_INSTANCE(es));
00939 
00940                         // Not sure this is the best way to handle this...
00941                         std::cerr << e;
00942                 }
00943         }
00944 }
00945 
00946 /** Process I/O from rsync
00947 
00948         If there is I/O from rsync to be read, read it and then send it through the
00949         parser.
00950 
00951  */
00952 void job_archiver::mf_process_rsync_io(
00953         execute& a_exec, 
00954         uint16 a_timeout,
00955         uint64& a_files_total,
00956         uint64& a_files_xferd,
00957         uint64& a_size_total,
00958         uint64& a_size_xferd,
00959         bool& a_overflow_detected
00960         )
00961 {
00962         size_t ro;
00963         size_t re;
00964         estring out;
00965         estring err;
00966         timer t;
00967         bool io_flag;
00968         char buffer[1024] = { 0 };
00969         char *ptr;
00970 
00971         ro = 1;
00972         re = 1;
00973         t.start();
00974         while ((ro != 0) || (re != 0) || a_exec.child_running()) {
00975                 io_flag = false;
00976                 ro = 0;
00977                 re = 0;
00978 
00979                 if (!a_overflow_detected) {
00980                         a_overflow_detected = vaulter.overflow();
00981                 }
00982 
00983                 m_error_msg.erase();
00984 
00985                 if (a_exec.out_ready()) {
00986                         ro = read(a_exec.out_fd(), buffer, 1024);
00987                         if (ro > 0) {
00988                                 io_flag = true;
00989                                 t.start();
00990                                 for (ptr = buffer; ptr != buffer+ro; ++ptr) {
00991                                         if ((*ptr != '\r') && (*ptr != '\n')) {
00992                                                 out += *ptr;
00993                                         }
00994                                         else {
00995                                                 reportio().write_rsync_out(out);
00996                                                 out.erase();
00997                                         }
00998                                 }
00999                         }
01000                 }
01001 
01002                 if (a_exec.err_ready()) {
01003                         re = read(a_exec.err_fd(), buffer, 1024);
01004                         if (re > 0) {
01005                                 io_flag = true;
01006                                 t.start();
01007                                 for (ptr = buffer; ptr != buffer+re; ++ptr) {
01008                                         if ((*ptr != '\r') && (*ptr != '\n')) {
01009                                                 err += *ptr;
01010                                         }
01011                                         else {
01012                                                 reportio().write_rsync_err(err);
01013                                                 err.erase();
01014                                         }
01015                                 }
01016                         }
01017                 }
01018 
01019                 t.stop();
01020                 if (t.duration_secs() > a_timeout) {
01021                         std::cerr << "*** Rsync program inactivity timeout" << std::endl;
01022                         a_exec.kill_child();
01023                         TRY_nomem(m_error_msg = "Rsync inactivity timeout");
01024                 }
01025 
01026                 if (!io_flag)
01027                         sleep(config.io_poll_interval());
01028 
01029         }
01030         if (out.size() > 0) {
01031                 std::cerr << out << std::endl;
01032                 mf_parse_rsync_io(
01033                         out, 
01034                         a_files_total, 
01035                         a_files_xferd, 
01036                         a_size_total,
01037                         a_size_xferd
01038                 );
01039                 out.erase();
01040         }
01041         if (err.size() > 0) {
01042                 std::cerr << err << std::endl;
01043                 mf_parse_rsync_io(
01044                         out, 
01045                         a_files_total, 
01046                         a_files_xferd, 
01047                         a_size_total,
01048                         a_size_xferd
01049                 );
01050                 err.erase();
01051         }
01052 }
01053 
01054 //-----------------------------------------------------------------------------
01055 
01056 /** C'tor */
01057 archive_manager::archive_manager()
01058 {
01059         if (this != &archiver)
01060                 throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers"));
01061 
01062         clear();
01063 }
01064 
01065 /** Clear the archive manager and clear the job list */
01066 void archive_manager::clear(void)
01067 {
01068         m_jobs.clear();
01069         m_initialized = false;
01070 }
01071 
01072 /** Initialize the archive manager
01073 
01074         Log the archive timestamp, select and prepare a vault.
01075 
01076  */
01077 void archive_manager::init(void)
01078 {
01079         timer t;
01080         estring lstr;
01081 
01082         lstr = "Archive Manager - Beginning initialization\n";
01083         logger.write(lstr);
01084         t.start();
01085 
01086         lstr = "Timestamp: ";
01087         lstr += config.timestamp().str();
01088         lstr += "\n";
01089         logger.write(lstr);
01090 
01091         // Select a vault?
01092         vaulter.select();
01093         lstr = "Vault selected: ";
01094         lstr += vaulter.vault();
01095         lstr += "\n";
01096         logger.write(lstr);
01097 
01098         reporter.vault().add_report(
01099                 vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault()))
01100                 );
01101 
01102         // Prepare the vault?
01103         vaulter.prepare();
01104 
01105         t.stop();
01106         lstr = "Archive Manager - Finished initialization";
01107         lstr += ", duration: ";
01108         lstr += t.duration();
01109         lstr += "\n";
01110         logger.write(lstr);
01111 
01112         m_initialized = true;
01113 }
01114 
01115 /** Return the initialized status of the archive manager */
01116 const bool archive_manager::initialized(void) const
01117 {
01118         return(m_initialized);
01119 }
01120 
01121 /** Give a status report
01122 
01123         After so many minutes of inactivity write a report to the log file of our
01124         current status of affairs.
01125 
01126  */
01127 void archive_manager::mf_log_status(void)
01128 {
01129         static timer t;
01130         const timer::value_type timeout = 30;
01131         estring lstr;
01132         std::vector<job_archiver*>::const_iterator ji;
01133         uint64 jobs_pending = 0;
01134         uint64 jobs_processing = 0;
01135         uint64 jobs_completed =0;
01136         uint64 jobs_remaining = 0;
01137 
01138         if (!t.is_started())
01139                 t.start();
01140         
01141         t.stop();
01142         if (t.duration_mins() < timeout)
01143                 return;
01144         
01145         lstr  = "\n";
01146         lstr += "STATUS REPORT:\n";
01147         lstr += "================================================================\n";
01148         logger.write(lstr);
01149         for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) {
01150                 lstr = "[";
01151                 switch ((*ji)->status()) {
01152                         case job_archiver::status_pending:
01153                                 lstr += "Pending    ";
01154                                 ++jobs_pending;
01155                                 break;
01156                         case job_archiver::status_processing:
01157                                 lstr += "Processing ";
01158                                 ++jobs_processing;
01159                                 break;
01160                         case job_archiver::status_reschedule:
01161                                 lstr += "Reschedule ";
01162                                 ++jobs_pending;
01163                                 break;
01164                         case job_archiver::status_fatal_error:
01165                                 lstr += "Fatal Error";
01166                                 ++jobs_completed;
01167                                 break;
01168                         case job_archiver::status_error:
01169                                 lstr += "Error      ";
01170                                 ++jobs_completed;
01171                                 break;
01172                         case job_archiver::status_completed:
01173                                 lstr += "Completed  ";
01174                                 ++jobs_completed;
01175                                 break;
01176                         case job_archiver::status_done:
01177                                 lstr += "Done       ";
01178                                 ++jobs_completed;
01179                                 break;
01180                         default:
01181                                 lstr += "<unknown>  ";
01182                                 break;
01183                 }
01184                 lstr += "]: ";
01185                 lstr += (*ji)->id();
01186                 lstr += "\n";
01187                 logger.write(lstr);
01188         }
01189 
01190         lstr  = "---------------------------------------------------------------\n";
01191         lstr += "     Jobs Pending: ";
01192         lstr += estring(jobs_pending);
01193         lstr += "\n";
01194 
01195         lstr += "  Jobs Processing: ";
01196         lstr += estring(jobs_processing);
01197         lstr += "\n";
01198 
01199         lstr += "   Jobs Completed: ";
01200         lstr += estring(jobs_completed);
01201         lstr += "\n";
01202 
01203         lstr += "            Total: ";
01204         lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
01205         lstr += "\n";
01206 
01207         lstr += "Overflow Detected: ";
01208         if (vaulter.overflow()) {
01209                 lstr += "TRUE";
01210         }
01211         else {
01212                 lstr += "false";
01213         }
01214         lstr += "\n";
01215 
01216         logger.write(lstr);
01217         logger.write("\n");
01218         t.start();
01219 }
01220 
01221 /** Archive jobs
01222 
01223         Create an archive directory.  Generate a to-do list of job archiver objects.
01224         Process the job archiver objects:
01225         - While there are less than rsync-parallel job archivers processing, start
01226                 the first available job archiver.
01227         - Check the status of each job and process I/O from jobs underway.
01228         - Remove jobs that failed to start.
01229         - Possibly reschedule failed jobs.
01230         - Remove completed jobs from active processing.
01231         - Call mf_log_status().
01232 
01233  */
01234 void archive_manager::archive(void)
01235 {
01236         timer t;
01237         estring lstr;
01238         configuration_manager::jobs_type::const_iterator cji;
01239         int num_processing = 0;
01240         std::vector<job_archiver*>::iterator ji;
01241         uint64 num_completed = 0;
01242         bool overflow_detected = false;
01243         estring debug_estr;
01244 
01245         if (!initialized())
01246                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01247 
01248         lstr = "Archive Manager - Begin archiving\n";
01249         logger.write(lstr);
01250         t.start();
01251 
01252         // Create archive directory
01253         try {
01254                 if (exists(archive_path())) {
01255                         lstr = "Found existing archive directory: ";
01256                         lstr += archive_path();
01257                         lstr += "\n";
01258                         logger.write(lstr);
01259                         rename_file(archive_path(), working_archive_path());
01260                 }
01261                 else if (exists(working_archive_path())) {
01262                         lstr = "Found existing archive directory: ";
01263                         lstr += working_archive_path();
01264                         lstr += "\n";
01265                         logger.write(lstr);
01266                 }
01267                 else {
01268                         lstr = "Creating archive directory: ";
01269                         lstr += working_archive_path();
01270                         lstr += "\n";
01271                         logger.write(lstr);
01272                         mk_dir(working_archive_path());
01273                 }
01274         }
01275         catch(error e) {
01276                 logger.write("An error has occured: ");
01277                 logger.write(e[0].what());
01278                 logger.write("\n");
01279                 throw(e);
01280         }
01281         catch(...) {
01282                 error e = err_unknown;
01283 
01284                 logger.write("An error has occured: ");
01285                 logger.write(e[0].what());
01286                 logger.write("\n");
01287                 throw(e);
01288         }
01289 
01290         // Create a todo list
01291         logger.write("Creating to-do list\n");
01292         for (
01293                 cji = config.jobs().begin();
01294                 cji != config.jobs().end();
01295                 ++cji
01296                 )
01297         {
01298                 job_archiver* ptr;
01299 
01300                 ptr = new job_archiver(&(*cji));
01301                 if (ptr == 0)
01302                         throw(err_nomem);
01303                 TRY_nomem(m_jobs.push_back(ptr));
01304         }
01305 
01306         // Backup clients
01307         logger.write("Processing jobs...\n");
01308         while (m_jobs.size() > num_completed) {
01309 
01310                 /*
01311                 logger.write("[DEBUG]: ---[ TOP OF LOOP ]---\n");
01312 
01313                 debug_estr = "[DEBUG]: overflow_detected = ";
01314                 debug_estr += estring(overflow_detected);
01315                 debug_estr += "\n";
01316                 logger.write(debug_estr);
01317 
01318                 debug_estr = "[DEBUG]: num_processing = ";
01319                 debug_estr += estring(num_processing);
01320                 debug_estr += "\n";
01321                 logger.write(debug_estr);
01322                 */
01323 
01324                 if (!overflow_detected) {
01325                         overflow_detected = vaulter.overflow(true);
01326                         /*
01327                         if (overflow_detected) {
01328                                 logger.write("[DEBUG]: Variable Change :: ");
01329                                 logger.write("overflow_detected = true");
01330                                 logger.write("\n");
01331                         }
01332                         */
01333                 }
01334 
01335                 // If the vault has exceeded it's highwater mark, wait for the
01336                 // currently-processing jobs to terminate, and then attempt to prepare the
01337                 // vault.
01338                 if (overflow_detected && (num_processing == 0)) {
01339                         TRY(vaulter.prepare(true),"Cannot complete archive");
01340                         overflow_detected = vaulter.overflow();
01341                         /*
01342                         if (!overflow_detected) {
01343                                 logger.write("[DEBUG]: Variable Change :: ");
01344                                 logger.write("overflow_detected = false");
01345                                 logger.write("\n");
01346                         }
01347                         */
01348                 }
01349 
01350                 // For each job in the list...
01351                 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji)
01352                 {
01353                         // While we're running less than rsync-parallel jobs, start new ones
01354                         if (
01355                                 !overflow_detected
01356                                 && (num_processing < config.rsync_parallel())
01357                                 && ((*ji)->status() == job_archiver::status_pending)
01358                                 )
01359                         {
01360                                 try {
01361                                         (*ji)->start();
01362                                 }
01363                                 catch(error e) {
01364                                         if (e.num() == 12) {
01365                                                 lstr = "Error starting job: Out of memory, will retry later\n";
01366                                                 (*ji)->clear();
01367                                         }
01368                                         else {
01369                                                 (*ji)->end();
01370                                                 lstr = "Error starting job, aborting\n";
01371                                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01372                                                 num_processing--;
01373                                                 reporter.jobs().add_report((*ji)->report());
01374                                         }
01375                                         logger.write(lstr);
01376                                 }
01377                                 catch(...) {
01378                                         (*ji)->end();
01379                                         lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
01380                                         lstr += "-- JOB TERMINATED\n";
01381                                         logger.write(lstr);
01382                                         // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01383                                         num_processing--;
01384                                         reporter.jobs().add_report((*ji)->report());
01385                                 }
01386                                 // logger.write("[DEBUG]: Variable Change :: num_processing++\n");
01387                                 num_processing++;
01388                         }
01389         
01390                         // Process jobs
01391                         if ((*ji)->status() == job_archiver::status_processing) {
01392                                 try {
01393                                         (*ji)->process();
01394                                 }
01395                                 catch(error e) {
01396                                         // TODO: Change 12 to ENOMEM?
01397                                         if (e.num() == 12) {
01398                                                 lstr  = "Error starting job: Out of memory, will retry later\n";
01399                                                 (*ji)->clear();
01400                                         }
01401                                         else {
01402                                                 (*ji)->end();
01403                                                 lstr = "Error starting job, aborting\n";
01404                                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01405                                                 num_processing--;
01406                                                 reporter.jobs().add_report((*ji)->report());
01407                                         }
01408                                         logger.write(lstr);
01409                                 }
01410                                 catch(...) {
01411                                         (*ji)->end();
01412                                         lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
01413                                         lstr += "-- JOB TERMINATED\n";
01414                                         logger.write(lstr);
01415                                         // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01416                                         num_processing--;
01417                                         reporter.jobs().add_report((*ji)->report());
01418                                 }
01419                         }
01420 
01421                         // Remove jobs that could not start from active duty
01422                         if ((*ji)->status() == job_archiver::status_reschedule) {
01423                                 (*ji)->clear();
01424                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01425                                 num_processing--;
01426                         }
01427 
01428                         // If a job exited with an error, and the vault is full, then reschedule
01429                         // the job for later
01430                         if (
01431                                         ((*ji)->status() == job_archiver::status_error)
01432                                 &&
01433                                         overflow_detected
01434                                 )
01435                         {
01436                                 lstr = "Vault overflow detected, will retry job later\n";
01437                                 logger.write(lstr);
01438                                 (*ji)->clear();
01439                                 num_processing--;
01440                         }
01441 
01442                         // Remove completed jobs from the processing list
01443                         if (
01444                                 ((*ji)->status() == job_archiver::status_completed)
01445                                 || ((*ji)->status() == job_archiver::status_fatal_error)
01446                                 || ((*ji)->status() == job_archiver::status_error)
01447                                 ) {
01448                                 (*ji)->end();
01449                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01450                                 num_processing--;
01451                                 num_completed++;
01452 
01453                                 // logger.write("Adding job report to report manager\n");
01454                                 reporter.jobs().add_report((*ji)->report());
01455                         }
01456                 }
01457 
01458                 mf_log_status();
01459                 sleep(1);
01460 
01461                 // logger.write("[DEBUG]: ---[ BOTTOM OF LOOP ]---\n");
01462         }
01463 
01464         t.stop();
01465         lstr = "Archive Manager - Finished archiving, duration: ";
01466         lstr += t.duration();
01467         lstr += "\n";
01468         logger.write(lstr);
01469 
01470         lstr = "Archive Manager - Finalizing archive path\n";
01471         logger.write(lstr);
01472         TRY(
01473                 rename_file(working_archive_path(), archive_path()),
01474                 "Cannot finalize archive"
01475                 );
01476 
01477         reporter.vault().add_report(
01478                 vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault()))
01479                 );
01480 }
01481 
01482 /** Return an absolute path to the finished archive directory */
01483 const std::string archive_manager::archive_path(void) const
01484 {
01485         estring path;
01486 
01487         if (!initialized())
01488                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01489 
01490         path = vaulter.vault();
01491         path += "/";
01492         path += config.timestamp().str();
01493 
01494         return(path);
01495 }
01496 
01497 /** Return the absolute path to the unfinished working archive directory */
01498 const std::string archive_manager::working_archive_path(void) const
01499 {
01500         estring path;
01501 
01502         if (!initialized())
01503                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01504         
01505         path = archive_path();
01506         path += ".incomplete";
01507 
01508         return(path);
01509 }
01510 
01511 //-----------------------------------------------------------------------------
01512 
01513 /** The global archive manager */
01514 archive_manager archiver;
01515 

Generated on Thu Jun 5 11:12:56 2008 for rvm by  doxygen 1.5.1