192 lstr +=
"Terminating child process!\n";
226 job::paths_type::const_iterator pi;
235 lstr +=
"Creating job archive path: ";
248 lstr +=
"Could not fork:\n";
255 lstr +=
"Will retry job later\n";
264 lstr +=
"Could not fork:\n";
271 lstr +=
"Will retry job later\n";
303 lstr +=
"Spawning child process: PID ";
343 lstr +=
"Child exited from signal: ";
348 lstr +=
"Child exited abnormally with code: ";
353 lstr +=
"Child exited successfully\n";
379 lstr +=
"Finished, duration: ";
418 job::paths_type::const_iterator pi;
424 std::vector<std::string> argv;
425 std::vector<std::string> extra_argv;
427 bool hardlink =
false;
428 bool multi_hardlink =
false;
434 uint64 files_total = 0;
435 uint64 files_xferd = 0;
436 uint64 size_total = 0;
437 uint64 size_xferd = 0;
438 bool overflow_detected = 0;
455 throw(
ERROR(EEXIST,es));
461 TRY_nomem(es =
"Cannot write to archive directory: \"");
465 throw(
ERROR(EACCES,es));
471 while ((num_retries <= m_job->rsync_retry_count) && !done) {
473 job::excludes_type::const_iterator ei;
474 job::includes_type::const_iterator ii;
475 std::vector<std::string>::const_iterator oi;
489 argv.push_back(
"-o");
490 argv.push_back(
"BatchMode=yes");
492 argv.insert(argv.end(), extra_argv.begin(), extra_argv.end());
493 argv.push_back(
"-l");
503 argv.insert(argv.end(), extra_argv.begin(), extra_argv.end());
507 opt =
"--rsync-path=";
517 std::string youngest;
519 bool hardlink_opt =
false;
520 bool first_hardlink =
false;
521 int linkdest_count = 0;
524 if (subdir.size() > 0) {
525 subdirectory::const_iterator si;
527 sort(subdir.begin(), subdir.end());
528 reverse(subdir.begin(), subdir.end());
529 for (si = subdir.begin(); si != subdir.end(); ++si) {
532 if (first_hardlink && !multi_hardlink) {
543 <<
"Considering potential hardlink source: "
550 ypath += archive_dir;
553 <<
"Using archive as hardlink source: "
557 argv.push_back(std::string(
"--hard-links"));
567 first_hardlink =
true;
572 <<
"- No such path: "
586 opt =
"--exclude-from=";
597 opt =
"--include-from=";
604 argv.push_back(path);
606 std::cout <<
"Command Line:" << std::endl;
609 std::cout <<
" Binary: " << binary << std::endl;
610 for (c = 0; c < argv.size(); c++) {
611 std::cout <<
" Argv[" << c <<
"] = " << argv[c] << std::endl;
621 exec.
exec(binary, argv);
637 <<
"Rsync caught signal: ["
645 <<
"Rsync exit code: ["
656 std::cout <<
"Ignoring rsync failure" << std::endl;
659 else if (overflow_detected) {
661 <<
"Vault overflow detected"
675 std::cout <<
"Failing, will not attempt to retry" << std::endl;
681 std::cout <<
"Retrying without hardlinks..." << std::endl;
687 if (num_retries <= m_job->rsync_retry_count) {
689 <<
" minutes before retrying... " << std::endl;
696 std::cout <<
"Retry count exceeded" << std::endl;
700 std::cout <<
"Giving up on this path" << std::endl;
733 bool io_flag =
false;
737 const int buffer_size = 128;
738 char buffer[buffer_size] = { 0 };
745 for (c = 0; c < r; ++c) {
746 if ((buffer[c] ==
'\r') || (buffer[c] ==
'\n')) {
760 if (a_finalize && (
m_io_out.size() > 0)) {
771 const int buffer_size = 128;
772 char buffer[buffer_size] = { 0 };
779 for (c = 0; c < r; ++c) {
780 if ((buffer[c] ==
'\r') || (buffer[c] ==
'\n')) {
794 if (a_finalize && (
m_io_err.size() > 0)) {
809 while ((a_str.size() > 0) && (!isdigit(a_str[0])))
811 while ((a_str.size() > 0) && (!isdigit(a_str[a_str.size()-1])))
812 a_str.erase(a_str.size()-1,1);
822 const std::string a_str,
823 uint64& a_files_total,
824 uint64& a_files_xferd,
825 uint64& a_size_total,
831 if (a_str.find(
"Number of files: ") == 0) {
835 a_files_total = estr;
840 es =
"Could not parse total number of files processed by rsync";
850 es =
"Could not parse total number of files processed by rsync";
857 else if (a_str.find(
"Number of files transferred: ") == 0) {
861 a_files_xferd = estr;
866 es =
"Could not parse total number of files transferred by rsync";
876 es =
"Could not parse total number of files transferred by rsync";
883 else if (a_str.find(
"Total file size: ") == 0) {
892 es =
"Could not parse total size of files processed by rsync";
902 es =
"Could not parse total size of files processed by rsync";
909 else if (a_str.find(
"Total transferred file size: ") == 0) {
918 es =
"Could not parse total size of files transferred by rsync";
928 es =
"Could not parse total size of files transferred by rsync";
946 uint64& a_files_total,
947 uint64& a_files_xferd,
948 uint64& a_size_total,
949 uint64& a_size_xferd,
950 bool& a_overflow_detected
958 char buffer[1024] = { 0 };
968 if (!a_overflow_detected) {
975 ro = read(a_exec.
out_fd(), buffer, 1024);
978 for (ptr = buffer; ptr != buffer+ro; ++ptr) {
979 if ((*ptr !=
'\r') && (*ptr !=
'\n')) {
991 re = read(a_exec.
err_fd(), buffer, 1024);
994 for (ptr = buffer; ptr != buffer+re; ++ptr) {
995 if ((*ptr !=
'\r') && (*ptr !=
'\n')) {
1022 std::cerr <<
"*** Rsync program inactivity timeout" << std::endl;
1031 if (out.size() > 0) {
1032 std::cerr << out << std::endl;
1042 if (err.size() > 0) {
1043 std::cerr << err << std::endl;
1061 throw(
INTERNAL_ERROR(0,
"Attempt to alocate multiple archive managers"));
1083 lstr =
"Archive Manager - Beginning initialization\n";
1087 lstr =
"Timestamp: ";
1094 lstr =
"Vault selected: ";
1107 lstr =
"Archive Manager - Finished initialization";
1108 lstr +=
", duration: ";
1133 std::vector<job_archiver*>::const_iterator ji;
1134 uint64 jobs_pending = 0;
1135 uint64 jobs_processing = 0;
1136 uint64 jobs_completed =0;
1137 uint64 jobs_remaining = 0;
1147 lstr +=
"STATUS REPORT:\n";
1148 lstr +=
"================================================================\n";
1152 switch ((*ji)->status()) {
1158 lstr +=
"Processing ";
1162 lstr +=
"Reschedule ";
1166 lstr +=
"Fatal Error";
1174 lstr +=
"Completed ";
1182 lstr +=
"<unknown> ";
1186 lstr += (*ji)->id();
1191 lstr =
"---------------------------------------------------------------\n";
1192 lstr +=
" Jobs Pending: ";
1193 lstr +=
estring(jobs_pending);
1196 lstr +=
" Jobs Processing: ";
1197 lstr +=
estring(jobs_processing);
1200 lstr +=
" Jobs Completed: ";
1201 lstr +=
estring(jobs_completed);
1205 lstr +=
estring(jobs_pending+jobs_processing+jobs_completed+jobs_remaining);
1208 lstr +=
"Overflow Detected: ";
1239 configuration_manager::jobs_type::const_iterator cji;
1240 int num_processing = 0;
1241 std::vector<job_archiver*>::iterator ji;
1242 uint64 num_completed = 0;
1243 bool overflow_detected =
false;
1249 lstr =
"Archive Manager - Begin archiving\n";
1256 lstr =
"Found existing archive directory: ";
1264 TRY_nomem(es =
"Cannot write to archive directory: \"");
1268 throw(
ERROR(EACCES,es));
1274 lstr =
"Found existing archive directory: ";
1282 TRY_nomem(es =
"Cannot write to working archive directory: \"");
1286 throw(
ERROR(EACCES,es));
1290 lstr =
"Creating archive directory: ";
1330 while (
m_jobs.size() > num_completed) {
1346 if (!overflow_detected) {
1360 if (overflow_detected && (num_processing == 0)) {
1386 if (e.
num() == 12) {
1387 lstr =
"Error starting job: Out of memory, will retry later\n";
1392 lstr =
"Error starting job, aborting\n";
1401 lstr =
"*** AN UNKNOWN ERROR HAS OCCURED WHILE STARTING JOB ";
1402 lstr +=
"-- JOB TERMINATED\n";
1419 if (e.
num() == 12) {
1420 lstr =
"Error starting job: Out of memory, will retry later\n";
1425 lstr =
"Error starting job, aborting\n";
1434 lstr =
"*** AN UNKNOWN ERROR HAS OCCURED WHILE PROCESSING JOB ";
1435 lstr +=
"-- JOB TERMINATED\n";
1458 lstr =
"Vault overflow detected, will retry job later\n";
1487 lstr =
"Archive Manager - Finished archiving, duration: ";
1492 lstr =
"Archive Manager - Finalizing archive path\n";
1496 "Cannot finalize archive"
1528 path +=
".incomplete";
uint16 rsync_multi_hardlink_max
void reroute_stdio(void)
Called by the child to reroute the child's stdin, stdout, and stderr to the parent.
std::string reform_path(const std::string &a_path)
Reformat a path to remove double slashes.
rsync_connection_type rsync_connection
std::string m_unknown_exit
Retrieve information about a filesystem.
void mk_dir(const std::string &a_path)
Create a directory.
log_manager logger
The global log manager.
job_path_report parse(const std::string &a_str)
Parse a received report from a child process and return a job_path_report.
Archive the paths associated with a single job.
const duration_type duration_secs(void) const
Reutrn the duration in seconds.
int child_signal_no(void)
If the child was signaled, return the signal number.
void clear(void)
Clear all values.
const std::string & rsync_local_path(void) const
Return the rsync-local-path.
void clear(void)
Erase the string value.
const std::string generate_job_id(void) const
Generate a unique ID string for this job.
pid_t child_pid(void)
Returns the child's PID.
An extended string class.
void add_report(const single_job_report &a_class)
Add a job report to the list.
Submit or parse a job path report.
bool err_eof(void)
Check for err EOF.
void set_error_logging(bool a_b)
Use error-logging-level instead of logging-level.
void clear(void)
Clear the job archiver and return it to it's initial state.
bool out_eof(void)
Check for output EOF.
void fork(void)
Fork a child process.
single_job_report report(void) const
Return the job report for this job.
Hold configuration data for a single job.
const std::string prefix(void)
Generate a job prefix string.
void clear(void)
Clear the archive manager and clear the job list.
void mf_log_status(void)
Give a status report.
void kill_child(void)
Send a KILL signal to the child.
const bool is_started(void) const
Return whether or not the timer has been started.
void write_rsync_err(const std::string &a_str)
Write a report line for output from rsync to parent on child's std::cerr.
int err_read(char *buf, const int len)
Allow parent to read from err_fd()
void push_back(const error_instance &a_e)
void mk_dirhier(const std::string a_path)
Recursively create a directory heirarchy.
void exit(int code=0)
Called by the child to exit with a particular code.
const std::string generate_source_path(const std::string &a_path) const
Generate the source path to be passed to rsync on the command line.
class rsync_behavior rsync_behavior
const uint16 & io_poll_interval(void) const
Return the number of seconds to sleep between polling for I/O.
void write(const std::string &a_str, const uint16 a_indention=0, const configuration_manager::logging_type a_logging_level=configuration_manager::logging_manager, const pid_t a_pid=pid())
Write a string to the log file.
bool writable(const std::string &a_path)
Return true if the file or directory exists and is writable.
const class timestamp & timestamp(void) const
Return the timestamp of this instance of rvm.
const uint16 & rsync_parallel(void) const
Return the rsync-parallel.
bool err_ready(void)
Check I/O for output.
bool out_ready(void)
Check I/O for output.
class rstat rsync_estat_str
int out_fd(void)
Return a file descriptor for I/O between parent a child.
void process(void)
Parent processor for a job.
archive_manager archiver
The global archive manager.
void id(const std::string &a_str)
Set a descriptive ID for this job report.
void mf_process_child_io(bool a_finalize)
Process I/O from the child.
bool child_running(void)
Returns true if the child is running.
void clear(void)
Reset the execute class to default values, kill the child processif one is running.
std::map< int, std::string > m_exit_str
void start(void)
Begin processing.
std::string rsync_remote_user
report_manager reporter
The global report manager.
jobs_report & jobs(void)
Return the jobs reporter object.
const std::string & exit(const int a_int) const
Get a verbose string for an exit code.
bool relative_path(const std::string &a_path)
Return true if the string looks like a relative path.
void exec(const std::string command)
Execute a command, rerouting stdin, stdout, and stderr to parent.
bool exists(const std::string &a_path)
Return true if the file or directory exists.
void end(void)
End any processes handling this job.
void mf_trim_string(std::string &a_str)
Trim off all non-digit leading and trailing characters from a string.
int out_read(char *buf, const int len)
Allow parent to read out_fd()
void select(void)
Select a vault.
bool child_signaled(void)
Returns true if the child was signaled.
std::vector< job_archiver * > m_jobs
const std::string generate_archive_path(const std::string &a_path) const
Generate the archive-path subdirectory for this job.
void start(void)
Start (or restart) the timer.
void mf_do_chores(void)
Child processor for a job.
void mf_process_rsync_io(execute &a_exec, uint16 a_timeout, uint64 &a_files_total, uint64 &a_files_xferd, uint64 &a_size_total, uint64 &a_size_xferd, bool &a_overflow_detected)
Process I/O from rsync.
int child_exit_code(void)
Return the child's exit code.
std::map< int, std::string > m_signal_str
archiving_status m_status
const type & path(const std::string a_path, const std::string a_filter="*")
Return a vector of strings of a list of files in a subdirectory.
const std::string vault(void) const
Return the path to the selected vault.
void add_report(const job_path_report &a_class)
Add a path report for this job.
bool child_exited_normally(void)
Returns true if the child has exited normally.
void clear(void)
Clear all values.
void prepare(bool a_assume_overflow=false)
Prepare the selected vault.
const std::string & signal(const int a_int) const
Get a verbose string for a signal number.
#define INTERNAL_ERROR(e, s)
std::string rsync_remote_path
void mf_process_report(const std::string &a_str)
configuration_manager config
The global configuration manager instance.
Map exit codes and signal numbers to verbose strings.
bool is_timestamp(const std::string &a_s)
Return true if the string is a valid timestamp.
Fork a child process or execute an external program.
void init(void)
Initialize the archive manager.
vault_report & vault(void)
Return the vault reporter object.
std::string mk_relative_path(const std::string a_path_to, const std::string a_path_from)
Make the path a_path_to relative from a_path_from, where a_path_to and a_path_from are directory name...
job_archiver(const job *a_job)
C'tor.
void write_report(const std::string a_source, const timer &a_timer, const int a_exit_code, const int a_signal_num, const rsync_behavior::value_type &a_behavior, const std::string &a_error_msg)
Generate and submit a report to the parent process on child's std::cout.
bool m_rsync_timeout_flag
const archiving_status status(void)
Return the processing status of this job archiver.
pid_t my_pid(void)
Returns the PID.
const std::string duration(void) const
Generate a duration string.
int err_fd(void)
Return a file descriptor for I/O between parent and child.
void add_report(const vault_stats_report &a_class)
Add a vault report to the list.
#define ERROR_INSTANCE(s)
Create (or update an existing) archive in the selected vault.
const std::vector< std::string > generate_ssh_options_vector(void) const
Generate ssh command line options.
const jobs_type & jobs(void) const
Return a list of jobs.
void mf_parse_rsync_io(std::string a_str, uint64 &a_files_total, uint64 &a_files_xferd, uint64 &a_size_total, uint64 &a_size_xferd)
Parse I/O from rsync.
const std::string str(void) const
Generate a string.
const std::vector< std::string > generate_rsync_options_vector(void) const
Generate rsync command line options.
const duration_type duration_mins(void) const
Return the duration in minutes.
const bool overflow(bool a_report=false)
Test to see if a vault has exceeded it's overflow threshold.
void archive(void)
Archive jobs.
void rename_file(const std::string a_from, const std::string a_to)
Rename a file or directory.
const std::string working_archive_path(void) const
Return the absolute path to the unfinished working archive directory.
void stop(void)
Stop the timer.
Retrieve a list of files in a subdirectory that match a given wildcard filename.
const std::string str(const std::string a_prefix="") const
const bool initialized(void) const
Return the initialized status of the archive manager.
bool rsync_multi_hardlink
std::string m_unknown_signal
void write_rsync_out(const std::string &a_str)
Write a report line for output from rsync to parent on child's std::cout.
bool is_child(void)
Returns true if called by the child.
const std::string & ssh_local_path(void) const
Return the ssh-local-path.
const std::string id(void)
Generate a job id string.
const std::string archive_path(void) const
Return an absolute path to the finished archive directory.
vault_manager vaulter
The global vault manager.