一、引言
在一个比较大的系统当中,很多地方都需要定时功能,比如某些存在内存中的数据,需要定时的做持久化(实时持久化可能带来很大的性能问题),如果这个系统要求跨平台。那么实现一个跨平台的定时器就很有必要了。
c++11的标准中,实现了多线程,头文件为<thread>
,也有了系统时间相关的库,头文件为<chrono>
,这使得使用c++11实现跨平台的定时器有了可能。在这之前,多线程一般都是使用操作系统相关的库。
二、 定时器的基本功能
在定时器的实现之前,需要事先计划好一个定时器的基本功能。大概的功能如下:
1.添加一个定时事件
2.删除一个指定的定时事件
3.定时事件支持只执行一次或者重复执行
4.定时器启动指令
这个部分的内容在Linux 下定时器的实现方式分析中有很详细的介绍
三、 数据结构知识预备
在实现定时器中,需要有一个容器去存储所有的定时事件,比如说最容易想到的数组,如果我们用数组保存定时事件,那么在找出当前需要执行的定时事件的伪代码就是
for(i = 0 ; i < events_array_size; ++i){
if(events[i].time <= current_time){
exec_in_child_thread(events);
}
}
这里就是遍历所有的事件,将已经到了执行时间的事件放到子线程中去执行。
在这里有一个可以优化效率问题:注意到每一次查询是否有事件到了执行时间,都会遍历整个数组,也就是O(n)的复杂度,并且查询周期也是非常短的,所以会有非常大的性能问题。因此,我们可以将这里的数组变成有序的,每一次只需查询第一个事件,如果还没到执行时间,那么之后的事件就不用遍历了。这样平均下来就是O(1)的时间复杂度。考虑到我们在插入,删除时始终要保证数组的有序,可以采用最小堆。也就是堆顶始终是最先到执行时间的一项。
四、c++11多线程的知识预备
4.1 join()还是detach()
c++11中创建一个线程很容易
#include <iostream>
#include <thread>
void function_1(){
std::cout << "hello world" << std::endl;
while(true){}
}
int main(){
std::thread t1(function_1);
t1.join(); //加入运行
std::cout << "hahah" << std::endl;
return 0;
}
这段代码就是创建了一个t1的线程,来执行function_1函数。注意到我们这里使用了t1.join(),字面上的意思是将子线程加入的主线程来执行,这样导致的结果是主线程会在t1.join()这里停止,直到t1执行结束后再往下执行。在这里function_1中存在死循环,所以hahah永远不会被输出。而如果使用t1.detach()的话,即将子线程从主线程分离出去,那么主线程就会一直向下执行,在这里就会执行到return 0,然后主线程销毁,detach出去的子线程可能就没有任何显示的机会了(查过资料说detach出去子线程的行为的undefined的)。
在这个定时器的实现中,当然不能使用join(),因为会阻塞程序的正常运行。
4.2 如何使得定时器的使用是线程安全的。
在一个程序中,可能很多地方会使用到定时器去触发某个动作,但是为了效率,肯定不会创建多个定时器实例,因此定时器的实现应该是单例的。因为需要单例,因此定时器的创建,定时器中定时事件的添加和删除都是线程安全的(因为同一时间可能会有多个线程操作定时器)。
4.2.1 线程安全的单例模式
单例模式的实现方法有很多种,因为能力问题,不对这一点多做叙述。可以看这一篇单例模式(Singleton)及其C++实现。
我这里使用的是使用static来创建局部可见的全局变量
static Timer* getInstance(std::chrono::milliseconds tick){
static Timer timer(tick);
return &timer;
}
4.2.2 使用锁保证定时事件的添加和删除是线程安全的。
在往定时器中添加和删除定时事件时,就是在做最小堆的添加和删除。这个中间线程不安全的现象为:以添加为例,往最小堆数组末尾加入一个数据,然后再逐层向上调整。如果在这个调整的过程中,另外一个线程执行了删除操作,两个调整过程就可能同时发生,造成不可预知的错误,使得最小堆并不有序。
所以添加和删除操作应该是互斥的,只有其中一个操作执行结束,另一个操作才可以开始。c++中可以使用信号量mutex和锁locker轻松的完成互斥的需求。示例代码如下:
#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <fstream>
class LofFile{
public:
LofFile(){
f.open("log.txt");
}
void shared_print(std::string id,int value){
std::lock(m_mutex,m_mutex2);
std::lock_guard<std::mutex> locker(m_mutex,std::adopt_lock);
std::lock_guard<std::mutex> locker2(m_mutex2,std::adopt_lock);
std::cout << "from" << id << ":" << value << std::endl;
}
void shared_print2(std::string id,int value){
std::lock(m_mutex,m_mutex2);
std::lock_guard<std::mutex> locker2(m_mutex2,std::adopt_lock); //lock_guard是为了防止语句执行中出现异常,锁不被释放。这样做可以保证在退出代码块后解锁
std::lock_guard<std::mutex> locker(m_mutex,std::adopt_lock);
std::cout << "from" << id << ":" << value << std::endl;
}
protected:
private:
std::mutex m_mutex; //使用信号量来解决资源竞争
std::mutex m_mutex2;
std::ofstream f;
};
void function_1(LofFile& log){
for(int i = 0; i > -100; i--){
log.shared_print("From t1:",i);
}
};
int main(){
LofFile log;
std::thread t1(function_1,std::ref(log));//线程开始运行
for(int i = 0; i < 100; i++){
log.shared_print2("From main",i);
}
t1.join();
return 0;
}
五、总结与实现
分析到这里,一个定时器的基本要素已经具备了。
1.用来存储定时事件,并且添加和删除操作都是线程安全的最小堆
2.可以多线程实现定时事件的执行从而不阻塞主线程。
下面贴出所有的实现代码:
5.1 最小堆的实现。
SortedHeap.hpp
/**
* 排序堆的实现
* 堆使用的是完全二叉树的结构,使用数组(数组从零开始)保存,那么最后一个非叶子节点为(n - 1) / 2
* 堆的构建一直都是在有序的基础上的,那么每次调整只需比较i和(i - 1) / 2的元素,依次上推
* 支持任意类的排序
* 当前还不支持多线程环境下的使用
* author:jiangpengfie
* date:2017-05-09
*/
#ifndef SORTEDHEAP_H
#define SORTEDHEAP_H
#include <iostream>
#include <vector>
#include <functional>
#include <memory>
#include <mutex>
#include <condition_variable>
#include "src/core/util/util.h"
template<class T>
class SortedHeap{
private:
struct HeapNode{
unsigned int id;
T obj;
HeapNode(unsigned int id,T t):obj(t){
this->id = id;
}
};
std::vector<HeapNode> heap;
unsigned int autoIncrementId;
std::function<bool(T& ,T&)> cmp; //比较函数,实现选择构造最大堆还是最小堆
std::mutex mu1;
std::mutex mu2;
/**
* 插入节点后调整堆中不符合的节点
*/
void adjustAfterInsert();
/**
* pop出堆顶元素后调整堆中不符合的节点
*/
void adjustAfterPopTop();
/**
* 删除节点后调整堆中不符合的节点
* @param i 删除的节点id
*/
void adjustAfterDelete(int id);
void swap(HeapNode& t1,HeapNode& t2);
void deleteNodeByPos(const unsigned int pos);
public:
/**
* 构造函数
* @param cmp 用来比较
*/
SortedHeap(std::function<bool(T&,T&)> cmp);
/**
* 插入节点
* @param node 插入的节点
*/
unsigned int insertNode(T& node);
/**
* 删除节点,时间复杂度为O(n)
* @param id 要删除的节点id
*/
void deleteNode(unsigned int id);
/**
* pop最小的节点
* @return T* 返回的最顶部的节点指针
*/
std::unique_ptr<T> popTopNode();
/**
* 获取最顶部的节点
* @return T 最顶部的节点指针
*/
std::unique_ptr<T> getTopNode();
/**
* 删除顶部的节点
*
*/
void deleteTopNode();
};
template<typename T>
SortedHeap<T>::SortedHeap(std::function<bool(T&,T&)> cmp){
this->cmp = cmp;
this->autoIncrementId = 0;
}
template<typename T>
void SortedHeap<T>::swap(HeapNode& t1,HeapNode& t2){
HeapNode tmp = t1;
t1 = t2;
t2 = tmp;
}
template<typename T>
void SortedHeap<T>::adjustAfterInsert(){
int last = this->heap.size() - 1;
int flag = true;
//从插入的节点位置开始向上调整
while(last > 0 && flag){
if(this->cmp(this->heap[last].obj,this->heap[(last - 1) / 2].obj)){
this->swap(this->heap[(last - 1) / 2],this->heap[last]);
}else{
//不需要调整了
flag = false;
}
last = (last - 1) / 2;
}
}
template<typename T>
void SortedHeap<T>::adjustAfterDelete(int pos){
//从pos位置开始向下调整
int last = this->heap.size() - 1;
if(last == 0)
return; //最后一个不需要调整
bool flag = true; //标记是否需要调整
while(pos <= (last - 1) / 2 && flag){
//一直调整到最后一个非叶子结点
int topNum = 0; //记录最小的结点编号
//(pos + 1) * 2 - 1是左孩子,pos是父
if(this->cmp(this->heap[(pos + 1) * 2 - 1].obj,this->heap[pos].obj)){
topNum = (pos + 1) * 2 - 1;
}else{
topNum = pos;
}
if((pos + 1) * 2 <= last){
//如果存在右结点
if(this->cmp(this->heap[(pos + 1) * 2].obj,this->heap[topNum].obj)){
topNum = (pos + 1) * 2;
}
}
//看看topNum是不是自己
if(pos == topNum){
//是自己就不用调整了
flag = false;
}else{
//交换
this->swap(this->heap[pos],this->heap[topNum]);
}
pos = topNum;
}
}
template<typename T>
void SortedHeap<T>::deleteNodeByPos(const unsigned int pos){
unsigned int last = this->heap.size() - 1;
if(pos > last){
return;
}
std::lock(mu1,mu2); //上锁
std::lock_guard<std::mutex> locker1(mu1,std::adopt_lock);
std::lock_guard<std::mutex> locker2(mu2,std::adopt_lock);
//与最后一个交换
swap(this->heap[pos],this->heap[last]);
//删除最后一个
this->heap.pop_back();
this->adjustAfterDelete(pos);
}
template<typename T>
unsigned int SortedHeap<T>::insertNode(T& node){
HeapNode hNode(this->autoIncrementId++,node);
std::lock(mu1,mu2); //上锁
std::lock_guard<std::mutex> locker1(mu1,std::adopt_lock);
std::lock_guard<std::mutex> locker2(mu2,std::adopt_lock);
this->heap.push_back(hNode); //先将node放在最后一位
if(this->heap.size() != 1){
//如果大小不等于1,则在新增节点后调整
this->adjustAfterInsert();
}
return this->autoIncrementId - 1;
}
template<typename T>
void SortedHeap<T>::deleteNode(unsigned int id){
for(unsigned int i = 0; i < this->heap.size(); i++){
if(heap[i].id == id){
//找到了id
this->deleteNodeByPos(i);
break;
}
}
}
template<typename T>
std::unique_ptr<T> SortedHeap<T>::popTopNode(){
if(this->heap.size() != 0){
std::unique_ptr<T> top(new T(this->heap[0].obj));
this->deleteNodeByPos(0);
return top;
}else{
std::unique_ptr<T> p = nullptr;
return p;
}
}
template<typename T>
std::unique_ptr<T> SortedHeap<T>::getTopNode(){
if(this->heap.size() != 0){
std::unique_ptr<T> top(new T(this->heap[0].obj));
return top;
}else{
std::unique_ptr<T> p = nullptr;
return p;
}
}
template<typename T>
void SortedHeap<T>::deleteTopNode(){
if(this->heap.size() != 0){
this->deleteNodeByPos(0);
}
}
#endif
5.2 定时器的实现
Timer.h
/**
* 定时器的实现
* 支持int setTimer(T interval,function action):设置一个定时器,指定间隔interval和回调函数action,返回定时器id
* 支持void deleteTimer(int timerId):删除一个定时器
* 数据结构:最小堆模型,按照定时器触发的时间排序
* author:jiangpengfei
* date:2017-05-09
*/
#ifndef TIMER_H
#define TIMER_H
#include <iostream>
#include <chrono>
#include <functional>
#include <thread>
#include <memory>
#include "SortedHeap.hpp"
class Timer{
private:
std::chrono::milliseconds tick;
double timeline; //当前时间线,long double的字节数为12
bool isStart; //标志当前定时器的启动状态
struct SchedulerEvent{
unsigned int id; //定时事件的唯一标示id
double interval; //事件的触发间隔,在重复事件中会用到这个属性
double deadline; //定时事件的触发时间
std::function<void()> action; //触发的事件
bool isRepeat; //是否是重复执行事件
SchedulerEvent( double interval, double timeline,std::function<void()> action,bool isRepeat){
this->interval = interval;
this->deadline = interval + timeline;
this->action = action;
this->isRepeat = isRepeat;
}
};
SortedHeap<SchedulerEvent> eventQueue;
/**
* 执行到达期限的定时器
*/
void loopForExecute();
//私有的构造函数
Timer(std::chrono::milliseconds tick):eventQueue(
[](SchedulerEvent& a,SchedulerEvent& b){
return a.deadline < b.deadline;
}
){
this->timeline = 0;
this->tick = tick;
this->isStart = false;
}
public:
//单例模式
static Timer* getInstance(std::chrono::milliseconds tick){
static Timer timer(tick);
return &timer;
}
/**
* 设置定时器
* @param interval 定时间隔
* @param action 定时执行的动作
* @param isRepeat 是否重复执行,默认不重复执行
* @return unsigned int 定时器的id,可以根据这个id执行删除操作
*/
unsigned int addEvent(double interval,std::function<void()> action,bool isRepeat = false);
/**
* 删除定时器
* @param timerId 定时器id
*
*/
void deleteEvent(unsigned int timerId);
/**
* 同步执行启动定时器
*/
void syncStart();
/**
* 异步执行启动定时器
*/
void asyncStart();
};
#endif
Timer.cpp
#include "src/core/util/Timer.h"
unsigned int Timer::addEvent(double interval,std::function<void()> action,bool isRepeat){
SchedulerEvent event(interval,this->timeline,action,isRepeat);
return this->eventQueue.insertNode(event);
}
void Timer::deleteEvent(unsigned int timerId){
this->eventQueue.deleteNode(timerId);
}
void Timer::loopForExecute(){
std::unique_ptr<SchedulerEvent> top = this->eventQueue.getTopNode();
while(top != nullptr && top->deadline <= this->timeline){
//如果已经到了执行的时间,新开一个子线程执行任务
std::thread t(top->action);
t.detach(); //子线程分离
if(top->isRepeat){
//如果是重复事件,则重新添加
this->addEvent(top->interval,top->action,top->isRepeat);
}
//从堆中删除
this->eventQueue.deleteTopNode();
top = this->eventQueue.getTopNode();
}
//执行一次后等待一个周期
std::this_thread::sleep_for(this->tick);
//周期增1
this->timeline++;
}
void Timer::asyncStart(){
if(!this->isStart){
std::thread daemon_thread(&Timer::syncStart,this);
daemon_thread.detach(); //从当前主线程分离
}
}
void Timer::syncStart(){
if(!this->isStart){
while(1)
this->loopForExecute();
}
}
测试执行的代码
#include <iostream>
#include <chrono>
#include <ctime>
#include <iomanip>
#include <string>
#include <functional>
#include <thread>
#include <memory>
#include <fstream>
#include "src/core/util/Timer.h"
void myprint(std::string msg){
std::ofstream of("timer.txt", std::ios::app);
std::thread::id this_id = std::this_thread::get_id();
auto t = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
of << "From Thread " << this_id << "at time " << std::put_time(std::localtime(&t), "%Y-%m-%d %H.%M.%S") << ":" << msg << std::endl;
}
int main(){
std::chrono::milliseconds tick(2000); //1000毫秒作为一个周期
Timer* timer = Timer::getInstance(tick);
std::function<void()> f1 = std::bind(myprint,"第一个加入,10tick后执行");
std::function<void()> f2 = std::bind(myprint,"第二个加入,被删除不执行");
std::function<void()> f3 = std::bind(myprint,"第三个加入,每5tick重复执行");
std::function<void()> f4 = std::bind(myprint,"第四个加入,5tick后执行");
std::function<void()> f5 = std::bind(myprint,"第五个加入,5tick后执行");
std::function<void()> f6 = std::bind(myprint,"第六个加入,5tick后执行");
std::function<void()> f7 = std::bind(myprint,"第七个加入,5tick后执行");
std::function<void()> f8 = std::bind(myprint,"第八个加入,5tick后执行");
std::function<void()> f9 = std::bind(myprint,"第九个加入,5tick后执行");
std::function<void()> f10 = std::bind(myprint,"第十个加入,5tick后执行");
std::function<void()> f11 = std::bind(myprint,"第十一个加入,15tick后执行");
std::function<void()> f12 = std::bind(myprint,"第十二个在执行后加入,20tick+5s后执行");
timer->addEvent(10,f1);
int id = timer->addEvent(11,f2);
timer->addEvent(5,f3,true);
timer->addEvent(5,f4);
timer->addEvent(5,f5);
timer->addEvent(5,f6);
timer->addEvent(5,f7);
timer->addEvent(5,f8);
timer->addEvent(5,f9);
timer->addEvent(5,f10);
timer->addEvent(15,f11);
timer->deleteEvent(id);
myprint("线程开始启动,每tick是2秒");
//异步执行,程序退出后计时器也会终止,因此在下面使用while循环保证程序不会退出
timer->asyncStart();
//timer->syncStart();
//休眠5秒钟
std::this_thread::sleep_for(std::chrono::seconds(5));
//应该在大概20*tick+5秒后执行,
//TODO 执行后加入的定时器不对
timer->addEvent(20,f12);
getchar();
return 0;
}