archiver.cc

Go to the documentation of this file.
00001 #include "config.h"
00002 
00003 #ifdef HAVE_UNISTD_H
00004 #include <unistd.h>
00005 #endif
00006 
00007 #include <ctype.h>
00008 
00009 #include <iostream>
00010 #include <string>
00011 #include <vector>
00012 
00013 #include "asserts.h"
00014 #include "error.h"
00015 #include "estring.h"
00016 #include "fs.h"
00017 #include "rconfig.h"
00018 #include "timer.h"
00019 #include "logger.h"
00020 
00021 #include "archiver.h"
00022 
00023 //-----------------------------------------------------------------------------
00024 
00025 /** C'tor */
00026 rstat::rstat()
00027 {
00028         TRY_nomem(m_exit_str[0] = "Success");
00029         TRY_nomem(m_exit_str[1] = "Syntax or usage error");
00030         TRY_nomem(m_exit_str[2] = "Protocol incompatability error");
00031         TRY_nomem(m_exit_str[3] = "Errors selecting I/O files or directories");
00032         TRY_nomem(m_exit_str[4] = "Requested action not supported");
00033         TRY_nomem(m_exit_str[10] = "Error in socket I/O");
00034         TRY_nomem(m_exit_str[11] = "Error in file I/O");
00035         TRY_nomem(m_exit_str[12] = "Error in rsync protocol data stream");
00036         TRY_nomem(m_exit_str[13] = "Errors with program diagnostics");
00037         TRY_nomem(m_exit_str[14] = "Error in IPC code");
00038         TRY_nomem(m_exit_str[20] = "Received SIGUSR1 or SIGINT");
00039         TRY_nomem(m_exit_str[21] = "Some error returned by waitpid()");
00040         TRY_nomem(m_exit_str[22] = "Error allocating core memory buffers");
00041         TRY_nomem(m_exit_str[23] = "Partial transfer");
00042         TRY_nomem(m_exit_str[30] = "Timeout in data send/receive");
00043         TRY_nomem(m_exit_str[124] = "The command executed by SSH exited with status 255");
00044         TRY_nomem(m_exit_str[125] = "The command executed by SSH was killed by a signal");
00045         TRY_nomem(m_exit_str[126] = "The command given to SSH cannot be run");
00046         TRY_nomem(m_exit_str[127] = "The command given to SSH cannot be found");
00047 
00048         TRY_nomem(m_signal_str[1] = "[HUP]: Hangup");
00049         TRY_nomem(m_signal_str[2] = "[INT]: Interrupt ");
00050         TRY_nomem(m_signal_str[3] = "[QUIT]: Quit");
00051         TRY_nomem(m_signal_str[4] = "[ILL]: Illegal instruction");
00052         TRY_nomem(m_signal_str[5] = "[TRAP]: Trace trap");
00053         TRY_nomem(m_signal_str[6] = "[IOT]: IOT instruction or hardware fault");
00054         TRY_nomem(m_signal_str[7] = "[ABRT]: Abnormal termination");
00055         TRY_nomem(m_signal_str[8] = "[EMT]: EMT instruction or hardware fault");
00056         TRY_nomem(m_signal_str[9] = "[FPE]: Floating point exception");
00057         TRY_nomem(m_signal_str[10] = "[KILL]: Killed");
00058         TRY_nomem(m_signal_str[11] = "[BUS]: Bus error");
00059         TRY_nomem(m_signal_str[12] = "[SEGV]: Segmentation fault");
00060         TRY_nomem(m_signal_str[13] = "[SYS]: Invalid system call or invalid argument to system call");
00061         TRY_nomem(m_signal_str[14] = "[PIPE]: Write to pipe with no readers");
00062         TRY_nomem(m_signal_str[15] = "[ALRM]: Alarm");
00063         TRY_nomem(m_signal_str[16] = "[TERM]: Software termination");
00064         TRY_nomem(m_signal_str[17] = "[USR1]: User-defined signal 1");
00065         TRY_nomem(m_signal_str[18] = "[USR2]: User-defined signal 2");
00066         TRY_nomem(m_signal_str[19] = "[CLD]: Child status change");
00067         TRY_nomem(m_signal_str[20] = "[PWR]: Power fail/restart");
00068         TRY_nomem(m_signal_str[21] = "[WINCH]: Terminal window size change");
00069         TRY_nomem(m_signal_str[22] = "[URG]: Urgent condition");
00070         TRY_nomem(m_signal_str[23] = "[POLL]: Pollable event or socket I/O");
00071         TRY_nomem(m_signal_str[24] = "[STOP]: Stop");
00072         TRY_nomem(m_signal_str[25] = "[TSTP]: Terminal stop character");
00073         TRY_nomem(m_signal_str[26] = "[CONT]: Continue stopped process");
00074         TRY_nomem(m_signal_str[27] = "[TTIN]: Background tty read");
00075         TRY_nomem(m_signal_str[28] = "[TTOU]: Background tty write");
00076         TRY_nomem(m_signal_str[29] = "[VTALRM]: Virtual timer expired");
00077         TRY_nomem(m_signal_str[30] = "[PROF]: Profiling timer expired");
00078         TRY_nomem(m_signal_str[31] = "[XCPU]: Exceeded CPU limit");
00079         TRY_nomem(m_signal_str[32] = "[XFSZ]: Exceeded file size limit");
00080         TRY_nomem(m_signal_str[33] = "[WAITING]: Process' LWPs are blocked");
00081         TRY_nomem(m_signal_str[34] = "[LWP]: Special thread library signal");
00082         TRY_nomem(m_signal_str[35] = "[FREEZE]: Special signal used by CPR");
00083         TRY_nomem(m_signal_str[36] = "[THAW]: Special signal used by CPR");
00084         TRY_nomem(m_signal_str[37] = "[CANCEL]: Thread cancellation");
00085         TRY_nomem(m_signal_str[38] = "[LOST]: Resource lost");
00086         TRY_nomem(m_signal_str[39] = "[RTMIN]: Highest priority real-time signal");
00087         TRY_nomem(m_signal_str[46] = "[RTMAX]: Lowest priority real-time signal");
00088 
00089         TRY_nomem(m_unknown_exit = "(Unknown exit code)");
00090         TRY_nomem(m_unknown_signal = "(Unknown signal)");
00091 }
00092 
00093 /** Get a verbose string for an exit code */
00094 const std::string& rstat::exit(const int a_int) const
00095 {
00096         if (m_exit_str.find(a_int) != m_exit_str.end()) {
00097                 return(m_exit_str.find(a_int)->second);
00098         }
00099         return(m_unknown_exit);
00100 }
00101 
00102 /** Get a verbose string for a signal number */
00103 const std::string& rstat::signal(const int a_int) const
00104 {
00105         if (m_signal_str.find(a_int) != m_signal_str.end()) {
00106                 return(m_signal_str.find(a_int)->second);
00107         }
00108         return(m_unknown_signal);
00109 }
00110 
00111 class rstat rsync_estat_str;
00112 
00113 //-----------------------------------------------------------------------------
00114 
00115 /** C'tor
00116 
00117         Set a job to be assiciated with this job archiver and initialize it's
00118         processing status to "pending".
00119 
00120  */
00121 job_archiver::job_archiver(const job * a_job)
00122 {
00123         clear();
00124         m_job = a_job;
00125         m_status = status_pending;
00126 }
00127 
00128 /** Generate a job prefix string
00129 
00130         Create a string to uniquely identify this job to be used in the log file to
00131         uniquely identify this job
00132 
00133  */
00134 const std::string job_archiver::prefix(void)
00135 {
00136         estring lstr;
00137 
00138         lstr = "[Job:";
00139         lstr += estring((void*)m_job);
00140         lstr += "] ";
00141 
00142         return(lstr);
00143 }
00144 
00145 /** Generate a job id string */
00146 const std::string job_archiver::id(void)
00147 {
00148         estring lstr;
00149 
00150         lstr = prefix();
00151         lstr += " ";
00152         lstr += m_job->generate_job_id();
00153 
00154         return(lstr);
00155 }
00156 
00157 /** Clear the job archiver and return it to it's initial state
00158         
00159         End any processes handling this job and return the job archiver to it's
00160         "pending" state.
00161 
00162  */
00163 void job_archiver::clear(void)
00164 {
00165         end();
00166         m_child_pid = m_exec.my_pid();
00167         m_status = status_pending;
00168         m_success = true;
00169         m_jr.clear();
00170         m_jpr.clear();
00171         m_error_msg.erase();
00172 }
00173 
00174 /** End any processes handling this job
00175 
00176         If any child processes are handling this job, terminate them.  Erase any
00177         pending I/O for the now defunct child.  Set our processing status to "done".
00178 
00179 */
00180 void job_archiver::end(void)
00181 {
00182         estring lstr;
00183 
00184         m_timer.stop();
00185         if (m_exec.child_running()) {
00186                 lstr = prefix();
00187                 lstr += "Terminating child process!\n";
00188                 logger.write(lstr,0,configuration_manager::logging_manager,m_child_pid);
00189                 m_exec.kill_child();
00190         }
00191         m_exec.clear();
00192         m_io_out.erase();
00193         m_io_err.erase();
00194         m_status = status_done;
00195 }
00196 
00197 /** Return the processing status of this job archiver */
00198 const job_archiver::archiving_status job_archiver::status(void)
00199 {
00200         return(m_status);
00201 }
00202 
00203 /** Begin processing
00204 
00205         Attempt to fork a child process to handle this job.  If unsuccessful then
00206         retry again later.  The child then calls mf_do_chores() to handle the actual
00207         processing, while the parent updates the job archiver's status from
00208         "pending" to "processing" and begins a timer to measure the duration of the
00209         job process.
00210         
00211 */
00212 void job_archiver::start(void)
00213 {
00214         estring lstr;
00215 
00216         m_jr.id(m_job->generate_job_id());
00217 
00218         try {
00219                 m_exec.fork();
00220         }
00221         catch(error e) {
00222                 lstr = prefix();
00223                 lstr += "Could not fork:\n";
00224                 logger.write(lstr);
00225 
00226                 lstr = e.str(prefix());
00227                 logger.write(lstr);
00228 
00229                 lstr = prefix();
00230                 lstr += "Will retry job later\n";
00231                 logger.write(lstr);
00232 
00233                 m_status = status_retry_later;
00234         }
00235         catch(...) {
00236                 error e = err_unknown;
00237 
00238                 lstr = prefix();
00239                 lstr += "Could not fork:\n";
00240                 logger.write(lstr);
00241 
00242                 lstr = e.str(prefix());
00243                 logger.write(lstr);
00244 
00245                 lstr = prefix();
00246                 lstr += "Will retry job later\n";
00247                 logger.write(lstr);
00248 
00249                 m_status = status_retry_later;
00250         }
00251 
00252         if (m_exec.is_child()) {
00253                 // wait_for_debugger = true;
00254 
00255                 // while (wait_for_debugger);
00256 
00257                 m_exec.reroute_stdio();
00258                 try {
00259                         mf_do_chores();
00260                 }
00261                 catch(error e) {
00262                         std::cerr << e;
00263                         m_success = false;
00264                 }
00265                 catch(...) {
00266                         std::cerr << err_unknown;
00267                         m_success = false;
00268                 }
00269                 if (m_success)
00270                         m_exec.exit(0);
00271                 else
00272                         m_exec.exit(1);
00273         }
00274 
00275         m_child_pid = m_exec.child_pid();
00276 
00277         lstr = prefix();
00278         lstr += "Spawning child process: PID ";
00279         lstr += estring(static_cast<unsigned long>(m_exec.child_pid()));
00280         lstr += "\n";
00281         logger.write(lstr);
00282 
00283         m_status = status_processing;
00284         m_timer.start();
00285 }
00286 
00287 /** Parent processor for a job
00288 
00289         Check for I/O from the child.  Check the child's status to see if it's still
00290         running, has exited with an exit code, or has exited from a signal.  If the
00291         child sis not exit normally (i.e. exit from a signal or exit with a non-zero
00292         exit code) then check the vault for overflow.  If the vault has exceeded
00293         it's overflow threshold then that could be the cause for the child's
00294         failure, in which case we reschedule the child to be processed again later.
00295 
00296         If the job is finished (whether successful or not), update the job
00297         archiver's status to "completed".
00298         
00299  */
00300 void job_archiver::process(void)
00301 {
00302         estring lstr;
00303         
00304         if (m_exec.child_running()) {
00305                 // Process child I/O
00306                 mf_process_child_io(false);
00307         }
00308         else {
00309                 // Process remaining child I/O
00310                 mf_process_child_io(true);
00311 
00312                 // If child exited with an error, check vault overflow.  If the vault is
00313                 // filling up, then reschedule the job for later retry.
00314                 lstr = prefix();
00315                 if (m_exec.child_signaled()) {
00316                         lstr += "Child exited from signal: ";
00317                         lstr += estring(m_exec.child_signal_no());
00318                         lstr += "\n";
00319                 }
00320                 else if (m_exec.child_exit_code() != 0) {
00321                         lstr += "Child exited abnormally with code: ";
00322                         lstr += estring(m_exec.child_exit_code());
00323                         lstr += "\n";
00324                 }
00325                 else {
00326                         lstr += "Child exited successfully\n";
00327                         m_status = status_completed;
00328                 }
00329                 logger.write(lstr);
00330 
00331                 if (m_exec.child_signaled() || !m_exec.child_exited_normally()) {
00332                         /*
00333                         if (vaulter.overflow()) {
00334                                 lstr = prefix();
00335                                 lstr += "Vault overflow detected, will retry later\n";
00336                                 logger.write(lstr);
00337                                 m_status = status_retry_later;
00338                         }
00339                         else {
00340                                 m_status = status_error;
00341                         }
00342                         */
00343                         m_status = status_error;
00344                 }
00345                 else {
00346                         m_status = status_completed;
00347                 }
00348 
00349                 m_timer.stop();
00350                 lstr = prefix();
00351                 lstr += "Finished, duration: ";
00352                 lstr += m_timer.duration();
00353                 lstr += "\n";
00354                 logger.write(lstr);
00355                 // m_status = status_completed;
00356         }
00357 }
00358 
00359 /** Return the job report for this job */
00360 single_job_report job_archiver::report(void) const
00361 {
00362         return(m_jr);
00363 }
00364 
00365 /** Child processor for a job
00366 
00367         For each path in this job:
00368         - Create the directory heiararchy for this job in the archive
00369         - Until done or until out of retrys:
00370                 - Choose a hardlink source, if applicable and available
00371                 - Construct the command line to pass to rsync
00372                 - Spawn rsync
00373                 - Process I/O sent back from rsync
00374                 - Process exit code or signal number returned from rsync
00375         - Generate and submit a report to the report manager
00376         
00377  */
00378 void job_archiver::mf_do_chores(void)
00379 {
00380         /*
00381         {
00382                 bool wait_for_debugger = true;
00383 
00384                 std::cerr << "Waiting for debugger to attach..." << std::endl;
00385                 while (wait_for_debugger);
00386                 std::cerr << "Debugger attached." << std::endl;
00387         }
00388         */
00389 
00390         job::paths_type::const_iterator pi;
00391 
00392         for (pi = m_job->paths.begin(); pi != m_job->paths.end(); ++pi) {
00393                 estring archive_dir;
00394                 estring path;
00395                 estring command_line;
00396                 bool hardlink = false;
00397                 int num_retries = 0;
00398                 bool done = false;
00399                 int exit_code = 0;
00400                 int signal_num = 0;
00401                 timer t;
00402                 uint64 files_total = 0;
00403                 uint64 files_xferd = 0;
00404                 uint64 size_total = 0;
00405                 uint64 size_xferd = 0;
00406                 bool overflow_detected = 0;
00407                 estring error_msg;
00408 
00409                 archive_dir = m_job->generate_archive_path(*pi);
00410                 // If archive_dir does not end with a '/', strip off characters until it
00411                 // does.
00412                 /*
00413                 while ((archive_dir.size() > 0) 
00414                         && (archive_dir[archive_dir.size()-1] != '/'))
00415                 {
00416                         archive_dir.erase(archive_dir.size()-1);
00417                 }
00418                 */
00419 
00420                 path = archiver.working_archive_path();
00421                 path += "/";
00422                 path += archive_dir;
00423                 path = reform_path(path);
00424                 if (!exists(path)) {
00425                         std::cout << "Creating job archive path: " << archive_dir << std::endl;
00426                         mk_dirhier(path);
00427                 }
00428                 else
00429                         std::cout << "Archiving to existing path: " << archive_dir << std::endl;
00430 
00431                 if (!writable(path)) {
00432                         std::string es;
00433 
00434                         TRY_nomem(es = "Cannot write to archive directory: \"");
00435                         TRY_nomem(es += archive_dir);
00436                         TRY_nomem(es += "\"");
00437 
00438                         throw(ERROR(EACCES,es));
00439                 }
00440 
00441                 logger.set_error_logging(false);
00442                 hardlink = m_job->rsync_hardlink;
00443                 while ((num_retries < m_job->rsync_retry_count) && !done) {
00444                         execute exec;
00445                         job::excludes_type::const_iterator ei;
00446                         job::includes_type::const_iterator ii;
00447 
00448                         exit_code = 0;
00449                         signal_num = 0;
00450 
00451                         command_line = config.rsync_local_path();
00452                         command_line += " ";
00453                         command_line += m_job->rsync_options;
00454 
00455                         if (m_job->rsync_connection != job::connection_local) {
00456                                 command_line += " ";
00457                                 command_line += " --rsync-path=";
00458                                 if (m_job->rsync_remote_path.size() != 0)
00459                                         command_line += m_job->rsync_remote_path;
00460                                 else
00461                                         command_line += config.rsync_local_path();
00462                         }
00463 
00464                         if (hardlink) {
00465                                 subdirectory subdir;
00466                                 std::string youngest;
00467                                 std::string relative_path;
00468 
00469                                 subdir.path(vaulter.vault());
00470                                 if (subdir.size() > 0) {
00471                                         subdirectory::const_iterator si;
00472 
00473                                         sort(subdir.begin(), subdir.end());
00474                                         reverse(subdir.begin(), subdir.end());
00475                                         for (si = subdir.begin(); si != subdir.end(); ++si) {
00476                                                 estring ypath;
00477 
00478                                                 if (!is_timestamp(*si))
00479                                                         continue;
00480                                                 if (*si == config.timestamp().str())
00481                                                         continue;
00482                                                 std::cout 
00483                                                         << "Considering potential hardlink source: "
00484                                                         << *si
00485                                                         << std::endl;
00486                                                 ypath = vaulter.vault();
00487                                                 ypath += "/";
00488                                                 ypath += *si;
00489                                                 ypath += "/";
00490                                                 ypath += archive_dir;
00491                                                 if (exists(ypath)) {
00492                                                         std::cout 
00493                                                                 << "Using archive as hardlink source: " 
00494                                                                 << *si
00495                                                                 << std::endl;
00496                                                         youngest = ypath;
00497                                                         break;
00498                                                 }
00499                                                 else {
00500                                                         std::cout
00501                                                                 << "- No such path: " 
00502                                                                 << ypath
00503                                                                 << std::endl;
00504                                                 }
00505                                         }
00506                                 }
00507                                 if (youngest.size() > 0) {
00508                                         relative_path = mk_relative_path(youngest,path);
00509                                         command_line += " --hard-links --link-dest=";
00510                                         command_line += relative_path;
00511                                 }
00512                         }
00513 
00514                         for (
00515                                 ei = m_job->excludes.begin();
00516                                 ei != m_job->excludes.end();
00517                                 ++ei
00518                                 )
00519                         {
00520                                 command_line += " --exclude-from=";
00521                                 command_line += *ei;
00522                         }
00523 
00524                         for (
00525                                 ii = m_job->includes.begin();
00526                                 ii != m_job->includes.end();
00527                                 ++ii
00528                                 )
00529                         {
00530                                 command_line += " --include-from=";
00531                                 command_line += *ii;
00532                         }
00533 
00534                         command_line += " ";
00535                         command_line += m_job->generate_source_path(*pi);
00536                         
00537                         command_line += " ";
00538                         command_line += path;
00539         
00540                         std::cout << "Command Line: " << command_line << std::endl;
00541 
00542                         m_error_msg.erase();
00543 
00544                         t.start();
00545                         exec.exec(command_line);
00546                         mf_process_rsync_io(
00547                                 exec, 
00548                                 m_job->rsync_timeout,
00549                                 files_total,
00550                                 files_xferd,
00551                                 size_total,
00552                                 size_xferd,
00553                                 overflow_detected
00554                                 );
00555                         t.stop();
00556 
00557                         signal_num = 0;
00558                         if (exec.child_signaled()) {
00559                                 std::cout 
00560                                         << "Rsync caught signal: [" 
00561                                         << exec.child_signal_no()
00562                                         << "] "
00563                                         << rsync_estat_str.signal(exec.child_signal_no())
00564                                         << std::endl;
00565                                 signal_num = exec.child_signal_no();
00566                         }
00567                         std::cout
00568                                 << "Rsync exit code: ["
00569                                 << exec.child_exit_code()
00570                                 << "] "
00571                                 << rsync_estat_str.exit(exec.child_exit_code())
00572                                 << std::endl;
00573 
00574                         exit_code = exec.child_exit_code();
00575                         if (exec.child_exited_normally() && (exit_code == 0))
00576                                 done = true;
00577                         else if (overflow_detected) {
00578                                 std::cout 
00579                                         << "Vault overflow detected"
00580                                         << std::endl;
00581                                 break;
00582                         }
00583                         else {
00584                                 ++num_retries;
00585                                 logger.set_error_logging(true);
00586                         }
00587 
00588                         if (m_job->rsync_behavior[exit_code] == rsync_behavior::fail)
00589                         {
00590                                 std::cout << "Failing, will not attempt to retry" << std::endl;
00591                                 break;
00592                         }
00593                         if (m_job->rsync_behavior[exit_code] 
00594                                 == rsync_behavior::retry_without_hardlinks)
00595                         {
00596                                 std::cout << "Retrying without hardlinks..." << std::endl;
00597                                 hardlink = false;
00598                         }
00599                 }
00600                 if (!done) {
00601                         if (num_retries >= m_job->rsync_retry_count) {
00602                                 std::cout << "Retry count exceeded" << std::endl;
00603                         }
00604                         if (m_job->rsync_behavior[exit_code] == rsync_behavior::ok)
00605                                 std::cout << "Ignoring rsync failure" << std::endl;
00606                         else {
00607                                 std::cout << "Giving up on this path" << std::endl;
00608                                 m_success = false;
00609                         }
00610                 }
00611                 reportio().write_report(
00612                         m_job->generate_source_path(*pi),
00613                         t,
00614                         exit_code,
00615                         signal_num,
00616                         m_error_msg
00617                         );
00618         }
00619 }
00620 
00621 void job_archiver::mf_process_report(const std::string& a_str)
00622 {
00623         if (reportio().is_report(a_str)) {
00624                 m_jpr = reportio().parse(a_str);
00625                 m_jr.add_report(m_jpr);
00626         }
00627 }
00628 
00629 /** Process I/O from the child
00630 
00631         While there is I/O to be read, read and parse it.  When the end of a line is
00632         reached write that line to the log file.  If a_finalize is true, the flush
00633         the child I/O buffer string.
00634         
00635  */
00636 void job_archiver::mf_process_child_io(bool a_finalize)
00637 {
00638         estring lstr;
00639         bool io_flag = false;
00640 
00641         while (!m_exec.out_eof() && (a_finalize || m_exec.out_ready())) {
00642                 int r;
00643                 const int buffer_size = 128;
00644                 char buffer[buffer_size] = { 0 };
00645 
00646                 r = m_exec.out_read(buffer, buffer_size);
00647                 if (r > 0) {
00648                         int c;
00649 
00650                         io_flag = true;
00651                         for (c = 0; c < r; ++c) {
00652                                 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00653                                         lstr = prefix();
00654                                         lstr += m_io_out;
00655                                         lstr += "\n";
00656                                         logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00657                                         mf_process_report(lstr);
00658                                         m_io_out.erase();
00659                                 }
00660                                 else {
00661                                         m_io_out += buffer[c];
00662                                 }
00663                         }
00664                 }
00665         }
00666         if (a_finalize && (m_io_out.size() > 0)) {
00667                 lstr = prefix();
00668                 lstr += m_io_out;
00669                 lstr += "\n";
00670                 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00671                 mf_process_report(lstr);
00672                 m_io_out.erase();
00673         }
00674 
00675         while (!m_exec.err_eof() && (a_finalize || m_exec.err_ready())) {
00676                 int r;
00677                 const int buffer_size = 128;
00678                 char buffer[buffer_size] = { 0 };
00679 
00680                 r = m_exec.err_read(buffer, buffer_size);
00681                 if (r > 0) {
00682                         int c;
00683 
00684                         io_flag = true;
00685                         for (c = 0; c < r; ++c) {
00686                                 if ((buffer[c] == '\r') || (buffer[c] == '\n')) {
00687                                         lstr = prefix();
00688                                         lstr += m_io_err;
00689                                         lstr += "\n";
00690                                         logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00691                                         mf_process_report(lstr);
00692                                         m_io_err.erase();
00693                                 }
00694                                 else {
00695                                         m_io_err += buffer[c];
00696                                 }
00697                         }
00698                 }
00699         }
00700         if (a_finalize && (m_io_err.size() > 0)) {
00701                 lstr = prefix();
00702                 lstr += m_io_err;
00703                 lstr += "\n";
00704                 logger.write(lstr,0,configuration_manager::logging_rsync,m_child_pid);
00705                 mf_process_report(lstr);
00706                 m_io_err.erase();
00707         }
00708         if (!io_flag)
00709                 sleep(config.io_poll_interval());
00710 }
00711 
00712 /** Trim off all non-digit leading and trailing characters from a string */
00713 void job_archiver::mf_trim_string(std::string& a_str)
00714 {
00715         while ((a_str.size() > 0) && (!isdigit(a_str[0])))
00716                 a_str.erase(0,1);
00717         while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
00718                 a_str.erase(a_str.size()-1,1);
00719 }
00720 
00721 /** Parse I/O from rsync
00722 
00723         Search for special output from rsync to tell us something about the number
00724         and size of files and files transfered.
00725 
00726  */
00727 void job_archiver::mf_parse_rsync_io(
00728         const std::string a_str,
00729         uint64& a_files_total,
00730         uint64& a_files_xferd,
00731         uint64& a_size_total,
00732         uint64& a_size_xferd
00733         )
00734 {
00735         estring estr;
00736 
00737         if (a_str.find("Number of files: ") == 0) {
00738                 estr = a_str;
00739                 mf_trim_string(estr);
00740                 try {
00741                         a_files_total = estr;
00742                 }
00743                 catch(error e) {
00744                         estring es;
00745 
00746                         es = "Could not parse total number of files processed by rsync";
00747                         e.push_back(ERROR_INSTANCE(es));
00748 
00749                         // Not sure this is the best way to handle this...
00750                         std::cerr << e;
00751                 }
00752                 catch(...) {
00753                         error e = err_unknown;
00754                         estring es;
00755 
00756                         es = "Could not parse total number of files processed by rsync";
00757                         e.push_back(ERROR_INSTANCE(es));
00758 
00759                         // Not sure this is the best way to handle this...
00760                         std::cerr << e;
00761                 }
00762         }
00763         else if (a_str.find("Number of files transferred: ") == 0) {
00764                 estr = a_str;
00765                 mf_trim_string(estr);
00766                 try {
00767                         a_files_xferd = estr;
00768                 }
00769                 catch(error e) {
00770                         estring es;
00771 
00772                         es = "Could not parse total number of files transferred by rsync";
00773                         e.push_back(ERROR_INSTANCE(es));
00774 
00775                         // Not sure this is the best way to handle this...
00776                         std::cerr << e;
00777                 }
00778                 catch(...) {
00779                         error e = err_unknown;
00780                         estring es;
00781 
00782                         es = "Could not parse total number of files transferred by rsync";
00783                         e.push_back(ERROR_INSTANCE(es));
00784 
00785                         // Not sure this is the best way to handle this...
00786                         std::cerr << e;
00787                 }
00788         }
00789         else if (a_str.find("Total file size: ") == 0) {
00790                 estr = a_str;
00791                 mf_trim_string(estr);
00792                 try {
00793                         a_size_total = estr;
00794                 }
00795                 catch(error e) {
00796                         estring es;
00797 
00798                         es = "Could not parse total size of files processed by rsync";
00799                         e.push_back(ERROR_INSTANCE(es));
00800 
00801                         // Not sure this is the best way to handle this...
00802                         std::cerr << e;
00803                 }
00804                 catch(...) {
00805                         error e = err_unknown;
00806                         estring es;
00807 
00808                         es = "Could not parse total size of files processed by rsync";
00809                         e.push_back(ERROR_INSTANCE(es));
00810 
00811                         // Not sure this is the best way to handle this...
00812                         std::cerr << e;
00813                 }
00814         }
00815         else if (a_str.find("Total transferred file size: ") == 0) {
00816                 estr = a_str;
00817                 mf_trim_string(estr);
00818                 try {
00819                         a_size_xferd = estr;
00820                 }
00821                 catch(error e) {
00822                         estring es;
00823 
00824                         es = "Could not parse total size of files transferred by rsync";
00825                         e.push_back(ERROR_INSTANCE(es));
00826 
00827                         // Not sure this is the best way to handle this...
00828                         std::cerr << e;
00829                 }
00830                 catch(...) {
00831                         error e = err_unknown;
00832                         estring es;
00833 
00834                         es = "Could not parse total size of files transferred by rsync";
00835                         e.push_back(ERROR_INSTANCE(es));
00836 
00837                         // Not sure this is the best way to handle this...
00838                         std::cerr << e;
00839                 }
00840         }
00841 }
00842 
00843 /** Process I/O from rsync
00844 
00845         If there is I/O from rsync to be read, read it and then send it through the
00846         parser.
00847 
00848  */
00849 void job_archiver::mf_process_rsync_io(
00850         execute& a_exec, 
00851         uint16 a_timeout,
00852         uint64& a_files_total,
00853         uint64& a_files_xferd,
00854         uint64& a_size_total,
00855         uint64& a_size_xferd,
00856         bool& a_overflow_detected
00857         )
00858 {
00859         size_t ro;
00860         size_t re;
00861         estring out;
00862         estring err;
00863         timer t;
00864         bool io_flag;
00865         char buffer[1024] = { 0 };
00866         char *ptr;
00867 
00868         ro = 1;
00869         re = 1;
00870         t.start();
00871         while ((ro != 0) || (re != 0) || a_exec.child_running()) {
00872                 io_flag = false;
00873                 ro = 0;
00874                 re = 0;
00875 
00876                 if (!a_overflow_detected) {
00877                         a_overflow_detected = vaulter.overflow();
00878                 }
00879 
00880                 m_error_msg.erase();
00881 
00882                 if (a_exec.out_ready()) {
00883                         ro = read(a_exec.out_fd(), buffer, 1024);
00884                         if (ro > 0) {
00885                                 io_flag = true;
00886                                 t.start();
00887                                 for (ptr = buffer; ptr != buffer+ro; ++ptr) {
00888                                         if ((*ptr != '\r') && (*ptr != '\n')) {
00889                                                 out += *ptr;
00890                                         }
00891                                         else {
00892                                                 reportio().write_rsync_out(out);
00893                                                 out.erase();
00894                                         }
00895                                 }
00896                         }
00897                 }
00898 
00899                 if (a_exec.err_ready()) {
00900                         re = read(a_exec.err_fd(), buffer, 1024);
00901                         if (re > 0) {
00902                                 io_flag = true;
00903                                 t.start();
00904                                 for (ptr = buffer; ptr != buffer+re; ++ptr) {
00905                                         if ((*ptr != '\r') && (*ptr != '\n')) {
00906                                                 err += *ptr;
00907                                         }
00908                                         else {
00909                                                 reportio().write_rsync_err(err);
00910                                                 err.erase();
00911                                         }
00912                                 }
00913                         }
00914                 }
00915 
00916                 t.stop();
00917                 if (t.duration_secs() > a_timeout) {
00918                         std::cerr << "*** Rsync program inactivity timeout" << std::endl;
00919                         a_exec.kill_child();
00920                         TRY_nomem(m_error_msg = "Rsync inactivity timeout");
00921                 }
00922 
00923                 if (!io_flag)
00924                         sleep(config.io_poll_interval());
00925 
00926         }
00927         if (out.size() > 0) {
00928                 std::cerr << out << std::endl;
00929                 mf_parse_rsync_io(
00930                         out, 
00931                         a_files_total, 
00932                         a_files_xferd, 
00933                         a_size_total,
00934                         a_size_xferd
00935                 );
00936                 out.erase();
00937         }
00938         if (err.size() > 0) {
00939                 std::cerr << err << std::endl;
00940                 mf_parse_rsync_io(
00941                         out, 
00942                         a_files_total, 
00943                         a_files_xferd, 
00944                         a_size_total,
00945                         a_size_xferd
00946                 );
00947                 err.erase();
00948         }
00949 }
00950 
00951 //-----------------------------------------------------------------------------
00952 
00953 /** C'tor */
00954 archive_manager::archive_manager()
00955 {
00956         if (this != &archiver)
00957                 throw(INTERNAL_ERROR(0,"Attempt to alocate multiple archive managers"));
00958 
00959         clear();
00960 }
00961 
00962 /** Clear the archive manager and clear the job list */
00963 void archive_manager::clear(void)
00964 {
00965         m_jobs.clear();
00966         m_initialized = false;
00967 }
00968 
00969 /** Initialize the archive manager
00970 
00971         Log the archive timestamp, select and prepare a vault.
00972 
00973  */
00974 void archive_manager::init(void)
00975 {
00976         timer t;
00977         estring lstr;
00978 
00979         lstr = "Archive Manager - Beginning initialization\n";
00980         logger.write(lstr);
00981         t.start();
00982 
00983         lstr = "Timestamp: ";
00984         lstr += config.timestamp().str();
00985         lstr += "\n";
00986         logger.write(lstr);
00987 
00988         // Select a vault?
00989         vaulter.select();
00990         lstr = "Vault selected: ";
00991         lstr += vaulter.vault();
00992         lstr += "\n";
00993         logger.write(lstr);
00994 
00995         reporter.vault().add_report(
00996                 vault_stats_report(estring("Initial Stats:"),filesystem(vaulter.vault()))
00997                 );
00998 
00999         // Prepare the vault?
01000         vaulter.prepare();
01001 
01002         t.stop();
01003         lstr = "Archive Manager - Finished initialization";
01004         lstr += ", duration: ";
01005         lstr += t.duration();
01006         lstr += "\n";
01007         logger.write(lstr);
01008 
01009         m_initialized = true;
01010 }
01011 
01012 /** Return the initialized status of the archive manager */
01013 const bool archive_manager::initialized(void) const
01014 {
01015         return(m_initialized);
01016 }
01017 
01018 /** Give a status report
01019 
01020         After so many minutes of inactivity write a report to the log file of our
01021         current status of affairs.
01022 
01023  */
01024 void archive_manager::mf_log_status(void)
01025 {
01026         static timer t;
01027         const timer::value_type timeout = 30;
01028         estring lstr;
01029         std::vector<job_archiver*>::const_iterator ji;
01030         uint64 jobs_pending = 0;
01031         uint64 jobs_processing = 0;
01032         uint64 jobs_completed =0;
01033         uint64 jobs_remaining = 0;
01034 
01035         if (!t.is_started())
01036                 t.start();
01037         
01038         t.stop();
01039         if (t.duration_mins() < timeout)
01040                 return;
01041         
01042         lstr  = "\n";
01043         lstr += "STATUS REPORT:\n";
01044         lstr += "================================================================\n";
01045         logger.write(lstr);
01046         for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji) {
01047                 lstr = "[";
01048                 switch ((*ji)->status()) {
01049                         case job_archiver::status_pending:
01050                                 lstr += "Pending    ";
01051                                 ++jobs_pending;
01052                                 break;
01053                         case job_archiver::status_processing:
01054                                 lstr += "Processing ";
01055                                 ++jobs_processing;
01056                                 break;
01057                         case job_archiver::status_retry_later:
01058                                 lstr += "Reschedule ";
01059                                 ++jobs_pending;
01060                                 break;
01061                         case job_archiver::status_fatal_error:
01062                                 lstr += "Fatal Error";
01063                                 ++jobs_completed;
01064                                 break;
01065                         case job_archiver::status_error:
01066                                 lstr += "Error      ";
01067                                 ++jobs_completed;
01068                                 break;
01069                         case job_archiver::status_completed:
01070                                 lstr += "Completed  ";
01071                                 ++jobs_completed;
01072                                 break;
01073                         case job_archiver::status_done:
01074                                 lstr += "Done       ";
01075                                 ++jobs_completed;
01076                                 break;
01077                         default:
01078                                 lstr += "<unknown>  ";
01079                                 break;
01080                 }
01081                 lstr += "]: ";
01082                 lstr += (*ji)->id();
01083                 lstr += "\n";
01084                 logger.write(lstr);
01085         }
01086 
01087         lstr  = "---------------------------------------------------------------\n";
01088         lstr += "     Jobs Pending: ";
01089         lstr += estring(jobs_pending);
01090         lstr += "\n";
01091 
01092         lstr += "  Jobs Processing: ";
01093         lstr += estring(jobs_processing);
01094         lstr += "\n";
01095 
01096         lstr += "   Jobs Completed: ";
01097         lstr += estring(jobs_completed);
01098         lstr += "\n";
01099 
01100         lstr += "            Total: ";
01101         lstr += estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
01102         lstr += "\n";
01103 
01104         lstr += "Overflow Detected: ";
01105         if (vaulter.overflow()) {
01106                 lstr += "TRUE";
01107         }
01108         else {
01109                 lstr += "false";
01110         }
01111         lstr += "\n";
01112 
01113         logger.write(lstr);
01114         logger.write("\n");
01115         t.start();
01116 }
01117 
01118 /** Archive jobs
01119 
01120         Create an archive directory.  Generate a to-do list of job archiver objects.
01121         Process the job archiver objects:
01122         - While there are less than rsync-parallel job archivers processing, start
01123                 the first available job archiver.
01124         - Check the status of each job and process I/O from jobs underway.
01125         - Remove jobs that failed to start.
01126         - Possibly reschedule failed jobs.
01127         - Remove completed jobs from active processing.
01128         - Call mf_log_status().
01129 
01130  */
01131 void archive_manager::archive(void)
01132 {
01133         timer t;
01134         estring lstr;
01135         configuration_manager::jobs_type::const_iterator cji;
01136         int num_processing = 0;
01137         std::vector<job_archiver*>::iterator ji;
01138         uint64 num_completed = 0;
01139         bool overflow_detected = false;
01140         estring debug_estr;
01141 
01142         if (!initialized())
01143                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01144 
01145         lstr = "Archive Manager - Begin archiving\n";
01146         logger.write(lstr);
01147         t.start();
01148 
01149         // Create archive directory
01150         try {
01151                 if (exists(archive_path())) {
01152                         lstr = "Found existing archive directory: ";
01153                         lstr += archive_path();
01154                         lstr += "\n";
01155                         logger.write(lstr);
01156                         rename_file(archive_path(), working_archive_path());
01157                 }
01158                 else if (exists(working_archive_path())) {
01159                         lstr = "Found existing archive directory: ";
01160                         lstr += working_archive_path();
01161                         lstr += "\n";
01162                         logger.write(lstr);
01163                 }
01164                 else {
01165                         lstr = "Creating archive directory: ";
01166                         lstr += working_archive_path();
01167                         lstr += "\n";
01168                         logger.write(lstr);
01169                         mk_dir(working_archive_path());
01170                 }
01171         }
01172         catch(error e) {
01173                 logger.write("An error has occured: ");
01174                 logger.write(e[0].what());
01175                 logger.write("\n");
01176                 throw(e);
01177         }
01178         catch(...) {
01179                 error e = err_unknown;
01180 
01181                 logger.write("An error has occured: ");
01182                 logger.write(e[0].what());
01183                 logger.write("\n");
01184                 throw(e);
01185         }
01186 
01187         // Create a todo list
01188         logger.write("Creating to-do list\n");
01189         for (
01190                 cji = config.jobs().begin();
01191                 cji != config.jobs().end();
01192                 ++cji
01193                 )
01194         {
01195                 job_archiver* ptr;
01196 
01197                 ptr = new job_archiver(&(*cji));
01198                 if (ptr == 0)
01199                         throw(err_nomem);
01200                 TRY_nomem(m_jobs.push_back(ptr));
01201         }
01202 
01203         // Backup clients
01204         logger.write("Processing jobs...\n");
01205         while (m_jobs.size() > num_completed) {
01206 
01207                 /*
01208                 logger.write("[DEBUG]: ---[ TOP OF LOOP ]---\n");
01209 
01210                 debug_estr = "[DEBUG]: overflow_detected = ";
01211                 debug_estr += estring(overflow_detected);
01212                 debug_estr += "\n";
01213                 logger.write(debug_estr);
01214 
01215                 debug_estr = "[DEBUG]: num_processing = ";
01216                 debug_estr += estring(num_processing);
01217                 debug_estr += "\n";
01218                 logger.write(debug_estr);
01219                 */
01220 
01221                 if (!overflow_detected) {
01222                         overflow_detected = vaulter.overflow(true);
01223                         /*
01224                         if (overflow_detected) {
01225                                 logger.write("[DEBUG]: Variable Change :: ");
01226                                 logger.write("overflow_detected = true");
01227                                 logger.write("\n");
01228                         }
01229                         */
01230                 }
01231 
01232                 // If the vault has exceeded it's highwater mark, wait for the
01233                 // currently-processing jobs to terminate, and then attempt to prepare the
01234                 // vault.
01235                 if (overflow_detected && (num_processing == 0)) {
01236                         TRY(vaulter.prepare(true),"Cannot complete archive");
01237                         overflow_detected = vaulter.overflow();
01238                         /*
01239                         if (!overflow_detected) {
01240                                 logger.write("[DEBUG]: Variable Change :: ");
01241                                 logger.write("overflow_detected = false");
01242                                 logger.write("\n");
01243                         }
01244                         */
01245                 }
01246 
01247                 // For each job in the list...
01248                 for (ji = m_jobs.begin(); ji != m_jobs.end(); ++ji)
01249                 {
01250                         // While we're running less than rsync-parallel jobs, start new ones
01251                         if (
01252                                 !overflow_detected
01253                                 && (num_processing < config.rsync_parallel())
01254                                 && ((*ji)->status() == job_archiver::status_pending)
01255                                 )
01256                         {
01257                                 try {
01258                                         (*ji)->start();
01259                                 }
01260                                 catch(error e) {
01261                                         if (e.num() == 12) {
01262                                                 lstr = "Error starting job: Out of memory, will retry later\n";
01263                                                 (*ji)->clear();
01264                                         }
01265                                         else {
01266                                                 (*ji)->end();
01267                                                 lstr = "Error starting job, aborting\n";
01268                                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01269                                                 num_processing--;
01270                                                 reporter.jobs().add_report((*ji)->report());
01271                                         }
01272                                         logger.write(lstr);
01273                                 }
01274                                 catch(...) {
01275                                         (*ji)->end();
01276                                         lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
01277                                         lstr += "-- JOB TERMINATED\n";
01278                                         logger.write(lstr);
01279                                         // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01280                                         num_processing--;
01281                                         reporter.jobs().add_report((*ji)->report());
01282                                 }
01283                                 // logger.write("[DEBUG]: Variable Change :: num_processing++\n");
01284                                 num_processing++;
01285                         }
01286         
01287                         // Process jobs
01288                         if ((*ji)->status() == job_archiver::status_processing) {
01289                                 try {
01290                                         (*ji)->process();
01291                                 }
01292                                 catch(error e) {
01293                                         // TODO: Change 12 to ENOMEM?
01294                                         if (e.num() == 12) {
01295                                                 lstr  = "Error starting job: Out of memory, will retry later\n";
01296                                                 (*ji)->clear();
01297                                         }
01298                                         else {
01299                                                 (*ji)->end();
01300                                                 lstr = "Error starting job, aborting\n";
01301                                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01302                                                 num_processing--;
01303                                                 reporter.jobs().add_report((*ji)->report());
01304                                         }
01305                                         logger.write(lstr);
01306                                 }
01307                                 catch(...) {
01308                                         (*ji)->end();
01309                                         lstr = "*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
01310                                         lstr += "-- JOB TERMINATED\n";
01311                                         logger.write(lstr);
01312                                         // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01313                                         num_processing--;
01314                                         reporter.jobs().add_report((*ji)->report());
01315                                 }
01316                         }
01317 
01318                         // Remove jobs that could not start from active duty
01319                         if ((*ji)->status() == job_archiver::status_retry_later) {
01320                                 (*ji)->clear();
01321                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01322                                 num_processing--;
01323                         }
01324 
01325                         // If a job exited with an error, and the vault is full, then reschedule
01326                         // the job for later
01327                         if (
01328                                         ((*ji)->status() == job_archiver::status_error)
01329                                 &&
01330                                         overflow_detected
01331                                 )
01332                         {
01333                                 lstr = "Vault overflow detected, will retry job later\n";
01334                                 logger.write(lstr);
01335                                 (*ji)->clear();
01336                                 num_processing--;
01337                         }
01338 
01339                         // Remove completed jobs from the processing list
01340                         if (
01341                                 ((*ji)->status() == job_archiver::status_completed)
01342                                 || ((*ji)->status() == job_archiver::status_fatal_error)
01343                                 || ((*ji)->status() == job_archiver::status_error)
01344                                 ) {
01345                                 (*ji)->end();
01346                                 // logger.write("[DEBUG]: Variable Change :: num_processing--\n");
01347                                 num_processing--;
01348                                 num_completed++;
01349 
01350                                 // logger.write("Adding job report to report manager\n");
01351                                 reporter.jobs().add_report((*ji)->report());
01352                         }
01353                 }
01354 
01355                 mf_log_status();
01356                 sleep(1);
01357 
01358                 // logger.write("[DEBUG]: ---[ BOTTOM OF LOOP ]---\n");
01359         }
01360 
01361         t.stop();
01362         lstr = "Archive Manager - Finished archiving, duration: ";
01363         lstr += t.duration();
01364         lstr += "\n";
01365         logger.write(lstr);
01366 
01367         lstr = "Archive Manager - Finalizing archive path\n";
01368         logger.write(lstr);
01369         TRY(
01370                 rename_file(working_archive_path(), archive_path()),
01371                 "Cannot finalize archive"
01372                 );
01373 
01374         reporter.vault().add_report(
01375                 vault_stats_report(estring("Final Stats:"),filesystem(vaulter.vault()))
01376                 );
01377 }
01378 
01379 /** Return an absolute path to the finished archive directory */
01380 const std::string archive_manager::archive_path(void) const
01381 {
01382         estring path;
01383 
01384         if (!initialized())
01385                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01386 
01387         path = vaulter.vault();
01388         path += "/";
01389         path += config.timestamp().str();
01390 
01391         return(path);
01392 }
01393 
01394 /** Return the absolute path to the unfinished working archive directory */
01395 const std::string archive_manager::working_archive_path(void) const
01396 {
01397         estring path;
01398 
01399         if (!initialized())
01400                 throw(INTERNAL_ERROR(0,"Archive manager is not initialized"));
01401         
01402         path = archive_path();
01403         path += ".incomplete";
01404 
01405         return(path);
01406 }
01407 
01408 //-----------------------------------------------------------------------------
01409 
01410 /** The global archive manager */
01411 archive_manager archiver;
01412 

Generated on Fri Jun 23 16:46:30 2006 for rvm by  doxygen 1.4.2