#--replace-me-with-perl-command--

=pod

=head1 NAME

mysql_replicate - realtime replication of a mysql database server

=head1 SYNOPSIS

mysql_replicate.pl --path I<path> --remote-server I<name>
                   [--resync-with-remote-server]
                   [--force-resync I<database>]
                   [--exclude-database I<database>]
                   [--maintain-state-database]

=head1 PREREQUISITES

requires that DBI with the MySQL drivers be installed

=head1 DESCRIPTION

Replicates all changes to databases on a mysql server to a remote mysql
server.

=head1 OPTIONS

=over 4

=item --path

Specifies the path to the update logs. logfiles should be named like
"update.*". mysql_replicate will create a status directory called
replica_logs in this directory, so the user who is running mysql_replicate
will need the appropriate permissions to create that directory.

=item --remote-server

The hostname of a remote server to replicate to. This option may be specified many
times, for each host you'd like to replicate to.

=item --resync-with-remote-server

If a database doesn't exist on the remote server, recreate it.

=item --force-resync I<database>

Force a resync/recreation of a given database on the remote server

=item --exclude-databases I<database>

Exclude a database from being mirrored on the remote server

=item --maintain-state-database

Create and update a database on the local server called "replica". 
This database has information that reflects the status of the
remote replica. If this item is specified, one can trigger 
force-resync's without having to restart the process by updating
the 'force_resync' field with a comma seperated list of databases
to resync in the row for the given remote servers.

=item --dont-delete-old-update-logs

Don't delete the old update logs. instead, rename them.
(e.g. update.log to old-update.log)

=back

=head1 USAGE

=over 4

=item  Create a "replica" user on all servers involved. 

This user should have "root" access to all databases on the server(s). 
edit the script, and change the variables $replicate_user and $replicate_password 
as appropriate. 


=item Set up the server for logging updates.

The mysql server should be running with the --log-update option configured
as follows: 

--log-update=<path>/update

this <path> is what you should specify for the path option to the script
at runtime. The logs will be rotated every ten minutes. mysql_replicate
will create a status directory in <path>, so the user who is running
mysql_replicate will need write access to that directory.

=item NOTE: logfiles that are no longer required will be I<deleted>!

if multiple mysql_replicates are running to multiple servers, the lowest
needed update log will still be tracked amongst the running mysql_replicates.

=back

if a remote server should disappear, the script will keep retrying the
connection/replication transaction until it is successful.

=head1 SEE ALSO

=head1 NOTES

=over 4

=item 

Should daemonize, and run as the mysql-user. This implies parsing the my.cnf file.

=item

Should write a utility script to help users set force-resyncs if running with the --maintain-state-database option.

=item

on startup, --force-resync should force the transfer of the database
immediately (without waiting for a statement in the update log)

=back

=head1 AUTHOR

=over 4

=item Andrew Elble

elble@icculus.nsg.nwu.edu

=back

=head1 BUGS

=over 4

=item 

remote servers get out of sync under certain (unknown) conditions.

=back

=head1 HISTORY

=over 4

=item 

11/9/1999 - fixed problem with resyncing a database related to removing the database from the list of databases to be resynced before calling reload_database() (thanks go to Yingjie Yang for pointing this out)

=item

1/1/2000 - patch from Max Parke. lots of things: setuid/gid, option to move instead of delete old update logs, bug in write_log where force_write was being ignored, cleanup of error message printouts, fixed bug where recovery from a lost connection to a server was causing resyncs, fixed bug where the code to stop looking for database to use would quit too late, fixed bug where a comment would get appended to line to be commited to database, ignore INT, better method for initial connection to database, we now keep a pidfile.
=back

=head1 DISCLAIMER

=item

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

=over 4

=back

=cut

use strict;
use DBI;
use FileHandle;
use Getopt::Long;

#
# global variables
#

my $writelogint = 100;
my $logcommits = 0;

my $replicate_username = "";
my $replicate_password = "";

my $run_as_user  = "";
my $run_as_group = "";
my $pid_file = "";

my $logflush_interval = (30*60);
my $fileprefix = "update";

