@lishuhuakai
2016-11-15T22:40:18.000000Z
字数 5788
阅读 1401
这一次的版本我们在第一版的基础上添加了一些有意思的东西,增强了我们的功能.
代码的实现里有一些很有意思的地方,下载限速和上传限速就是其中一例,这些东西听起来非常高大上,可是如果你真的做了的话,会发现这些东西真的也就是这样.
限速的原理很简单,如果发现下载速度过快,那么我就休息一下,不下那么快,这样速度就降下来的.上传同理.
SpeedBarrier
/* 下载或者上传速度限制器 */
class SpeedBarrier : boost::noncopyable
{
public:
void StartTimer()
{
start_ = Timestamp::now();
}
void limitSpeed(int64_t maxSpeed, size_t bytesTransed);
static int64_t maxDownloadSpeed; /* 最大的下载速度 */
static int64_t maxUploadSpeed; /* 最大的上传速度 */
private:
void nanoSleep(int64_t microSeconds);
private:
Timestamp start_; /* 开始的时间 */
};
如何使用这个速度限制器呢?很简单,首先,我们要执行StartTimer()
函数开始计时.我们可以看到上面的实现代码,就是获取当前的时间.
然后传输一点数据之后,然后调用limitSpeed函数,将允许的最大传输速度和你已经发送的数据量作为参数传递过去,然后就可以了:
void SpeedBarrier::limitSpeed(int64_t maxSpeed, size_t bytesTransed) {
/* 开始限制速度 */
Timestamp now = Timestamp::now();
int64_t timePassed = now.microSecondsSinceEpoch() - start_.microSecondsSinceEpoch();
int64_t speedNow = bytesTransed / (static_cast<double>(timePassed) / Timestamp::kMicroSecondsPerSecond); /* 求出每秒钟传送的字节数 */
if (speedNow > maxSpeed) {
/* 然后我们就必须休眠 */
int64_t diff = speedNow - maxSpeed; /* 求出两者之间的差值 */
int64_t sleepMicroSeconds = (diff / static_cast<double>(maxSpeed)) * timePassed;
nanoSleep(sleepMicroSeconds);
//usleep(sleepMicroSeconds); // usleep要求参数小于1000000,也就是1秒,这是不现实的.
}
start_ = Timestamp::now(); /* 重新计时 */
}
如果传输的速度超过了,那么就要休眠对应的时间.
这也是一个很有意思的功能,以前以为断点续传一定是一个高大上的功能,实现了之后发现一文不值.
断点续传的实现简单粗暴,首先,客户端发送rest
命令,传递需要续传的位置,然后传输文件的时候,将文件读写指针指向那个位置,然后开始发送.
void CmdHandle::REST() {
/* 这次要实现的一个比较有意思的功能是断点续传 */
resumePoint_ = str2longlong(argv_.c_str()); /* 记录断点 */
Reply("%d we will transfer the file from the position we got!\r\n", FTP_RESTOK);
}
void CmdHandle::RETR() {
Connection conn = GetConnect(); /* 获得连接 */
int fd = open(argv_.c_str(), O_RDONLY, NULL); /* 打开文件 */
if (fd < 0) {
Reply("%d Open local file fail.\r\n", FTP_FILEFAIL);
return;
}
/* 开始传送文件 */
char text[1024] = { 0 };
struct stat fileInfo;
size_t size;
{
FileRDLock lock(fd); /* 读锁,如果不能读的话,会一直阻塞 */
fstat(fd, &fileInfo);
if (!S_ISREG(fileInfo.st_mode)) { /* 判断是否为普通文件,设备文件不能下载 */
Reply("%d It is not a regular file.\r\n", FTP_FILEFAIL);
return;
}
if (mode_ == binary)
sprintf(text, "Opening BINARY mode data connection for %s (%lld bytes).",
argv_.c_str(), (long long)fileInfo.st_size);
else
sprintf(text, "Opening ASCII mode data connection for %s (%lld bytes).",
argv_.c_str(), (long long)fileInfo.st_size);
Reply("%d %s\r\n", FTP_DATACONN, text);
// 读取文件内容,写入套接字
size = fileInfo.st_size;
if (resumePoint_ != 0) {
Lseek(fd, resumePoint_, SEEK_SET); /* 重定位文件 */
size -= resumePoint_; /* 需要传送的字节的数目 */
}
barrier_.StartTimer(); /* 开始计时 */
while (size > 0) {
int sended = sendfile(conn->GetFd(), fd, NULL, bytesPerTime); /* 每次发送一点 */
if (sended == -1) {
break;
}
barrier_.limitSpeed(SpeedBarrier::maxDownloadSpeed, sended); /* 开始限速 */
size -= sended;
}
}
if (size == 0)
Reply("%d Transfer complete.\r\n", FTP_TRANSFEROK);
else {
Reply("%d Transfer failed.\r\n", FTP_BADSENDNET); /* 连接关闭,放弃传输 */
}
utility::Close(fd); /* 关闭文件 */
resumePoint_ = 0; /* 文件已经传送完毕了,要将断点复原 */
}
abort
命令整个程序真正有点难度的是abort
命令.
客户端正在下载文件,突然用户点了取消,客户端给你发送紧急消息怎么弄?如果你不屏蔽SIGPIPE
消息的话,你的程序多半会挂掉.如果你不处理客户端发送的紧急消息的话,你下一次接收命令的时候应该会碰到一堆的\377
之类的东西,parse
没弄好的话,分分钟将你服务端搞死.
SIGPIPE
消息最近被SIGPIPE
消息坑了很久.所以立志要一次性解决它.
SIGPIPE
消息的由来对一个对端已经关闭的socket
调用两次write
, 第二次将会生成SIGPIPE
信号, 该信号默认结束进程.
具体的分析可以结合TCP
的”四次握手”关闭. TCP
是全双工的信道,可以看作两条单工信道, TCP
连接两端的两个端点各负责一条. 当对端调用close
时, 虽然本意是关闭整个两条信道, 但本端只是收到FIN
包. 按照TCP
协议的语义, 表示对端只是关闭了其所负责的那一条单工信道, 仍然可以继续接收数据. 也就是说, 因为TCP
协议的限制, 一个端点无法获知对端已经完全关闭.
对一个已经收到FIN
包的socket
调用read
方法, 如果接收缓冲已空, 则返回0
, 这就是常说的表示连接关闭. 但第一次对其调用write
方法时, 如果发送缓冲没问题, 会返回正确写入(发送). 但发送的报文会导致对端发送RST
报文, 因为对端的socket
已经调用了close
, 完全关闭, 既不发送, 也不接收数据. 所以, 第二次调用write
方法(假设在收到RST
之后), 会生成SIGPIPE
信号, 导致进程退出.
那么,我们如何屏蔽SIGPIPE
消息?
SIGPIPE
消息屏蔽方法对于单进程而言,有下面几种方法可以尝试一下:
signal
函数有意思的是,这种方法在我的电脑上完全行不通,我不知道是不是都是这样,但是还是在这里贴一下:
signal(SIGPIPE, SIG_IGN); /* 忽略掉SIGPIPE消息 */
signal
函数是很老的东西,它由ISO C
定义,由于ISO C不涉及多线程,进程组等,所以它对信号的定义非常模糊,以致于对Unix系统而言几乎毫无用处.所以说,不推荐系统提供的signal函数.
如果你实在要使用signal
的话,stevents
老爷子用sigaction
.函数给我们重新实现了一遍signal
函数,我们比较推荐这个版本.这里也顺带在这里贴一下:
void unix_error(const char *msg) /* unix-style error */
{
fprintf(stderr, "%s: %s\n", msg, strerror(errno));
exit(0);
}
typedef void handler_t(int);
handler_t *Signal(int signum, handler_t *handler) /* 用于注册信号处理函数 */
{
struct sigaction action, old_action;
action.sa_handler = handler; /* 信号处理函数 */
sigemptyset(&action.sa_mask); /* block sigs of type being handled */
action.sa_flags = SA_RESTART; /* 如果可能的话,重启系统调用 */
if (sigaction(signum, &action, &old_action) < 0)
unix_error("Signal error");
return (old_action.sa_handler);
}
我测试一下这个函数,对其他的信号貌似都很管用,在我的电脑上对SIGPIPE
消息却没什么用处,真是奇怪.
另外的方法也是有的,除了signal方法,其实我们也可以使用信号集的方法.
我这里给一个函数:
void BlockSigno(int signo) { /* 阻塞掉某个信号 */
sigset_t signal_mask;
sigemptyset(&signal_mask); /* 初始化信号集,并清除signal_mask中的所有信号 */
sigaddset(&signal_mask, signo); /* 将signo添加到信号集中 */
sigprocmask(SIG_BLOCK, &signal_mask, NULL); /* 这个进程屏蔽掉signo信号 */
}
然后调用:
BlockSigno(SIGPIPE);
即可.如果想具体了解这几个函数,可以去看APUE
.
SIGPIPE
屏蔽方法其实线程的屏蔽方法和单进程差不太多,不行,你可以看:
void BlockSigno(int signo)
{
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, signo);
pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
}
然后我们调用:
BlockSigno(SIGPIPE);
即可.当然,上面的代码都做了简化,没有错误处理,你自己可以添加.
首先,我们要注册SIGURG
消息.注册这个消息有一点很有意思,那就是我们一般要添加这样的一段代码:
/*-
* 这条命令还是很有必要的,因为系统可不知道cmdfd_所属的进程,所以它也不知道应该向谁发送SIGURG信号.
* 而设置了fd所属的进程之后,系统一旦检测到了客户端发来了紧急数据,就会立马通知该进程.否则的话
* 该进程是收不到SIGURG信号的.
*/
fcntl(fd, F_SETOWN, getpid());
我这里SIGURG
处理很随便,直接丢掉:
void CmdHandle::HandleUrg() { /* 处理紧急数据 */
/*-
* 一般而言,对于正在传输中的数据连接,客户端会给服务器发送紧急数据,当然,我这里也没有处理,直接丢弃.
* 发送的紧急数据是什么呢? \377\364 \377\362之类的.
* \377\364 \377\362就是telnet协议中规定的操作,我翻译一下(操作序列):'IAC' 'Interrupt Process' 'IAC' 'Data Mark'
* 其中:(1) \364表示操作:'Interrupt Process',即实施telnet的The function IP。含义:ftp客户机告诉你这个FTP服务器,
* 赶快放下你现在手头的事情,马上处理我的事件(我有紧急数据到来)。
* (2) \362表示操作:'Data Mark',这个字节是ftp客户机以TCP的紧急模式发送的一个字节。含义:即:其后的数据必须立即读取。
* (3) \377即IAC,是telnet中的转义字节(即:255),每一个telnet操作(如:\364、\362)都必须以IAC开始。
* 如何处理? 因为f2是通过TCP紧急模式发送的一个字节而已。你只要将字节f2(即telnet操作:\362)丢弃即可以了。以上仅供你参考.
*/
char cmd[256] = { 0 };
int errorno = 0;
buffer_.readFd(cmdfd_, &errorno);
strncpy(cmd, buffer_.peek(), buffer_.readableBytes()); /* 调试的时候你可以查看收到了什么东西 */
if (errorno == 0 && false == buffer_.getLine(cmd, sizeof(cmd))) /* 没有出错 */
buffer_.retrieveAll(); /* 丢弃掉紧急数据 */
printf("recv urg!\n");
}