发新话题
打印

[转载]用Perl POE实现端口重定向

[转载]用Perl POE实现端口重定向

文章作者:r00t (shanleiguang_at_gmail.com)

POE - Persistent Object Enviroment,http://poe.perl.org

POE是用单线程实现的基于事件驱动的,可协作完成多任务的Perl模块,简单的说,就是在单线程下的状态机,在事件的循环中,有某个注册事件发生时,就调用注册的处理代码。在用Perl做网络编程时,常常需要多进程、线程、监视Socket的IO等并发操作,相关的函数和模块即fork、threads、select,另一个选择就是使用POE单线程的事件驱动来模拟并发的进程和线程,显然单线程下可以方便的共享数据。下面实现TCP端口重定向,例一使用创建子进程和IO::Select;例二采用POE实现。

===========================================================================================
eXample one:端口重定向(fork、IO::Select)
===========================================================================================
#!C:\Perl\bin\perl.exe
#A simple TCP stream forwarder
#By shanleiguang@gmail.com, 2005/10
use strict;
use warnings;

use IO::Socket;
use IO::Select;
use Getopt::Std;
use POSIX qw(:sys_wait_h strftime);

use constant FOREVER => 1;
use constant BUFSIZE => 4096;

#父进程下处理僵死子进程
sub zombie_reaper {
   while(waitpid(-1, WNOHANG) > 0) {}
   $SIG{CHLD} = \&zombie_reaper;
}

$SIG{CHLD} = \&zombie_reaper;

#处理参数
my %opts;
getopts('h:l:t:p:', \%opts);

print_help() and exit if(defined($opts{'h'}));
print_help() and exit if(not defined($opts{'t'}) or not defined($opts{'p'}));
print_help() and exit if($opts{'t'} !~ m/^\d+.\d+.\d+.\d+$/);
print_help() and exit if($opts{'l'} !~ m/^\d+$/);
print_help() and exit if($opts{'p'} !~ m/^\d+$/);

my $listen_port = (defined($opts{'l'})) ? $opts{'l'} : 8080;
my $target_ip  = $opts{'t'};
my $target_port = $opts{'p'};

#在本地创建监听Socket
my $socket_listen  = IO::Socket::INET->new(
   LocalPort => $listen_port,
   Proto    => 'tcp',
   Listen   => 5,
   Reuse    => 1,
);

print timestamp(), ", listening on port $listen_port ...\n";

#新建两个用于Socket IO监视的IO::Select对象
my $readers = IO::Select->new();
my $writers = IO::Select->new();

$readers->add($socket_listen);

#父进程仅监视Listening Socket的accept事件
while(FOREVER) {
   my @readers = $readers->can_read;

   foreach my $reader (@readers) {
      if($reader eq $socket_listen) {
        #创建子进程处理后续的转发,父进程继续监视Listening Socket
        fork and next;

        my $socket_client = $socket_listen->accept();

        #在子进程中,不再需要对Listening Socket进行操作
        $readers->remove($socket_listen);
        $socket_listen->close();

        #子进程
        as_server($socket_client);

        exit;
      }
   }
}

