博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
epoll 边界触发模式1,2的实现
阅读量:6643 次
发布时间:2019-06-25

本文共 22477 字,大约阅读时间需要 74 分钟。

本篇贴出在上篇文章中介绍的模式1的实现代码.

首先,因为是多线程的程序,必须防止某一资源在一个线程中使用的时候,却在另一个线程中释放了.

其中最主要的便是socket_t结构,为了杜绝这个问题,对应用层来说,应该根本不知道socket_t的存在,
仅仅提供一个HANDLE给应用层就足够了.

#ifndef _KENDYNET_H#define _KENDYNET_H#include "MsgQueue.h"typedef struct list_node{    struct list_node *next;}list_node;#define LIST_NODE list_node node;/*IO请求和完成队列使用的结构*/typedef struct{    LIST_NODE;    struct iovec *iovec;    int    iovec_count;    int    bytes_transfer;    int    error_code;}st_io;enum{        //完成模式,当套接口从不可读/写变为激活态时, 如果有IO请求,则完成请求,并将完成通告发送给工作线程    MODE_COMPLETE = 0,    //poll模式,当套接口从不可读/写变为激活态时, 如果有IO请求,将请求发送给工作线程,由工作线程完成请求    MODE_POLL,};//初始化网络系统int      InitNetSystem();typedef int HANDLE;struct block_queue;struct block_queue* CreateEventQ();void DestroyEventQ(struct block_queue**);HANDLE   CreateEngine(char);void     CloseEngine(HANDLE);int      EngineRun(HANDLE);int     Bind2Engine(HANDLE,HANDLE);//获取和投递事件到engine的队列中//int      GetQueueEvent(HANDLE,struct block_queue*,st_io **,int timeout);int      GetQueueEvent(HANDLE,MsgQueue_t,st_io **,int timeout);int      PutQueueEvent(HANDLE,st_io *);/**   发送和接收,如果操作立即完成返回完成的字节数*   否则返回0,当IO异步完成后会通告到事件队列中.*   返回-1表示套接口出错或断开*   notify:当IO立即可完成时,是否向事件队*   列提交一个完成通告,设置为1会提交,0为不提交.*/int WSASend(HANDLE,st_io*,int notify);int WSARecv(HANDLE,st_io*,int notify);#endif

 

这里定义的st_io结构,相当与IOCP里的OVERLAPPED结构,其中,第一个成员是list_node,这样

可以很方便的将st_io放在队列中.

然后是GetQueueEvent函数,这个函数的作用相当于IOCP里的GetCompleteStatus,用于从队列

中获取事件,不同的是,这个队列是由用户提供的,也就是参数中的block_queue*,如果调用
GetQueueEvent的时候,engine的buffering_event_queue中没有事件,就会将block_queue
放到一个等待队列中(block_thread_queue),当其它地方调用PutQueueEvent的时候,首先
会看下是否有线程在等待,有就把它的block_queue弹出push一个事件进去,然后唤醒阻塞
的线程.

