Linux 系统 Crontab 的安装及定时任务的命令详解

前言

很多时候我们的VPS运行脚本或者重启某些软件的时候都需要用到定时功能,这时候就要用到Crontab定时软件了,这里简单的说下安装和使用方法。

安装

一般我们使用的Centos或者Debian都自带Crontab定时软件,如果没有,那我们就需要安装了。

1、centos 安装 crontab

yum install vixie-cron crontabs   #安装Crontab
chkconfig crond on   #设置开机启动Crontab
service crond start   #启动Crontab

2、debian 安装 crontab

apt-get install cron   #安装Crontab
/etc/init.d/cron restart   #重启Crontab

使用方法

1、基本命令

crontab -e   #编辑定时任务
crontab -r   #移除定时任务
crontab -l   #查看定时任务

2、使用命令

添加的命令必须以如下格式:

* * * * * /root/rats.sh   #前5个*指时间,后面为命令

前5个*分别表示:

分钟:0-59
小时:1-23
日期:1-31
月份:1-12
星期:0-6(0表示周日)

还可以用一些特殊符号:

*: 表示任何时刻
,: 表示分割
-:表示一个段,如:1-5,就表示1到5点
/n : 表示每个n的单位执行一次,如:*/1, 就表示每隔1个小时执行一次命令。也可以写成1-23/1

3、定时任务设置举例

43 21 * * *   #21:43 执行
15 05 * * *   #05:15 执行
0 17 * * *    #17:00 执行
0 17 * * 1    #每周一的17:00 执行
0,10 17 * * 0,2,3    #每周日,周二,周三的17:00和17:10 执行
0-10 17 1 * *    #毎月1日从17:00到7:10毎隔1分钟 执行
0 0 1,15 * 1    #毎月1日和15日和一日的0:00 执行
42 4 1 * *   #毎月1日的4:42分 执行
0 21 * * 1-6   #周一到周六21:00 执行
0,10,20,30,40,50 * * * *   #每隔10分 执行
*/10 * * * *   #每隔10分 执行
* 1 * * *   #从1:0到1:59每隔1分钟 执行
0 1 * * *   #1:00 执行
0 */1 * * *   #毎时0分每隔1小时 执行
0 * * * *   #毎时0分 执行
2 8-20/3 * * *   #8:02,11:02,14:02,17:02,20:02 执行
30 5 1,15 * *   #1日和15日的5:30 执行

crontab书写格式

1、crontab命令

功能说明:设置计时器。
语法:crontab [-u <用户名称>][配置文件]crontab [-u <用户名称>][-elr]
补充说明:cron是一个常驻服务,它提供计时器的功能,让用户在特定的时间得以执行预设的指令或程序。只要用户会编辑计时器的配置文件,就可以使用计时器的功能。
其配置文件格式如下:
Minute Hour Day Month DayOFWeek Command

参 数:
crontab -e  编辑该用户的计时器设置。
crontab -l  列出该用户的计时器设置。
crontab -r  删除该用户的计时器设置。
crontab -u<用户名称>  指定要设定计时器的用户名称。

2、crontab 格式

基本格式 :
第1列 表示分钟1~59 每分钟用或者 /1表示
第2列 表示小时1~23(0表示0点)
第3列 表示日期1~31
第4列 表示月份1~12
第5列 标识号星期0~6(0表示星期天)
第6列 要执行的命令

3、crontab文件的一些例子

    30 21 * * * /usr/local/etc/rc.d/lighttpd restart
    // 上面的例子表示每晚的21:30重启apache。

    45 4 1,10,22 * * /usr/local/etc/rc.d/lighttpd restart
    // 上面的例子表示每月1、10、22日的4 : 45重启apache。

    10 1 * * 6,0 /usr/local/etc/rc.d/lighttpd restart
    //上面的例子表示每周六、周日的1 : 10重启apache。

    0,30 18-23 * * * /usr/local/etc/rc.d/lighttpd restart
    // 上面的例子表示在每天18 : 00至23 : 00之间每隔30分钟重启apache。

    0 23 * * 6 /usr/local/etc/rc.d/lighttpd restart
    // 上面的例子表示每星期六的11 : 00 pm重启apache。

    * */1 * * * /usr/local/etc/rc.d/lighttpd restart
    // 每一小时重启apache

    * 23-7/1 * * * /usr/local/etc/rc.d/lighttpd restart
    // 晚上11点到早上7点之间,每隔一小时重启apache

    0 11 4 * mon-wed /usr/local/etc/rc.d/lighttpd restart
    // 每月的4号与每周一到周三的11点重启apache

    0 4 1 jan * /usr/local/etc/rc.d/lighttpd restart
    // 一月一号的4点重启apache

    */30 * * * * /usr/sbin/ntpdate 210.72.145.44
    // 每半小时同步一下时间

CENTOS 使用ANSIBLE 将PYTHON 包安装到VIRTUALENV环境中

使用root用户,则直接安装

pip: name=pkgname virtualenv=虚拟环境目录

如果以!root用户安装,ansible无法获取virtualenv可执行文件,需要手动将执行路径添加到PATH环境变量,在用户家目录的.local/bin目录下

environment:
      PATH: "{{ansible_env.PATH}}/{{ansible_user_dir}}/.local/bin"

完整实例:

tasks:
  - name: install pip packages
    pip: name={{item}} virtualenv=envdir
    with_items:
      - requests
      - flask
    environment:
      PATH: "{{ansible_env.PATH}}/{{ansible_user_dir}}/.local/bin"

mac上配置使用virtualenv

1.安装virtualenv

$ sudo pip install virtualenv

2.安装virtualenvwrapper

$ sudo easy_install virtualenvwrapper 

3.新建一个放置虚拟环境的目录

$ mkdir ~/workspaces
$ cd ~/workspaces

4.设置环境变量

$ export WORKON_HOME=~/workspaces
$ source /usr/local/bin/virtualenvwrapper.sh

5.创建1-n个虚拟环境

$ mkvirtualenv env1
$ mkvirtualenv env2

成功后,路径前面会有(env2)

列出所有虚拟环境:

$ workon

切换到某个虚拟环境:

$ workon env1

退出虚拟环境:

$ deactivate

删除虚拟环境:

$ rmvirtualenv env2

切换到某个虚拟环境上之后,再使用pip在当前环境下进行安装

$ workon env1
$ pip install numpy

注意,每次进入虚拟环境前,都要先配置环境变量:

$ cd ~/workspaces
$ export WORKON_HOME=~/workspaces
$ source /usr/local/bin/virtualenvwrapper.sh
$ workon env1

启动虚拟环境后,pycharm配置项目的interpreter,找到对应虚拟环境下的python文件,比如:

~/workspaces/env1/bin/python2.7

python tornado TCPserver异步协程实例

内容预览:

  • 它的值包括四种:AF_UNIX,AF_INET,AF_INET6和AF_UNSPEC~
  • # 相反,我们使用listen backlog作为我们可以合理接受的连接数的~
  • if errno_from_exception(e) == errno.ECONNABORTED: continue raise ca…~

项目所用知识点

  • tornado
  • socket
  • tcpserver
  • 协程
  • 异步

tornado tcpserver源码抛析

在tornado的tcpserver文件中,实现了TCPServer这个类,他是一个单线程的,非阻塞的tcp 服务。

为了与上层协议(在tornado中就是HTTPServer)交互,TCPServer提供了一个接口:handle_stream, 要求其子类必需实现该方法,该方法就是主要用来处理应用层逻辑的。

我们可以通过下面代码倒入模块查看源码

from tornado.tcpserver import TCPServer

源码中给了好多解释,先把源码注释贴进来

class TCPServer(object):

    ‘’‘

    1.要想用TCPserver,我给你提供你一个接口handle_stream,子类中必须要有这个方法

    2.他已经给我们举出了例子

    3.往下他给咱们介绍了几种启动方法,而我用的listen()方法启动看起来简单明了

    ’‘’

    r"""A non-blocking, single-threaded TCP server.



    To use `TCPServer`, define a subclass which overrides the `handle_stream`

    method. For example, a simple echo server could be defined like this::



      from tornado.tcpserver import TCPServer

      from tornado.iostream import StreamClosedError

      from tornado import gen



      class EchoServer(TCPServer):

          @gen.coroutine

          def handle_stream(self, stream, address):

              while True:

                  try:

                      data = yield stream.read_until(b"n")

                      yield stream.write(data)

                  except StreamClosedError:

                      break



    To make this server serve SSL traffic, send the ``ssl_options`` keyword

    argument with an `ssl.SSLContext` object. For compatibility with older

    versions of Python ``ssl_options`` may also be a dictionary of keyword

    arguments for the `ssl.wrap_socket` method.::



       ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)

       ssl_ctx.load_cert_chain(os.path.join(data_dir, "mydomain.crt"),

                               os.path.join(data_dir, "mydomain.key"))

       TCPServer(ssl_options=ssl_ctx)



    `TCPServer` initialization follows one of three patterns:



    1. `listen`: simple single-process::



            server = TCPServer()

            server.listen(8888)

            IOLoop.current().start()



    2. `bind`/`start`: simple multi-process::



            server = TCPServer()

            server.bind(8888)

            server.start(0)  # Forks multiple sub-processes

            IOLoop.current().start()



       When using this interface, an `.IOLoop` must *not* be passed

       to the `TCPServer` constructor.  `start` will always start

       the server on the default singleton `.IOLoop`.



    3. `add_sockets`: advanced multi-process::



            sockets = bind_sockets(8888)

            tornado.process.fork_processes(0)

            server = TCPServer()

            server.add_sockets(sockets)

            IOLoop.current().start()



       The `add_sockets` interface is more complicated, but it can be

       used with `tornado.process.fork_processes` to give you more

       flexibility in when the fork happens.  `add_sockets` can

       also be used in single-process servers if you want to create

       your listening sockets in some way other than

       `~tornado.netutil.bind_sockets`.



    .. versionadded:: 3.1

       The ``max_buffer_size`` argument.



    .. versionchanged:: 5.0

       The ``io_loop`` argument has been removed.

    """

自己仔细看该类的其他方法

    def listen(self, port, address=""):

        """Starts accepting connections on the given port.



        This method may be called more than once to listen on multiple ports.

        `listen` takes effect immediately; it is not necessary to call

        `TCPServer.start` afterwards.  It is, however, necessary to start

        the `.IOLoop`.

        """

        sockets = bind_sockets(port, address=address)

        self.add_sockets(sockets)



    def add_sockets(self, sockets):

        """Makes this server start accepting connections on the given sockets.



        The ``sockets`` parameter is a list of socket objects such as

        those returned by `~tornado.netutil.bind_sockets`.

        `add_sockets` is typically used in combination with that

        method and `tornado.process.fork_processes` to provide greater

        control over the initialization of a multi-process server.

        """

        for sock in sockets:

            self._sockets[sock.fileno()] = sock

            self._handlers[sock.fileno()] = add_accept_handler(

                sock, self._handle_connection)



    def add_socket(self, socket):

        """Singular version of `add_sockets`.  Takes a single socket object."""

        self.add_sockets([socket])



    def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128,

             reuse_port=False):

        """Binds this server to the given port on the given address.



        To start the server, call `start`. If you want to run this server

        in a single process, you can call `listen` as a shortcut to the

        sequence of `bind` and `start` calls.



        Address may be either an IP address or hostname.  If it's a hostname,

        the server will listen on all IP addresses associated with the

        name.  Address may be an empty string or None to listen on all

        available interfaces.  Family may be set to either `socket.AF_INET`

        or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise

        both will be used if available.



        The ``backlog`` argument has the same meaning as for

        `socket.listen <socket.socket.listen>`. The ``reuse_port`` argument

        has the same meaning as for `.bind_sockets`.



        This method may be called multiple times prior to `start` to listen

        on multiple ports or interfaces.



        .. versionchanged:: 4.4

           Added the ``reuse_port`` argument.

        """

        sockets = bind_sockets(port, address=address, family=family,

                               backlog=backlog, reuse_port=reuse_port)

        if self._started:

            self.add_sockets(sockets)

        else:

            self._pending_sockets.extend(sockets)



    def start(self, num_processes=1):

        """Starts this server in the `.IOLoop`.



        By default, we run the server in this process and do not fork any

        additional child process.



        If num_processes is ``None`` or <= 0, we detect the number of cores

        available on this machine and fork that number of child

        processes. If num_processes is given and > 1, we fork that

        specific number of sub-processes.



        Since we use processes and not threads, there is no shared memory

        between any server code.



        Note that multiple processes are not compatible with the autoreload

        module (or the ``autoreload=True`` option to `tornado.web.Application`

        which defaults to True when ``debug=True``).

        When using multiple processes, no IOLoops can be created or

        referenced until after the call to ``TCPServer.start(n)``.

        """

        assert not self._started

        self._started = True

        if num_processes != 1:

            process.fork_processes(num_processes)

        sockets = self._pending_sockets

        self._pending_sockets = []

        self.add_sockets(sockets)



    def stop(self):

        """Stops listening for new connections.



        Requests currently in progress may still continue after the

        server is stopped.

        """

        if self._stopped:

            return

        self._stopped = True

        for fd, sock in self._sockets.items():

            assert sock.fileno() == fd

            # Unregister socket from IOLoop

            self._handlers.pop(fd)()

            sock.close()



    def handle_stream(self, stream, address):

        """Override to handle a new `.IOStream` from an incoming connection.



        This method may be a coroutine; if so any exceptions it raises

        asynchronously will be logged. Accepting of incoming connections

        will not be blocked by this coroutine.



        If this `TCPServer` is configured for SSL, ``handle_stream``

        may be called before the SSL handshake has completed. Use

        `.SSLIOStream.wait_for_handshake` if you need to verify the client's

        certificate or use NPN/ALPN.



        .. versionchanged:: 4.2

           Added the option for this method to be a coroutine.

        """

        raise NotImplementedError()



    def _handle_connection(self, connection, address):

        if self.ssl_options is not None:

            assert ssl, "Python 2.6+ and OpenSSL required for SSL"

            try:

                connection = ssl_wrap_socket(connection,

                                             self.ssl_options,

                                             server_side=True,

                                             do_handshake_on_connect=False)

            except ssl.SSLError as err:

                if err.args[0] == ssl.SSL_ERROR_EOF:

                    return connection.close()

                else:

                    raise

            except socket.error as err:

                # If the connection is closed immediately after it is created

                # (as in a port scan), we can get one of several errors.

                # wrap_socket makes an internal call to getpeername,

                # which may return either EINVAL (Mac OS X) or ENOTCONN

                # (Linux).  If it returns ENOTCONN, this error is

                # silently swallowed by the ssl module, so we need to

                # catch another error later on (AttributeError in

                # SSLIOStream._do_ssl_handshake).

                # To test this behavior, try nmap with the -sT flag.

                # https://github.com/tornadoweb/tornado/pull/750

                if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):

                    return connection.close()

                else:

                    raise

        try:

            if self.ssl_options is not None:

                stream = SSLIOStream(connection,

                                     max_buffer_size=self.max_buffer_size,

                                     read_chunk_size=self.read_chunk_size)

            else:

                stream = IOStream(connection,

                                  max_buffer_size=self.max_buffer_size,

                                  read_chunk_size=self.read_chunk_size)



            future = self.handle_stream(stream, address)

            if future is not None:

                IOLoop.current().add_future(gen.convert_yielded(future),

                                            lambda f: f.result())

        except Exception:

            app_log.error("Error in connection callback", exc_info=True)

