archiver.cc

Go to the documentation of this file.
00001 #include "config.h"
00002 
00003 #ifdef HAVE_UNISTD_H
00004 #include <unistd.h>
00005 #endif
00006 
00007 #include <ctype.h>
00008 
00009 #include <iostream>
00010 #include <string>
00011 #include <vector>
00012 
00013 #include "asserts.h"
00014 #include "error.h"
00015 #include "estring.h"
00016 #include "fs.h"
00017 #include "rconfig.h"
00018 #include "timer.h"
00019 #include "logger.h"
00020 
00021 #include "archiver.h"
00022 
00023 //-----------------------------------------------------------------------------
00024 
00025 /** C'tor */
00026 rstat::rstat()
00027 {
00028         TRY_nomem(m_exit_str[0] = "Success");
00029         TRY_nomem(m_exit_str[1] = "Syntax or usage error");
00030         TRY_nomem(m_exit_str[2] = "Protocol incompatability error");
00031         TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories");
00032         TRY_nomem(m_exit_str[4] = "Requested action not supported");
00033         TRY_nomem(m_exit_str[10] = "Error in socket I/O");
00034         TRY_nomem(m_exit_str[11] = "Error in file I/O");
00035         TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream");
00036         TRY_nomem(m_exit_str[13] = "Errors with program diagnostics");
00037         TRY_nomem(m_exit_str[14] = "Error in IPC code");
00038         TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT");
00039         TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()");
00040         TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers");
00041         TRY_nomem(m_exit_str[23] = "Partial transfer");
00042         TRY_nomem(m_exit_str[30] = "Timeout in data send/receive");
00043         TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255");
00044         TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal");
00045         TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run");
00046         TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found");
00047 
00048         TRY_nomem(m_signal_str[1] = "[HUP]: Hangup");
00049         TRY_nomem(m_signal_str[2] = "[INT]: Interrupt ");
00050         TRY_nomem(m_signal_str[3] = "[QUIT]: Quit");
00051         TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction");
00052         TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap");
00053         TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault");
00054         TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination");
00055         TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault");
00056         TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception");
00057         TRY_nomem(m_signal_str[10] = "[KILL]: Killed");
00058         TRY_nomem(m_signal_str[11] = "[BUS]: Bus error");
00059         TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault");
00060         TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call");
00061         TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers");
00062         TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm");
00063         TRY_nomem(m_signal_str[16] = "[TERM]: Software termination");
00064         TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1");
00065         TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2");
00066         TRY_nomem(m_signal_str[19] = "[CLD]: Child status change");
00067         TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart");
00068         TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change");
00069         TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition");
00070         TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O");
00071         TRY_nomem(m_signal_str[24] = "[STOP]: Stop");
00072         TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character");
00073         TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process");
00074         TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read");
00075         TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write");
00076         TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired");
00077         TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired");
00078         TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit");
00079         TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit");
00080         TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked");
00081         TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal");
00082         TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR");
00083         TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR");
00084         TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation");
00085         TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost");
00086         TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal");
00087         TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal");
00088 
00089         TRY_nomem(m_unknown_exit = "(Unknown exit code)");
00090         TRY_nomem(m_unknown_signal = "(Unknown signal)");
00091 }
00092 
00093 /** Get a verbose string for an exit code */
00094 const std::string& rstat::exit(const int a_int) const
00095 {
00096         if (m_exit_str.find(a_int) != m_exit_str.end()) {
00097                 return(m_exit_str.find(a_int)->second);
00098         }
00099         return(m_unknown_exit);
00100 }
00101 
00102 /** Get a verbose string for a signal number */
00103 const std::string& rstat::signal(const int a_int) const
00104 {
00105         if (m_signal_str.find(a_int) != m_signal_str.end()) {
00106                 return(m_signal_str.find(a_int)->second);
00107         }
00108         return(m_unknown_signal);
00109 }
00110 
00111 class rstat rsync_estat_str;
00112 
00113 //-----------------------------------------------------------------------------
00114 
00115 /** C'tor
00116 
00117         Set a job to be assiciated with this job archiver and initialize it's
00118         processing status to "pending".
00119 
00120  */
00121 job_archiver::job_archiver(const job * a_job)
00122 {
00123         clear();
00124         m_job = a_job;
00125         m_status = status_pending;
00126 }
00127 
00128 /** Generate a job prefix string
00129 
00130         Create a string to uniquely identify this job to be used in the log file to
00131         uniquely identify this job
00132 
00133  */
00134 const std::string job_archiver::prefix(void)
00135 {
00136         estring lstr;
00137 
00138         lstr = "[Job:";
00139         lstr += estring((void*)m_job);
00140         lstr += "] ";
00141 
00142         return(lstr);
00143 }
00144 
00145 /** Generate a job id string */
00146 const std::string job_archiver::id(void)
00147 {
00148         estring lstr;
00149 
00150         lstr = prefix();
00151         lstr += " ";
00152         lstr += m_job->generate_job_id();
00153 
00154         return(lstr);
00155 }
00156 
00157 /** Clear the job archiver and return it to it's initial state
00158         
00159         End any processes handling this job and return the job archiver to it's
00160         "pending" state.
00161 
00162  */
00163 void job_archiver::clear(void)
00164 {
00165         end();
00166         m_child_pid = m_exec.my_pid();
00167         m_status = status_pending;
00168         m_success = true;
00169         m_jr.clear();
00170         m_jpr.clear();
00171         m_error_msg.erase();
00172 }
00173 
00174 /** End any processes handling this job
00175 
00176         If any child processes are handling this job, terminate them.  Erase any
00177         pending I/O for the now defunct child.  Set our processing status to "done".
00178 
00179 */
00180 void job_archiver::end(void)
00181 {
00182         estring lstr;
00183 
00184         m_timer.stop();
00185         if (m_exec.child_running()) {
00186                 lstr = prefix();
00187                 lstr += "Terminating child process!\n";
00188                 logger.write(lstr,0,configuration_manager::logging_manager,m_child_pid);
00189                 m_exec.kill_child();
00190         }
00191         m_exec.clear();
00192         m_io_out.erase();
00193         m_io_err.erase();
00194         m_status = status_done;
00195 }
00196 
00197 /** Return the processing status of this job archiver */
00198 const job_archiver::archiving_status job_archiver::status(void)
00199 {
00200         return(m_status);
00201 }
00202 
00203 /** Begin processing
00204 
00205         Attempt to fork a child process to handle this job.  If unsuccessful then
00206         retry again later.  The child then calls mf_do_chores() to handle the actual
00207         processing, while the parent updates the job archiver's status from
00208         "pending" to "processing" and begins a timer to measure the duration of the
00209         job process.
00210         
00211 */
00212 void job_archiver::start(void)
00213 {
00214         estring lstr;
00215 
00216         m_jr.id(m_job->generate_job_id());
00217 
00218         try {
00219                 m_exec.fork();
00220         }
00221         catch(error e) {
00222                 lstr = prefix();
00223                 lstr += "Could not fork:\n";
00224                 logger.write(lstr);
00225 
00226                 lstr = e.str(prefix());
00227                 logger.write(lstr);
00228 
00229                 lstr = prefix();
00230                 lstr += "Will retry job later\n";
00231                 logger.write(lstr);
00232 
00233                 m_status = status_retry_later;
00234         }
00235         catch(...) {
00236                 error e = err_unknown;
00237 
00238                 lstr = prefix();
00239                 lstr += "Could not fork:\n";
00240                 logger.write(lstr);
00241 
00242                 lstr = e.str(prefix());
00243                 logger.write(lstr);
00244 
00245                 lstr = prefix();
00246                 lstr += "Will retry job later\n";
00247                 logger.write(lstr);
00248 
00249                 m_status = status_retry_later;
00250         }
00251 
00252         if (m_exec.is_child()) {
00253                 // wait_for_debugger = true;
00254 
00255                 // while (wait_for_debugger);
00256 
00257                 m_exec.reroute_stdio();
00258                 try {
00259                         mf_do_chores();
00260                 }
00261                 catch(error e) {
00262                         std::cerr << e;
00263                         m_success = false;
00264                 }
00265                 catch(...) {
00266                         std::cerr << err_unknown;
00267                         m_success = false;
00268                 }
00269                 if (m_success)
00270                         m_exec.exit(0);
00271                 else
00272                         m_exec.exit(1);
00273         }
00274 
00275         m_child_pid = m_exec.child_pid();
00276 
00277         lstr = prefix();
00278         lstr += "Spawning child process: PID ";
00279         lstr += estring(static_cast<unsigned long>(m_exec.child_pid()));
00280         lstr += "\n";
00281         logger.write(lstr);
00282 
00283         m_status = status_processing;
00284         m_timer.start();
00285 }
00286 
00287 /** Parent processor for a job
00288 
00289         Check for I/O from the child.  Check the child's status to see if it's still
00290         running, has exited with an exit code, or has exited from a signal.  If the
00291         child sis not exit normally (i.e. exit from a signal or exit with a non-zero
00292         exit code) then check the vault for overflow.  If the vault has exceeded
00293         it's overflow threshold then that could be the cause for the child's
00294         failure, in which case we reschedule the child to be processed again later.
00295 
00296         If the job is finished (whether successful or not), update the job
00297         archiver's status to "completed".
00298         
00299  */
00300 void job_archiver::process(void)
00301 {
00302         estring lstr;
00303         
00304         if (m_exec.child_running()) {
00305                 // Process child I/O
00306                 mf_process_child_io(false);
00307         }
00308         else {
00309                 // Process remaining child I/O
00310                 mf_process_child_io(true);
00311 
00312                 // If child exited with an error, check vault overflow.  If the vault is
00313                 // filling up, then reschedule the job for later retry.
00314                 lstr = prefix();
00315                 if (m_exec.child_signaled()) {
00316                         lstr += "Child exited from signal: ";
00317                         lstr += estring(m_exec.child_signal_no());
00318                         lstr += "\n";
00319                 }
00320                 else if (m_exec.child_exit_code() != 0) {
00321                         lstr += "Child exited abnormally with code: ";
00322                         lstr += estring(m_exec.child_exit_code());
00323                         lstr += "\n";
00324                 }
00325                 else {
00326                         lstr += "Child exited successfully\n";
00327                         m_status = status_completed;
00328                 }
00329                 logger.write(lstr);
00330 
00331                 if (m_exec.child_signaled() || !m_exec.child_exited_normally()) {
00332                         /*
00333                         if (vaulter.overflow()) {
00334                                 lstr = prefix();
00335                                 lstr += "Vault overflow detected, will retry later\n";
00336                                 logger.write(lstr);
00337                                 m_status = status_retry_later;
00338                         }
00339                         else {
00340                                 m_status = status_error;
00341                         }
00342                         */
00343                         m_status = status_error;
00344                 }
00345                 else {
00346                         m_status = status_completed;
00347                 }
00348 
00349                 m_timer.stop();
00350                 lstr = prefix();
00351                 lstr += "Finished, duration: ";
00352                 lstr += m_timer.duration();
00353                 lstr += "\n";
00354                 logger.write(lstr);
00355                 // m_status = status_completed;
00356         }
00357 }
00358 
00359 /** Return the job report for this job */
00360 single_job_report job_archiver::report(void) const
00361 {
00362         return(m_jr);
00363 }
00364 
00365 /** Child processor for a job
00366 
00367         For each path in this job:
00368         - Create the directory heiararchy for this job in the archive
00369         - Until done or until out of retrys:
00370                 - Choose a hardlink source, if applicable and available
00371                 - Construct the command line to pass to rsync
00372                 - Spawn rsync
00373                 - Process I/O sent back from rsync
00374                 - Process exit code or signal number returned from rsync
00375         - Generate and submit a report to the report manager
00376         
00377  */
00378 void job_archiver::mf_do_chores(void)
00379 {
00380         /*
00381         {
00382                 bool wait_for_debugger = true;
00383 
00384                 std::cerr << "Waiting for debugger to attach..." << std::endl;
00385                 while (wait_for_debugger);
00386                 std::cerr << "Debugger attached." << std::endl;
00387         }
00388         */
00389 
00390         job::paths_type::const_iterator pi;
00391 
00392         for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
00393                 estring archive_dir;
00394                 estring path;
00395                 estring command_line;
00396                 bool hardlink = false;
00397                 int num_retries = 0;
00398                 bool done = false;
00399                 int exit_code = 0;
00400                 int signal_num = 0;
00401                 timer t;
00402                 uint64 files_total = 0;
00403                 uint64 files_xferd = 0;
00404                 uint64 size_total = 0;
00405                 uint64 size_xferd = 0;
00406                 bool overflow_detected = 0;
00407                 estring error_msg;
00408 
00409                 archive_dir = m_job->generate_archive_path(*pi);
00410                 // If archive_dir does not end with a '/', strip off characters until it
00411                 // does.
00412                 /*
00413                 while ((archive_dir.size() > 0) 
00414                         && (archive_dir[archive_dir.size()-1] != '/'))
00415                 {
00416                         archive_dir.erase(archive_dir.size()-1);
00417                 }
00418                 */
00419 
00420                 path = archiver.working_archive_path();
00421                 path += "/";
00422                 path += archive_dir;
00423                 path = reform_path(path);
00424                 if (!exists(path)) {
00425                         std::cout << "Creating job archive path: " << archive_dir << std::endl;
00426                         mk_dirhier(path);
00427                 }
00428                 else
00429                         std::cout << "Archiving to existing path: " << archive_dir << std::endl;
00430 
00431                 hardlink = m_job->rsync_hardlink;
00432                 while ((num_retries < m_job->rsync_retry_count) && !done) {
00433                         execute exec;
00434                         job::excludes_type::const_iterator ei;
00435                         job::includes_type::const_iterator ii;
00436 
00437                         exit_code = 0;
00438                         signal_num = 0;
00439 
00440                         command_line = config.rsync_local_path();
00441                         command_line += " ";
00442                         command_line += m_job->rsync_options;
00443 
00444                         if (m_job->rsync_connection != job::connection_local) {
00445                                 command_line += " ";
00446                                 command_line += " --rsync-path=";
00447                                 if (m_job->rsync_remote_path.size() != 0)
00448                                         command_line += m_job->rsync_remote_path;
00449                                 else
00450                                         command_line += config.rsync_local_path();
00451                         }
00452 
00453                         if (hardlink) {
00454                                 subdirectory subdir;
00455                                 std::string youngest;
00456                                 std::string relative_path;
00457 
00458                                 subdir.path(vaulter.vault());
00459                                 if (subdir.size() > 0) {
00460                                         subdirectory::const_iterator si;
00461 
00462                                         sort(subdir.begin(), subdir.end());
00463                                         reverse(subdir.begin(), subdir.end());
00464                                         for (si = subdir.begin(); si != subdir.end(); ++si) {
00465                                                 estring ypath;
00466 
00467                                                 if (!is_timestamp(*si))
00468                                                         continue;
00469                                                 if (*si == config.timestamp().str())
00470                                                         continue;
00471                                                 std::cout 
00472                                                         << "Considering potential hardlink source: "
00473                                                         << *si
00474                                                         << std::endl;
00475                                                 ypath = vaulter.vault();
00476                                                 ypath += "/";
00477                                                 ypath += *si;
00478                                                 ypath += "/";
00479                                                 ypath += archive_dir;
00480                                                 if (exists(ypath)) {
00481                                                         std::cout 
00482                                                                 << "Using archive as hardlink source: " 
00483                                                                 << *si
00484                                                                 << std::endl;
00485                                                         youngest = ypath;
00486                                                         break;
00487                                                 }
00488                                                 else {
00489                                                         std::cout
00490                                                                 << "- No such path: " 
00491                                                                 << ypath
00492                                                                 << std::endl;
00493                                                 }
00494                                         }
00495                                 }
00496                                 if (youngest.size() > 0) {
00497                                         relative_path = mk_relative_path(youngest,path);
00498                                         command_line += " --hard-links --link-dest=";
00499                                         command_line += relative_path;
00500                                 }
00501                         }
00502 
00503                         for (
00504                                 ei = m_job->excludes.begin();
00505                                 ei != m_job->excludes.end();
00506                                 ++ei
00507                                 )
00508                         {
00509                                 command_line += " --exclude-from=";
00510                                 command_line += *ei;
00511                         }
00512 
00513                         for (
00514                                 ii = m_job->includes.begin();
00515                                 ii != m_job->includes.end();
00516                                 ++ii
00517                                 )
00518                         {
00519                                 command_line += " --include-from=";
00520                                 command_line += *ii;
00521                         }
00522 
00523                         command_line += " ";
00524                         command_line += m_job->generate_source_path(*pi);
00525                         
00526                         command_line += " ";
00527                         command_line += path;
00528         
00529                         std::cout << "Command Line: " << command_line << std::endl;
00530 
00531                         m_error_msg.erase();
00532 
00533                         t.start();
00534                         exec.exec(command_line);
00535                         mf_process_rsync_io(
00536                                 exec, 
00537                                 m_job->rsync_timeout,
00538                                 files_total,
00539                                 files_xferd,
00540                                 size_total,
00541                                 size_xferd,
00542                                 overflow_detected
00543                                 );
00544                         t.stop();
00545 
00546                         signal_num = 0;
00547                         if (exec.child_signaled()) {
00548                                 std::cout 
00549                                         << "Rsync caught signal: [" 
00550                                         << exec.child_signal_no()
00551                                         << "] "
00552                                         << rsync_estat_str.signal(exec.child_signal_no())
00553                                         << std::endl;
00554                                 signal_num = exec.child_signal_no();
00555                         }
00556                         std::cout
00557                                 << "Rsync exit code: ["
00558                                 << exec.child_exit_code()
00559                                 << "] "
00560                                 << rsync_estat_str.exit(exec.child_exit_code())
00561                                 << std::endl;
00562 
00563                         exit_code = exec.child_exit_code();
00564                         if (exec.child_exited_normally() && (exit_code == 0))
00565                                 done = true;
00566                         else if (overflow_detected) {
00567                                 std::cout 
00568                                         << "Vault overflow detected"
00569                                         << std::endl;
00570                                 break;
00571                         }
00572                         else
00573                                 ++num_retries;
00574 
00575                         if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail)
00576                         {
00577                                 std::cout << "Failing, will not attempt to retry" << std::endl;
00578                                 break;
00579                         }
00580                         if (m_job->rsync_behavior[exit_code] 
00581                                 == rsync_behavior::retry_without_hardlinks)
00582                         {
00583                                 std::cout << "Retrying without hardlinks..." << std::endl;
00584                                 hardlink = false;
00585                         }
00586                 }
00587                 if (!done) {
00588                         if (num_retries >= m_job->rsync_retry_count) {
00589                                 std::cout << "Retry count exceeded" << std::endl;
00590                         }
00591                         if (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)
00592                                 std::cout << "Ignoring rsync failure" << std::endl;
00593                         else {
00594                                 std::cout << "Giving up on this path" << std::endl;
00595                                 m_success = false;
00596                         }
00597                 }
00598                 reportio().write_report(
00599                         m_job->generate_source_path(*pi),
00600                         t,
00601                         exit_code,
00602                         signal_num,
00603                         m_error_msg
00604                         );
00605         }
00606 }
00607 
00608 void job_archiver::mf_process_report(const std::string& a_str)
00609 {
00610         if (reportio().is_report(a_str)) {
00611                 m_jpr = reportio().parse(a_str);
00612                 m_jr.add_report(m_jpr);
00613         }
00614 }
00615 
00616 /** Process I/O from the child
00617 
00618         While there is I/O to be read, read and parse it.  When the end of a line is
00619         reached write that line to the log file.  If a_finalize is true, the flush
00620         the child I/O buffer string.
00621         
00622  */
00623 void job_archiver::mf_process_child_io(bool a_finalize)
00624 {
00625         estring lstr;
00626         bool io_flag = false;
00627 
00628         while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) {
00629                 int r;
00630                 const int buffer_size = 128;
00631                 char buffer[buffer_size] = { 0 };
00632 
00633                 r = m_exec.out_read(buffer, buffer_size);
00634                 if (r > 0) {
00635                         int c;
00636 
00637                         io_flag = true;
00638                         for (c = 0; c < r; ++c) {
00639                                 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00640                                         lstr = prefix();
00641                                         lstr += m_io_out;
00642                                         lstr += "\n";
00643                                         logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00644                                         mf_process_report(lstr);
00645                                         m_io_out.erase();
00646                                 }
00647                                 else {
00648                                         m_io_out += buffer[c];
00649                                 }
00650                         }
00651                 }
00652         }
00653         if (a_finalize && (m_io_out.size() > 0)) {
00654                 lstr = prefix();
00655                 lstr += m_io_out;
00656                 lstr += "\n";
00657                 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00658                 mf_process_report(lstr);
00659                 m_io_out.erase();
00660         }
00661 
00662         while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) {
00663                 int r;
00664                 const int buffer_size = 128;
00665                 char buffer[buffer_size] = { 0 };
00666 
00667                 r = m_exec.err_read(buffer, buffer_size);
00668                 if (r > 0) {
00669                         int c;
00670 
00671                         io_flag = true;
00672                         for (c = 0; c < r; ++c) {
00673                                 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00674                                         lstr = prefix();
00675                                         lstr += m_io_err;
00676                                         lstr += "\n";
00677                                         logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00678                                         mf_process_report(lstr);
00679                                         m_io_err.erase();
00680                                 }
00681                                 else {
00682                                         m_io_err += buffer[c];
00683                                 }
00684                         }
00685                 }
00686         }
00687         if (a_finalize && (m_io_err.size() > 0)) {
00688                 lstr = prefix();
00689                 lstr += m_io_err;
00690                 lstr += "\n";
00691                 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00692                 mf_process_report(lstr);
00693                 m_io_err.erase();
00694         }
00695         if (!io_flag)
00696                 sleep(config.io_poll_interval());
00697 }
00698 
00699 /** Trim off all non-digit leading and trailing characters from a string */
00700 void job_archiver::mf_trim_string(std::string& a_str)
00701 {
00702         while ((a_str.size() > 0) && (!isdigit(a_str[0])))
00703                 a_str.erase(0,1);
00704         while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
00705                 a_str.erase(a_str.size()-1,1);
00706 }
00707 
00708 /** Parse I/O from rsync
00709 
00710         Search for special output from rsync to tell us something about the number
00711         and size of files and files transfered.
00712 
00713  */
00714 void job_archiver::mf_parse_rsync_io(
00715         const std::string a_str,
00716         uint64& a_files_total,
00717         uint64& a_files_xferd,
00718         uint64& a_size_total,
00719         uint64& a_size_xferd
00720         )
00721 {
00722         estring estr;
00723 
00724         if (a_str.find("Number of files: ") == 0) {
00725                 estr = a_str;
00726                 mf_trim_string(estr);
00727                 try {
00728                         a_files_total = estr;
00729                 }
00730                 catch(error e) {
00731                         estring es;
00732 
00733                         es = "Could not parse total number of files processed by rsync";
00734                         e.push_back(ERROR_INSTANCE(es));
00735 
00736                         // Not sure this is the best way to handle this...
00737                         std::cerr << e;
00738                 }
00739                 catch(...) {
00740                         error e = err_unknown;
00741                         estring es;
00742 
00743                         es = "Could not parse total number of files processed by rsync";
00744                         e.push_back(ERROR_INSTANCE(es));
00745 
00746                         // Not sure this is the best way to handle this...
00747                         std::cerr << e;
00748                 }
00749         }
00750         else if (a_str.find("Number of files transferred: ") == 0) {
00751                 estr = a_str;
00752                 mf_trim_string(estr);
00753                 try {
00754                         a_files_xferd = estr;
00755                 }
00756                 catch(error e) {
00757                         estring es;
00758 
00759                         es = "Could not parse total number of files transferred by rsync";
00760                         e.push_back(ERROR_INSTANCE(es));
00761 
00762                         // Not sure this is the best way to handle this...
00763                         std::cerr << e;
00764                 }
00765                 catch(...) {
00766                         error e = err_unknown;
00767                         estring es;
00768 
00769                         es = "Could not parse total number of files transferred by rsync";
00770                         e.push_back(ERROR_INSTANCE(es));
00771 
00772                         // Not sure this is the best way to handle this...
00773                         std::cerr << e;
00774                 }
00775         }
00776         else if (a_str.find("Total file size: ") == 0) {
00777                 estr = a_str;
00778                 mf_trim_string(estr);
00779                 try {
00780                         a_size_total = estr;
00781                 }
00782                 catch(error e) {
00783                         estring es;
00784 
00785                         es = "Could not parse total size of files processed by rsync";
00786                         e.push_back(ERROR_INSTANCE(es));
00787 
00788                         // Not sure this is the best way to handle this...
00789                         std::cerr << e;
00790                 }
00791                 catch(...) {
00792                         error e = err_unknown;
00793                         estring es;
00794 
00795                         es = "Could not parse total size of files processed by rsync";
00796                         e.push_back(ERROR_INSTANCE(es));
00797 
00798                         // Not sure this is the best way to handle this...
00799                         std::cerr << e;
00800                 }
00801         }
00802         else if (a_str.find("Total transferred file size: ") == 0) {
00803                 estr = a_str;
00804                 mf_trim_string(estr);
00805                 try {
00806                         a_size_xferd = estr;
00807                 }
00808                 catch(error e) {
00809                         estring es;
00810 
00811                         es = "Could not parse total size of files transferred by rsync";
00812                         e.push_back(ERROR_INSTANCE(es));
00813 
00814                         // Not sure this is the best way to handle this...
00815                         std::cerr << e;
00816                 }
00817                 catch(...) {
00818                         error e = err_unknown;
00819                         estring es;
00820 
00821                         es = "Could not parse total size of files transferred by rsync";
00822                         e.push_back(ERROR_INSTANCE(es));
00823 
00824                         // Not sure this is the best way to handle this...
00825                         std::cerr << e;
00826                 }
00827         }
00828 }
00829 
00830 /** Process I/O from rsync
00831 
00832         If there is I/O from rsync to be read, read it and then send it through the
00833         parser.
00834 
00835  */
00836 void job_archiver::mf_process_rsync_io(
00837         execute& a_exec, 
00838         uint16 a_timeout,
00839         uint64& a_files_total,
00840         uint64& a_files_xferd,
00841         uint64& a_size_total,
00842         uint64& a_size_xferd,
00843         bool& a_overflow_detected
00844         )
00845 {
00846         size_t ro;
00847         size_t re;
00848         estring out;
00849         estring err;
00850         timer t;
00851         bool io_flag;
00852         char buffer[1024] = { 0 };
00853         char *ptr;
00854 
00855         ro = 1;
00856         re = 1;
00857         t.start();
00858         while ((ro != 0) || (re != 0) || a_exec.child_running()) {
00859                 io_flag = false;
00860                 ro = 0;
00861                 re = 0;
00862 
00863                 if (!a_overflow_detected) {
00864                         a_overflow_detected = vaulter.overflow();
00865                 }
00866 
00867                 m_error_msg.erase();
00868 
00869                 if (a_exec.out_ready()) {
00870                         ro = read(a_exec.out_fd(), buffer, 1024);
00871                         if (ro > 0) {
00872                                 io_flag = true;
00873                                 t.start();
00874                                 for (ptr = buffer; ptr != buffer+ro; ++ptr) {
00875                                         if ((*ptr != '\r') && (*ptr != '\n')) {
00876                                                 out += *ptr;
00877                                         }
00878                                         else {
00879                                                 reportio().write_rsync_out(out);
00880                                                 out.erase();
00881                                         }
00882                                 }
00883                         }
00884                 }
00885 
00886                 if (a_exec.err_ready()) {
00887                         re = read(a_exec.err_fd(), buffer, 1024);
00888                         if (re > 0) {
00889                                 io_flag = true;
00890                                 t.start();
00891                                 for (ptr = buffer; ptr != buffer+re; ++ptr) {
00892                                         if ((*ptr != '\r') && (*ptr != '\n')) {
00893                                                 err += *ptr;
00894                                         }
00895                                         else {
00896                                                 reportio().write_rsync_err(err);
00897                                                 err.erase();
00898                                         }
00899                                 }
00900                         }
00901                 }
00902 
00903                 t.stop();
00904                 if (t.duration_secs() > a_timeout) {
00905                         std::cerr << "*** Rsync program inactivity timeout" << std::endl;
00906                         a_exec.kill_child();
00907                         TRY_nomem(m_error_msg = "Rsync inactivity timeout");
00908                 }
00909 
00910                 if (!io_flag)
00911                         sleep(config.io_poll_interval());
00912 
00913         }
00914         if (out.size() > 0) {
00915                 std::cerr << out << std::endl;
00916                 mf_parse_rsync_io(
00917                         out, 
00918                         a_files_total, 
00919                         a_files_xferd, 
00920                         a_size_total,
00921                         a_size_xferd
00922                 );
00923                 out.erase();
00924         }
00925         if (err.size() > 0) {
00926                 std::cerr << err << std::endl;
00927                 mf_parse_rsync_io(
00928                         out, 
00929                         a_files_total, 
00930                         a_files_xferd, 
00931                         a_size_total,
00932                         a_size_xferd
00933                 );
00934                 err.erase();
00935         }
00936 }
00937 
00938 //-----------------------------------------------------------------------------
00939 
00940 /** C'tor */
00941 archive_manager::archive_manager()
00942 {
00943         if (this != &archiver)
00944                 throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers"));
00945 
00946         clear();
00947 }
00948 
00949 /** Clear the archive manager and clear the job list */
00950 void archive_manager::clear(void)
00951 {
00952         m_jobs.clear();
00953         m_initialized = false;
00954 }
00955 
00956 /** Initialize the archive manager
00957 
00958         Log the archive timestamp, select and prepare a vault.
00959 
00960  */
00961 void archive_manager::init(void)
00962 {
00963         timer t;
00964         estring lstr;
00965 
00966         lstr = "Archive Manager - Beginning initialization\n";
00967         logger.write(lstr);
00968         t.start();
00969 
00970         lstr = "Timestamp: ";
00971         lstr += config.timestamp().str();
00972         lstr += "\n";
00973         logger.write(lstr);
00974 
00975         // Select a vault?
00976         vaulter.select();
00977         lstr = "Vault selected: ";
00978         lstr += vaulter.vault();
00979         lstr += "\n";
00980         logger.write(lstr);
00981 
00982         reporter.vault().add_report(
00983                 vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault()))
00984                 );
00985 
00986         // Prepare the vault?
00987         vaulter.prepare();
00988 
00989         t.stop();
00990         lstr = "Archive Manager - Finished initialization";
00991         lstr += ", duration: ";
00992         lstr += t.duration();
00993         lstr += "\n";
00994         logger.write(lstr);
00995 
00996         m_initialized = true;
00997 }
00998 
00999 /** Return the initialized status of the archive manager */
01000 const bool archive_manager::initialized(void) const
01001 {
01002         return(m_initialized);
01003 }
01004 
01005 /** Give a status report
01006 
01007         After so many minutes of inactivity write a report to the log file of our
01008         current status of affairs.
01009 
01010  */
01011 void archive_manager::mf_log_status(void)
01012 {
01013         static timer t;
01014         const timer::value_type timeout = 30;
01015         estring lstr;
01016         std::vector<job_archiver*>::const_iterator ji;
01017         uint64 jobs_pending = 0;
01018         uint64 jobs_processing = 0;
01019         uint64 jobs_completed =0;
01020         uint64 jobs_remaining = 0;
01021 
01022         if (!t.is_started())
01023                 t.start();
01024         
01025         t.stop();
01026         if (t.duration_mins() < timeout)
01027                 return;
01028         
01029         lstr  = "\n";
01030         lstr += "STATUS REPORT:\n";
01031         lstr += "================================================================\n";
01032         logger.write(lstr);
01033         for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) {
01034                 lstr = "[";
01035                 switch ((*ji)->status()) {
01036                         case job_archiver::status_pending:
01037                                 lstr += "Pending    ";
01038                                 ++jobs_pending;
01039                                 break;
01040                         case job_archiver::status_processing:
01041                                 lstr += "Processing ";
01042                                 ++jobs_processing;
01043                                 break;
01044                         case job_archiver::status_retry_later:
01045                                 lstr += "Reschedule ";
01046                                 ++jobs_pending;
01047                                 break;
01048                         case job_archiver::status_fatal_error:
01049                                 lstr += "Fatal Error";
01050                                 ++jobs_completed;
01051                                 break;
01052                         case job_archiver::status_error:
01053                                 lstr += "Error      ";
01054                                 ++jobs_completed;
01055                                 break;
01056                         case job_archiver::status_completed:
01057                                 lstr += "Completed  ";
01058                                 ++jobs_completed;
01059                                 break;
01060                         case job_archiver::status_done:
01061                                 lstr += "Done       ";
01062                                 ++jobs_completed;
01063                                 break;
01064                         default:
01065                                 lstr += "<unknown>  ";
01066                                 break;
01067                 }
01068                 lstr += "]: ";
01069                 lstr += (*ji)->id();
01070                 lstr += "\n";
01071                 logger.write(lstr);
01072         }
01073 
01074         lstr  = "---------------------------------------------------------------\n";
01075         lstr += "     Jobs Pending: ";
01076         lstr += estring(jobs_pending);
01077         lstr += "\n";
01078 
01079         lstr += "  Jobs Processing: ";
01080         lstr += estring(jobs_processing);
01081         lstr += "\n";
01082 
01083         lstr += "   Jobs Completed: ";
01084         lstr += estring(jobs_completed);
01085         lstr += "\n";
01086 
01087         lstr += "            Total: ";
01088         lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
01089         lstr += "\n";
01090 
01091         logger.write(lstr);
01092         logger.write("\n");
01093         t.start();
01094 }
01095 
01096 /** Archive jobs
01097 
01098         Create an archive directory.  Generate a to-do list of job archiver objects.
01099         Process the job archiver objects:
01100         - While there are less than rsync-parallel job archivers processing, start
01101                 the first available job archiver.
01102         - Check the status of each job and process I/O from jobs underway.
01103         - Remove jobs that failed to start.
01104         - Possibly reschedule failed jobs.
01105         - Remove completed jobs from active processing.
01106         - Call mf_log_status().
01107 
01108  */
01109 void archive_manager::archive(void)
01110 {
01111         timer t;
01112         estring lstr;
01113         configuration_manager::jobs_type::const_iterator cji;
01114         int num_processing = 0;
01115         std::vector<job_archiver*>::iterator ji;
01116         uint64 num_completed = 0;
01117         bool overflow_detected = false;
01118         estring debug_estr;
01119 
01120         if (!initialized())
01121                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01122 
01123         lstr = "Archive Manager - Begin archiving\n";
01124         logger.write(lstr);
01125         t.start();
01126 
01127         // Create archive directory
01128         try {
01129                 if (exists(archive_path())) {
01130                         lstr = "Found existing archive directory: ";
01131                         lstr += archive_path();
01132                         lstr += "\n";
01133                         logger.write(lstr);
01134                         rename_file(archive_path(), working_archive_path());
01135                 }
01136                 else if (exists(working_archive_path())) {
01137                         lstr = "Found existing archive directory: ";
01138                         lstr += working_archive_path();
01139                         lstr += "\n";
01140                         logger.write(lstr);
01141                 }
01142                 else {
01143                         lstr = "Creating archive directory: ";
01144                         lstr += working_archive_path();
01145                         lstr += "\n";
01146                         logger.write(lstr);
01147                         mk_dir(working_archive_path());
01148                 }
01149         }
01150         catch(error e) {
01151                 logger.write("An error has occured: ");
01152                 logger.write(e[0].what());
01153                 logger.write("\n");
01154                 throw(e);
01155         }
01156         catch(...) {
01157                 error e = err_unknown;
01158 
01159                 logger.write("An error has occured: ");
01160                 logger.write(e[0].what());
01161                 logger.write("\n");
01162                 throw(e);
01163         }
01164 
01165         // Create a todo list
01166         logger.write("Creating to-do list\n");
01167         for (
01168                 cji = config.jobs().begin();
01169                 cji != config.jobs().end();
01170                 ++cji
01171                 )
01172         {
01173                 job_archiver* ptr;
01174 
01175                 ptr = new job_archiver(&(*cji));
01176                 if (ptr == 0)
01177                         throw(err_nomem);
01178                 TRY_nomem(m_jobs.push_back(ptr));
01179         }
01180 
01181         // Backup clients
01182         logger.write("Processing jobs...\n");
01183         while (m_jobs.size() > num_completed) {
01184 
01185                 /*
01186                 logger.write("[DEBUG]: ---[ TOP OF LOOP ]---\n");
01187 
01188                 debug_estr = "[DEBUG]: overflow_detected = ";
01189                 debug_estr += estring(overflow_detected);
01190                 debug_estr += "\n";
01191                 logger.write(debug_estr);
01192 
01193                 debug_estr = "[DEBUG]: num_processing = ";
01194                 debug_estr += estring(num_processing);
01195                 debug_estr += "\n";
01196                 logger.write(debug_estr);
01197                 */
01198 
01199                 if (!overflow_detected) {
01200                         overflow_detected = vaulter.overflow(true);
01201                         /*
01202                         if (overflow_detected) {
01203                                 logger.write("[DEBUG]: Variable Change :: ");
01204                                 logger.write("overflow_detected = true");
01205                                 logger.write("\n");
01206                         }
01207                         */
01208                 }
01209 
01210                 // If the vault has exceeded it's highwater mark, wait for the
01211                 // currently-processing jobs to terminate, and then attempt to prepare the
01212                 // vault.
01213                 if (overflow_detected && (num_processing == 0)) {
01214                         TRY(vaulter.prepare(true),"Cannot complete archive");
01215                         overflow_detected = vaulter.overflow();
01216                         /*
01217                         if (!overflow_detected) {
01218                                 logger.write("[DEBUG]: Variable Change :: ");
01219                                 logger.write("overflow_detected = false");
01220                                 logger.write("\n");
01221                         }
01222                         */
01223                 }
01224 
01225                 // For each job in the list...
01226                 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji)
01227                 {
01228                         // While we're running less than rsync-parallel jobs, start new ones
01229                         if (
01230                                 !overflow_detected
01231                                 && (num_processing < config.rsync_parallel())
01232                                 && ((*ji)->status() == job_archiver::status_pending)
01233                                 )
01234                         {
01235                                 try {
01236                                         (*ji)->start();
01237                                 }
01238                                 catch(error e) {
01239                                         if (e.num() == 12) {
01240                                                 lstr = "Error starting job: Out of memory, will retry later\n";
01241                                                 (*ji)->clear();
01242                                         }
01243                                         else {
01244                                                 (*ji)->end();
01245                                                 lstr = "Error starting job, aborting\n";
01246                                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01247                                                 num_processing--;
01248                                                 reporter.jobs().add_report((*ji)->report());
01249                                         }
01250                                         logger.write(lstr);
01251                                 }
01252                                 catch(...) {
01253                                         (*ji)->end();
01254                                         lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
01255                                         lstr += "-- JOB TERMINATED\n";
01256                                         logger.write(lstr);
01257                                         // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01258                                         num_processing--;
01259                                         reporter.jobs().add_report((*ji)->report());
01260                                 }
01261                                 // logger.write("[DEBUG]: Variable Change :: num_processing++\n");
01262                                 num_processing++;
01263                         }
01264         
01265                         // Process jobs
01266                         if ((*ji)->status() == job_archiver::status_processing) {
01267                                 try {
01268                                         (*ji)->process();
01269                                 }
01270                                 catch(error e) {
01271                                         // TODO: Change 12 to ENOMEM?
01272                                         if (e.num() == 12) {
01273                                                 lstr  = "Error starting job: Out of memory, will retry later\n";
01274                                                 (*ji)->clear();
01275                                         }
01276                                         else {
01277                                                 (*ji)->end();
01278                                                 lstr = "Error starting job, aborting\n";
01279                                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01280                                                 num_processing--;
01281                                                 reporter.jobs().add_report((*ji)->report());
01282                                         }
01283                                         logger.write(lstr);
01284                                 }
01285                                 catch(...) {
01286                                         (*ji)->end();
01287                                         lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
01288                                         lstr += "-- JOB TERMINATED\n";
01289                                         logger.write(lstr);
01290                                         // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01291                                         num_processing--;
01292                                         reporter.jobs().add_report((*ji)->report());
01293                                 }
01294                         }
01295 
01296                         // Remove jobs that could not start from active duty
01297                         if ((*ji)->status() == job_archiver::status_retry_later) {
01298                                 (*ji)->clear();
01299                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01300                                 num_processing--;
01301                         }
01302 
01303                         // If a job exited with an error, and the vault is full, then reschedule
01304                         // the job for later
01305                         if (
01306                                         ((*ji)->status() == job_archiver::status_error)
01307                                 &&
01308                                         overflow_detected
01309                                 )
01310                         {
01311                                 lstr = "Vault overflow detected, will retry job later\n";
01312                                 logger.write(lstr);
01313                                 (*ji)->clear();
01314                                 num_processing--;
01315                         }
01316 
01317                         // Remove completed jobs from the processing list
01318                         if (
01319                                 ((*ji)->status() == job_archiver::status_completed)
01320                                 || ((*ji)->status() == job_archiver::status_fatal_error)
01321                                 || ((*ji)->status() == job_archiver::status_error)
01322                                 ) {
01323                                 (*ji)->end();
01324                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01325                                 num_processing--;
01326                                 num_completed++;
01327 
01328                                 // logger.write("Adding job report to report manager\n");
01329                                 reporter.jobs().add_report((*ji)->report());
01330                         }
01331                 }
01332 
01333                 mf_log_status();
01334                 sleep(1);
01335 
01336                 // logger.write("[DEBUG]: ---[ BOTTOM OF LOOP ]---\n");
01337         }
01338 
01339         t.stop();
01340         lstr = "Archive Manager - Finished archiving, duration: ";
01341         lstr += t.duration();
01342         lstr += "\n";
01343         logger.write(lstr);
01344 
01345         lstr = "Archive Manager - Finalizing archive path\n";
01346         logger.write(lstr);
01347         TRY(
01348                 rename_file(working_archive_path(), archive_path()),
01349                 "Cannot finalize archive"
01350                 );
01351 
01352         reporter.vault().add_report(
01353                 vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault()))
01354                 );
01355 }
01356 
01357 /** Return an absolute path to the finished archive directory */
01358 const std::string archive_manager::archive_path(void) const
01359 {
01360         estring path;
01361 
01362         if (!initialized())
01363                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01364 
01365         path = vaulter.vault();
01366         path += "/";
01367         path += config.timestamp().str();
01368 
01369         return(path);
01370 }
01371 
01372 /** Return the absolute path to the unfinished working archive directory */
01373 const std::string archive_manager::working_archive_path(void) const
01374 {
01375         estring path;
01376 
01377         if (!initialized())
01378                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01379         
01380         path = archive_path();
01381         path += ".incomplete";
01382 
01383         return(path);
01384 }
01385 
01386 //-----------------------------------------------------------------------------
01387 
01388 /** The global archive manager */
01389 archive_manager archiver;
01390 

Generated on Mon Jul 12 12:02:41 2004 for rvm by doxygen 1.3.6