#include "KendyNet.h"#include "Engine.h"#include "Socket.h"#include "link_list.h"#include "HandleMgr.h"#include 
#include "MsgQueue.h"int InitNetSystem(){ return InitHandleMgr();}struct block_queue* CreateEventQ(){ return BLOCK_QUEUE_CREATE();}void DestroyEventQ(struct block_queue** eq){ BLOCK_QUEUE_DESTROY(eq);}int EngineRun(HANDLE engine){ engine_t e = GetEngineByHandle(engine); if(!e) return -1; e->Loop(e); return 0;}HANDLE CreateEngine(char mode){ HANDLE engine = NewEngine(); if(engine >= 0) { engine_t e = GetEngineByHandle(engine); e->mode = mode; LIST_CLEAR(e->buffering_event_queue); LIST_CLEAR(e->block_thread_queue); if(0 != e->Init(e)) { CloseEngine(engine); engine = -1; } } return engine;}void CloseEngine(HANDLE handle){ ReleaseEngine(handle);}int Bind2Engine(HANDLE e,HANDLE s){ engine_t engine = GetEngineByHandle(e); socket_t sock = GetSocketByHandle(s); if(!engine || ! sock) return -1; if(engine->Register(engine,sock) == 0) { sock->engine = engine; return 0; } return -1;}//int GetQueueEvent(HANDLE handle, struct block_queue *EventQ,st_io **io ,int timeout)int GetQueueEvent(HANDLE handle, MsgQueue_t EventQ,st_io **io ,int timeout){ assert(EventQ);assert(io);/* engine_t e = GetEngineByHandle(handle); if(!e) return -1; spin_lock(e->mtx,0);//mutex_lock(e->mtx); if(e->status == 0) { spin_unlock(e->mtx);//mutex_unlock(e->mtx); return -1; } if(*io = LIST_POP(st_io*,e->buffering_event_queue)) { spin_unlock(e->mtx);//mutex_unlock(e->mtx); return 0; } //插入到等待队列中,阻塞 LIST_PUSH_FRONT(e->block_thread_queue,EventQ); spin_unlock(e->mtx);//mutex_unlock(e->mtx); if(POP_FORCE_WAKE_UP == BLOCK_QUEUE_POP(EventQ,io,timeout)) return -1;//因为engine的释放而被强制唤醒 */ engine_t e = GetEngineByHandle(handle); if(!e) return -1; mutex_lock(e->mtx); if(e->status == 0) { mutex_unlock(e->mtx); return -1; } if(*io = LIST_POP(st_io*,e->buffering_event_queue)) { mutex_unlock(e->mtx); return 0; } //插入到等待队列中,阻塞 LIST_PUSH_FRONT(e->block_thread_queue,EventQ); mutex_unlock(e->mtx); GetMsg(EventQ,io,sizeof(*io),0); if(*io == 0) return -1; return 0; }int PutQueueEvent(HANDLE handle,st_io *io){ assert(io); engine_t e = GetEngineByHandle(handle); if(!e) return -1; return put_event(e,io);}extern int put_event(engine_t e,st_io *io);int WSASend(HANDLE sock,st_io *io,int notify){ assert(io); socket_t s = GetSocketByHandle(sock); if(!s) return -1; int active_send_count = -1; int ret = 0; mutex_lock(s->send_mtx); //为保证执行顺序与请求的顺序一致,先将请求插入队列尾部,再弹出队列首元素 LIST_PUSH_BACK(s->pending_send,io); io = 0; if(s->writeable) { io = LIST_POP(st_io*,s->pending_send); active_send_count = s->active_write_count; } mutex_unlock(s->send_mtx); if(s->engine->mode == MODE_POLL) { if(io) { //当前处于激活态 if(notify) //不直接处理,将处理交给完成线程 put_event(s->engine,io); else ret = _send(s,io,active_send_count,notify); } } else { if(io) ret = _send(s,io,active_send_count,notify); } return ret;}int WSARecv(HANDLE sock,st_io *io,int notify){ assert(io); socket_t s = GetSocketByHandle(sock); if(!s) return -1; int active_recv_count = -1; int ret = 0; mutex_lock(s->recv_mtx); //为保证执行顺序与请求的顺序一致,先将请求插入队列尾部,再弹出队列首元素 LIST_PUSH_BACK(s->pending_recv,io); io = 0; if(s->readable) { io = LIST_POP(st_io*,s->pending_recv); active_recv_count = s->active_read_count; } mutex_unlock(s->recv_mtx); if(s->engine->mode == MODE_POLL) { if(io) { //当前处于激活态 if(notify) //不直接处理,将处理交给完成线程 put_event(s->engine,io); else ret = _recv(s,io,active_recv_count,notify); } } else { if(io) ret = _recv(s,io,active_recv_count,notify); } return ret;}

 

然后是engine接口:

#ifndef _ENGINE_H#define _ENGINE_H#include "sync.h"//#include "thread.h"#include "link_list.h"struct socket_wrapper;typedef struct engine{    int  (*Init)(struct engine*);    void (*Loop)(struct engine*);    int  (*Register)(struct engine*,struct socket_wrapper*);    int  (*UnRegister)(struct engine*,struct socket_wrapper*);        mutex_t mtx;    volatile int status; /*0:关闭状态,1:开启状态*/    int poller_fd;        //完成事件相关成员    struct link_list   *buffering_event_queue;    //当没有线程等待时,事件将被缓存到这个队列中    struct link_list   *block_thread_queue;    //正在等待完成消息的线程        //thread_t     engine_thread;         //运行engine的线程    }*engine_t;engine_t create_engine();void   free_engine(engine_t *);void   stop_engine(engine_t);#endif

 

engine接口提供了4个函数指针,可根据不同的系统实现相应的函数,在这里使用的是linux的epoll.

#include "Engine.h"#include "link_list.h"#include "KendyNet.h"#include 
#include "epoll.h"#include
engine_t create_engine(){ engine_t e = malloc(sizeof(*e)); if(e) { e->mtx = mutex_create(); e->status = 0; e->buffering_event_queue = LIST_CREATE(); e->block_thread_queue = LIST_CREATE(); e->Init = epoll_init; e->Loop = epoll_loop; e->Register = epoll_register; e->UnRegister = epoll_unregister; //e->engine_thread = 0; } return e;}void free_engine(engine_t *e){ assert(e); assert(*e); mutex_destroy(&(*e)->mtx); LIST_DESTROY(&(*e)->buffering_event_queue); LIST_DESTROY(&(*e)->block_thread_queue); //if((*e)->engine_thread) // destroy_thread(&(*e)->engine_thread); free(*e); *e = 0;}int put_event(engine_t e,st_io *io){ mutex_lock(e->mtx); struct block_queue *EventQ = LIST_POP(struct block_queue*,e->block_thread_queue); if(!EventQ) { //没有等待的线程,先缓冲事件 LIST_PUSH_BACK(e->buffering_event_queue,io); io = 0; } mutex_unlock(e->mtx); if(io) BLOCK_QUEUE_PUSH(EventQ,io); return 0;}void stop_engine(engine_t e){ mutex_lock(e->mtx); e->status = 0; //join(e->engine_thread); //强制唤醒所有等待在完成队列上的线程 struct block_queue *queue = 0; /*唤醒所有等待在完成队列的线程*/ while(queue = LIST_POP(struct block_queue *,e->block_thread_queue)) { BLOCK_QUEUE_FORCE_WAKEUP(queue); } mutex_unlock(e->mtx);}

 

epoll的实现:

#include "epoll.h"#include "Socket.h"#include "SocketWrapper.h"#include "HandleMgr.h"int  epoll_init(engine_t e);{    e->poller_fd = TEMP_FAILURE_RETRY(epoll_create(MAX_SOCKET));    return     e->poller_fd > 0 ? 0:-1; }int epoll_register(engine_t e, socket_t s){    int ret = -1;        struct epoll_event ev;        ev.data.ptr = s;    ev.events = EV_IN | EV_OUT | EV_ET;    TEMP_FAILURE_RETRY(ret = epoll_ctl(e->poller_fd,EPOLL_CTL_ADD,s->fd,&ev));    return ret;}inline int epoll_unregister(engine_t e,socket_t s){    struct epoll_event ev;int ret;    TEMP_FAILURE_RETRY(ret = epoll_ctl(s->poller_fd,EPOLL_CTL_DEL,s->fd,&ev));    return ret;}void epoll_loop(engine_t n){    struct epoll_event events[MAX_SOCKET];    memset(events,0,sizeof(events));    while(1)    {        int nfds = TEMP_FAILURE_RETRY(epoll_wait(n->poller_fd,events,MAX_SOCKET,-1));        if(nfds < 0)        {            break;        }        int i;        for(i = 0 ; i < nfds ; ++i)        {                socket_t sock = (socket_t)events[i].data.ptr;            if(sock)            {                //套接口可读                if(events[i].events & EPOLLIN)                {                     on_read_active(sock);                }                //套接口可写                if(events[i].events & EPOLLOUT)                {                    on_write_active(sock);                }                                    if(events[i].events & EPOLLERR)                {                    //套接口异常                }            }        }    }}

最后是socket_t

#ifndef _SOCKETWRAPPER_H#define _SOCKETWRAPPER_H#include "Engine.h"typedef struct socket_wrapper{    mutex_t  mtx;//保证ReleaseSocketWrapper只会被正常执行一次    volatile int status;//0:未开启;1:正常;    engine_t  *engine;            volatile int readable;    volatile int writeable;    volatile int active_read_count;    volatile int active_write_count;    int fd;        //当发送/接收无法立即完成时,把请求投入下面两个队列中,    //当套接口可读/可写时再完成请求    struct link_list *pending_send;//尚未处理的发请求    struct link_list *pending_recv;//尚未处理的读请求        mutex_t   recv_mtx;    mutex_t   send_mtx;        }*socket_t;void on_read_active(socket_t);void on_write_active(socket_t);socket_t create_socket();void free_socket(socket_t*);int _recv(socket_t,st_io*,int,int notify);int _send(socket_t,st_io*,int,int notify);#endif

socket_t.c

#include "Socket.h"#include 
#include
#include
#include "SocketWrapper.h"#include "KendyNet.h"socket_t create_socket(){ socket_t s = malloc(sizeof(*s)); if(s) { s->mtx = mutex_create(); s->recv_mtx = mutex_create(); s->send_mtx = mutex_create(); s->pending_send = LIST_CREATE(); s->pending_recv = LIST_CREATE(); s->status = 0; s->engine = 0; } return s;}void free_socket(socket_t *s){ assert(s);assert(*s); mutex_destroy(&(*s)->mtx); mutex_destroy(&(*s)->recv_mtx); mutex_destroy(&(*s)->send_mtx); destroy_list(&(*s)->pending_send); destroy_list(&(*s)->pending_recv); free(*s); *s = 0;}void on_read_active(socket_t s){ assert(s); mutex_lock(s->recv_mtx); s->readable = 1; int active_recv_count = ++s->active_read_count; st_io *req = LIST_POP(st_io*,s->pending_recv); mutex_unlock(s->recv_mtx); if(req) { if(s->engine->mode == MODE_COMPLETE) _recv(s,req,active_recv_count,1); else put_event(s->engine,req); }}void on_write_active(socket_t s){ assert(s); mutex_lock(s->send_mtx); s->writeable = 1; int active_send_count = ++s->active_write_count; st_io *req = LIST_POP(st_io*,s->pending_send); mutex_unlock(s->send_mtx); if(req) { if(s->engine->mode == MODE_COMPLETE) _send(s,req,active_send_count,1); else put_event(s->engine,req); } }int _recv(socket_t s,st_io* io_req,int active_recv_count,int notify){ assert(s);assert(io_req); while(1) { int retry = 0; int bytes_transfer = io_req->bytes_transfer = TEMP_FAILURE_RETRY(readv(s->fd,io_req->iovec,io_req->iovec_count)); io_req->error_code = 0; if(bytes_transfer < 0) { switch(errno) { case EAGAIN: { mutex_lock(s->recv_mtx); if(active_recv_count != s->active_read_count) { active_recv_count = s->active_read_count; retry = 1; } else { s->readable = 0; LIST_PUSH_FRONT(s->pending_recv,io_req); } mutex_unlock(s->recv_mtx); if(retry) continue; return 0; } break; default://连接出错 { io_req->error_code = errno; } break; } } else if(bytes_transfer == 0) bytes_transfer = -1; if(notify) put_event(s->engine,io_req); return bytes_transfer; } }int _send(socket_t s,st_io* io_req,int active_send_count,int notify){ assert(s);assert(io_req); while(1) { int retry = 0; int bytes_transfer = io_req->bytes_transfer = TEMP_FAILURE_RETRY(writev(s->fd,io_req->iovec,io_req->iovec_count)); io_req->error_code = 0; if(bytes_transfer < 0) { switch(errno) { case EAGAIN: { mutex_lock(s->send_mtx); if(active_send_count != s->active_write_count) { active_send_count = s->active_write_count; retry = 1; } else { s->writeable = 0; //将请求重新放回到队列 LIST_PUSH_FRONT(s->pending_send,io_req); } mutex_unlock(s->send_mtx); if(retry) continue; return 0; } break; default://连接出错 { io_req->error_code = errno; } break; } } else if(bytes_transfer == 0) bytes_transfer = -1; if(notify) put_event(s->engine,io_req); return bytes_transfer; }}

 

用户提供的完成例程代码大概如下:

 

st_io *io = 0;block_queue *EventQ;while(GetQueueEvent(engine,EventQ,&io ,0) == 0){    if(io)    {        HANDLE sock = GetHandle(io);        OnRecv(io);        int ret;        //发起新的读,直到套接字变为不可读        while((ret = WSARecv(sock,io,0)) > 0)        {            OnRecv(io);        }                if(ret < 0)        {            //连接断开            CloseSocket(sock);        }    }}

对于快速的发送方,完成例程接收完并处理之后,套接口上可能马上又有数据可读了,这样在epoll线程中执行recv的机会比较小。

但如果发送方全都是慢速的,例如每一秒发送一次,则会导致几乎所有的recv操作都在epoll线程中执行,当连接数巨大时没法发挥

多核处理器的优势,所以就有了模型2,epoll线程完全不处理recv,当套接字变为激活时,将请求投递到队列中,交给complete例程

去执行recv.

(2012.4.16修改,增加对模型2的支持,CreateEngine时可选择传入参数MODE_COMPLETE或MODE_POLL,

当传入MODE_POLL时,GetQueueEvent返回的事件表明套接口可读/写,且有读/写请求,传入MODE_COMPLETE

GetQueueEvent返回的事件表明一个读/写请求已经完成)

测试程序如下,注意IORoutinePoll和IORoutine的区别:

test.c

#include 
#include
#include "KendyNet.h"#include "thread.h"#include "SocketWrapper.h"#include "atomic.h"enum{ RECV_FINISH = 0, SEND_FINISH,};typedef struct IoContext{ st_io m_ioStruct; unsigned char m_opType; void *ud;}IoContext;typedef struct _Socket{ char send_buf[64]; char recv_buf[64]; struct iovec recv_iovec; struct iovec send_iovec; IoContext m_IORecvComp; IoContext m_IOSendComp; HANDLE m_sock;}_Socket;HANDLE engine;const char *ip;long port;void *ListerRoutine(void *arg){ thread_t thread = (thread_t)CUSTOM_ARG(arg); struct sockaddr_in servaddr; HANDLE listerfd; if((listerfd = Tcp_Listen(ip,port,&servaddr,5)) == 0) { while(!is_terminate(thread)) { struct sockaddr sa; socklen_t salen; HANDLE sock = Accept(listerfd,&sa,&salen); if(sock >= 0) { setNonblock(sock); _Socket *_sock = malloc(sizeof(*_sock)); _sock->m_sock = sock; _sock->m_IORecvComp.m_opType = RECV_FINISH; _sock->m_IOSendComp.m_opType = SEND_FINISH; _sock->m_IORecvComp.ud = _sock->m_IOSendComp.ud = _sock; _sock->recv_iovec.iov_base = _sock->recv_buf; _sock->send_iovec.iov_base = _sock->send_buf; _sock->m_IORecvComp.m_ioStruct.iovec = &_sock->recv_iovec; _sock->m_IOSendComp.m_ioStruct.iovec = &_sock->send_iovec; _sock->m_IOSendComp.m_ioStruct.iovec_count = _sock->m_IORecvComp.m_ioStruct.iovec_count = 1; //printf("接到一个连接\n"); if(0 != Bind2Engine(engine,sock)) { printf("bind出错\n"); CloseSocket(sock); free(_sock); } else { //发起第一个读请求 _sock->m_IORecvComp.m_ioStruct.iovec->iov_len = 64; WSARecv(sock,(st_io*)&(_sock->m_IORecvComp),1); } } else { printf("accept出错\n"); } } printf("listener 终止\n"); } return 0;}//COMPLETE MODEvoid *IORoutine(void *arg){ st_io* ioComp; //struct block_queue *EventQ = CreateEventQ(); MsgQueue_t EventQ = CreateMsgQ(); while(1) { ioComp = 0; if(0 > GetQueueEvent(engine,EventQ,&ioComp,-1)) { printf("poller终止\n"); break; } //printf("唤醒咯\n"); if(ioComp) { if(ioComp->bytes_transfer <= 0) { //printf("套接口断开\n"); _Socket *sock = (_Socket*)(((IoContext*)ioComp)->ud); //连接关闭 CloseSocket(sock->m_sock); free(sock); } else { IoContext *context = (IoContext*)ioComp; _Socket *sock = (_Socket*)context->ud; if(context->m_opType == RECV_FINISH) { int byteTransfer = 0; //把数据回发给客户端 memcpy(sock->send_buf,sock->recv_buf,ioComp->bytes_transfer); sock->m_IOSendComp.m_ioStruct.iovec->iov_len = ioComp->bytes_transfer; WSASend(sock->m_sock,(st_io*)&(sock->m_IOSendComp),0); //继续读 while((byteTransfer = WSARecv(sock->m_sock,(st_io*)context,0)) > 0) { memcpy(sock->send_buf,sock->recv_buf,byteTransfer); sock->m_IOSendComp.m_ioStruct.iovec->iov_len = byteTransfer; byteTransfer = WSASend(sock->m_sock,(st_io*)&(sock->m_IOSendComp),0); if(byteTransfer == 0) printf("can't write\n"); if(byteTransfer < 0) { printf("close\n"); break; } } if(byteTransfer < 0) { //printf("套接口断开\n"); CloseSocket(sock->m_sock); free(sock); } } } } } printf("IO end\n"); //DestroyEventQ(&EventQ); DestroyMsgQ(&EventQ); return 0;}//POLL MODEvoid *IORoutinePoll(void *arg){ st_io* ioComp; MsgQueue_t EventQ = CreateMsgQ(); while(1) { ioComp = 0; if(0 > GetQueueEvent(engine,EventQ,&ioComp,-1)) { printf("poller终止\n"); break; } if(ioComp) { IoContext *context = (IoContext*)ioComp; _Socket *sock = (_Socket*)context->ud; if(context->m_opType == RECV_FINISH) { int byteTransfer = 0; while((byteTransfer = WSARecv(sock->m_sock,(st_io*)context,0)) > 0) { memcpy(sock->send_buf,sock->recv_buf,byteTransfer); sock->m_IOSendComp.m_ioStruct.iovec->iov_len = byteTransfer; byteTransfer = WSASend(sock->m_sock,(st_io*)&(sock->m_IOSendComp),0); if(byteTransfer == 0) printf("can't write1\n"); if(byteTransfer < 0) { printf("close\n"); break; } } if(byteTransfer < 0) { //printf("connection close\n"); CloseSocket(sock->m_sock); free(sock); } } else WSASend(sock->m_sock,(st_io*)&(sock->m_IOSendComp),0); } } printf("IO end\n"); DestroyMsgQ(&EventQ); return 0;}void *EngineRoutine(void *arg){ EngineRun(engine); printf("Engine stop\n"); return 0;}int main(int argc,char **argv){ ip = "127.0.0.1";//argv[]; port = atoi(argv[1]); signal(SIGPIPE,SIG_IGN); if(InitNetSystem() != 0) { printf("Init error\n"); return 0; } //engine = CreateEngine(MODE_POLL); engine = CreateEngine(MODE_COMPLETE); thread_t listener = create_thread(0); start_run(listener,ListerRoutine,listener); int complete_count = atoi(argv[2]); int i = 0; for( ; i < complete_count; ++i) { thread_t complete_t = create_thread(0); //start_run(complete_t,IORoutinePoll,0); start_run(complete_t,IORoutine,0); } thread_t engine_thread = create_thread(1); start_run(engine_thread,EngineRoutine,0); getchar(); CloseEngine(engine); join(engine_thread); return 0;}

 

 

转载于:https://www.cnblogs.com/sniperHW/archive/2012/04/09/2438850.html

你可能感兴趣的文章
Mandiant对APT1组织的***行动的情报分析报告
查看>>
规则与法则:中国式创业的界限与群像
查看>>
天籁数学——数列篇(2)
查看>>
一个可以更新时区的Calendar
查看>>
并行开发 —— 第二篇 Task的使用
查看>>
"百年一遇"奇怪问题的进展:找到原因,ajax请求中断引起
查看>>
关于几道面试的题目
查看>>
SQL Server发送邮件的存储过程
查看>>
【java】eclipse从数据库逆向生成Hibernate实体类
查看>>
make:commands commence before first target
查看>>
一个很强大很好用的报表统计插件
查看>>
A+B for Input-Output Practice (II)
查看>>
精美高清壁纸:2013年1月桌面日历壁纸免费下载
查看>>
Extjs Dom
查看>>
new与delete
查看>>
Unitils+hibernate+Spring+PostgreSql做dao层测试遇到的错误
查看>>
关于MVC使用Code-First代码优先来先建实体类中间添加新字段不需要重新建立数据库的方法...
查看>>
【SAS NOTES】字符串处理函数
查看>>
constellio——基于solr的开源搜索引擎系统源码研究(四)
查看>>
PS制作流星效果
查看>>