通过方法名就能看出来,而且开头已经给出实例怎么去用,所以这个就不一一解释了,我自己的用法如下

from tornado.tcpserver import TCPServer

from tornado.iostream import IOStream, StreamClosedError

from tornado import gen

from tornado.ioloop import IOLoop

import struct



class ProxyServer(TCPServer):

    def __init__(self, *args, **kwargs):

        super(ProxyServer, self).__init__(*args, **kwargs)

        self.devices = dict()



    @gen.coroutine

    def handle_stream(self, stream, address):

          pass





if __name__ == "__main__":

    server = ProxyServer()

    server.listen(1234)

    IOLoop.current().start()

具体步骤来分析 一下

TCPServer执行过程

1.server = ProxyServer()创建tcpserver对象,该步骤仅仅做了一个初始化操作

def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None, read_chunk_size=None):

        self.io_loop = io_loop

        self.ssl_options = ssl_options

        self._sockets = {}  # fd -> socket object    用来存储文件描述符与socket对象的映射关系

        self._pending_sockets = []

        self._started = False

        self.max_buffer_size = max_buffer_size    # 最大缓冲长度

        self.read_chunk_size = read_chunk_size    # 每次读的chunk大小



        # 校验ssl选项. 

        if self.ssl_options is not None and isinstance(self.ssl_options, dict):

            if 'certfile' not in self.ssl_options:

                raise KeyError('missing key "certfile" in ssl_options')



            if not os.path.exists(self.ssl_options['certfile']):

                raise ValueError('certfile "%s" does not exist' % self.ssl_options['certfile'])

            if ('keyfile' in self.ssl_options and not os.path.exists(self.ssl_options['keyfile'])):

                raise ValueError('keyfile "%s" does not exist' % self.ssl_options['keyfile'])

2.想都不要想肯定是开启socket

步骤是执行server.listen(1234)的时候,

    def listen(self, port, address=""):

        """Starts accepting connections on the given port.



        This method may be called more than once to listen on multiple ports.

        `listen` takes effect immediately; it is not necessary to call

        `TCPServer.start` afterwards.  It is, however, necessary to start

        the `.IOLoop`.

        """

        sockets = bind_sockets(port, address=address)

        self.add_sockets(sockets)

3.看下listen里面有调用bind_sockets()方法,来看下该方法

def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=_DEFAULT_BACKLOG, flags=None, reuse_port=False):

    if reuse_port and not hasattr(socket, "SO_REUSEPORT"):

        raise ValueError("the platform doesn't support SO_REUSEPORT")



    sockets = []

    if address == "":

        address = None

    # address family参数指定调用者期待返回的套接口地址结构的类型。它的值包括四种:AF_UNIX,AF_INET,AF_INET6和AF_UNSPEC。

    # AF_UNIX用于同一台机器上的进程间通信

    # 如果指定AF_INET,那么函数就不能返回任何IPV6相关的地址信息;如果仅指定了AF_INET6,则就不能返回任何IPV4地址信息。

    # AF_UNSPEC则意味着函数返回的是适用于指定主机名和服务名且适合任何协议族的地址。

    # 如果某个主机既有AAAA记录(IPV6)地址,同时又有A记录(IPV4)地址,那么AAAA记录将作为sockaddr_in6结构返回,而A记录则作为sockaddr_in结构返回

    if not socket.has_ipv6 and family == socket.AF_UNSPEC: # 如果系统不支持ipv6

        family = socket.AF_INET

    if flags is None:

        flags = socket.AI_PASSIVE

    bound_port = None

    for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags)):

        af, socktype, proto, canonname, sockaddr = res

        if (sys.platform == 'darwin' and address == 'localhost' and af == socket.AF_INET6 and sockaddr[3] != 0):

            # Mac OS X在“localhost”的getaddrinfo结果中包含一个链接本地地址fe80 :: 1%lo0。 

            # 但是,防火墙不了解这是一个本地地址,并且会提示访问。 所以跳过这些地址。

            continue

        try:

            sock = socket.socket(af, socktype, proto)

        except socket.error as e:

            # 如果协议不支持该地址

            if errno_from_exception(e) == errno.EAFNOSUPPORT:

                continue

            raise

        # 为 fd 设置 FD_CLOEXEC 标识

        set_close_exec(sock.fileno())

        if os.name != 'nt': # 非windows

            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        if reuse_port:

            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)

        if af == socket.AF_INET6:

            # On linux, ipv6 sockets accept ipv4 too by default,

            # but this makes it impossible to bind to both

            # 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,

            # separate sockets *must* be used to listen for both ipv4

            # and ipv6.  For consistency, always disable ipv4 on our

            # ipv6 sockets and use a separate ipv4 socket when needed.

            #

            # Python 2.x on windows doesn't have IPPROTO_IPV6.

            if hasattr(socket, "IPPROTO_IPV6"):

                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)



        # 自动端口分配,端口=None

        # 应该绑定在IPv4和IPv6地址上的同一个端口上

        host, requested_port = sockaddr[:2]

        if requested_port == 0 and bound_port is not None:

            sockaddr = tuple([host, bound_port] + list(sockaddr[2:]))

        # 设置socket为非阻塞

        sock.setblocking(0)    

        sock.bind(sockaddr)

        bound_port = sock.getsockname()[1]

        sock.listen(backlog)

        sockets.append(sock)

    return sockets

4.接下来执行的是add_sockets()方法

def add_sockets(self, sockets):

        if self.io_loop is None:

            self.io_loop = IOLoop.current()    # 获取IOLoop实例对象



        for sock in sockets:

            self._sockets[sock.fileno()] = sock

            add_accept_handler(sock, self._handle_connection, io_loop=self.io_loop)

可以看到里面调用了add_accept_handler方法,来我们进去看看该方法干啥了

5.探析add_accept_handler方法

