rvm 1.08

archiver.cc

Go to the documentation of this file.
00001 #include "config.h"
00002 
00003 #ifdef HAVE_UNISTD_H
00004 #include <unistd.h>
00005 #endif
00006 
00007 #include <ctype.h>
00008 
00009 #include <iostream>
00010 #include <string>
00011 #include <vector>
00012 
00013 #include "asserts.h"
00014 #include "error.h"
00015 #include "estring.h"
00016 #include "fs.h"
00017 #include "rconfig.h"
00018 #include "timer.h"
00019 #include "logger.h"
00020 
00021 #include "archiver.h"
00022 
00023 //-----------------------------------------------------------------------------
00024 
00025 /** C'tor */
00026 rstat::rstat()
00027 {
00028         TRY_nomem(m_exit_str[0] = "Success");
00029         TRY_nomem(m_exit_str[1] = "Syntax or usage error");
00030         TRY_nomem(m_exit_str[2] = "Protocol incompatability error");
00031         TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories");
00032         TRY_nomem(m_exit_str[4] = "Requested action not supported");
00033         TRY_nomem(m_exit_str[10] = "Error in socket I/O");
00034         TRY_nomem(m_exit_str[11] = "Error in file I/O");
00035         TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream");
00036         TRY_nomem(m_exit_str[13] = "Errors with program diagnostics");
00037         TRY_nomem(m_exit_str[14] = "Error in IPC code");
00038         TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT");
00039         TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()");
00040         TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers");
00041         TRY_nomem(m_exit_str[23] = "Partial transfer");
00042         TRY_nomem(m_exit_str[24] = "Some files vanished before they could be transferred");
00043         TRY_nomem(m_exit_str[25] = "The --max-delete limit stopped deletions");
00044         TRY_nomem(m_exit_str[30] = "Timeout in data send/receive");
00045         TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255");
00046         TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal");
00047         TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run");
00048         TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found");
00049 
00050         TRY_nomem(m_signal_str[1] = "[HUP]: Hangup");
00051         TRY_nomem(m_signal_str[2] = "[INT]: Interrupt ");
00052         TRY_nomem(m_signal_str[3] = "[QUIT]: Quit");
00053         TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction");
00054         TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap");
00055         TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault");
00056         TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination");
00057         TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault");
00058         TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception");
00059         TRY_nomem(m_signal_str[10] = "[KILL]: Killed");
00060         TRY_nomem(m_signal_str[11] = "[BUS]: Bus error");
00061         TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault");
00062         TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call");
00063         TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers");
00064         TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm");
00065         TRY_nomem(m_signal_str[16] = "[TERM]: Software termination");
00066         TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1");
00067         TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2");
00068         TRY_nomem(m_signal_str[19] = "[CLD]: Child status change");
00069         TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart");
00070         TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change");
00071         TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition");
00072         TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O");
00073         TRY_nomem(m_signal_str[24] = "[STOP]: Stop");
00074         TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character");
00075         TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process");
00076         TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read");
00077         TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write");
00078         TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired");
00079         TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired");
00080         TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit");
00081         TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit");
00082         TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked");
00083         TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal");
00084         TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR");
00085         TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR");
00086         TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation");
00087         TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost");
00088         TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal");
00089         TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal");
00090 
00091         TRY_nomem(m_unknown_exit = "(Unknown exit code)");
00092         TRY_nomem(m_unknown_signal = "(Unknown signal)");
00093 }
00094 
00095 /** Get a verbose string for an exit code */
00096 const std::string& rstat::exit(const int a_int) const
00097 {
00098         if (m_exit_str.find(a_int) != m_exit_str.end()) {
00099                 return(m_exit_str.find(a_int)->second);
00100         }
00101         return(m_unknown_exit);
00102 }
00103 
00104 /** Get a verbose string for a signal number */
00105 const std::string& rstat::signal(const int a_int) const
00106 {
00107         if (m_signal_str.find(a_int) != m_signal_str.end()) {
00108                 return(m_signal_str.find(a_int)->second);
00109         }
00110         return(m_unknown_signal);
00111 }
00112 
00113 class rstat rsync_estat_str;
00114 
00115 //-----------------------------------------------------------------------------
00116 
00117 /** C'tor
00118 
00119         Set a job to be assiciated with this job archiver and initialize it's
00120         processing status to "pending".
00121 
00122  */
00123 job_archiver::job_archiver(const job * a_job)
00124 {
00125         clear();
00126         m_job = a_job;
00127         m_status = status_pending;
00128 }
00129 
00130 /** Generate a job prefix string
00131 
00132         Create a string to uniquely identify this job to be used in the log file to
00133         uniquely identify this job
00134 
00135  */
00136 const std::string job_archiver::prefix(void)
00137 {
00138         estring lstr;
00139 
00140         lstr = "[Job:";
00141         lstr += estring((void*)m_job);
00142         lstr += "] ";
00143 
00144         return(lstr);
00145 }
00146 
00147 /** Generate a job id string */
00148 const std::string job_archiver::id(void)
00149 {
00150         estring lstr;
00151 
00152         lstr = prefix();
00153         lstr += " ";
00154         lstr += m_job->generate_job_id();
00155 
00156         return(lstr);
00157 }
00158 
00159 /** Clear the job archiver and return it to it's initial state
00160         
00161         End any processes handling this job and return the job archiver to it's
00162         "pending" state.
00163 
00164  */
00165 void job_archiver::clear(void)
00166 {
00167         end();
00168         m_child_pid = m_exec.my_pid();
00169         m_status = status_pending;
00170         m_success = true;
00171         m_jr.clear();
00172         m_jpr.clear();
00173         m_error_msg.erase();
00174         m_rsync_timeout_flag = false;
00175 }
00176 
00177 /** End any processes handling this job
00178 
00179         If any child processes are handling this job, terminate them.  Erase any
00180         pending I/O for the now defunct child.  Set our processing status to "done".
00181 
00182 */
00183 void job_archiver::end(void)
00184 {
00185         estring lstr;
00186 
00187         m_timer.stop();
00188         m_io_timer.stop();
00189         if (m_exec.child_running()) {
00190                 lstr = prefix();
00191                 lstr += "Terminating child process!\n";
00192                 logger.write(lstr,0,configuration_manager::logging_manager,m_child_pid);
00193                 m_exec.kill_child();
00194         }
00195         m_exec.clear();
00196         m_io_out.erase();
00197         m_io_err.erase();
00198         m_status = status_done;
00199 }
00200 
00201 /** Return the processing status of this job archiver */
00202 const job_archiver::archiving_status job_archiver::status(void)
00203 {
00204         return(m_status);
00205 }
00206 
00207 /** Begin processing
00208 
00209         Attempt to fork a child process to handle this job.  If unsuccessful then
00210         retry again later.  The child then calls mf_do_chores() to handle the actual
00211         processing, while the parent updates the job archiver's status from
00212         "pending" to "processing" and begins a timer to measure the duration of the
00213         job process.
00214         
00215 */
00216 void job_archiver::start(void)
00217 {
00218         estring lstr;
00219         estring path;
00220         estring archive_dir;
00221 
00222         m_jr.id(m_job->generate_job_id());
00223 
00224         // Create directories before forking
00225         job::paths_type::const_iterator pi;
00226         for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
00227                 archive_dir = m_job->generate_archive_path(*pi);
00228                 path = archiver.working_archive_path();
00229                 path += "/";
00230                 path += archive_dir;
00231                 path = reform_path(path);
00232                 if (!exists(path)) {
00233                         lstr = prefix();
00234                         lstr += "Creating job archive path: ";
00235                         lstr += archive_dir;
00236                         lstr += "\n";
00237                         logger.write(lstr);
00238                         mk_dirhier(path);
00239                 }
00240         }
00241 
00242         try {
00243                 m_exec.fork();
00244         }
00245         catch(error e) {
00246                 lstr = prefix();
00247                 lstr += "Could not fork:\n";
00248                 logger.write(lstr);
00249 
00250                 lstr = e.str(prefix());
00251                 logger.write(lstr);
00252 
00253                 lstr = prefix();
00254                 lstr += "Will retry job later\n";
00255                 logger.write(lstr);
00256 
00257                 m_status = status_reschedule;
00258         }
00259         catch(...) {
00260                 error e = err_unknown;
00261 
00262                 lstr = prefix();
00263                 lstr += "Could not fork:\n";
00264                 logger.write(lstr);
00265 
00266                 lstr = e.str(prefix());
00267                 logger.write(lstr);
00268 
00269                 lstr = prefix();
00270                 lstr += "Will retry job later\n";
00271                 logger.write(lstr);
00272 
00273                 m_status = status_reschedule;
00274         }
00275 
00276         if (m_exec.is_child()) {
00277                 // wait_for_debugger = true;
00278 
00279                 // while (wait_for_debugger);
00280 
00281                 m_exec.reroute_stdio();
00282                 try {
00283                         mf_do_chores();
00284                 }
00285                 catch(error e) {
00286                         std::cerr << e;
00287                         m_success = false;
00288                 }
00289                 catch(...) {
00290                         std::cerr << err_unknown;
00291                         m_success = false;
00292                 }
00293                 if (m_success)
00294                         m_exec.exit(0);
00295                 else
00296                         m_exec.exit(1);
00297         }
00298 
00299         m_child_pid = m_exec.child_pid();
00300 
00301         lstr = prefix();
00302         lstr += "Spawning child process: PID ";
00303         lstr += estring(static_cast<unsigned long>(m_exec.child_pid()));
00304         lstr += "\n";
00305         logger.write(lstr);
00306 
00307         m_status = status_processing;
00308         m_timer.start();
00309         m_io_timer.start();
00310         m_rsync_timeout_flag = false;
00311 }
00312 
00313 /** Parent processor for a job
00314 
00315         Check for I/O from the child.  Check the child's status to see if it's still
00316         running, has exited with an exit code, or has exited from a signal.  If the
00317         child sis not exit normally (i.e. exit from a signal or exit with a non-zero
00318         exit code) then check the vault for overflow.  If the vault has exceeded
00319         it's overflow threshold then that could be the cause for the child's
00320         failure, in which case we reschedule the child to be processed again later.
00321 
00322         If the job is finished (whether successful or not), update the job
00323         archiver's status to "completed".
00324         
00325  */
00326 void job_archiver::process(void)
00327 {
00328         estring lstr;
00329         
00330         if (m_exec.child_running()) {
00331                 // Process child I/O
00332                 mf_process_child_io(false);
00333         }
00334         else {
00335                 // Process remaining child I/O
00336                 mf_process_child_io(true);
00337 
00338                 // If child exited with an error, check vault overflow.  If the vault is
00339                 // filling up, then reschedule the job for later retry.
00340                 lstr = prefix();
00341                 if (m_exec.child_signaled()) {
00342                         lstr += "Child exited from signal: ";
00343                         lstr += estring(m_exec.child_signal_no());
00344                         lstr += "\n";
00345                 }
00346                 else if (m_exec.child_exit_code() != 0) {
00347                         lstr += "Child exited abnormally with code: ";
00348                         lstr += estring(m_exec.child_exit_code());
00349                         lstr += "\n";
00350                 }
00351                 else {
00352                         lstr += "Child exited successfully\n";
00353                         m_status = status_completed;
00354                 }
00355                 logger.write(lstr);
00356 
00357                 if (m_exec.child_signaled() || !m_exec.child_exited_normally()) {
00358                         /*
00359                         if (vaulter.overflow()) {
00360                                 lstr = prefix();
00361                                 lstr += "Vault overflow detected, will retry later\n";
00362                                 logger.write(lstr);
00363                                 m_status = status_reschedule;
00364                         }
00365                         else {
00366                                 m_status = status_error;
00367                         }
00368                         */
00369                         m_status = status_error;
00370                 }
00371                 else {
00372                         m_status = status_completed;
00373                 }
00374 
00375                 m_timer.stop();
00376                 m_io_timer.stop();
00377                 lstr = prefix();
00378                 lstr += "Finished, duration: ";
00379                 lstr += m_timer.duration();
00380                 lstr += "\n";
00381                 logger.write(lstr);
00382                 // m_status = status_completed;
00383         }
00384 }
00385 
00386 /** Return the job report for this job */
00387 single_job_report job_archiver::report(void) const
00388 {
00389         return(m_jr);
00390 }
00391 
00392 /** Child processor for a job
00393 
00394         For each path in this job:
00395         - Create the directory heiararchy for this job in the archive
00396         - Until done or until out of retrys:
00397                 - Choose a hardlink source, if applicable and available
00398                 - Construct the command line to pass to rsync
00399                 - Spawn rsync
00400                 - Process I/O sent back from rsync
00401                 - Process exit code or signal number returned from rsync
00402         - Generate and submit a report to the report manager
00403         
00404  */
00405 void job_archiver::mf_do_chores(void)
00406 {
00407         /*
00408         {
00409                 bool wait_for_debugger = true;
00410 
00411                 std::cerr << "Waiting for debugger to attach..." << std::endl;
00412                 while (wait_for_debugger);
00413                 std::cerr << "Debugger attached." << std::endl;
00414         }
00415         */
00416 
00417         job::paths_type::const_iterator pi;
00418 
00419         for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
00420                 estring archive_dir;
00421                 estring path;
00422                 // estring command_line;
00423                 std::string binary;
00424                 std::vector<std::string> argv;
00425                 estring opt;
00426                 bool hardlink = false;
00427                 bool multi_hardlink = false;
00428                 int num_retries = 0;
00429                 bool done = false;
00430                 int exit_code = 0;
00431                 int signal_num = 0;
00432                 timer t;
00433                 uint64 files_total = 0;
00434                 uint64 files_xferd = 0;
00435                 uint64 size_total = 0;
00436                 uint64 size_xferd = 0;
00437                 bool overflow_detected = 0;
00438                 estring error_msg;
00439 
00440                 archive_dir = m_job->generate_archive_path(*pi);
00441                 // If archive_dir does not end with a '/', strip off characters until it
00442                 // does.
00443                 /*
00444                 while ((archive_dir.size() > 0) 
00445                         && (archive_dir[archive_dir.size()-1] != '/'))
00446                 {
00447                         archive_dir.erase(archive_dir.size()-1);
00448                 }
00449                 */
00450 
00451                 path = archiver.working_archive_path();
00452                 path += "/";
00453                 path += archive_dir;
00454                 path = reform_path(path);
00455                 /*
00456                 if (!exists(path)) {
00457                         std::cout << "Creating job archive path: " << archive_dir << std::endl;
00458                         mk_dirhier(path);
00459                 }
00460                 else
00461                         std::cout << "Archiving to existing path: " << archive_dir << std::endl;
00462                 */
00463 
00464                 if (!exists(path)) {
00465                         std::string es;
00466 
00467                         TRY_nomem(es = "No such directory: \"");
00468                         TRY_nomem(es += archive_dir);
00469                         TRY_nomem(es += "\"");
00470 
00471                         throw(ERROR(EEXIST,es));
00472                 }
00473 
00474                 if (!writable(path)) {
00475                         std::string es;
00476 
00477                         TRY_nomem(es = "Cannot write to archive directory: \"");
00478                         TRY_nomem(es += archive_dir);
00479                         TRY_nomem(es += "\"");
00480 
00481                         throw(ERROR(EACCES,es));
00482                 }
00483 
00484                 logger.set_error_logging(false);
00485                 hardlink = m_job->rsync_hardlink;
00486                 multi_hardlink = m_job->rsync_multi_hardlink;
00487                 while ((num_retries <= m_job->rsync_retry_count) && !done) {
00488                         execute exec;
00489                         job::excludes_type::const_iterator ei;
00490                         job::includes_type::const_iterator ii;
00491                         std::vector<std::string>::const_iterator oi;
00492 
00493                         exit_code = 0;
00494                         signal_num = 0;
00495 
00496                         /*
00497                         command_line = config.rsync_local_path();
00498                         command_line += " ";
00499                         command_line += m_job->rsync_options;
00500                         */
00501                         /**/
00502                         binary = config.rsync_local_path();
00503                         argv = m_job->generate_rsync_options_vector();
00504                         /**/
00505 
00506                         if (m_job->rsync_connection != job::connection_local) {
00507                                 /*
00508                                 command_line += " ";
00509                                 command_line += " --rsync-path=";
00510                                 if (m_job->rsync_remote_path.size()) {
00511                                         command_line += m_job->rsync_remote_path;
00512                                 }
00513                                 else {
00514                                         command_line += config.rsync_local_path();
00515                                 }
00516                                 */
00517                                 /**/
00518                                 opt = "--rsync-path=";
00519                                 if (m_job->rsync_remote_path.size() != 0)
00520                                         opt += m_job->rsync_remote_path;
00521                                 else
00522                                         opt += config.rsync_local_path();
00523                                 argv.push_back(opt);
00524                                 /**/
00525                         }
00526 
00527                         if (hardlink) {
00528                                 subdirectory subdir;
00529                                 std::string youngest;
00530                                 std::string relative_path;
00531                                 bool hardlink_opt = false;
00532                                 bool first_hardlink = false;
00533                                 int linkdest_count = 0;
00534 
00535                                 subdir.path(vaulter.vault());
00536                                 if (subdir.size() > 0) {
00537                                         subdirectory::const_iterator si;
00538 
00539                                         sort(subdir.begin(), subdir.end());
00540                                         reverse(subdir.begin(), subdir.end());
00541                                         for (si = subdir.begin(); si != subdir.end(); ++si) {
00542                                                 estring ypath;
00543 
00544                                                 if (first_hardlink && !multi_hardlink) {
00545                                                         continue;
00546                                                 }
00547                                                 if (linkdest_count >= m_job->rsync_multi_hardlink_max) {
00548                                                         continue;
00549                                                 }
00550                                                 if (!is_timestamp(*si))
00551                                                         continue;
00552                                                 if (*si == config.timestamp().str())
00553                                                         continue;
00554                                                 std::cout 
00555                                                         << "Considering potential hardlink source: "
00556                                                         << *si
00557                                                         << std::endl;
00558                                                 ypath = vaulter.vault();
00559                                                 ypath += "/";
00560                                                 ypath += *si;
00561                                                 ypath += "/";
00562                                                 ypath += archive_dir;
00563                                                 if (exists(ypath)) {
00564                                                         std::cout 
00565                                                                 << "Using archive as hardlink source: " 
00566                                                                 << *si
00567                                                                 << std::endl;
00568                                                         // youngest = ypath;
00569                                                         if (!hardlink_opt) {
00570                                                                 /**/
00571                                                                 argv.push_back(std::string("--hard-links"));
00572                                                                 /**/
00573                                                                 /*
00574                                                                 command_line += " --hard-links";
00575                                                                 */
00576                                                                 hardlink_opt = true;
00577                                                         }
00578 
00579                                                         relative_path = mk_relative_path(ypath,path);
00580                                                         /*
00581                                                         command_line += " --link-dest=";
00582                                                         command_line += relative_path;
00583                                                         */
00584                                                         /**/
00585                                                         opt="--link-dest=";
00586                                                         opt += relative_path;
00587                                                         argv.push_back(opt);
00588                                                         /**/
00589                                                         first_hardlink = true;
00590                                                         linkdest_count++; // At most 20 link-dest options can be used w/ rsync
00591                                                 }
00592                                                 else {
00593                                                         std::cout
00594                                                                 << "- No such path: " 
00595                                                                 << ypath
00596                                                                 << std::endl;
00597                                                 }
00598                                         }
00599                                 }
00600                                 /*
00601                                 if (youngest.size() > 0) {
00602                                         relative_path = mk_relative_path(youngest,path);
00603                                         command_line += " --hard-links --link-dest=";
00604                                         command_line += relative_path;
00605                                 }
00606                                 */
00607                         }
00608 
00609                         for (
00610                                 ei = m_job->excludes.begin();
00611                                 ei != m_job->excludes.end();
00612                                 ++ei
00613                                 )
00614                         {
00615                                 /*
00616                                 command_line += " --exclude-from=";
00617                                 command_line += *ei;
00618                                 */
00619                                 /**/
00620                                 opt = "--exclude-from=";
00621                                 opt += *ei;
00622                                 argv.push_back(opt);
00623                                 /**/
00624                         }
00625 
00626                         for (
00627                                 ii = m_job->includes.begin();
00628                                 ii != m_job->includes.end();
00629                                 ++ii
00630                                 )
00631                         {
00632                                 /*
00633                                 command_line += " --include-from=";
00634                                 command_line += *ii;
00635                                 */
00636                                 /**/
00637                                 opt = "--include-from=";
00638                                 opt += *ei;
00639                                 argv.push_back(opt);
00640                                 /**/
00641                         }
00642 
00643                         /*
00644                         command_line += " ";
00645                         command_line += m_job->generate_source_path(*pi);
00646                         */
00647                         /**/
00648                         argv.push_back(m_job->generate_source_path(*pi));
00649                         /**/
00650                         
00651                         /*
00652                         command_line += " ";
00653                         command_line += path;
00654                         */
00655                         /**/
00656                         argv.push_back(path);
00657                         /**/
00658         
00659                         /*
00660                         std::cout << "Command Line: " << command_line << std::endl;
00661                         */
00662                         /**/
00663                         std::cout << "Command Line:" << std::endl;
00664                         {
00665                                 uint16 c;
00666                                 std::cout << "  Binary: " << binary << std::endl;
00667                                 for (c = 0; c < argv.size(); c++) {
00668                                         std::cout << "    Argv[" << c << "] = " << argv[c] << std::endl;
00669                                 }
00670                         }
00671                         /**/
00672 
00673                         m_error_msg.erase();
00674 
00675                         t.start();
00676                         /*
00677                         exec.exec(command_line);
00678                         */
00679                         /**/
00680                         m_io_timer.start();
00681                         m_rsync_timeout_flag = false;
00682                         exec.exec(binary, argv);
00683                         /**/
00684                         mf_process_rsync_io(
00685                                 exec, 
00686                                 m_job->rsync_timeout,
00687                                 files_total,
00688                                 files_xferd,
00689                                 size_total,
00690                                 size_xferd,
00691                                 overflow_detected
00692                                 );
00693                         t.stop();
00694 
00695                         signal_num = 0;
00696                         if (exec.child_signaled()) {
00697                                 std::cout 
00698                                         << "Rsync caught signal: [" 
00699                                         << exec.child_signal_no()
00700                                         << "] "
00701                                         << rsync_estat_str.signal(exec.child_signal_no())
00702                                         << std::endl;
00703                                 signal_num = exec.child_signal_no();
00704                         }
00705                         std::cout
00706                                 << "Rsync exit code: ["
00707                                 << exec.child_exit_code()
00708                                 << "] "
00709                                 << rsync_estat_str.exit(exec.child_exit_code())
00710                                 << std::endl;
00711 
00712                         exit_code = exec.child_exit_code();
00713                         if (!m_rsync_timeout_flag && exec.child_exited_normally() && (exit_code == 0)) {
00714                                 done = true;
00715                         }
00716                         else if (!m_rsync_timeout_flag && (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)) {
00717                                 std::cout << "Ignoring rsync failure" << std::endl;
00718                                 done = true;
00719                         }
00720                         else if (overflow_detected) {
00721                                 std::cout 
00722                                         << "Vault overflow detected"
00723                                         << std::endl;
00724                                 break;
00725                         }
00726                         else {
00727                                 ++num_retries;
00728                                 logger.set_error_logging(true);
00729                         }
00730                         if (m_rsync_timeout_flag) {
00731                                 TRY_nomem(m_error_msg = "Rsync inactivity timeout");
00732                         }
00733 
00734                         if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail)
00735                         {
00736                                 std::cout << "Failing, will not attempt to retry" << std::endl;
00737                                 break;
00738                         }
00739                         if (m_job->rsync_behavior[exit_code] 
00740                                 == rsync_behavior::retry_without_hardlinks)
00741                         {
00742                                 std::cout << "Retrying without hardlinks..." << std::endl;
00743                                 hardlink = false;
00744                         }
00745 
00746                         if ((!done) && (m_job->rsync_retry_delay > 0)) {
00747                                 std::cout << "Retries left: " << (m_job->rsync_retry_count - num_retries + 1) << std::endl;
00748                                 if (num_retries <= m_job->rsync_retry_count) {
00749                                         std::cout << "Waiting " << m_job->rsync_retry_delay 
00750                                                 << " minutes before retrying... " << std::endl;
00751                                         sleep( (m_job->rsync_retry_delay) * 60);
00752                                 }
00753                         }
00754                 }
00755                 if (!done) {
00756                         if (num_retries >= m_job->rsync_retry_count) {
00757                                 std::cout << "Retry count exceeded" << std::endl;
00758                                 m_success = false;
00759                         }
00760                         else {
00761                                 std::cout << "Giving up on this path" << std::endl;
00762                                 m_success = false;
00763                         }
00764                 }
00765                 reportio().write_report(
00766                         m_job->generate_source_path(*pi),
00767                         t,
00768                         exit_code,
00769                         signal_num,
00770                         m_job->rsync_behavior[exit_code],
00771                         m_error_msg
00772                         );
00773         }
00774 }
00775 
00776 void job_archiver::mf_process_report(const std::string& a_str)
00777 {
00778         if (reportio().is_report(a_str)) {
00779                 m_jpr = reportio().parse(a_str);
00780                 m_jr.add_report(m_jpr);
00781         }
00782 }
00783 
00784 /** Process I/O from the child
00785 
00786         While there is I/O to be read, read and parse it.  When the end of a line is
00787         reached write that line to the log file.  If a_finalize is true, the flush
00788         the child I/O buffer string.
00789         
00790  */
00791 void job_archiver::mf_process_child_io(bool a_finalize)
00792 {
00793         estring lstr;
00794         bool io_flag = false;
00795 
00796         while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) {
00797                 int r;
00798                 const int buffer_size = 128;
00799                 char buffer[buffer_size] = { 0 };
00800 
00801                 r = m_exec.out_read(buffer, buffer_size);
00802                 if (r > 0) {
00803                         int c;
00804 
00805                         io_flag = true;
00806                         for (c = 0; c < r; ++c) {
00807                                 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00808                                         lstr = prefix();
00809                                         lstr += m_io_out;
00810                                         lstr += "\n";
00811                                         logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00812                                         mf_process_report(lstr);
00813                                         m_io_out.erase();
00814                                 }
00815                                 else {
00816                                         m_io_out += buffer[c];
00817                                 }
00818                         }
00819                 }
00820         }
00821         if (a_finalize && (m_io_out.size() > 0)) {
00822                 lstr = prefix();
00823                 lstr += m_io_out;
00824                 lstr += "\n";
00825                 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00826                 mf_process_report(lstr);
00827                 m_io_out.erase();
00828         }
00829 
00830         while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) {
00831                 int r;
00832                 const int buffer_size = 128;
00833                 char buffer[buffer_size] = { 0 };
00834 
00835                 r = m_exec.err_read(buffer, buffer_size);
00836                 if (r > 0) {
00837                         int c;
00838 
00839                         io_flag = true;
00840                         for (c = 0; c < r; ++c) {
00841                                 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00842                                         lstr = prefix();
00843                                         lstr += m_io_err;
00844                                         lstr += "\n";
00845                                         logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00846                                         mf_process_report(lstr);
00847                                         m_io_err.erase();
00848                                 }
00849                                 else {
00850                                         m_io_err += buffer[c];
00851                                 }
00852                         }
00853                 }
00854         }
00855         if (a_finalize && (m_io_err.size() > 0)) {
00856                 lstr = prefix();
00857                 lstr += m_io_err;
00858                 lstr += "\n";
00859                 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00860                 mf_process_report(lstr);
00861                 m_io_err.erase();
00862         }
00863         if (!io_flag)
00864                 sleep(config.io_poll_interval());
00865 }
00866 
00867 /** Trim off all non-digit leading and trailing characters from a string */
00868 void job_archiver::mf_trim_string(std::string& a_str)
00869 {
00870         while ((a_str.size() > 0) && (!isdigit(a_str[0])))
00871                 a_str.erase(0,1);
00872         while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
00873                 a_str.erase(a_str.size()-1,1);
00874 }
00875 
00876 /** Parse I/O from rsync
00877 
00878         Search for special output from rsync to tell us something about the number
00879         and size of files and files transfered.
00880 
00881  */
00882 void job_archiver::mf_parse_rsync_io(
00883         const std::string a_str,
00884         uint64& a_files_total,
00885         uint64& a_files_xferd,
00886         uint64& a_size_total,
00887         uint64& a_size_xferd
00888         )
00889 {
00890         estring estr;
00891 
00892         if (a_str.find("Number of files: ") == 0) {
00893                 estr = a_str;
00894                 mf_trim_string(estr);
00895                 try {
00896                         a_files_total = estr;
00897                 }
00898                 catch(error e) {
00899                         estring es;
00900 
00901                         es = "Could not parse total number of files processed by rsync";
00902                         e.push_back(ERROR_INSTANCE(es));
00903 
00904                         // Not sure this is the best way to handle this...
00905                         std::cerr << e;
00906                 }
00907                 catch(...) {
00908                         error e = err_unknown;
00909                         estring es;
00910 
00911                         es = "Could not parse total number of files processed by rsync";
00912                         e.push_back(ERROR_INSTANCE(es));
00913 
00914                         // Not sure this is the best way to handle this...
00915                         std::cerr << e;
00916                 }
00917         }
00918         else if (a_str.find("Number of files transferred: ") == 0) {
00919                 estr = a_str;
00920                 mf_trim_string(estr);
00921                 try {
00922                         a_files_xferd = estr;
00923                 }
00924                 catch(error e) {
00925                         estring es;
00926 
00927                         es = "Could not parse total number of files transferred by rsync";
00928                         e.push_back(ERROR_INSTANCE(es));
00929 
00930                         // Not sure this is the best way to handle this...
00931                         std::cerr << e;
00932                 }
00933                 catch(...) {
00934                         error e = err_unknown;
00935                         estring es;
00936 
00937                         es = "Could not parse total number of files transferred by rsync";
00938                         e.push_back(ERROR_INSTANCE(es));
00939 
00940                         // Not sure this is the best way to handle this...
00941                         std::cerr << e;
00942                 }
00943         }
00944         else if (a_str.find("Total file size: ") == 0) {
00945                 estr = a_str;
00946                 mf_trim_string(estr);
00947                 try {
00948                         a_size_total = estr;
00949                 }
00950                 catch(error e) {
00951                         estring es;
00952 
00953                         es = "Could not parse total size of files processed by rsync";
00954                         e.push_back(ERROR_INSTANCE(es));
00955 
00956                         // Not sure this is the best way to handle this...
00957                         std::cerr << e;
00958                 }
00959                 catch(...) {
00960                         error e = err_unknown;
00961                         estring es;
00962 
00963                         es = "Could not parse total size of files processed by rsync";
00964                         e.push_back(ERROR_INSTANCE(es));
00965 
00966                         // Not sure this is the best way to handle this...
00967                         std::cerr << e;
00968                 }
00969         }
00970         else if (a_str.find("Total transferred file size: ") == 0) {
00971                 estr = a_str;
00972                 mf_trim_string(estr);
00973                 try {
00974                         a_size_xferd = estr;
00975                 }
00976                 catch(error e) {
00977                         estring es;
00978 
00979                         es = "Could not parse total size of files transferred by rsync";
00980                         e.push_back(ERROR_INSTANCE(es));
00981 
00982                         // Not sure this is the best way to handle this...
00983                         std::cerr << e;
00984                 }
00985                 catch(...) {
00986                         error e = err_unknown;
00987                         estring es;
00988 
00989                         es = "Could not parse total size of files transferred by rsync";
00990                         e.push_back(ERROR_INSTANCE(es));
00991 
00992                         // Not sure this is the best way to handle this...
00993                         std::cerr << e;
00994                 }
00995         }
00996 }
00997 
00998 /** Process I/O from rsync
00999 
01000         If there is I/O from rsync to be read, read it and then send it through the
01001         parser.
01002 
01003  */
01004 void job_archiver::mf_process_rsync_io(
01005         execute& a_exec, 
01006         uint16 a_timeout,
01007         uint64& a_files_total,
01008         uint64& a_files_xferd,
01009         uint64& a_size_total,
01010         uint64& a_size_xferd,
01011         bool& a_overflow_detected
01012         )
01013 {
01014         size_t ro;
01015         size_t re;
01016         estring out;
01017         estring err;
01018         bool io_flag;
01019         char buffer[1024] = { 0 };
01020         char *ptr;
01021 
01022         ro = 1;
01023         re = 1;
01024         while ((ro != 0) || (re != 0) || a_exec.child_running()) {
01025                 io_flag = false;
01026                 ro = 0;
01027                 re = 0;
01028 
01029                 if (!a_overflow_detected) {
01030                         a_overflow_detected = vaulter.overflow();
01031                 }
01032 
01033                 m_error_msg.erase();
01034 
01035                 if (a_exec.out_ready()) {
01036                         ro = read(a_exec.out_fd(), buffer, 1024);
01037                         if (ro > 0) {
01038                                 io_flag = true;
01039                                 for (ptr = buffer; ptr != buffer+ro; ++ptr) {
01040                                         if ((*ptr != '\r') && (*ptr != '\n')) {
01041                                                 out += *ptr;
01042                                         }
01043                                         else {
01044                                                 reportio().write_rsync_out(out);
01045                                                 out.erase();
01046                                         }
01047                                 }
01048                         }
01049                 }
01050 
01051                 if (a_exec.err_ready()) {
01052                         re = read(a_exec.err_fd(), buffer, 1024);
01053                         if (re > 0) {
01054                                 io_flag = true;
01055                                 for (ptr = buffer; ptr != buffer+re; ++ptr) {
01056                                         if ((*ptr != '\r') && (*ptr != '\n')) {
01057                                                 err += *ptr;
01058                                         }
01059                                         else {
01060                                                 reportio().write_rsync_err(err);
01061                                                 err.erase();
01062                                         }
01063                                 }
01064                         }
01065                 }
01066 
01067                 m_io_timer.stop();
01068 // std::cerr << "[DEBUG]: --------------------------" << std::endl;
01069 // std::cerr << "[DEBUG]: m_io_timer.start_value() = " << m_io_timer.start_value() << std::endl;
01070 // std::cerr << "[DEBUG]:  m_io_timer.stop_value() = " << m_io_timer.stop_value() << std::endl;
01071 // std::cerr << "[DEBUG]:                  time(0) = " << time(0) << std::endl;
01072 // std::cerr << "[DEBUG]:    m_io_timer.duration() = " << m_io_timer.duration_secs() << std::endl;
01073 // std::cerr << "[DEBUG]:               difference = " << (time(0) - m_io_timer.start_value()) << std::endl;
01074 // std::cerr << "[DEBUG]:                  timeout = " << a_timeout << std::endl;
01075 // std::cerr << "[DEBUG]: io_flag = " << io_flag << std::endl;
01076 // std::cerr << "[DEBUG]: m_rsync_timeout_flag = " << m_rsync_timeout_flag << std::endl;
01077                 if (io_flag) {
01078                         m_io_timer.stop();
01079                         m_io_timer.start();
01080                         m_rsync_timeout_flag = false;
01081                 }
01082                 if (!m_rsync_timeout_flag && (m_io_timer.duration_secs() > a_timeout)) {
01083                         std::cerr << "*** Rsync program inactivity timeout" << std::endl;
01084                         a_exec.kill_child();
01085                         m_rsync_timeout_flag = true;
01086                 }
01087 
01088                 if (!io_flag)
01089                         sleep(config.io_poll_interval());
01090 
01091         }
01092         if (out.size() > 0) {
01093                 std::cerr << out << std::endl;
01094                 mf_parse_rsync_io(
01095                         out, 
01096                         a_files_total, 
01097                         a_files_xferd, 
01098                         a_size_total,
01099                         a_size_xferd
01100                 );
01101                 out.erase();
01102         }
01103         if (err.size() > 0) {
01104                 std::cerr << err << std::endl;
01105                 mf_parse_rsync_io(
01106                         out, 
01107                         a_files_total, 
01108                         a_files_xferd, 
01109                         a_size_total,
01110                         a_size_xferd
01111                 );
01112                 err.erase();
01113         }
01114 }
01115 
01116 //-----------------------------------------------------------------------------
01117 
01118 /** C'tor */
01119 archive_manager::archive_manager()
01120 {
01121         if (this != &archiver)
01122                 throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers"));
01123 
01124         clear();
01125 }
01126 
01127 /** Clear the archive manager and clear the job list */
01128 void archive_manager::clear(void)
01129 {
01130         m_jobs.clear();
01131         m_initialized = false;
01132 }
01133 
01134 /** Initialize the archive manager
01135 
01136         Log the archive timestamp, select and prepare a vault.
01137 
01138  */
01139 void archive_manager::init(void)
01140 {
01141         timer t;
01142         estring lstr;
01143 
01144         lstr = "Archive Manager - Beginning initialization\n";
01145         logger.write(lstr);
01146         t.start();
01147 
01148         lstr = "Timestamp: ";
01149         lstr += config.timestamp().str();
01150         lstr += "\n";
01151         logger.write(lstr);
01152 
01153         // Select a vault?
01154         vaulter.select();
01155         lstr = "Vault selected: ";
01156         lstr += vaulter.vault();
01157         lstr += "\n";
01158         logger.write(lstr);
01159 
01160         reporter.vault().add_report(
01161                 vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault()))
01162                 );
01163 
01164         // Prepare the vault?
01165         vaulter.prepare();
01166 
01167         t.stop();
01168         lstr = "Archive Manager - Finished initialization";
01169         lstr += ", duration: ";
01170         lstr += t.duration();
01171         lstr += "\n";
01172         logger.write(lstr);
01173 
01174         m_initialized = true;
01175 }
01176 
01177 /** Return the initialized status of the archive manager */
01178 const bool archive_manager::initialized(void) const
01179 {
01180         return(m_initialized);
01181 }
01182 
01183 /** Give a status report
01184 
01185         After so many minutes of inactivity write a report to the log file of our
01186         current status of affairs.
01187 
01188  */
01189 void archive_manager::mf_log_status(void)
01190 {
01191         static timer t;
01192         const timer::value_type timeout = 30;
01193         estring lstr;
01194         std::vector<job_archiver*>::const_iterator ji;
01195         uint64 jobs_pending = 0;
01196         uint64 jobs_processing = 0;
01197         uint64 jobs_completed =0;
01198         uint64 jobs_remaining = 0;
01199 
01200         if (!t.is_started())
01201                 t.start();
01202         
01203         t.stop();
01204         if (t.duration_mins() < timeout)
01205                 return;
01206         
01207         lstr  = "\n";
01208         lstr += "STATUS REPORT:\n";
01209         lstr += "================================================================\n";
01210         logger.write(lstr);
01211         for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) {
01212                 lstr = "[";
01213                 switch ((*ji)->status()) {
01214                         case job_archiver::status_pending:
01215                                 lstr += "Pending    ";
01216                                 ++jobs_pending;
01217                                 break;
01218                         case job_archiver::status_processing:
01219                                 lstr += "Processing ";
01220                                 ++jobs_processing;
01221                                 break;
01222                         case job_archiver::status_reschedule:
01223                                 lstr += "Reschedule ";
01224                                 ++jobs_pending;
01225                                 break;
01226                         case job_archiver::status_fatal_error:
01227                                 lstr += "Fatal Error";
01228                                 ++jobs_completed;
01229                                 break;
01230                         case job_archiver::status_error:
01231                                 lstr += "Error      ";
01232                                 ++jobs_completed;
01233                                 break;
01234                         case job_archiver::status_completed:
01235                                 lstr += "Completed  ";
01236                                 ++jobs_completed;
01237                                 break;
01238                         case job_archiver::status_done:
01239                                 lstr += "Done       ";
01240                                 ++jobs_completed;
01241                                 break;
01242                         default:
01243                                 lstr += "<unknown>  ";
01244                                 break;
01245                 }
01246                 lstr += "]: ";
01247                 lstr += (*ji)->id();
01248                 lstr += "\n";
01249                 logger.write(lstr);
01250         }
01251 
01252         lstr  = "---------------------------------------------------------------\n";
01253         lstr += "     Jobs Pending: ";
01254         lstr += estring(jobs_pending);
01255         lstr += "\n";
01256 
01257         lstr += "  Jobs Processing: ";
01258         lstr += estring(jobs_processing);
01259         lstr += "\n";
01260 
01261         lstr += "   Jobs Completed: ";
01262         lstr += estring(jobs_completed);
01263         lstr += "\n";
01264 
01265         lstr += "            Total: ";
01266         lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
01267         lstr += "\n";
01268 
01269         lstr += "Overflow Detected: ";
01270         if (vaulter.overflow()) {
01271                 lstr += "TRUE";
01272         }
01273         else {
01274                 lstr += "false";
01275         }
01276         lstr += "\n";
01277 
01278         logger.write(lstr);
01279         logger.write("\n");
01280         t.start();
01281 }
01282 
01283 /** Archive jobs
01284 
01285         Create an archive directory.  Generate a to-do list of job archiver objects.
01286         Process the job archiver objects:
01287         - While there are less than rsync-parallel job archivers processing, start
01288                 the first available job archiver.
01289         - Check the status of each job and process I/O from jobs underway.
01290         - Remove jobs that failed to start.
01291         - Possibly reschedule failed jobs.
01292         - Remove completed jobs from active processing.
01293         - Call mf_log_status().
01294 
01295  */
01296 void archive_manager::archive(void)
01297 {
01298         timer t;
01299         estring lstr;
01300         configuration_manager::jobs_type::const_iterator cji;
01301         int num_processing = 0;
01302         std::vector<job_archiver*>::iterator ji;
01303         uint64 num_completed = 0;
01304         bool overflow_detected = false;
01305         estring debug_estr;
01306 
01307         if (!initialized())
01308                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01309 
01310         lstr = "Archive Manager - Begin archiving\n";
01311         logger.write(lstr);
01312         t.start();
01313 
01314         // Create archive directory
01315         try {
01316                 if (exists(archive_path())) {
01317                         lstr = "Found existing archive directory: ";
01318                         lstr += archive_path();
01319                         lstr += "\n";
01320                         logger.write(lstr);
01321 
01322                         if (!writable(archive_path())) {
01323                                 std::string es;
01324 
01325                                 TRY_nomem(es = "Cannot write to archive directory: \"");
01326                                 TRY_nomem(es += archive_path());
01327                                 TRY_nomem(es += "\"");
01328 
01329                                 throw(ERROR(EACCES,es));
01330                         }
01331 
01332                         rename_file(archive_path(), working_archive_path());
01333                 }
01334                 else if (exists(working_archive_path())) {
01335                         lstr = "Found existing archive directory: ";
01336                         lstr += working_archive_path();
01337                         lstr += "\n";
01338                         logger.write(lstr);
01339 
01340                         if (!writable(working_archive_path())) {
01341                                 std::string es;
01342 
01343                                 TRY_nomem(es = "Cannot write to working archive directory: \"");
01344                                 TRY_nomem(es += working_archive_path());
01345                                 TRY_nomem(es += "\"");
01346 
01347                                 throw(ERROR(EACCES,es));
01348                         }
01349                 }
01350                 else {
01351                         lstr = "Creating archive directory: ";
01352                         lstr += working_archive_path();
01353                         lstr += "\n";
01354                         logger.write(lstr);
01355                         mk_dir(working_archive_path());
01356                 }
01357         }
01358         catch(error e) {
01359                 logger.write("An error has occured: ");
01360                 logger.write(e[0].what());
01361                 logger.write("\n");
01362                 throw(e);
01363         }
01364         catch(...) {
01365                 error e = err_unknown;
01366 
01367                 logger.write("An error has occured: ");
01368                 logger.write(e[0].what());
01369                 logger.write("\n");
01370                 throw(e);
01371         }
01372 
01373         // Create a todo list
01374         logger.write("Creating to-do list\n");
01375         for (
01376                 cji = config.jobs().begin();
01377                 cji != config.jobs().end();
01378                 ++cji
01379                 )
01380         {
01381                 job_archiver* ptr;
01382 
01383                 ptr = new job_archiver(&(*cji));
01384                 if (ptr == 0)
01385                         throw(err_nomem);
01386                 TRY_nomem(m_jobs.push_back(ptr));
01387         }
01388 
01389         // Backup clients
01390         logger.write("Processing jobs...\n");
01391         while (m_jobs.size() > num_completed) {
01392 
01393                 /*
01394                 logger.write("[DEBUG]: ---[ TOP OF LOOP ]---\n");
01395 
01396                 debug_estr = "[DEBUG]: overflow_detected = ";
01397                 debug_estr += estring(overflow_detected);
01398                 debug_estr += "\n";
01399                 logger.write(debug_estr);
01400 
01401                 debug_estr = "[DEBUG]: num_processing = ";
01402                 debug_estr += estring(num_processing);
01403                 debug_estr += "\n";
01404                 logger.write(debug_estr);
01405                 */
01406 
01407                 if (!overflow_detected) {
01408                         overflow_detected = vaulter.overflow(true);
01409                         /*
01410                         if (overflow_detected) {
01411                                 logger.write("[DEBUG]: Variable Change :: ");
01412                                 logger.write("overflow_detected = true");
01413                                 logger.write("\n");
01414                         }
01415                         */
01416                 }
01417 
01418                 // If the vault has exceeded it's highwater mark, wait for the
01419                 // currently-processing jobs to terminate, and then attempt to prepare the
01420                 // vault.
01421                 if (overflow_detected && (num_processing == 0)) {
01422                         TRY(vaulter.prepare(true),"Cannot complete archive");
01423                         overflow_detected = vaulter.overflow();
01424                         /*
01425                         if (!overflow_detected) {
01426                                 logger.write("[DEBUG]: Variable Change :: ");
01427                                 logger.write("overflow_detected = false");
01428                                 logger.write("\n");
01429                         }
01430                         */
01431                 }
01432 
01433                 // For each job in the list...
01434                 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji)
01435                 {
01436                         // While we're running less than rsync-parallel jobs, start new ones
01437                         if (
01438                                 !overflow_detected
01439                                 && (num_processing < config.rsync_parallel())
01440                                 && ((*ji)->status() == job_archiver::status_pending)
01441                                 )
01442                         {
01443                                 try {
01444                                         (*ji)->start();
01445                                 }
01446                                 catch(error e) {
01447                                         if (e.num() == 12) {
01448                                                 lstr = "Error starting job: Out of memory, will retry later\n";
01449                                                 (*ji)->clear();
01450                                         }
01451                                         else {
01452                                                 (*ji)->end();
01453                                                 lstr = "Error starting job, aborting\n";
01454                                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01455                                                 num_processing--;
01456                                                 reporter.jobs().add_report((*ji)->report());
01457                                         }
01458                                         logger.write(lstr);
01459                                 }
01460                                 catch(...) {
01461                                         (*ji)->end();
01462                                         lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
01463                                         lstr += "-- JOB TERMINATED\n";
01464                                         logger.write(lstr);
01465                                         // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01466                                         num_processing--;
01467                                         reporter.jobs().add_report((*ji)->report());
01468                                 }
01469                                 // logger.write("[DEBUG]: Variable Change :: num_processing++\n");
01470                                 num_processing++;
01471                         }
01472         
01473                         // Process jobs
01474                         if ((*ji)->status() == job_archiver::status_processing) {
01475                                 try {
01476                                         (*ji)->process();
01477                                 }
01478                                 catch(error e) {
01479                                         // TODO: Change 12 to ENOMEM?
01480                                         if (e.num() == 12) {
01481                                                 lstr  = "Error starting job: Out of memory, will retry later\n";
01482                                                 (*ji)->clear();
01483                                         }
01484                                         else {
01485                                                 (*ji)->end();
01486                                                 lstr = "Error starting job, aborting\n";
01487                                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01488                                                 num_processing--;
01489                                                 reporter.jobs().add_report((*ji)->report());
01490                                         }
01491                                         logger.write(lstr);
01492                                 }
01493                                 catch(...) {
01494                                         (*ji)->end();
01495                                         lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
01496                                         lstr += "-- JOB TERMINATED\n";
01497                                         logger.write(lstr);
01498                                         // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01499                                         num_processing--;
01500                                         reporter.jobs().add_report((*ji)->report());
01501                                 }
01502                         }
01503 
01504                         // Remove jobs that could not start from active duty
01505                         if ((*ji)->status() == job_archiver::status_reschedule) {
01506                                 (*ji)->clear();
01507                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01508                                 num_processing--;
01509                         }
01510 
01511                         // If a job exited with an error, and the vault is full, then reschedule
01512                         // the job for later
01513                         if (
01514                                         ((*ji)->status() == job_archiver::status_error)
01515                                 &&
01516                                         overflow_detected
01517                                 )
01518                         {
01519                                 lstr = "Vault overflow detected, will retry job later\n";
01520                                 logger.write(lstr);
01521                                 (*ji)->clear();
01522                                 num_processing--;
01523                         }
01524 
01525                         // Remove completed jobs from the processing list
01526                         if (
01527                                 ((*ji)->status() == job_archiver::status_completed)
01528                                 || ((*ji)->status() == job_archiver::status_fatal_error)
01529                                 || ((*ji)->status() == job_archiver::status_error)
01530                                 ) {
01531                                 (*ji)->end();
01532                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01533                                 num_processing--;
01534                                 num_completed++;
01535 
01536                                 // logger.write("Adding job report to report manager\n");
01537                                 reporter.jobs().add_report((*ji)->report());
01538                         }
01539                 }
01540 
01541                 mf_log_status();
01542                 sleep(1);
01543 
01544                 // logger.write("[DEBUG]: ---[ BOTTOM OF LOOP ]---\n");
01545         }
01546 
01547         t.stop();
01548         lstr = "Archive Manager - Finished archiving, duration: ";
01549         lstr += t.duration();
01550         lstr += "\n";
01551         logger.write(lstr);
01552 
01553         lstr = "Archive Manager - Finalizing archive path\n";
01554         logger.write(lstr);
01555         TRY(
01556                 rename_file(working_archive_path(), archive_path()),
01557                 "Cannot finalize archive"
01558                 );
01559 
01560         reporter.vault().add_report(
01561                 vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault()))
01562                 );
01563 }
01564 
01565 /** Return an absolute path to the finished archive directory */
01566 const std::string archive_manager::archive_path(void) const
01567 {
01568         estring path;
01569 
01570         if (!initialized())
01571                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01572 
01573         path = vaulter.vault();
01574         path += "/";
01575         path += config.timestamp().str();
01576 
01577         return(path);
01578 }
01579 
01580 /** Return the absolute path to the unfinished working archive directory */
01581 const std::string archive_manager::working_archive_path(void) const
01582 {
01583         estring path;
01584 
01585         if (!initialized())
01586                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01587         
01588         path = archive_path();
01589         path += ".incomplete";
01590 
01591         return(path);
01592 }
01593 
01594 //-----------------------------------------------------------------------------
01595 
01596 /** The global archive manager */
01597 archive_manager archiver;
01598 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines