tornado 之 ioloop模块解读

2014-08-31 16:20:00 by dragondjf

1.ioloop是什么?

ioloop是tornado事件驱动机制的核心模块,在linux下使用高效的epoll异步I/O模型, 在FreeBSD(mac)下使用高效的kqueue,在windows使用普通的select模型,正是基于epoll/kqueue/select, tornado在网络异步编程上表现优秀,下面就是对ioloop的个人理解。

2.ioloop中使用的模块依赖一览表

  • 标准库:
模块 作用 备注
errno standard errno system symbols 标准的系统错误符号库
functools Higher-order functions and operations on callable objects 操作改变可调用的对象
heapq 最小堆算法 有序列表从小到大
itertools Functions creating iterators for efficient looping 生产迭代器的高效循环库
select 异步I/O模型 提供异步机制的系统调用封装
threading 高级接口的线程模块 用于获取线程的基本信息
signal Set handlers for asynchronous events 为异步事件提供handlers
  • 内部模块:
模块 作用 备注
concurrent Utilities for working with threads and Futures
stack_context 上下文库 StackContext allows applications to maintain threadlocal-like state that follows execution as it moves to other execution contexts
util 工具库,提供常用的工具函数
3 事件支持

tornado.ioloop.IOLoop同时提供了4种响应事件:

事件 描述
tornado.ioloop.IOLoop.NONE 无事件
tornado.ioloop.IOLoop.READ 读事件
tornado.ioloop.IOLoop.WRITE 写事件
tornado.ioloop.IOLoop.ERROR 发生错误的事件

4. 异步事件编程机制:

tornado.ioloop.IOLoop 提供了三个接口可以用于异步事件编程:

  • add_handler

    def add_handler(self, fd, handler, events):
        self._handlers[fd] = stack_context.wrap(handler)
        self._impl.register(fd, events | self.ERROR)
    

    add_handler用于添加socket到主循环中, 接受三个参数: fd 是socket的文件描述符 handler 是处理此socket的 callback函数 * events 是此socket注册的事件

  • update_handler

    def update_handler(self, fd, events):
        self._impl.modify(fd, events | self.ERROR)
    

    update_handler用于更新住循环中已存在的socket响应事件, 接受两个参数: fd 是socket对应的文件描述符 events 是注册的新事件

  • remove_handler

    def remove_handler(self, fd):
        self._handlers.pop(fd, None)
        self._events.pop(fd, None)
        try:
            self._impl.unregister(fd)
        except Exception:
            gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
    

    remove_handler用于移除主循环中已存在的socket

5.Demo echo server

根据上面的接口和事件我们就可以写出一个简单的 echo server

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/usr/bin/env python
# -*- coding:utf-8 -*-
#
#   Author  :   cold
#   E-mail  :   wh_linux@126.com
#   Date    :   13/04/15 15:08:51
#   Desc    :   Tornado Echo Server
#   HOME    :   http://www.linuxzen.com
#
import Queue
import socket

from functools import partial

from tornado.ioloop import IOLoop

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)              # 将socket设置为非阻塞

server_address = ("localhost", 10000)

sock.bind(server_address)
sock.listen(5)

fd_map = {}              # 文件描述符到socket的映射
message_queue_map = {}   # socket到消息队列的映射

fd = sock.fileno()
fd_map[fd] = sock

ioloop = IOLoop.instance()

def handle_client(cli_addr, fd, event):
    s = fd_map[fd]
    if event & IOLoop.READ:
        data = s.recv(1024)
        if data:
            print "     received '%s' from %s" % (data, cli_addr)
            # 接收到消息更改事件为写, 用于发送数据到对端
            ioloop.update_handler(fd, IOLoop.WRITE)
            message_queue_map[s].put(data)
        else:
            print "     closing %s" % cli_addr
            ioloop.remove_handler(fd)
            s.close()
            del message_queue_map[s]

    if event & IOLoop.WRITE:
        try:
            next_msg = message_queue_map[s].get_nowait()
        except Queue.Empty:
            print "%s queue empty" % cli_addr
            ioloop.update_handler(fd, IOLoop.READ)
        else:
            print 'sending "%s" to %s' % (next_msg, cli_addr)
            s.send(next_msg)

    if event & IOLoop.ERROR:
        print " exception on %s" % cli_addr
        ioloop.remove_handler(fd)
        s.close()
        del message_queue_map[s]


def handle_server(fd, event):
    s = fd_map[fd]
    if event & IOLoop.READ:
        conn, cli_addr = s.accept()
        print "     connection %s" % cli_addr[0]
        conn.setblocking(0)
        conn_fd = conn.fileno()
        fd_map[conn_fd] = conn
        handle = partial(handle_client, cli_addr[0])   # 将cli_addr作为第一个参数
        # 将连接和handle注册为读事件加入到 tornado ioloop
        ioloop.add_handler(conn_fd, handle, IOLoop.READ)
        message_queue_map[conn] = Queue.Queue()   # 创建对应的消息队列


ioloop.add_handler(fd, handle_server, IOLoop.READ)

ioloop.start()

上面代码就建立了一个非阻塞的高效的异步的echo server


Fork me on GitHub