def add_accept_handler(sock, callback, io_loop=None):

    if io_loop is None: # 获取IOLoop实例对象

        io_loop = IOLoop.current()



    def accept_handler(fd, events):

        # 我们处理回调时可能会有许多的连接等待建立; 为了防止其他任务的饥饿,我们必须限制我们一次接受的连接数。 

        # 理想情况下,我们接受在处理回调过程中等待的连接数,但此可能会对负载产生不利影响。 

        # 相反,我们使用listen backlog作为我们可以合理接受的连接数的。

        for i in xrange(_DEFAULT_BACKLOG): # _DEFAULT_BACKLOG默认为128

            try:

                connection, address = sock.accept()

            except socket.error as e:

                # _ERRNO_WOULDBLOCK 与EAGAIN相同,表示再尝试一下,很多情况下是因为资源不足,或者条件未达成

                # 当某个子进程与客户端建立了连接,其他子进程再次尝试与该客户端建立连接时就会产生该错误

                if errno_from_exception(e) in _ERRNO_WOULDBLOCK:

                    return

                # ECONNABORTED表示有一个连接,在他处于等待被服务端accept的时候主动关闭了。

                if errno_from_exception(e) == errno.ECONNABORTED:

                    continue

                raise

            callback(connection, address)

    io_loop.add_handler(sock, accept_handler, IOLoop.READ) # 为socket注册handler:当发生READ事件时运行accept_handler函数。

欣欣然我们来到了最后一步

6.IOLoop.current().start(),然我们看下源码

def start(self):

        try:

            while True:    

                callbacks = self._callbacks

                self._callbacks = []

                due_timeouts = []

                # 将时间已到的定时任务放置到due_timeouts中,过程省略

                for callback in callbacks:          # 执行callback

                    self._run_callback(callback)

                for timeout in due_timeouts:        # 执行定时任务

                    if timeout.callback is not None:

                        self._run_callback(timeout.callback)       

                callbacks = callback = due_timeouts = timeout = None    # 释放内存

                # 根据情况设置poll_timeout的值,过程省略

                if not self._running:    # 终止ioloop运行时,在执行完了callback后结束循环

                    breaktry:

                    event_pairs = self._impl.poll(poll_timeout)

                except Exception as e:

                    if errno_from_exception(e) == errno.EINTR:  # 系统调用被信号处理函数中断,进行下一次循环

                        continue

                    else:

                        raise 

                self._events.update(event_pairs)

                while self._events: 

                    fd, events = self._events.popitem()             # 获取一个fd以及对应事件

                    try:

                        fd_obj, handler_func = self._handlers[fd]   # 获取该fd对应的事件处理函数

                        handler_func(fd_obj, events)                # 运行该事件处理函数

                    except (OSError, IOError) as e:         

                        if errno_from_exception(e) == errno.EPIPE:     # 当客户端关闭连接时会产生EPIPE错误                         

                            pass

                        # 其他异常处理已经省略

                fd_obj = handler_func = None       # 释放内存空间    

这一步想了解更多去参考这篇文章http://www.cnblogs.com/MnCu8261/p/6730691.html

代码实例

目前公司有这么一个需求,iphonex–server–ue4,面对两个客户端,达到iphonex把数据给ue4,server起一个代理作用,需求大概就是这样,具体实现代码如下

from tornado.tcpserver import TCPServer

from tornado.iostream import IOStream, StreamClosedError

from tornado import gen

from tornado.ioloop import IOLoop

import struct



class ProxyServer(TCPServer):

    def __init__(self, *args, **kwargs):

        super(ProxyServer, self).__init__(*args, **kwargs)

        self.devices = dict()



    @gen.coroutine

    def handle_stream(self, stream, address):

        device = yield stream.read_bytes(1)

        if device == b"x0a":

            self.handle_iphonex_stream(stream, address)

        elif device == b"x0b":

            self.handle_ue4_stream(stream, address)

        else:

            print("protocol error.")



    @gen.coroutine

    def handle_iphonex_stream(self, stream, address):

        yield stream.write(b"x00")

        print("iphonex")



        # uuid

        rlen = yield stream.read_bytes(4)

        rlen = struct.unpack(">I", rlen)[0]

        uuid = yield stream.read_bytes(rlen)

        uuid = uuid.decode()

        yield stream.write(b"x00")

        print(uuid)



        # keys

        rlen = yield stream.read_bytes(4)

        rlen = struct.unpack(">I", rlen)[0]

        keys = yield stream.read_bytes(rlen)

        keys = keys.decode()

        yield stream.write(b"x00")

        print(keys)



        # save

        self.devices[uuid] = {'keys': keys}



        # data

        keys = keys.split(',')

        fmt = "%df" % len(keys)



        while True:

            try:

                data = yield stream.read_bytes(struct.calcsize(fmt))

            except StreamClosedError:

                print 'iphonex is closed'

                break

            pdata = struct.unpack(fmt, data)

            print(pdata)



            ue4stream = self.devices[uuid].get('ue4')

            if ue4stream:

                try:

                    yield ue4stream.write(data)

                except Exception as e:

                    self.devices[uuid]['ue4'] = None

                    print('request for %s closed' % uuid)



    @gen.coroutine

    def handle_ue4_stream(self, stream, address):

        yield stream.write(b"x00")

        print("ue4")



        # uuid

        rlen = yield stream.read_bytes(4)

        rlen = struct.unpack(">I", rlen)[0]

        uuid = yield stream.read_bytes(rlen)

        uuid = uuid.decode()

        print(uuid)



        if self.devices.get(uuid):

            yield stream.write(b"x00")

        else:

            yield stream.write(b"x01")

            raise Exception



        # send keys

        keys = self.devices[uuid].get('keys')

        stream.write(struct.pack(">I", len(keys)))

        stream.write(keys.encode())



        valid = yield stream.read_bytes(1)

        if valid == b'x

Centos7 系统下搭建.NET Core2.0+Nginx+Supervisor+Mysql环境

内容预览:

  • 直到微软推出完全开源的.NET Core~
  • 一方面,这个小巧的框架可以让某…~
  • 问题3:如果服务器宕机或需要重启我们则还是需要连入shell进行启动~

好记性不如烂笔头!

一、简介

一直以来,微软只对自家平台提供.NET支持,这样等于让这个“理论上”可以跨平台的框架在Linux和macOS上的支持只能由第三方项目提供(比如Mono .NET)。直到微软推出完全开源的.NET Core。这个开源的平台兼容.NET Standard,并且能在Windows、Linux和MacOS上提供完全一致的API。虽然这个小巧的.NET框架只是标准.NET的一个子集,但是已经相当强大了。

一方面,这个小巧的框架可以让某些功能性应用同时运行在三个平台上(就像某些功能性的Python脚本一样),另一方面,这也可以让服务器运维人员将ASP .NET服务程序部署在Linux服务器上(特别是对于运行Windows Server较为吃力的服务器)。

官网参考资料:https://www.microsoft.com/net/core#linuxcentos

二、.NET Core2.0 环境部署前准备

1.环境说明

服务器系统:CentOS 7.2.1511

2.安装前准备(关闭防火墙、关闭selinux)

1)关闭firewall:

systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall开机启动
firewall-cmd --state #查看默认防火墙状态(关闭后显示notrunning,开启后显示running)