#子进程
sub as_server {
   my $socket_client = shift;

   my $client_port = $socket_client->peerport();
   my $client_ip  = $socket_client->peerhost();

   #创建到目标地址:端口的Socket连接
   my $socket_forward = IO::Socket::INET->new(
      PeerAddr => $target_ip,
      PeerPort => $target_port
   );

   print timestamp(), ", $client_ip:$client_port<->$target_ip:$target_port.\n";

   #监视socket_client、socket_forward的IO情况
   $readers->add($socket_client);
   $readers->add($socket_forward);
   $writers->add($socket_client);
   $writers->add($socket_forward);

   my ($rbuffer_forward, $rbuffer_client) = (&#39;&#39;, &#39;&#39;);

   while(FOREVER) {
      my @readers = $readers->can_read;

      foreach my $reader (@readers) {
        my $rbuffer;
        #当socket_client可读时,将读取的内容追加到rbuffer_client后
        #假如读取失败,则退出子进程
        if($reader eq $socket_client) {
           exit if(not recv($reader, $rbuffer, BUFSIZE, 0));
           $rbuffer_client.= $rbuffer;
        }

        #当socket_forward可读时,将读取的内容追加到rbuffer_forward后
        #假如读取失败,则退出子进程
        if($reader eq $socket_forward) {
           exit if(not recv($reader, $rbuffer, BUFSIZE, 0));
           $rbuffer_forward.= $rbuffer;
        }
      }

      my @writers = $writers->can_write;

      foreach my $writer (@writers) {
        #当socket_client可写,且rbuffer_forward不为空时,将rbuffer_forward
        #内容写入socket_client,假如写失败,则退出子进程
        if($writer eq $socket_client) {
           next if(not $rbuffer_forward);
           exit if(not send($writer, $rbuffer_forward, 0));
           $rbuffer_forward = &#39;&#39;;
        }

        #当socket_forward可写,且rbuffer_client不为空时,将rbuffer_client
        #内容写入socket_forward,假如写失败,则退出子进程
        if($writer eq $socket_forward) {
           next if(not $rbuffer_client);
           exit if(not send($writer, $rbuffer_client, 0));
           $rbuffer_client = &#39;&#39;;
        }
      }
   }
}

sub timestamp {
   return strftime "[%y/%m/%d,%H:%M:%S]", localtime;
}

sub print_help {
   my $filename = (split /\\/, $0)[-1];

   print <<HELP

  >>> $filename [-h,-l:] <-t:,-p:>
   -h  print help
   -l  listening local port, default 8080
   -t  target ipaddr
   -p  target port
        By shanleiguang\@gmail.com, 2005/10

HELP
}
===========================================================================================


===========================================================================================
eXample two:端口重定向(POE)
===========================================================================================
POE结构:

Driver->Filter->Wheel->Components
  |______|______|________|
         |
       Session
         |
        Kernel

Driver:   底层文件操作的抽象,在编程时不会直接用到
Filter:   底层、中层协议操作的抽象,通常不会直接用到
Wheel:    高层协议操作的抽象,经常要用到
Components:POE提供的一些拿来就能用的组件
Session:  会话抽象,会话中需要创建高层协议抽象
Kernel:   POE管理会话的内核

POE对象的数据结构:

$_[HEAP]:是会话唯一的数据存储区;
$_[SESSION]:是指向会话自身的引用;
$_[KERNEL]:是指向会话管理内核的引用;
@_[ARG0..ARG9]:用于传递给各事件处理函数的参数;

还是实例最直观:
在父会话中创建一个监听用的Socket,当有客户端连接,即有accept_sucess事件发生时,
则创建一个子会话处理后续事件,并将accept获得的客户端Socket传递给子会话;子会话
创建到目标的Socket,连接过程中,如果客户端Socket中有input事件,则将客户端的input
内容缓存在一个队列中,当连接成功后,发送给到目标的那个Socket中,见下图:

       +-------------------------------+
      /|    Socket_listen        |
      / +-------------------------------+
Client<-->|Socket_client <=> Socket_server|<-->Target
       +-------------------------------+
              Forwarder
===========================================================================================
#!C:\Perl\bin\perl.exe
#A simple TCP stream forwarder
#By shanleiguang@gmail.com, 2005/10
use strict;
use warnings;

use Socket;
use Getopt::Std;
use POSIX qw(strftime);
use POE qw(
   Wheel::SocketFactory
   Wheel::ReadWrite
   Filter::Stream
);

#Get Options
my %opts;
getopts(&#39;hl:t:p:&#39;, \%opts);

print_help() and exit if(defined($opts{&#39;h&#39;}));
print_help() and exit if(not defined($opts{&#39;t&#39;}) or not defined($opts{&#39;p&#39;}));
print_help() and exit if($opts{&#39;t&#39;} !~ m/^\d+.\d+.\d+.\d+$/);
print_hekp() and exit if($opts{&#39;p&#39;} !~ m/^\d+$/);

my $listen_port = (defined($opts{&#39;l&#39;})) ? $opts{&#39;l&#39;} : 8080;
my $target_addr = $opts{&#39;t&#39;};
my $target_port = $opts{&#39;p&#39;};

###Create Parent - &#39;Listen Session&#39;###
###创建父会话用于监听客户端的连接
###会话创建的最后将进入_start状态,执行_start的handler
###accept_success即在_start的handler中创建监听Socket的Wheel中
###的SuccessEvent事件,它的handler是forwarder_create函数
###$_[ARG0]是wheel::SocketFatory的SuccessEvent传递的参数
POE::Session->create(
   inline_states => {
      _start => \&forwarder_server_start,
      _stop  => sub { print timestamp(), ", forwarder server stopped."; },

      accept_success => sub { &forwarder_create($_[ARG0]); },
      accept_failure => sub { delete $_[HEAP]->{server_wheel} },
   },
);

$poe_kernel->run();

exit;

###Event handlers for Parent Session###
###父会话中的事件处理函数
sub forwarder_server_start {
   print timestamp(), ", listening on port $listen_port and ";
   print "forward to $target_addr:$target_port\n";

   #在父会话的存储区创建一个监听Socket类型的Wheel
   $_[HEAP]->{server_wheel} = POE::Wheel::SocketFactory->new(
      BindPort     => $listen_port,
      SocketProtocol => &#39;tcp&#39;,
      ListenQueue   => SOMAXCONN,
      Reuse       => &#39;on&#39;,

      #ARG0 of SuccessEvent
      SuccessEvent => &#39;accept_success&#39;,
      FailureEvent => &#39;accept_failure&#39;,
   );
}

###Create Child - &#39;Forward Session&#39;###
###创建子会话
sub forwarder_create {
   my $socket = shift;

   POE::Session->create(
      inline_states => {
        _start => \&forwarder_start,
        _stop  => sub {
           print &#39; &#39; x 4, timestamp(), &#39;, sessionId:&#39;;
           print $_[SESSION]->ID, ", forwarder stop\n";
           },
        client_input  => \&client_input,
        client_error  => sub {
           delete $_[HEAP]->{wheel_client};
           delete $_[HEAP]->{wheel_server};
           },
        server_connect => \&server_connect,
        server_input  => sub {
           $_[HEAP]->{wheel_client}->put($_[ARG0])
              if(exists $_[HEAP]->{wheel_client});
           },
        server_error  => sub {
           delete $_[HEAP]->{wheel_client};
           delete $_[HEAP]->{wheel_server};
           },
      },
      #Parameters to &#39;_start&#39; Event
      args => [$socket],
   );
}

##Event Handlers of Child Session##

sub forwarder_start {
   my ($heap, $socket) = @_[HEAP, ARG0];

   print &#39; &#39; x 4, timestamp(), &#39;, sessionId:&#39;;
   print $_[SESSION]->ID, ", forwarder start\n";

   #Buffer client&#39;s input while connecting to the target
   $heap->{state} = &#39;connecting&#39;;
   $heap->{queue} = [];

   #Client<-&#39;wheel_client&#39;->Forwarder server
   $heap->{wheel_client} = POE::Wheel::ReadWrite->new(
      Handle    => $socket,
      Driver    => POE::Driver::SysRW->new(),
      Filter    => POE::Filter::Stream->new(),
      InputEvent => &#39;client_input&#39;,
      ErrorEvent => &#39;client_error&#39;,
   );

   #Forwarder server<-&#39;wheel_server&#39;->target
   $heap->{wheel_server} = POE::Wheel::SocketFactory->new(
      RemoteAddress => $target_addr,
      RemotePort   => $target_port,
      SuccessEvent  => &#39;server_connect&#39;,
      FailureEvent  => &#39;server_error&#39;,
   );
}

sub server_connect {
   my ($kernel, $session, $heap, $socket) = @_[KERNEL, SESSION, HEAP, ARG0];

   #Replace $heap->{wheel_server}
   $heap->{wheel_server} = POE::Wheel::ReadWrite->new(
      Handle    => $socket,
      Driver    => POE::Driver::SysRW->new,
      Filter    => POE::Filter::Stream->new,
      InputEvent => &#39;server_input&#39;,
      ErrorEvent => &#39;server_error&#39;,
   );

   $heap->{state} = &#39;connected&#39;;

   $kernel->call($session, &#39;client_input&#39;, $_) foreach(@{$heap->{queue}});

   $heap->{queue} = [];
}

sub client_input {
   my ($heap, $input) = @_[HEAP, ARG0];

   push @{$heap->{queue}}, $input and return if($heap->{state} eq &#39;connecting&#39;);
   $heap->{wheel_server}->put($input) if(exists $heap->{wheel_server});
}

#Common subroutines
sub timestamp {
   return strftime "[%H:%M:%S]", localtime;
}

sub print_help {
   my $filename = (split /\\/, $0)[-1];

   print <<HELP

  >>> $filename [-h,-l:] <-t:,-p:>
   -h  print help
   -l  listen port
   -t  target ipaddress
   -p  target port
        A simple TCP forwarder server, 2005/10
        By shanleiguang\@gmail.com

HELP
}
===========================================================================================

> perl tcpForwarder.pl -l 8080 -t xxx.xxx.xxx.xxx -p 80
[xx:xx:xx:], listening on local port 8080 and forward to xxx.xxx.xxx.xxx:80...
...
...
曾几何时,有人对我说:装B遭雷劈。我说:去你妈的。于是,这个人又对我说:如果再说脏话,上帝会惩罚你的。我说:我操上帝。结论:彪悍的人生不需要上帝。

TOP

发新话题