my %hold_commit;
my @remote_servers;
my @exclude_databases;
my @force_resync;
my @childpids;

my ($pathname,
    $remote_server,
    $resync_with_remote_server,
    $maintain_state_database,
    $dont_delete_old_update_logs,
    $database, 
    $default,
    $filename, 
    $next_flush_time,
    $synced,
    $prefix,
    $extension,
    $curpos,
    $dbh,
    $localdbh);

#
# Get our options
#

GetOptions("path=s" => \$pathname,
           "remote-server=s" => \@remote_servers,
           "resync-with-remote-server" => \$resync_with_remote_server,
	   "dont-delete-old-update-logs" => \$dont_delete_old_update_logs,
           "exclude-databases=s" => \@exclude_databases,
	   "force-resync=s" => \@force_resync,
           "maintain-state-database" => \$maintain_state_database);


if (($pathname eq "") ||
    ($#remote_servers < 0)) {
  print "you must specify a path and at least one remote server!\n";
  exit(0);
}
my $synclogdir = "$pathname/replica_logs";



my ($uid, $gid);
if (($run_as_user ne "") && ($run_as_group ne "")) {

  die "unknown user  $run_as_user"  unless ($uid = getpwnam($run_as_user));
  die "unknown group $run_as_group" unless ($gid = getgrnam($run_as_group));
  
  die "unable to change user to $run_as_user" if ($<) ;
  
  $) = $gid;
  $( = $gid;
  $> = $uid;
  $< = $uid;
  
}
# set log file to flush after each write
select STDOUT; $| = 1; select;
  
if ($pid_file ne "") {
  # write out PID
  open PIDF, ">$pid_file";
  print PIDF $$;
  close PIDF;
}

#
# ROUTINE
#   reload_database
#
# DESCRIPTION
#   replicates the entire database on the remote system
#
sub reload_database {
  if (($resync_with_remote_server) ||
      (grep(/\Q$database\E/,@force_resync))) {
    print "$remote_server: dumping $database to $remote_server\n";

    if (grep(/\Q$database\E/,@force_resync)) {
      my @tmpforce_resync = grep(!/\Q$database\E/,@force_resync);
      @force_resync = @tmpforce_resync;
    }

    if ($maintain_state_database) {
      replica_table_check();
      $localdbh->do("use replica");
      $localdbh->do("SET SQL_LOG_UPDATE=0");
      $localdbh->do("update replica_status set resyncing = '".$database."' where hostname = '".$remote_server."'");
    }
 
#    print "$remote_server: unsetting default\n";
    $default = undef;
    commit("drop database $database");
    commit("create database $database");
    commit("use $database");
    
    $localdbh->do("use $database");
    $localdbh->do("SET SQL_LOG_UPDATE=0");
#
# get a write lock on all tables so that the data doesn't change.
#
    my $query = $localdbh->prepare("show tables");
    $query->execute;
    my $table = $query->fetchall_arrayref;
    my $i;
    my $dostring = "lock tables";
    my $first = 1;
    foreach $i (0 .. $#{ $table }) {
      if ($first == 1) {
	$dostring .= " ".$table->[$i][0]." write";
	$first = 0;
      } else {
	$dostring .= ",".$table->[$i][0]." write";
      }
    }
#    print "LOCK: $dostring\n";
    $localdbh->do($dostring);

#
# force a logflush. that way, any potential changes to this database
# are only valid in the latest logfile.
#
    $next_flush_time = 0;
    flush_logs();
#
# find the highest numbered logfile, and temporarily block commits to the database
# until we reach the highest numbered logfile.
#
    opendir(LOGDIR,$pathname);
    my @logs = sort {$b cmp $a} (grep { /^\Q$fileprefix\E/ && -f "$pathname/$_" } readdir(LOGDIR));
    closedir(LOGDIR);
    my $tmpfilename = $pathname."/".$logs[0];
    if ( -f $tmpfilename) {
      print "$remote_server: updates for $database will recommence with: $tmpfilename\n";
      my ($tmpprefix, $tmpextension) = ($tmpfilename =~ /(\w+)\.(\d+)/);
      $hold_commit{$tmpprefix.$tmpextension} = $database;
    } else {
      die "unable to keep track of logfile!";
    }


#
# Create the tables and dump the data!
#
    my $dostring = "";
    foreach $i (0 .. $#{ $table }) {
      $dostring = "CREATE TABLE ".$table->[$i][0]." (\n";
      my $subquery = $localdbh->prepare("show fields from ".$table->[$i][0]);
      $subquery->execute;
      my $subtable = $subquery->fetchall_arrayref;
      my $prikey;
      my %unikey;
      my %mulkey;
      my $subi;
      foreach $subi (0 .. $#{ $subtable }) {
	$dostring .= $subtable->[$subi][0]." ".$subtable->[$subi][1];
	if ($subtable->[$subi][2] eq "") {
	  $dostring .= " NOT NULL";
	}
	if ($subtable->[$subi][3] eq "PRI") {
	  if ($prikey ne "") {
	    $prikey .= ",".$subtable->[$subi][0];
	  } else {
	    $prikey = $subtable->[$subi][0];
	  }
	} elsif ($subtable->[$subi][3] eq "UNI") {
	  $unikey{$subtable->[$subi][0]} = 1;
	} elsif ($subtable->[$subi][3] eq "MUL") {
	  $mulkey{$subtable->[$subi][0]} = 1;
	}
	
	if ($subtable->[$subi][4] eq "NULL") {
	  $dostring .= " DEFAULT ''";
	} elsif ($subtable->[$subi][4] ne "") {
	  $dostring .= " DEFAULT '$subtable->[$subi][4]'";
	}
	if ($subtable->[$subi][5] ne "") {
	  $dostring .= " ".$subtable->[$subi][5];
	}
	if ($subi < $#{ $subtable}) {
	  $dostring .= ",\n";
	}
      }
      if ($prikey ne "") {
	$dostring .= ",\nPRIMARY KEY($prikey)";
      }
      my $key;
      foreach $key (keys %unikey) {
	$dostring .= ",\nUNIQUE KEY($key)";
      }
      my $key;
      foreach $key (keys %mulkey) {
	$dostring .= ",\nKEY($key)";
      }
      $dostring .= ")\n\n";
      $subquery->finish;
      $subtable = undef;
#      print "doing: $dostring";
      $dbh->do($dostring);
      #
      # dump data (should lock tables as well!)
      #
      my $subquery = $localdbh->prepare("select * from ".$table->[$i][0]);
      $subquery->execute;
      my $subtable = $subquery->fetchall_arrayref;
      my $subi;
      $dbh->do("lock tables ".$table->[$i][0]." write");
      foreach $subi (0 .. $#{ $subtable }) {
	my $subdostring = "INSERT INTO ".$table->[$i][0]." VALUES (";
	my $j;
	foreach $j (0 .. $#{ $subtable->[$subi] }) {
	  if ($subtable->[$subi][$j] ne "") {
	    my $quoted = $localdbh->quote($subtable->[$subi][$j]);
	    $subdostring .= $quoted;
	  } else {
	    $subdostring .= "''";
	  }
	  if ($j < $#{ $subtable->[$subi] }) {
	    $subdostring .= ",";
	  }
	}
	$subdostring .= ")\n";
#	print "insert: $subdostring";
	$dbh->do($subdostring);
      }
      $dbh->do("unlock tables");
      $subquery->finish;
      $subquery = undef;
      $subtable = undef;
    }
    $query->finish;
    $query = undef;
    $table = undef;

#
# exclude the database from updates 'till the selected log.
#

    push(@exclude_databases, $database);
    print "$remote_server excluding $database from updates until log: $tmpfilename\n";
    $localdbh->do("unlock tables");
    if ($maintain_state_database) {
      replica_table_check();
      $localdbh->do("use replica");
      $localdbh->do("SET SQL_LOG_UPDATE=0");
      $localdbh->do("update replica_status set resyncing = '' where hostname = '".$remote_server."'");
    }
  } else {
    print "$remote_server database $database doesn't exist on remote server!\n";
    if (!(grep(/\Q$database\E/,@exclude_databases))) {
      print "$remote_server adding $database to exclude list!\n";
      push(@exclude_databases, $database);
    }
  }
}

#
# ROUTINE
#   check_for_resyncs
#
# DESCRIPTION
#   checks to see if there are resync requests pending.
#
sub check_for_resyncs {
  
  if ($maintain_state_database) {
    $localdbh->do("use replica");
    $localdbh->do("SET SQL_LOG_UPDATE=0");
    my $query = $localdbh->prepare("select force_resync, resyncing from replica_status where hostname = '".$remote_server."'");
    $query->execute;
    my $table = $query->fetchall_arrayref;
    my $i;
    foreach $i (0 .. $#{ $table }) {
      if ($table->[$i][0] ne "") {
	my @tmppush = split(/\,/,$table->[$i][0]);
	my $tpsh;
	foreach $tpsh (@tmppush) {
	  if (!(grep(/\Q$tpsh\E/,@force_resync))) {
	    print "$remote_server: acknowledging resync request for: ".$tpsh."\n";
	    push(@force_resync,$tpsh);
	  }
	}
      }
      if ($table->[$i][1] ne "") {
	my @tmppush = split(/\,/,$table->[$i][1]);
	my $tpsh;
	foreach $tpsh (@tmppush) {
	  if (!(grep(/\Q$tpsh\E/,@force_resync))) {
	    print "$remote_server: redoing interrupted resync for : ".$tpsh."\n";
	    push(@force_resync,$tpsh);
	  }
	}
      }
    }
    $query->finish;
    $query = undef;
    $table = undef;
    $localdbh->do("update replica_status set force_resync='' where hostname = '".$remote_server."'");
  }
}


#
# ROUTINE
#   check_for_next_log_in_sequence
#
# DESCRIPTION
#   check to see if the next logfile is available
#
# RETURNS
#   the name of the next logfile (if next logfile is present)
#   undef otherwise
#

sub check_for_next_log_in_sequence {
  
  my $tmpprefix = $prefix;
  my $tmpextension = $extension;
  $tmpextension++;
  my $tempfilename = $pathname."/".$tmpprefix.".".$tmpextension;
  if ( -f $tempfilename) {
    return($tempfilename);
  } else {
    return(undef);
  }
}

#
# ROUTINE
#   flush_old_update_files
#
# DESCRIPTION
#   discover what update logs are no longer needed, and remove them
#


sub flush_old_update_files {
  if (!stat($synclogdir)) {
    if (!mkdir($synclogdir,0755)) {
      die("can't create status directory\n");
    }
  }
  opendir(SYNC,$synclogdir);
  my @logs = sort {$a cmp $b} (grep { -f "$synclogdir/$_" } readdir(SYNC));
  closedir(SYNC);
  my $lowestlognumber = $extension;
  my $origlognumber = $lowestlognumber;
  my $file;
  foreach $file (@logs) {
    open(LOG, "$synclogdir/$file");
    while (<LOG>) {
      chomp;
      my ($num,$pos,$db) = ($_ =~ /(\d+)\:(\w+)\:(\w+)/);
      if (($num < $lowestlognumber) &&
	  ($num ne "")) {
	$lowestlognumber = $num;
      }
    }
    close(LOG);
  }

  print "$remote_server: lowest needed file# is $lowestlognumber\n";
  while ($lowestlognumber > 0) {
    $lowestlognumber--;
    #
    # need commandline option to engage this behavior...
    #
    if ($dont_delete_old_update_logs) {
      my $file1 = sprintf("$pathname/$fileprefix.%3.3d", $lowestlognumber);
      my $file2 = sprintf("$pathname/old-$fileprefix.%3.3d", $lowestlognumber);
      if (-f $file1) {
	print "$remote_server: renaming $file1 to $file2\n";
	rename ($file1, $file2);
      }
    } else {
      my $rmfile = sprintf("$pathname/$fileprefix.%3.3d", $lowestlognumber);
      if (-f $rmfile) {
	print "$remote_server: removing $rmfile\n";
	unlink($rmfile);
      }
    }
  }
}

#
# ROUTINE
#   replica_table_check
#
# DESCRIPTION
#   check to see if the replica_status table exists. if not, create it.
#


sub replica_table_check {
  $localdbh->do("SET SQL_LOG_UPDATE=0");
  if ($maintain_state_database) {
    $localdbh->do("use replica");
    $localdbh->do("SET SQL_LOG_UPDATE=0");
    if ($localdbh->errstr =~ /Unknown database/) {
      print "$remote_server: creating replica database\n";
      $localdbh->do("create database replica");
    }
    $localdbh->do("select * from replica_status limit 1");
    my $err = $localdbh->err;
    if ($err == 0) {
      $localdbh->do("insert into replica_status (hostname, in_sync) values ('".$remote_server."','".$synced."')");
      $localdbh->do("update replica_status set in_sync = '".$synced."' where hostname = '".$remote_server."'");
      return 1;
    }
    print "$remote_server: creating replica tables\n";
    $localdbh->do("CREATE table replica_status (
                 hostname char(255) NOT NULL,
                 in_sync tinyint(1) default 0,
                 resyncing char(255),
                 force_resync char(255),
                 PRIMARY KEY(hostname))");
    $localdbh->do("insert into replica_status (hostname, in_sync) values ('".$remote_server."','".$synced."')");
    return 0;
  }
}

sub lost_sync {
  $synced = 0;
  replica_table_check();
  print "$remote_server: lost sync\n";
}

sub attained_sync {
  $synced = 1;
  replica_table_check();
  print "$remote_server: attained sync\n";
}

#
# ROUTINE
#   mfile
#
# DESCRIPTION
#   1.) finds the next logfile to work from.
#   2.) determines the correct position within the file to work from
#   3.) calls readfile() to open the file and process the updates.
#

sub mfile {
  my ($num, $pos);
  if ($filename eq "") {
    lost_sync();
    if (-f "$synclogdir/$remote_server") {
      open(LOG, "$synclogdir/$remote_server");
      while (<LOG>) {
	chomp;
	my ($tnum,$tpos,$db) = ($_ =~ /(\d+)\:(\w+)\:(\w+)/);
	if ($tnum ne "") {
	  $num = $tnum;
	}
	if ($tpos ne "") {
	  $pos = $tpos;
	}
	if ($db ne "") {
	  $dbh->do("use ".$db);
	  $database = $db;
	}
      }
      close(LOG);
      $filename = "$pathname/$fileprefix.$num";
      my $foo = (-f "$filename");
      if (-f $filename) {
	print "$remote_server: restarting with: $filename\n";
	($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/);
	$curpos = $pos;
	readfile();
      } else {
	unlink("$synclogdir/$remote_server");
	$filename = "";
      }
    }
    if ($filename eq "") {
      opendir(LOGDIR,$pathname);
      my @logs = sort {$a cmp $b} (grep { /^\Q$fileprefix\E/ && -f "$pathname/$_" } readdir(LOGDIR));
      closedir(LOGDIR);
      $filename = $pathname."/".$logs[0];
      if ( -f $filename) {
	print "$remote_server: starting with: $filename\n";
	($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/);
	$curpos = 0;
	readfile();
      } else {
	die "i can't find a log to start syncing with!";
      }
    }
  } else {
    if (my $tempfilename = check_for_next_log_in_sequence()) {
      close(IN);
      lost_sync();
      print "$remote_server: switching to log: $tempfilename\n";
      $filename = $tempfilename;
      ($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/);
      $curpos = 0;
      readfile();
    } else {
      return;
    }
  }
}

sub write_log {
  my $force_write = shift;
  $logcommits++;
  if (($extension ne "") &&
      ($curpos ne "") && 
      ($database ne "")) {
    if (($logcommits%$writelogint == 0) || $force_write) {
      truncate(POSLOG,0);
      seek (POSLOG, 0, 0);
      print POSLOG "$extension:$curpos:$database\n";
    }
  }
}

#
# ROUTINE
#  commit
#
# DESCRIPTION
#   calls $dbh->do in a safe manner, and only when the correct conditions are met!
#

sub commit {
#  print "in commit\n";
  my $line = shift;
  my $lasterr;
  my $retry_msg_issued = 0;
  
  if (defined($dbh)) {
    alarm(0);
    local $SIG{ALRM} = sub { lost_sync(); };
    alarm(2);
    $dbh->ping;
    alarm(0);
    local $SIG{ALRM} = undef;
  }
  if ($line eq "") {
    return;
  }
#  print "commit: got $line to $remote_server for $database\n";
  if ($line =~ /^use (\w+)\;$/i) {
    ($database) = ($line =~ /^use (\w+)/i);
    if (!(grep(/\Q$database\E/,@exclude_databases))) {
#      print "$remote_server: switching to database $database\n";
    } else {
#      print "database $database has been excluded\n";
    }
    while (!defined($dbh)) {
      sleep(1);
      $dbh = DBI->connect("DBI:mysql:$database:$remote_server", $replicate_username, $replicate_password, { PrintError => 0 });
    }
    $dbh->do("SET SQL_LOG_UPDATE=0");
  }

  if (grep(/\Q$database\E/,@force_resync)) {
    print "$remote_server: forced resync of $database\n";
    lost_sync();
    reload_database();
  }
  
  my $result;
  my $first = 1;
  my $dbherr = 1;
  my $dbherrstr;
  while (($dbherr ne "") &&
	 (!(grep(/\Q$database\E/,@exclude_databases))) &&
	 (($default ne "set") || ($line =~ /^use (\w+)/i))) {
    if ($default ne "") {
#      print "$remote_server: got default: $default\n";
    }
    if ($first == 0) {
      unless ($retry_msg_issued) {
	print "$remote_server: invoking retry\n";
	$retry_msg_issued = 1;
      }
      if ($synced == 1) {
	lost_sync();
      }
      sleep(1);
    }
#    print "$remote_server: doing: $database: $default: $line\n";
    $result = $dbh->do($line);
    $dbherr = $dbh->err;
    $dbherrstr = $dbh->errstr;
    if (($dbherr ne "") && ($dbherr ne $lasterr)) {
       print "commit: *** $remote_server: DBI error occurred processing SQL command: $line\n";
       print "commit: *** $remote_server: dbherr: $dbherr, dbherrstr: $dbherrstr\n";
       $lasterr = $dbherr;
    }
    if (($dbherr eq "") &&
	($line =~ /^use (\w+)/i)) {
#      print "$remote_server: unsetting default use ($1)\n";
      $default = "";
    }
    if (($dbherrstr =~ /Unknown database/) ||
	($dbherrstr =~ /Table \'.+\' doesn\'t exist/)) {
#      print "$remote_server($curpos): result: $result dberr: $dbherrstr ($dbherr) reload!\n";
      reload_database();
#      print "$remote_server: unsetting default\n";
      $default = "";
    } elsif ($dbherrstr ne "") {
# a hack to handle lost connection to server
      if ($dbherrstr =~ /server has gone away/i) {
        print "commit: lost connection to $remote_server, attempting reconnect\n";
        $dbh->disconnect();
        $dbh = undef;
        while (!defined($dbh)) {
          sleep(2);
          $dbh = DBI->connect("DBI:mysql:$database:$remote_server", $replicate_username, $replicate_password, { PrintError => 0 });
        }
        $dbh->do("SET SQL_LOG_UPDATE=0");
        print "commit: reconnected to $remote_server\n";
      } else {
	print "$remote_server($curpos): result: $result dberr: $dbherrstr ($dbherr)\n";
	print "$line\n";
	$dbherrstr = "";
	$dbherr = "";
      }
    }
    $first = 0;
  }
#  print "leaving commit\n";
  write_log(0);
}

#
# ROUTINE
#   readfile
#
# DESCRIPTION
#  1.) opens the update log, and seeks to the correct position.
#  2.) sends the updates from the log to the remote server.
#

sub readfile {

#
# remove holddown for database that has been recently loaded to the remote server.
#

  if (defined($hold_commit{$prefix.$extension})) {
    print "$remote_server: removing database $hold_commit{$prefix.$extension} from exclude list\n";
    my @tmpexclude_databases = grep(!/\Q$hold_commit{$prefix.$extension}\E/,@exclude_databases);
    @exclude_databases = @tmpexclude_databases;
  }
  open(POSLOG,"> $synclogdir/$remote_server");
  POSLOG->autoflush();
  while(!open(IN,"$filename")) {
    sleep(3);
  }

  check_for_resyncs();

  if ($curpos != 0) {
    if ($database eq "") {
      #  print "looking for database to use\n";
      my $line;
      my $tmppos = tell(IN);
      while(<IN>) {
	last if ($tmppos >= $curpos);
	chomp;
	if ($_ !~ /^\#/) {
	  if ($_ =~ /\;$/) {
	    $line .= $_;
	    if ($line =~ /^use (\w+)/i) {
	      commit($line);
	    }
	    $line = undef;
	  } else {
	    $line .= $_;
	  }
	}
	$tmppos = tell(IN);
      }
      #  print "end looking for database to use\n";
    } else {
      print "$remote_server: readfile: using database $database\n";
      commit("use $database;");
    }
    seek(IN, $curpos, 0);
  }
  
  flush_old_update_files();

  my $line;
  for (;;) {
    for ($curpos = tell(IN); $_ = <IN>; $curpos = tell(IN)) {
#      print "syncing: $curpos\n";
      chomp;
      if ($_ !~ /^\#/) {
	if ($_ =~ /\;$/) {
	  $line .= $_;
	  commit("SET SQL_LOG_UPDATE=0");
	  commit($line);
	  $line = undef;
	} else {
	  $line .= $_;
	}
      }
    }
    sleep(1);
    check_for_resyncs();
    my $cur_log_size = (stat $filename)[7];
    if ($cur_log_size == $curpos) {
      mfile();
    }
    if ($synced == 0) {
#      print "status: ($curpos / $cur_log_size)\n";
      if ((($cur_log_size - $curpos) < 8192) &&
	  (!defined(check_for_next_log_in_sequence()))) {
	  attained_sync();
      } else {
	lost_sync();
      }	
    }
    seek(IN, $curpos, 0);
  }
  close(POSLOG);
}

sub closelogs {
  write_log(1);
  close(POSLOG);
  close(IN);
  unlink($pid_file);
  lost_sync();
  print "$remote_server: clean exit\n";
  exit(0);
}

sub flush_logs {
  if ($next_flush_time < time()) {
    $localdbh->do("SET SQL_LOG_UPDATE=0");
    $localdbh->do("flush logs");
    $next_flush_time = time() + $logflush_interval;
  }
  return;
}

my $child;
foreach $child (@remote_servers) {
  my $pid = fork();
  if ($pid == 0) {
    # in child
    $remote_server = $child;
    $SIG{TERM} = \&closelogs;
    $SIG{INT} = 'IGNORE';
    print "connecting to local DB\n";
    $localdbh = undef;
    while (!defined($localdbh)) {
    $localdbh = DBI->connect("DBI:mysql:mysql", $replicate_username, $replicate_password, { PrintError => 0 });
       sleep(1);
    }
    print "connected to local DB\n";
    $localdbh->do("SET SQL_LOG_UPDATE=0");
      print "$remote_server: using default database\n";
    while (!defined($dbh)) {
      sleep(1);
      if ($database eq "") {
	$database = "mysql";
	$default = "set";
      }
      $dbh = DBI->connect("DBI:mysql:mysql:$remote_server", $replicate_username, $replicate_password, { PrintError => 0 });
    }
    commit("SET SQL_LOG_UPDATE=0");
    mfile();
    exit(0);
  } else {
    print "forked child: $pid for $child\n";
    push(@childpids,$pid);
  }
}

$localdbh = undef;
while(!defined($localdbh)) {
$localdbh = DBI->connect("DBI:mysql:mysql", $replicate_username, $replicate_password, { PrintError => 0 });
  sleep(1);
}
$next_flush_time = 0;
while(1) {
  $localdbh->do("SET SQL_LOG_UPDATE=0");
  flush_logs();
  sleep(20);
}