2)关闭selinux

sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config

查看改后文件如下:

[root@localhost ~]# cat /etc/selinux/config 

# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
#     enforcing - SELinux security policy is enforced.
#     permissive - SELinux prints warnings instead of enforcing.
#     disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of three two values:
#     targeted - Targeted processes are protected,
#     minimum - Modification of targeted policy. Only selected processes are protected. 
#     mls - Multi Level Security protection.
SELINUXTYPE=targeted

3)重启Centos

reboot

三、部署.NET Core2.0 环境

1.添加DOTNET产品

在安装.NET核心之前,您需要注册微软产品提要。这只需要做一次。首先,注册微软签名密钥,然后添加微软产品提要。

rpm --import https://packages.microsoft.com/keys/microsoft.asc                                     
sh -c 'echo -e "[packages-microsoft-com-prod]nname=packages-microsoft-com-prod nbaseurl=https://packages.microsoft.com/yumrepos/microsoft-rhel7.3-prodnenabled=1ngpgcheck=1ngpgkey=https://packages.microsoft.com/keys/microsoft.asc" > /etc/yum.repos.d/dotnetdev.repo'

2.安装.NET核心SDK

在下一步之前,请从您的系统中删除.NET .NET以前的任何预览版本。

以下命令更新用于安装的产品列表,安装.NET核心所需的组件,然后安装.NET核心SDK。

yum update
yum install libunwind libicu -y
yum install dotnet-sdk-2.0.0 -y

3.检查dotnet是否安装成功与版本查看

dotnet --info

dotnet --version

四、测试.NET Core2.0 环境

1.在home目录下初始化一个测试环境并输出”Hello World “内容 (测试方式一,可忽略)

cd /home
dotnet new console -o hwapp
cd hwapp
dotnet run

输出空内容如下:

[root@localhost hwapp]# dotnet run
Hello World!

2.上传.net core的实例页面进行测试 (测试方式二、推荐)

Centos 下.net core 2 环境测试用例 (把它上传到/home目录下或自定义的目录)

下载地址:

http://down.51cto.com/data/2334968

执行以下命令

cd /home/WebApplication1
dotnet restore   //如果使过用测试方式一,就需先执行这命令重新加载一下当前新的网站文件
dotnet run

运行后如下图:

未分类

通过IE访问测试页

五、安装配置nginx对ASP.NET Core应用的转发

1.安装Nginx环境

[root@localhost ~]#curl -o  nginx.rpm http://nginx.org/packages/centos/7/noarch/RPMS/nginx-release-centos-7-0.el7.ngx.noarch.rpm
[root@localhost ~]#rpm -ivh nginx.rpm
[root@localhost ~]#yum install nginx -y

输入:systemctl start nginx 来启动nginx。

[root@localhost ~]# systemctl start nginx

输入:systemctl enable nginx 来设置nginx的开机启动(linux宕机、重启会自动运行nginx不需要连上去输入命令)

[root@localhost ~]#systemctl enable nginx
Created symlink from /etc/systemd/system/multi-user.target.wants/nginx.service to /usr/lib/systemd/system/nginx.service.

2.通过iE检查能否访问

[root@localhost nginx-1.8.1]# ps -ef|grep nginx
root      14626      1  0 08:47 ?        00:00:00 nginx: master process nginx
nginx     14627  14626  0 08:47 ?        00:00:00 nginx: worker process
root      14636   3269  0 08:49 pts/1    00:00:00 grep --color=auto nginx

nginx常用的操作命令

systemctl start nginx.service               #启动nginx服务

systemctl enable nginx.service             #设置开机自启动

systemctl disable nginx.service            #停止开机自启动

systemctl status nginx.service             #查看服务当前状态

systemctl restart nginx.service           #重新启动服务

systemctl list-units –type=service        #查看所有已启动的服务

3.防火墙配置(如果系统有防火墙就需要进行写入规则)

命令:firewall-cmd –zone=public –add-port=80/tcp –permanent(开放80端口)

命令:systemctl restart firewalld(重启防火墙以使配置即时生效)

4.配置nginx对ASP.NET Core应用的转发

修改 /etc/nginx/conf.d/default.conf 文件。

将文件内容替换为

    server {
        listen 80;
        location / {
            proxy_pass http://localhost:88;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection keep-alive;
            proxy_set_header Host $host;
            proxy_cache_bypass $http_upgrade;
        }
    }

重新加载nignx

[root@localhost nginx]# nginx -s reload

nginx的配置己完成

5.开启dotnet run进行测试

[root@localhost ~]# cd /home/WebApplication1/
[root@localhost WebApplication1]# dotnet run
Using launch settings from /home/WebApplication1/Properties/launchSettings.json...
Hosting environment: Development
Content root path: /home/WebApplication1
Now listening on: http://[::]:88
Application started. Press Ctrl+C to shut down.

通过IP 80端口访问

六、配置守护服务(Supervisor)

目前存在三个问题

问题1:ASP.NET Core应用程序运行在shell之中,如果关闭shell则会发现ASP.NET Core应用被关闭,从而导致应用无法访问,这种情况当然是我们不想遇到的,而且生产环境对这种情况是零容忍的。

问题2:如果ASP.NET Core进程意外终止那么需要人为连进shell进行再次启动,往往这种操作都不够及时。

问题3:如果服务器宕机或需要重启我们则还是需要连入shell进行启动。

为了解决这个问题,我们需要有一个程序来监听ASP.NET Core 应用程序的状况。在应用程序停止运行的时候立即重新启动。这边我们用到了Supervisor这个工具,Supervisor使用Python开发的。

1.安装Supervisor

[root@localhost /]# yum install python-setuptools -y
[root@localhost /]#easy_install supervisor

2.配置Supervisor

[root@localhost /]#mkdir /etc/supervisor
[root@localhost /]#echo_supervisord_conf > /etc/supervisor/supervisord.conf

修改supervisord.conf文件,将文件尾部的配置

[root@localhost /]# vi /etc/supervisor/supervisord.conf

将里面的最后两行:

;[include]                                                   
;files = relative/directory/*.ini

改为

[include]
files = conf.d/*.conf

ps:如果服务已启动,修改配置文件可用“supervisorctl reload”命令来使其生效

3.配置对ASP.NET Core应用的守护

创建一个 WebApplication1.conf文件,内容大致如下

[root@localhost /]# vi WebApplication1.conf
[program:WebApplication1]
command=dotnet WebApplication1.dll ; 运行程序的命令
directory=/home/WebApplication1/ ; 命令执行的目录
autorestart=true ; 程序意外退出是否自动重启
stderr_logfile=/var/log/WebApplication1.err.log ; 错误日志文件
stdout_logfile=/var/log/WebApplication1.out.log ; 输出日志文件
environment=ASPNETCORE_ENVIRONMENT=Production ; 进程环境变量
user=root ; 进程执行的用户身份
stopsignal=INT

将文件拷贝至:“/etc/supervisor/conf.d/WebApplication1.conf”下

[root@localhost /]#mkdir /etc/supervisor/conf.d
[root@localhost /]#cp WebApplication1.conf /etc/supervisor/conf.d/

运行supervisord,查看是否生效

[root@localhost /]#supervisord -c /etc/supervisor/supervisord.confsupervisord -c /etc/supervisor/supervisord.conf
[root@localhost /]# ps -ef | grep WebApplication1
root      29878  29685  0 09:57 ?        00:00:00 dotnet WebApplication1.dll
root      29892  29363  0 09:57 pts/3    00:00:00 grep --color=auto WebApplication1 

如果存在dotnet WebApplication1.dll 进程则代表运行成功,这时候在使用浏览器进行访问。

至此关于ASP.NET Core应用程序的守护即配置完成。

Supervisor守护进程常用操作

【启动supervisord】
确保配置无误后可以在每台主机上使用下面的命令启动supervisor的服务器端supervisord
supervisord

【停止supervisord】
supervisorctl shutdown

【重新加载配置文件】
supervisorctl reload

七 、配置Supervisor开机启动

新建一个“supervisord.service”文件

[root@localhost /]# vi supervisord.service
# dservice for systemd (CentOS 7.0+)
# by ET-CS (https://github.com/ET-CS)
[Unit]
Description=Supervisor daemon
[Service]
Type=forking
ExecStart=/usr/bin/supervisord -c /etc/supervisor/supervisord.conf
ExecStop=/usr/bin/supervisorctl shutdown
ExecReload=/usr/bin/supervisorctl reload
KillMode=process
Restart=on-failure
RestartSec=42s
[Install]
WantedBy=multi-user.target

将文件拷贝至:“/usr/lib/systemd/system/supervisord.service”

[root@localhost /]# cp supervisord.service /usr/lib/systemd/system/

执行命令:systemctl enable supervisord

[root@localhost /]# systemctl enable supervisord
Created symlink from /etc/systemd/system/multi-user.target.wants/supervisord.service to /usr/lib/systemd/system/supervisord.service.

执行命令:systemctl is-enabled supervisord #来验证是否为开机启动

[root@localhost /]# systemctl is-enabled supervisord

重启系统看能否能成功访问

[root@localhost /]# reboot

CentOS&.NET Core初试-4-安装守护服务(Supervisor)

Supervisor是什么?

Supervisor 是一个用 Python 写的进程管理工具,可以很方便的用来启动、重启、关闭进程(不仅仅是 Python 进程)。除了对单个进程的控制,还可以同时启动、关闭多个进程,比如很不幸的服务器出问题导致所有应用程序都被杀死,此时可以用 supervisor 同时启动所有应用程序而不是一个一个地敲命令启动。

Supervisor能干什么?

Supervisor帮助我们解决在开发过程中遇到的以下问题:

  • ASP.NET Core应用程序运行在shell之中,如果关闭shell则会发现ASP.NET Core应用被关闭,从而导致应用无法访问,这种情况当然是我们不想遇到的,而且生产环境对这种情况是零容忍的。

  • 如果ASP.NET Core进程意外终止那么需要人为连进shell进行再次启动,往往这种操作都不够及时。

  • 如果服务器宕机或需要重启我们则还是需要连入shell进行启动。

安装Supervisor

首先安装Python包管理工具(Supervisor使用Python开发的),然后再安装supervisor。

yum install python-setuptools
easy_install supervisor

supervisor安装完成后会生成三个执行程序:

  • supervisortd :supervisor的守护进程服务(用于接收进程管理命令)

  • supervisorctl :客户端(用于和守护进程通信,发送管理进程的指令)

  • echo_supervisord_conf :生成初始配置文件程序。

配置Supervisor

添加supervisor文件夹以及conf.d配置文件夹

mkdir /etc/supervisor
echo_supervisord_conf > /etc/supervisor/supervisord.conf
mkdir /etc/supervisor/conf.d

修改supervisord.conf文件,在文件尾部:

[include]
files=/etc/supervisor/conf.d/*.conf

启动Supervisor服务

supervisord -c /etc/supervisor/supervisord.conf

program的配置

在supervisor的conf.d文件夹下新建一个程序配置文件,hellocore.conf:

#配置程序名称
[program:hellocore]
#运行程序的命令
command=dotnet hellocore.dll 
#命令执行的目录
directory=/home/hellocore/ 
#错误日志文件
stderr_logfile=/var/log/hellocore.err.log
#输出日志文件
stdout_logfile=/var/log/hellocore.out.log 
#进程环境变量
environment=ASPNETCORE_ENVIRONMENT=Production 
#进程执行的用户身份
user=root
#程序是否自启动
autostart=true
#程序意外退出是否自动重启
autorestart=true
#启动时间间隔(秒)
startsecs=5
stopsignal=INT

重载Supervisor的配置文件

supervisorctl reload #重新加载配置文件

客户端相关命令:

supervisorctl status #查看程序配置的状态
supervisorctl stop programname    #停止某一个程序配置
supervisorctl start programname   #加载某一个程序配置
supervisorctl restart programname #重新加载某一个程序配置
supervisorctl reload #重新加载配置
supervisorctl update

查看配置程序是否启动:

ps -ef | grep hellocore #programdllname

如下图,则Supervisor配置成功:

未分类

Supervisor配置成功

设置Supervisor开机启动

在 /usr/lib/systemd/system文件夹下新建supervisor.service配置文件,内容如下:

[Unit]
Description=supervisor
[Service]
Type=forking
ExecStart=/usr/bin/supervisord -c /etc/supervisor/supervisord.conf
ExecStop=/usr/bin/supervisorctl shutdown
ExecReload=/usr/bin/supervisorctl reload
KillMode=process
Restart=on-failure
RestartSec=42s
[Install]
WantedBy=multi-user.target

将服务设置为开机启动:

systemctl enable supervisor.service
systemctl start supervisor.service

supervisor开机启动服务配置成功

未分类

配置Supervisor图形化管理

打开supervisor的配置文件

vi /etc/supervisor/supervisord.conf

找到配置:

;[inet_http_server]         ; inet (TCP) server disabled by default
;port=127.0.0.1:9001        ; (ip_address:port specifier, *:port for all iface)
;username=user              ; (default is no username (open server))ls
;password=123               ; (default is no password (open server))

修改成:

[inet_http_server] ; inet (TCP) 服务,默认是关闭的
port=*:9001      ;ip:端口,*代表所有IP
username=root               ;登陆账号,可以不设
password=root123              ;登陆账户,可以不设

保存好修改后,重启supervisor。

supervisorctl reload

防火墙查看9001端口是否开启

firewall-cmd --list-ports #查看已开放的端口

开启端口:

firewall-cmd --zone=public  --add-port=9001/tcp --permanent
firewall-cmd --reload #重启防火墙

命令含义:

--zone #作用域
--add-port=9001/tcp  #添加端口,格式为:端口/通讯协议
--permanent  #永久生效,没有此参数重启后失效

成功访问:

未分类

为laravel/lumen queue 配置 supervisor

近期工作中使用了 laravel框架中的队列服务,需要运行队列处理器 php /path/to/artisan queue:work --tries 3 命令

测试环境部署时需要:

  • 终止原有queue:work命令
  • 开启新queue:work命令

依照laravel教程(https://laravel-china.org/docs/laravel/5.6/queues/1395#supervisor-configuration)推荐使用进程管理工具 supervisor

Supervisor 安装

yum install supervisor  # centos
sudo apt-get install supervisor  # ubuntu

supervisord 是进程管理的服务端,常驻进程辅助干活
supervisorctl 是客户端,用来执行查看、加载等命令

Supervisor 配置

生成配置文件

echo_supervisord_conf > /etc/supervisord.conf

编辑配置文件

vim /etc/supervisord.conf

配置文件中配置除了最后两行,使用默认的就行。vim中使用G把光标跳到文件末尾,将最后一行修改为如下。

# file: /etc/supervisord.conf
...
[include]
files = /etc/supervisor/*.conf

创建文件夹

mkdir -p /etc/supervisor/

创建queue work启动脚本

vim /etc/supervisor/lumen_worker.conf

lumen_worker.conf 文件内容 注意修改项目artisan文件路径

[program:lumen_worker]
process_name=%(program_name)s_%(process_num)02d
command=php /path/to/artisan queue:listen --tries 3 --sleep 3
autostart=true
autorestart=true
user=root
numprocs=1
redirect_stderr=true
stdout_logfile=/tmp/lumen_worker.log

启动 supervisor

supervisord  
# 启动supervisord服务,默认使用/etc/supervisord.conf配置文件。
# 如果报错Error: Another program is already listening on a port that one of our HTTP servers is configured to use.说明已经启动,继续即可。

supervisorctl reload  # 重启 supervisord
supervisorctl reread  # 重新读取配置文件 supervisord
supervisorctl start lumen_worker:*  # 启动lumen_worker服务

supervisorctl restart lumen_worker:*  # 重启lumen_worker服务

启动后查看一下运行状态

supervisorctl status

应该能看到lumen_worker处于RUNNING状态:

[root@X /etc/supervisor]# supervisorctl status
lumen_worker:lumen_worker_00     RUNNING   pid 19317, uptime 0:01:49

supervisor更改某项目配置后 需要重新启动才有效

在linux服务器上部署了node项目,使用supervisor进行管理,supervisor是个好工具,具体介绍见这里http://www.cnblogs.com/xueweihan/p/6195824.html

梗概了该项目对的某些配置后,重新启动项目,发现问题仍在,通过日志内容才知道原来新的配置没有被应用。 
要使更新的配置得以应用,需要重新启动supervisor服务。具体操作如下: 参考文章http://www.codeweblog.com/supervisor%E9%87%8D%E6%96%B0%E5%8A%A0%E8%BD%BD%E9%85%8D%E7%BD%AE%E5%90%AF%E5%8A%A8%E6%96%B0%E7%9A%84%E8%BF%9B%E7%A8%8B/

1、更新新的配置到supervisord

supervisorctl update

2、重新启动配置中的所有程序

supervisorctl reload

3、启动某个进程(program_name=你配置中写的程序名称)

supervisorctl start program_name

4、查看正在守候的进程

supervisorctl

5、停止某一进程 (program_name=你配置中写的程序名称)

pervisorctl stop program_name

6、重启某一进程 (program_name=你配置中写的程序名称)

supervisorctl restart program_name

7、停止全部进程

supervisorctl stop all

flask插件系列之Flask-WTF表单

flask_wtf是flask框架的表单验证模块,可以很方便生成表单,也可以当做json数据交互的验证工具,支持热插拔。

安装

pip install Flask-WTF

Flask-WTF其实是对wtforms组件的封装,使其支持对flask框架的热插拔。

简单使用

# app.py
from flask import Flask, current_app, request, render_template
from forms import MyForm

app = Flask(__name__,template_folder='static/html')
@app.route('/',methods=['GET','POST'])
def login():
    form = MyForm()
    if form.validate_on_submit():
        return 'OK'
    return render_template('forms/index.html', form=form)
if __name__ == '__main__':
    app.run(host='127.0.0.1', port=80, debug=True)

# forms.py
from flask_wtf import FlaskForm
from wtforms import StringField
from wtforms.validators import DataRequired

class MyForm(FlaskForm):
    name = StringField('name', validators=[DataRequired()])

# forms/index.html
<form method="POST" action="/">
{{ form.csrf_token }}
{{ form.name.label }} {{ form.name(size=20) }}
<input type="submit" value="Go">
</form>

flask_wtf定义字段

flask_wtf完全使用wtforms组件的字段模型,wtforms对字段的定义在fields模块;又分为core和simple,core模块定义了普通使用的字段,simple在core模块的基础上扩展了一些字段,这些字段会自动进行字段级别的校验。

  • 字段类型
# core.py
__all__ = (
    'BooleanField', 'DecimalField', 'DateField', 'DateTimeField', 'FieldList',
    'FloatField', 'FormField', 'IntegerField', 'RadioField', 'SelectField',
    'SelectMultipleField', 'StringField',
)
常用字段说明:
BooleanField:布尔类型,如Flask,True
StringField:字符串类型
DecimalField:小数点文本字段,如:‘1.23’
DateField:日期字段,格式:'%Y-%m-%d'
DateTimeField:日期字段,格式:'%Y-%m-%d %H:%M:%S'
FieldList:统一字段类型组成列表,如:FieldList(StringField('Name', [validators.required()]))
FloatField:浮点数类型
IntegerField:整形
SelectMultipleField:多选框
RadioField:单选框

# simple.py
TextAreaField:文本域,可接受多行输入
PasswordField:密码字段,输入的不会直接在浏览器明文显示
FileField:上传文件,但不会处理验证文件,需要手动处理
HiddenField:隐藏字段
SubmitField:按钮
TextField:字符串类型的别名,弃用
  • 表单定义
# 参数:
class UserAdminForm(FlaskForm):
    username = StringField(label='用户名', validators=[DataRequired(),Length(4,20)])
    password_hash = PasswordField(label='密码',validators=[DataRequired(),Length(4,20)])
    limit = SelectField(label='用户权限',
                        choices=[('guest', '所有权限'),
                                 ('operation', '可读可写不可删除'),
                                 ('management', '可读不可写')],
                        default='guest')  # 权限

# 字段一般的参数
# label:字段的名字
# default:默认
# validators:字段的验证序列
# description:字段的描述
# choices:SelectField和他的子类有的字段,选择框,多选一
  • 字段的验证序列

字段的参数validators可以指定提交表单的验证序列,按照从左到右的顺序,默认的可选验证在wtforms.validators模块,已经封装的验证方法有:

__all__ = (
    'DataRequired', 'data_required', 'Email', 'email', 'EqualTo', 'equal_to',
    'IPAddress', 'ip_address', 'InputRequired', 'input_required', 'Length',
    'length', 'NumberRange', 'number_range', 'Optional', 'optional',
    'Required', 'required', 'Regexp', 'regexp', 'URL', 'url', 'AnyOf',
    'any_of', 'NoneOf', 'none_of', 'MacAddress', 'mac_address', 'UUID'
)
模块中大小写有对应的方式,如DataRequired对应data_required。

DataRequired/data_required:验证数据是否真实存在,即不能为空,必须是非空白字符串,否则触发StopValidation错误。
InputRequired/input_required:和DataRequired的区别在于可以是空白字符串;
Required/required:data_required的别名
Email/email:验证符合邮件的格式,只有最基本的验证;
EqualTo/equal_to:比较两个字段的值,比如密码和确认密码,如果不相等就触发错误,equal_to(field,message),需要输入另一个字段的名字。
IPAddress/ip_address:验证是否是ip地址,默认验证IPV4地址。
MacAddress/mac_address:验证是否符合mac格式;
UUID:是否是uuid格式;
URL/url:验证是否符合url格式;
Regexp/regexp:用提供的正则表达式验证字段;Regexp(r"")
Length/length:设置字段值的长度,Length(min,max);
NumberRange/number_range:设置一个数字字段的取值范围,可以针对浮点数和小数;NumberRange(min,max)
Optional/optional:允许字段为空并停止验证;
NoneOf/none_of:将传入的数据和无效的比较,是抛出异常;Noneof(values).
Anyof/any_of:将传入的数据和预设的数据比较,不是异常。Anyof(values).
  • 自定义字段验证

如果默认的验证序列不满足我们的要求,我们可以通过继承的方式自定义字段。

from wtforms.validators import DataRequired,Length,StopValidation
class NewStringField(StringField):
    """
    自定义一个新的字段
    """
    def pre_validate(self, form):
        """验证方法,在validators验证序列之前"""
        x:str = form.name.data
        if not x.startswith('g'):
            raise StopValidation("your data must be startswith 'g'")

    def post_validate(self, form, validation_stopped):
        """
        验证方法,在validators验证序列之后
        :param form:该字段所属的表单对象
        :param validation_stopped:前面验证序列的结果,True表示验证通过,False表示验证失败
        :return:
        """
        if not validation_stopped:
            raise ValueError("验证失败了!")
        pass
  • 触发StopValidation异常会停止验证链;

  • 自定义表单验证

一般来说,如果对表单有额外需要的验证,一般自定义表单的额外的验证方法而不是重新自定义新的字段,而form已经为我们提供了这种方法。
看Form对象的源码:

def validate(self):
    """
    Validates the form by calling `validate` on each field, passing any
    extra `Form.validate_<fieldname>` validators to the field validator.
    """
    extra = {}
    for name in self._fields:
        inline = getattr(self.__class__, 'validate_%s' % name, None)
        if inline is not None:
            extra[name] = [inline]

    return super(Form, self).validate(extra)

Form对象调用validate函数时会自动寻找validate_%s的方法添加到验证序列,并在原先字段的验证序列验证完毕后执行。

class MyForm(FlaskForm):
    name = StringField('name', validators=[DataRequired(), Length(4,20)])
    def validate_name(self, field):
        print(field.data)
        if hasattr(self, 'name') and len(self.name.data) > 5:
            print(self.name.data)
            return True
        raise ValidationError('超过5个字符')

# 在自定义的验证方法中,抛出异常使用ValidationError,validate会自动捕捉。

表单对象

flask_wtf推荐使用Form对象的子类FlaskForm代替,该对象提供了所有表单需要的属性和方法。那么Form对象是如何自动实现表单功能的呢?
分析FlaskForm对象源码:

class FlaskForm(Form):
    class Meta(DefaultMeta):
        def wrap_formdata(self, form, formdata):
            pass

    def __init__(self, formdata=_Auto, **kwargs):
        csrf_enabled = kwargs.pop('csrf_enabled', None)
        pass
    def is_submitted(self):
        pass
    def validate_on_submit(self):
        pass
    def hidden_tag(self, *fields):
        pass
    def validate(self):
        pass
  • FlaskForm内部定义了一个Meta类,该类添加csrf保护的一些方法,所以创建表单的时候一定要导入FlaskForm而不是Form.

  • is_submitted:检查是否有一个活跃的request请求;

  • validate_on_submit:调用is_submitted和validate方法,返回一个布尔值,用来判断表单是否被提交;

  • validate:字段级别的验证,每个字段都有一个validate方法,FlaskForm调用validate会对所有的字段调用validate方法验证,如果所有的验证都通过返回Ture,否则抛出异常。

  • hidden_tag:获取表单隐藏的字段;

  • wrap_formdata:获取request中的form,每次form对象初始化时会执行该函数从request获取form。

重要属性

form.data:字段名字和值组成的字典;
form.errors:验证失败的信息字典,在调用validate_on_submit方法后才有效;
form.name.data:字段name的值;
form.name.type:字段name的类型

常用场景

  • 登录验证
# froms.py
class UserPasswordForm(FlaskForm):
    """
    登录提交的表单
    """
    username = StringField('User', validators=[DataRequired()])
    password = PasswordField('Password', validators=[DataRequired()])

# form.html
<form method="POST" action="/">
{{ form.csrf_token }}
{{ form.username.label }} {{ form.username(size=20) }}
{{ form.password.label }} {{ form.password }}
<input type="submit" value="Go">
</form>

# views.py
@app.route('/login',methods=['GET','POST'])
def login():
    form = UserPasswordForm()
    if form.validate_on_submit():
        # 验证表单
        if form.username.data == "xiaoming" and form.password.data == '123':
            return 'OK'
    return render_template('forms/index.html', form=form)
  • ajax请求转化表单

有时候我们没有html页面的表单,只有ajax请求的数据交互,但是想借用Form来定义接口和验证接收的数据,如果ajax的请求方法是(‘POST’, ‘PUT’, ‘PATCH’, ‘DELETE’)中的一种,FlaskForm会自动从request对象中调用request.form和request.get_json()方法来接收数据,因此这种方式十分方便。注意:get方法不再其中。

# form.py
class MyForm(FlaskForm):
    name = StringField('name', validators=[DataRequired(), Length(4,20)])
# view.py
@app.route('/form',methods=['GET','POST'])
def form():
    if request.method != "GET":
        form = MyForm() # form会获取请求数据
        print(form.data)
        return 'ok'
    return ''
# test.py
import requests as req
import json

class ProTest():
    baseurl = 'http://127.0.0.1:80'
    def test_form(self):
        url = self.baseurl + '/form'
        rsp = req.post(url,json={'name':'hhhh'})
        # rsp = req.get(url,json={'name':'hhhh'})
if __name__ == '__main__':
    test = ProTest()
    test.test_form()
  • form启用csrf保护

默认csrf保护是开启的,只要在html文件中添加{{ form.csrf_token }},app必须设置SECRET_KEY参数。

# 禁用保护
form = Form(csrf_enabled=False)
# 或配置app时
WTF_CSRF_ENABLED = False
  • 一般数据csrf保护

同理必须设置SECRET_KEY参数。

from flask_wtf.csrf import CsrfProtect
csrf = CsrfProtect()

def create_app():
    app = Flask(__name__)
    csrf.init_app(app)

# 模板中添加一个隐藏域
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
<meta name="csrf-token" content="{{ csrf_token() }}">
# 如果是ajax请求,可以在脚本中
var csrftoken = "{{ csrf_token() }}"
# 然后每个请求添加 X-CSRFToken 头部

# 全局禁用csrf
WTF_CSRF_CHECK_DEFAULT = False

# 对一些视图跳过csrf检查
@csrf.exempt
@app.route('/foo', methods=('GET', 'POST'))
def my_handler():
    return 'ok'