Python + MongoDB 小型程序利器

|作为一个用着和领扣 LeetCode 同样技术栈 —— Python 的程序员,对于平时一些小的想法和 Demo 自然是通过 Python 来解决,但是在学习和使用的过程中,对于数据的存储一直难以统一,最初使用纯文本文件存储,发现对于格式化索引来说纯文本存储效率太低,之后又转了 MySQL 存储,但是发现过于复杂,对于一些热更新数据来说写起来十分不雅观,限制太多,无奈便换成了 JSON 格式存储,当然,那是在现在使用的 MongoDB 之前了。

MongoDB 概要

MongoDB 用起来其实比较随意,相关命令遍写的感觉和 Python 这类弱类型语言很相似,用起来比较 Geek。

未分类

MongoDB 是一个面向文档的数据库,目前由 10gen 开发并维护,它的功能丰富,齐全,完全可以替代 MySQL。 MonogDB 的一些亮点:

  • 使用 JSON 风格 语法,易于掌握和理解:MongoDB 使用 JSON 的变种 BSON 作为内部存储的格式和语法。针对 MongoDB 的操作都使用 JSON 风格语法,客户端提交或接收的数据都使用 JSON 形式来展现。相对于 SQL来说,更加直观,容易理解和掌握。
  • Schema-less,支持嵌入子文档:MongoDB 是一个 Schema-free 的文档数据库。一个数据库可以有多个Collection,每个 Collection 是Documents的集合。Collection 和 Document 和传统数据库的 Table 和 Row并不对等。无需事先定义 Collection,随时可以创建。
  • Collection中可以包含具有不同 schema 的文档记录。 这意味着,你上一条记录中的文档有3个属性,而下一条记录的文档可以有10个属 性,属性的类型既可以是基本的数据类型(如数字、字符串、日期等),也可以是数组或者散列,甚至还可以是一个子文档(embed document)。这样,可以实现逆规范化(denormalizing)的数据模型,提高查询的速度。

未分类

如果在本地测试或者仅仅是为了临时丢一些数据进去的话,安装并启动 mongod 后直接在命令行下 mongo 即可完成连接,默认没有连接密码,如果看到类似如下提示的话,说明 MongoDB 已经安装完成了:

未分类

Python + MongoDB

下面让 Python 连接上 MongoDB:

安装 PyMongo:

pip3 install pymongo

在 Python 中引入:

import pymongo

指定数据表并连接:

# 默认的 MongoDB 监听地址
myclient = pymongo.MongoClient("mongodb://localhost:27017/")

# 使用上一步建立的 myclient 连接,并且使用 leetcode 数据库
db = myclient["leetcode"]

# 使用 db 连接的 leetcode 数据库中的 articles 表
table = db['articles']

增删改查:

# 定义我们要插入的数据,JSON 格式,在 Python 中就是 Dict 格式
post = {"author": "Nova Kwok",
        "text": "LeetCode is in China!",
        "tags": ["mongodb", "python", "pymongo"],
        "date": datetime.datetime.utcnow()}

插入一条记录:

# 插入一条记录并返回插入 ID
post_id = posts.insert_one(post).inserted_id

查询记录,这里我们需要多 import 一个包,pprint:

import pprint
pprint.pprint(posts.find_one())

返回结果:

{u'_id': ObjectId('...'),
u'author': u'Nova Kwok',
u'date': datetime.datetime(...),
u'tags': [u'mongodb', u'python', u'pymongo'],
u'text': u'LeetCode is now in China!'}

PHP 和 Python 基于 UDP 协议操作 memcached

在写完(https://mp.weixin.qq.com/s?__biz=MzAwOTU4NzM5Ng==&mid=2455770298&idx=1&sn=1a6232862a977c9bc85d99620a9e8499&scene=21#wechat_redirect)这篇文章后,我重新燃起了对 memcached 的兴趣,在新浪博客的时候,我们很早就使用了 memcached,但由于大部分服务使用了 squid 缓存,所以 memcached 没有大规模使用,十年过去了,我对 memcached 的认知也越来越肤浅了,乘着这次机会,我重新看了一遍 memcached 官方 wiki,打算写几篇文章回顾下,今天聊第一个话题,那就是如何基于 UDP 协议操作 memcached。

首先 memcached 支持 TCP 和 UDP 协议,至于两者的差别不是本文的重点,本文主要讲解如何在 PHP 和 Python 中以 UDP 协议的方式操作 memcached。

memcached 服务如何开启 UDP

由于出现过 memcached UDP 反射攻击,所以很多 linux 发行版默认启动的是关闭 UDP 端口的,如果你想开启,可以执行下列命令:

$ memcached -m 64 -p 11212 -U 11211 -u memcache -l 127.0.0.1

-U 参数表示指定 UDP 端口,如果 -U 参数的值为0 表示关闭 UDP 端口。

一旦执行上列命令,表示 memcached 服务器端同时监听 11211 的 UDP 和 TCP 端口,通过下列命令可看出:

$ netstat -an | grep 11211
tcp 0 0 127.0.0.1:11211 0.0.0.0:* LISTEN     
udp 0 0 127.0.0.1:11211 0.0.0.0:*

命令行 UDP 连接 memcached

在 Linux 中,telnet 只能运行于 TCP 模式,所以为了演示,只能采用 nc 命令行。UDP 操作 memcached,在操作数据的时候必须增加一个 frame 头,后续的数据格式和 TCP 操作 memcached 一样,查看官方手册:

 The frame header is 8 bytes long, as follows (all values are 16-bit integers

 in network byte order, high byte first):


 0-1 Request ID.

 2-3 Sequence number.

 4-5 Total number of datagrams in this message.

 6-7 Reserved for future use; must be 0.

为了说的简单点,可以执行下列命令,以 UDP 协议操作 memcached:

$ printf 'x00x00x00x00x00x01x00x00statsrn' | nc -u 127.0.0.1 11211 
$ printf 'x00x00x00x00x00x01x00x00set v 0 0 1rnxrn' | nc -u 127.0.0.1 11211 
$ printf 'x00x00x00x00x00x01x00x00get vrn' | nc -u 127.0.0.1 11211

PHP 以 UDP 协议操作 memcached

php-memcached 扩展也支持 UDP 协议操作 memcached,但并不鼓励,所以官方文档介绍 UDP 操作非常少,我也是查了官方的 Issues 才明白的。另外即使支持,UDP 操作也有限制,比如 set 命令支持 UDP 协议,但 get 命令就不支持,至于原因,大家可以思考下,后续我会简单说一说。

先看代码:

$m_udp = new Memcached();
# 使用 UDP 协议模式
$m_udp->setOption(Memcached::OPT_USE_UDP, true);
# 注意,支持文本模式的协议,而非二进制协议
$m_udp->setOption(Memcached::OPT_BINARY_PROTOCOL, false);

$m_udp->addServer('127.0.0.1', 11211, 1);

echo $m_udp->get('y');
var_dump($m_udp->getResultMessage());

输出 string(20) “ACTION NOT SUPPORTED”,可以看出 php-memcached 扩展做了限制,不允许 UDP 协议操作 get 命令。

$m_udp->set('y',"ok");
var_dump($m_udp->getResultMessage());
$m_tcp =new Memcached();
# 切换为默认的 TCP 连接方式
$m_tcp->addServer('127.0.0.1', 11211, 1);
echo $m_tcp->get("y");

执行完毕,成功输出 ok。

Python 以 UDP 协议操作 memcached

Python 有专门的包基于 UDP 协议操作 memcached,这就是 python-memcached-udp 包,安装后,演示一个例子:

client = memcached_udp.Client([('localhost', 11211)])
client.set('key1', 'value1')
r = client.get('key1')
print (r)

大家可以看看这个包的源代码,非常有意思,可以学到很多 memcached 命令知识。

nginx 日志切割的三种方法(logrotate切割、shell脚本切割、Python脚本切割)

nginx 日志切割

PS:nginx日志切割可以通过两种方式进行切割,日志logrotate,脚本

方法一:通过logrotate切割

个人感觉配置麻烦,且结合自己这边的项目的环境的复杂,所以就不使用这种方式。这里我也就不写了

可以参考:https://www.cnblogs.com/ilanni/p/5365420.html

方法二:通过shell脚本切割

1、编写shell脚本

#!/bin/bash
#此脚本用于自动分割Nginx的日志,包括access.log和error.log
#每天00:00执行此脚本 将前一天的access.log重命名为access-xxxx-xx-xx.log格式,并重新打开日志文件
#Nginx日志文件所在目录
LOG_PATH=/opt/nginx/logs
#获取昨天的日期
YESTERDAY=$(date -d "yesterday" +%Y-%m-%d)
#获取pid文件路径
PID=/var/run/nginx/nginx.pid
#分割日志
mv ${LOG_PATH}access.log ${LOG_PATH}access-${YESTERDAY}.log
mv ${LOG_PATH}error.log ${LOG_PATH}error-${YESTERDAY}.log
#向Nginx主进程发送USR1信号,重新打开日志文件
kill -USR1 `cat ${PID}`

2、配置crontab每天凌晨00:00定时执行这个脚本

crontab -e
# 输入以下内容并保存
00 00 * * * /bin/bash /opt/nginx/sbin/cut_nginx_log.sh

方法三:通过Python脚本切割

未分类

鉴于这种乱情况,日志目录下面还有多个项目的日志,且分别保存在不同的文件下面(下面又分为测试环境的日志,预发布环境的日志),使用shell脚本就不怎么好写,我就采取了通过Python来实现

脚本:

import os, sys, datetime,re

# nginx日志存放的路径
nginxLogPath="/opt/nginx-1.9.5/logs/"
# 获取昨天的日期
yesterday = (datetime.date.today() + datetime.timedelta(days = -1)).strftime("%Y-%m-%d")
# nginx启动的pid文件
PID = "/var/run/nginx.pid"


def cutNginxLog(path):
    """
    切割nginx日志函数
    :param path: 日志文件的第一级目录
    :return: 
    """
    logList = os.listdir(path)    # 判断传入的path是否是目录
    for logName in logList:      # 循环处理目录里面的文件
        logAbsPath = os.path.join(path, logName)
        if os.path.isdir(logAbsPath):   # 如果是目录,递归调用自己,继续处理
            cutNginxLog(logAbsPath)
        else:         # 如果是日志文件,就进行处理
            # 分割日志
            re_Num = re.compile(r'^[a-zA-Z]')
            # 判断日志文件是否是字母开头,如果是日期开头就不切割
            if re_Num.search(logName):
                logNewName = yesterday + "_" + logName          # 新日志文件名称列如:2018-11-8_access.log
                oldLogPath = os.path.join(path, logName)        # 旧日志文件绝对路径
                newLogPath = os.path.join(path, logNewName)     # 新日志文件绝对路径
                os.rename(oldLogPath, newLogPath)

    cmd = " kill -USR1 `cat %s` "% PID
    res = os.system(cmd)
    if res != 0:
        return "重新加载nginx失败"
cutNginxLog(nginxLogPath)

计划任务:

# crontab -e

30 02 * * * /usr/bin/python3.5 /opt/nginx-1.9.5/sbin/cut_nginx_log.py

切割后的效果图:

未分类

CENTOS 使用ANSIBLE 将PYTHON 包安装到VIRTUALENV环境中

使用root用户,则直接安装

pip: name=pkgname virtualenv=虚拟环境目录

如果以!root用户安装,ansible无法获取virtualenv可执行文件,需要手动将执行路径添加到PATH环境变量,在用户家目录的.local/bin目录下

environment:
      PATH: "{{ansible_env.PATH}}/{{ansible_user_dir}}/.local/bin"

完整实例:

tasks:
  - name: install pip packages
    pip: name={{item}} virtualenv=envdir
    with_items:
      - requests
      - flask
    environment:
      PATH: "{{ansible_env.PATH}}/{{ansible_user_dir}}/.local/bin"

python tornado TCPserver异步协程实例

内容预览:

  • 它的值包括四种:AF_UNIX,AF_INET,AF_INET6和AF_UNSPEC~
  • # 相反,我们使用listen backlog作为我们可以合理接受的连接数的~
  • if errno_from_exception(e) == errno.ECONNABORTED: continue raise ca…~

项目所用知识点

  • tornado
  • socket
  • tcpserver
  • 协程
  • 异步

tornado tcpserver源码抛析

在tornado的tcpserver文件中,实现了TCPServer这个类,他是一个单线程的,非阻塞的tcp 服务。

为了与上层协议(在tornado中就是HTTPServer)交互,TCPServer提供了一个接口:handle_stream, 要求其子类必需实现该方法,该方法就是主要用来处理应用层逻辑的。

我们可以通过下面代码倒入模块查看源码

from tornado.tcpserver import TCPServer

源码中给了好多解释,先把源码注释贴进来

class TCPServer(object):

    ‘’‘

    1.要想用TCPserver,我给你提供你一个接口handle_stream,子类中必须要有这个方法

    2.他已经给我们举出了例子

    3.往下他给咱们介绍了几种启动方法,而我用的listen()方法启动看起来简单明了

    ’‘’

    r"""A non-blocking, single-threaded TCP server.



    To use `TCPServer`, define a subclass which overrides the `handle_stream`

    method. For example, a simple echo server could be defined like this::



      from tornado.tcpserver import TCPServer

      from tornado.iostream import StreamClosedError

      from tornado import gen



      class EchoServer(TCPServer):

          @gen.coroutine

          def handle_stream(self, stream, address):

              while True:

                  try:

                      data = yield stream.read_until(b"n")

                      yield stream.write(data)

                  except StreamClosedError:

                      break



    To make this server serve SSL traffic, send the ``ssl_options`` keyword

    argument with an `ssl.SSLContext` object. For compatibility with older

    versions of Python ``ssl_options`` may also be a dictionary of keyword

    arguments for the `ssl.wrap_socket` method.::



       ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)

       ssl_ctx.load_cert_chain(os.path.join(data_dir, "mydomain.crt"),

                               os.path.join(data_dir, "mydomain.key"))

       TCPServer(ssl_options=ssl_ctx)



    `TCPServer` initialization follows one of three patterns:



    1. `listen`: simple single-process::



            server = TCPServer()

            server.listen(8888)

            IOLoop.current().start()



    2. `bind`/`start`: simple multi-process::



            server = TCPServer()

            server.bind(8888)

            server.start(0)  # Forks multiple sub-processes

            IOLoop.current().start()



       When using this interface, an `.IOLoop` must *not* be passed

       to the `TCPServer` constructor.  `start` will always start

       the server on the default singleton `.IOLoop`.



    3. `add_sockets`: advanced multi-process::



            sockets = bind_sockets(8888)

            tornado.process.fork_processes(0)

            server = TCPServer()

            server.add_sockets(sockets)

            IOLoop.current().start()



       The `add_sockets` interface is more complicated, but it can be

       used with `tornado.process.fork_processes` to give you more

       flexibility in when the fork happens.  `add_sockets` can

       also be used in single-process servers if you want to create

       your listening sockets in some way other than

       `~tornado.netutil.bind_sockets`.



    .. versionadded:: 3.1

       The ``max_buffer_size`` argument.



    .. versionchanged:: 5.0

       The ``io_loop`` argument has been removed.

    """

自己仔细看该类的其他方法

    def listen(self, port, address=""):

        """Starts accepting connections on the given port.



        This method may be called more than once to listen on multiple ports.

        `listen` takes effect immediately; it is not necessary to call

        `TCPServer.start` afterwards.  It is, however, necessary to start

        the `.IOLoop`.

        """

        sockets = bind_sockets(port, address=address)

        self.add_sockets(sockets)



    def add_sockets(self, sockets):

        """Makes this server start accepting connections on the given sockets.



        The ``sockets`` parameter is a list of socket objects such as

        those returned by `~tornado.netutil.bind_sockets`.

        `add_sockets` is typically used in combination with that

        method and `tornado.process.fork_processes` to provide greater

        control over the initialization of a multi-process server.

        """

        for sock in sockets:

            self._sockets[sock.fileno()] = sock

            self._handlers[sock.fileno()] = add_accept_handler(

                sock, self._handle_connection)



    def add_socket(self, socket):

        """Singular version of `add_sockets`.  Takes a single socket object."""

        self.add_sockets([socket])



    def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128,

             reuse_port=False):

        """Binds this server to the given port on the given address.



        To start the server, call `start`. If you want to run this server

        in a single process, you can call `listen` as a shortcut to the

        sequence of `bind` and `start` calls.



        Address may be either an IP address or hostname.  If it's a hostname,

        the server will listen on all IP addresses associated with the

        name.  Address may be an empty string or None to listen on all

        available interfaces.  Family may be set to either `socket.AF_INET`

        or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise

        both will be used if available.



        The ``backlog`` argument has the same meaning as for

        `socket.listen <socket.socket.listen>`. The ``reuse_port`` argument

        has the same meaning as for `.bind_sockets`.



        This method may be called multiple times prior to `start` to listen

        on multiple ports or interfaces.



        .. versionchanged:: 4.4

           Added the ``reuse_port`` argument.

        """

        sockets = bind_sockets(port, address=address, family=family,

                               backlog=backlog, reuse_port=reuse_port)

        if self._started:

            self.add_sockets(sockets)

        else:

            self._pending_sockets.extend(sockets)



    def start(self, num_processes=1):

        """Starts this server in the `.IOLoop`.



        By default, we run the server in this process and do not fork any

        additional child process.



        If num_processes is ``None`` or <= 0, we detect the number of cores

        available on this machine and fork that number of child

        processes. If num_processes is given and > 1, we fork that

        specific number of sub-processes.



        Since we use processes and not threads, there is no shared memory

        between any server code.



        Note that multiple processes are not compatible with the autoreload

        module (or the ``autoreload=True`` option to `tornado.web.Application`

        which defaults to True when ``debug=True``).

        When using multiple processes, no IOLoops can be created or

        referenced until after the call to ``TCPServer.start(n)``.

        """

        assert not self._started

        self._started = True

        if num_processes != 1:

            process.fork_processes(num_processes)

        sockets = self._pending_sockets

        self._pending_sockets = []

        self.add_sockets(sockets)



    def stop(self):

        """Stops listening for new connections.



        Requests currently in progress may still continue after the

        server is stopped.

        """

        if self._stopped:

            return

        self._stopped = True

        for fd, sock in self._sockets.items():

            assert sock.fileno() == fd

            # Unregister socket from IOLoop

            self._handlers.pop(fd)()

            sock.close()



    def handle_stream(self, stream, address):

        """Override to handle a new `.IOStream` from an incoming connection.



        This method may be a coroutine; if so any exceptions it raises

        asynchronously will be logged. Accepting of incoming connections

        will not be blocked by this coroutine.



        If this `TCPServer` is configured for SSL, ``handle_stream``

        may be called before the SSL handshake has completed. Use

        `.SSLIOStream.wait_for_handshake` if you need to verify the client's

        certificate or use NPN/ALPN.



        .. versionchanged:: 4.2

           Added the option for this method to be a coroutine.

        """

        raise NotImplementedError()



    def _handle_connection(self, connection, address):

        if self.ssl_options is not None:

            assert ssl, "Python 2.6+ and OpenSSL required for SSL"

            try:

                connection = ssl_wrap_socket(connection,

                                             self.ssl_options,

                                             server_side=True,

                                             do_handshake_on_connect=False)

            except ssl.SSLError as err:

                if err.args[0] == ssl.SSL_ERROR_EOF:

                    return connection.close()

                else:

                    raise

            except socket.error as err:

                # If the connection is closed immediately after it is created

                # (as in a port scan), we can get one of several errors.

                # wrap_socket makes an internal call to getpeername,

                # which may return either EINVAL (Mac OS X) or ENOTCONN

                # (Linux).  If it returns ENOTCONN, this error is

                # silently swallowed by the ssl module, so we need to

                # catch another error later on (AttributeError in

                # SSLIOStream._do_ssl_handshake).

                # To test this behavior, try nmap with the -sT flag.

                # https://github.com/tornadoweb/tornado/pull/750

                if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):

                    return connection.close()

                else:

                    raise

        try:

            if self.ssl_options is not None:

                stream = SSLIOStream(connection,

                                     max_buffer_size=self.max_buffer_size,

                                     read_chunk_size=self.read_chunk_size)

            else:

                stream = IOStream(connection,

                                  max_buffer_size=self.max_buffer_size,

                                  read_chunk_size=self.read_chunk_size)



            future = self.handle_stream(stream, address)

            if future is not None:

                IOLoop.current().add_future(gen.convert_yielded(future),

                                            lambda f: f.result())

        except Exception:

            app_log.error("Error in connection callback", exc_info=True)

通过方法名就能看出来,而且开头已经给出实例怎么去用,所以这个就不一一解释了,我自己的用法如下

from tornado.tcpserver import TCPServer

from tornado.iostream import IOStream, StreamClosedError

from tornado import gen

from tornado.ioloop import IOLoop

import struct



class ProxyServer(TCPServer):

    def __init__(self, *args, **kwargs):

        super(ProxyServer, self).__init__(*args, **kwargs)

        self.devices = dict()



    @gen.coroutine

    def handle_stream(self, stream, address):

          pass





if __name__ == "__main__":

    server = ProxyServer()

    server.listen(1234)

    IOLoop.current().start()

具体步骤来分析 一下

TCPServer执行过程

1.server = ProxyServer()创建tcpserver对象,该步骤仅仅做了一个初始化操作

def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None, read_chunk_size=None):

        self.io_loop = io_loop

        self.ssl_options = ssl_options

        self._sockets = {}  # fd -> socket object    用来存储文件描述符与socket对象的映射关系

        self._pending_sockets = []

        self._started = False

        self.max_buffer_size = max_buffer_size    # 最大缓冲长度

        self.read_chunk_size = read_chunk_size    # 每次读的chunk大小



        # 校验ssl选项. 

        if self.ssl_options is not None and isinstance(self.ssl_options, dict):

            if 'certfile' not in self.ssl_options:

                raise KeyError('missing key "certfile" in ssl_options')



            if not os.path.exists(self.ssl_options['certfile']):

                raise ValueError('certfile "%s" does not exist' % self.ssl_options['certfile'])

            if ('keyfile' in self.ssl_options and not os.path.exists(self.ssl_options['keyfile'])):

                raise ValueError('keyfile "%s" does not exist' % self.ssl_options['keyfile'])

2.想都不要想肯定是开启socket

步骤是执行server.listen(1234)的时候,

    def listen(self, port, address=""):

        """Starts accepting connections on the given port.



        This method may be called more than once to listen on multiple ports.

        `listen` takes effect immediately; it is not necessary to call

        `TCPServer.start` afterwards.  It is, however, necessary to start

        the `.IOLoop`.

        """

        sockets = bind_sockets(port, address=address)

        self.add_sockets(sockets)

3.看下listen里面有调用bind_sockets()方法,来看下该方法

def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=_DEFAULT_BACKLOG, flags=None, reuse_port=False):

    if reuse_port and not hasattr(socket, "SO_REUSEPORT"):

        raise ValueError("the platform doesn't support SO_REUSEPORT")



    sockets = []

    if address == "":

        address = None

    # address family参数指定调用者期待返回的套接口地址结构的类型。它的值包括四种:AF_UNIX,AF_INET,AF_INET6和AF_UNSPEC。

    # AF_UNIX用于同一台机器上的进程间通信

    # 如果指定AF_INET,那么函数就不能返回任何IPV6相关的地址信息;如果仅指定了AF_INET6,则就不能返回任何IPV4地址信息。

    # AF_UNSPEC则意味着函数返回的是适用于指定主机名和服务名且适合任何协议族的地址。

    # 如果某个主机既有AAAA记录(IPV6)地址,同时又有A记录(IPV4)地址,那么AAAA记录将作为sockaddr_in6结构返回,而A记录则作为sockaddr_in结构返回

    if not socket.has_ipv6 and family == socket.AF_UNSPEC: # 如果系统不支持ipv6

        family = socket.AF_INET

    if flags is None:

        flags = socket.AI_PASSIVE

    bound_port = None

    for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags)):

        af, socktype, proto, canonname, sockaddr = res

        if (sys.platform == 'darwin' and address == 'localhost' and af == socket.AF_INET6 and sockaddr[3] != 0):

            # Mac OS X在“localhost”的getaddrinfo结果中包含一个链接本地地址fe80 :: 1%lo0。 

            # 但是,防火墙不了解这是一个本地地址,并且会提示访问。 所以跳过这些地址。

            continue

        try:

            sock = socket.socket(af, socktype, proto)

        except socket.error as e:

            # 如果协议不支持该地址

            if errno_from_exception(e) == errno.EAFNOSUPPORT:

                continue

            raise

        # 为 fd 设置 FD_CLOEXEC 标识

        set_close_exec(sock.fileno())

        if os.name != 'nt': # 非windows

            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        if reuse_port:

            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)

        if af == socket.AF_INET6:

            # On linux, ipv6 sockets accept ipv4 too by default,

            # but this makes it impossible to bind to both

            # 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,

            # separate sockets *must* be used to listen for both ipv4

            # and ipv6.  For consistency, always disable ipv4 on our

            # ipv6 sockets and use a separate ipv4 socket when needed.

            #

            # Python 2.x on windows doesn't have IPPROTO_IPV6.

            if hasattr(socket, "IPPROTO_IPV6"):

                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)



        # 自动端口分配,端口=None

        # 应该绑定在IPv4和IPv6地址上的同一个端口上

        host, requested_port = sockaddr[:2]

        if requested_port == 0 and bound_port is not None:

            sockaddr = tuple([host, bound_port] + list(sockaddr[2:]))

        # 设置socket为非阻塞

        sock.setblocking(0)    

        sock.bind(sockaddr)

        bound_port = sock.getsockname()[1]

        sock.listen(backlog)

        sockets.append(sock)

    return sockets

4.接下来执行的是add_sockets()方法

def add_sockets(self, sockets):

        if self.io_loop is None:

            self.io_loop = IOLoop.current()    # 获取IOLoop实例对象



        for sock in sockets:

            self._sockets[sock.fileno()] = sock

            add_accept_handler(sock, self._handle_connection, io_loop=self.io_loop)

可以看到里面调用了add_accept_handler方法,来我们进去看看该方法干啥了

5.探析add_accept_handler方法

def add_accept_handler(sock, callback, io_loop=None):

    if io_loop is None: # 获取IOLoop实例对象

        io_loop = IOLoop.current()



    def accept_handler(fd, events):

        # 我们处理回调时可能会有许多的连接等待建立; 为了防止其他任务的饥饿,我们必须限制我们一次接受的连接数。 

        # 理想情况下,我们接受在处理回调过程中等待的连接数,但此可能会对负载产生不利影响。 

        # 相反,我们使用listen backlog作为我们可以合理接受的连接数的。

        for i in xrange(_DEFAULT_BACKLOG): # _DEFAULT_BACKLOG默认为128

            try:

                connection, address = sock.accept()

            except socket.error as e:

                # _ERRNO_WOULDBLOCK 与EAGAIN相同,表示再尝试一下,很多情况下是因为资源不足,或者条件未达成

                # 当某个子进程与客户端建立了连接,其他子进程再次尝试与该客户端建立连接时就会产生该错误

                if errno_from_exception(e) in _ERRNO_WOULDBLOCK:

                    return

                # ECONNABORTED表示有一个连接,在他处于等待被服务端accept的时候主动关闭了。

                if errno_from_exception(e) == errno.ECONNABORTED:

                    continue

                raise

            callback(connection, address)

    io_loop.add_handler(sock, accept_handler, IOLoop.READ) # 为socket注册handler:当发生READ事件时运行accept_handler函数。

欣欣然我们来到了最后一步

6.IOLoop.current().start(),然我们看下源码

def start(self):

        try:

            while True:    

                callbacks = self._callbacks

                self._callbacks = []

                due_timeouts = []

                # 将时间已到的定时任务放置到due_timeouts中,过程省略

                for callback in callbacks:          # 执行callback

                    self._run_callback(callback)

                for timeout in due_timeouts:        # 执行定时任务

                    if timeout.callback is not None:

                        self._run_callback(timeout.callback)       

                callbacks = callback = due_timeouts = timeout = None    # 释放内存

                # 根据情况设置poll_timeout的值,过程省略

                if not self._running:    # 终止ioloop运行时,在执行完了callback后结束循环

                    breaktry:

                    event_pairs = self._impl.poll(poll_timeout)

                except Exception as e:

                    if errno_from_exception(e) == errno.EINTR:  # 系统调用被信号处理函数中断,进行下一次循环

                        continue

                    else:

                        raise 

                self._events.update(event_pairs)

                while self._events: 

                    fd, events = self._events.popitem()             # 获取一个fd以及对应事件

                    try:

                        fd_obj, handler_func = self._handlers[fd]   # 获取该fd对应的事件处理函数

                        handler_func(fd_obj, events)                # 运行该事件处理函数

                    except (OSError, IOError) as e:         

                        if errno_from_exception(e) == errno.EPIPE:     # 当客户端关闭连接时会产生EPIPE错误                         

                            pass

                        # 其他异常处理已经省略

                fd_obj = handler_func = None       # 释放内存空间    

这一步想了解更多去参考这篇文章http://www.cnblogs.com/MnCu8261/p/6730691.html

代码实例

目前公司有这么一个需求,iphonex–server–ue4,面对两个客户端,达到iphonex把数据给ue4,server起一个代理作用,需求大概就是这样,具体实现代码如下

from tornado.tcpserver import TCPServer

from tornado.iostream import IOStream, StreamClosedError

from tornado import gen

from tornado.ioloop import IOLoop

import struct



class ProxyServer(TCPServer):

    def __init__(self, *args, **kwargs):

        super(ProxyServer, self).__init__(*args, **kwargs)

        self.devices = dict()



    @gen.coroutine

    def handle_stream(self, stream, address):

        device = yield stream.read_bytes(1)

        if device == b"x0a":

            self.handle_iphonex_stream(stream, address)

        elif device == b"x0b":

            self.handle_ue4_stream(stream, address)

        else:

            print("protocol error.")



    @gen.coroutine

    def handle_iphonex_stream(self, stream, address):

        yield stream.write(b"x00")

        print("iphonex")



        # uuid

        rlen = yield stream.read_bytes(4)

        rlen = struct.unpack(">I", rlen)[0]

        uuid = yield stream.read_bytes(rlen)

        uuid = uuid.decode()

        yield stream.write(b"x00")

        print(uuid)



        # keys

        rlen = yield stream.read_bytes(4)

        rlen = struct.unpack(">I", rlen)[0]

        keys = yield stream.read_bytes(rlen)

        keys = keys.decode()

        yield stream.write(b"x00")

        print(keys)



        # save

        self.devices[uuid] = {'keys': keys}



        # data

        keys = keys.split(',')

        fmt = "%df" % len(keys)



        while True:

            try:

                data = yield stream.read_bytes(struct.calcsize(fmt))

            except StreamClosedError:

                print 'iphonex is closed'

                break

            pdata = struct.unpack(fmt, data)

            print(pdata)



            ue4stream = self.devices[uuid].get('ue4')

            if ue4stream:

                try:

                    yield ue4stream.write(data)

                except Exception as e:

                    self.devices[uuid]['ue4'] = None

                    print('request for %s closed' % uuid)



    @gen.coroutine

    def handle_ue4_stream(self, stream, address):

        yield stream.write(b"x00")

        print("ue4")



        # uuid

        rlen = yield stream.read_bytes(4)

        rlen = struct.unpack(">I", rlen)[0]

        uuid = yield stream.read_bytes(rlen)

        uuid = uuid.decode()

        print(uuid)



        if self.devices.get(uuid):

            yield stream.write(b"x00")

        else:

            yield stream.write(b"x01")

            raise Exception



        # send keys

        keys = self.devices[uuid].get('keys')

        stream.write(struct.pack(">I", len(keys)))

        stream.write(keys.encode())



        valid = yield stream.read_bytes(1)

        if valid == b'x

在 Ubuntu 16.04 LTS 系统上安装 Python 3.6

Ubuntu 16.04 LTS 系统默认自带的是 Python 2.7 和 Python 3.5,有时候我们会需要用到 Python 3.6,但是官方的源里是没有 Python 3.6 的 ,今天就介绍一下如何安装 Python 3.6。

未分类

安装方法

1.配置软件仓库,因为 Python 3.6 新版没有发布到 Ubuntu 的正式仓库中,咱们通过第3方仓库来做。在命令行中输入:

sudo add-apt-repository ppa:jonathonf/python-3.6

如果有提示信息,回车即可。

2.检查系统软件包并安装 Python 3.6

sudo apt-get update 
sudo apt-get install python3.6

3.查看 Python 版本信息(现在在你的系统中已经有3个 Python 版本了)

python -V
python3 -V
python3.6 -V

4.通过上图我们看到,新安装的3.6版本需要输入 python3.6 才能使用,那能不能配置我只输入 python3 时就默认使用 3.6 版本呢,当然可以,执行以下命令:

sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.5 1 
sudo update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.6 2 
sudo update-alternatives --config python3

5.最后再确认一下:

python3 -V

6.配置完成后,之前有些 3.5 版本时候装的包可能会提示不存在了,此时重新安装一下即可。

参考文章

https://blog.csdn.net/lzzyok/article/details/77413968

Centos安装Python2.7与Python3.5双版本

前言

最近博主一直忙于工作之中,无法自拔(别问我为什么,因为穷 👿 );最近有个小项目用到了Python,但是无奈服务端的有个Python2.7,但是也不能更新他,因为有程序在用,无奈只能弄双版本的Python,下面就来说说具体的步骤。

未分类

正文

首先,先下载Python的源码包:https://www.python.org/ftp/python/3.5.5/Python-3.5.5.tgz,执行

wget https://www.python.org/ftp/python/3.5.5/Python-3.5.5.tgz

然后解压:

tar zxvf Python-3.5.5.tgz

将解压目录移动至/usr/local:

mv Python-3.5.5 /usr/local

进入目录,然后配置编译:

cd /usr/local/Python-3.5.5
./configure 
make
make install

创建软链接

ln -s /usr/local/Python3.5/python /usr/bin/python3

到这一步就全部完成了,此时在终端输入python3 -V即可看见输出Python 3.5.5

但是有时候安装上Python3,却没有pip3,那么继续执行如下脚本:

wget https://bootstrap.pypa.io/3.2/get-pip.py
python3 get-pip.py 

为了大家方便,我将这些操作整合成了一个shell脚本,运行下方的脚本即可一键安装:

yum install -y wget && wget -O install.sh https://www.licoy.cn/go/api/shell/python3.sh && sh install.sh

后记

有人说,这些东西网上都有,为什么自己还要写一份呢?

我也知道网上有,不过博客主要就是用来记录自己的所见所闻、所知所想,爹有娘有不如自己有,反正就相当于好记性不如烂笔头,自己写一遍更有利于加深对它的理解以及更好的去使用它。

Python读取.wav音频文件

可以使用scipy.io.wavfile.read(somefile)来读取.wav音频文件。它会返回一个元组,第一项为音频的采样率,第二项为音频数据的numpy数组。

用法:

from scipy.io import wavfile
fs, data = wavfile.read('./output/audio.wav')

也可以使用PySoundFile,它也是返回一个元组,指示第一项为数据,第二项为采样率。

用法:

import soundfile as sf
data, samplerate = sf.read('existing_file.wav') 

举例子让你明白python中is和==的区别

在说 is 和 == 的区别之前,我们先理解下python的变量。python的变量和java的变量有很大的区别,因为一个是动态语言,另一个是静态语言。
java的变量就像是个盒子,是把对象的地址装进这个盒子内,就会有大的或者小的盒子。而python的变量像个便利贴,把他贴在哪个地方都可以,不需要管数据类型,只要你喜欢就可以。说太多没有用,得用代码体会下。

a = 1  

上面的代码过程是先在内存中生成了一个int的对象,然后我们把这个便利贴 a 贴在了上面,从此a变量就指向了这个对象。我们还可以看看下面这个。

a = [1, 2]
b = a
b.append(3)
print(a)
# 结果[1, 2, 3]

这里是把a贴在了一个列表上,然后又用b指向a,接着操作b但是却打印了出了b的结果。这是因为a和b都是便利贴,都贴在了同一个列表上,所以操作一个的话,另外一个也会变化。或者我们可以看看这两个变量指向的对象是否是同于一个对象。

print(a is b)
print(id(a), id(b))
# 结果True1461897824584 1461897824584

可以看到,是相等的。

现在我们再看看is和==之间的区别。我们还是先看一个例子。

a = [1, 2, 3]
b = [1, 2, 3]
print(a is b)
print(a == b)

大家可以先猜猜结果是什么。我之前说了python的变量就是个便利贴,[1, 2, 3]都是直接赋值给两个变量,也就是说生成了两个对象,所以a,b指向的对象不一样。所以结果出来了,第一个是False,第二个是True。为什么呢?因为is比较的是对象相同不相同,但是==比较的是值相同不相同。如果打印两个id值的话,显然是不同的。

print(id(a), id(b))
# 结果1735698385160 1735679028936

但是这个呢?

a = 1
b = 1
print(a is b)
print(id(a), id(b))
# 结果True1956929696 1956929696

为什么这个又相等了呢?这是因为python中有个intern机制。

intern机制就是不管你创建了多少个相同的字符串,在python中都是会指向同一个对象的。这是为了防止你不小心创建了多个相同对象而浪费大量内存甚至会发生挤爆内存的后果。有了这个理解,我们再看看下面例子就容易得出答案了。

a = 'str'
b = 's' + 'tr'
print(a is b) # 结果 True
c = "hello world"
d = "hello world"
print(c is d) # 结果 True

再来到==的实际调用,在用==进行判断的时候实际是调用魔法函数eq()来判断他们的值是否相等的。

总的来说,我们在判断是否是同一个对象的时候就用is,不要用==,所以在判断该对象是什么类型的时候建议用is或者直接用isinstance()这个方法。

class Person():
    pass
p = Person()
print(type(p) is Person)  # 结果 True

为什么上面结果是True呢?这是因为我们之前说过类本身也是个对象,用type()方法会指向该对象,又因为这个类,也就是对象是唯一的,所以结果就是True。

完毕!

python 3.x 163邮箱登陆,邮件读取

import  urllib.request
import  urllib.parse
import  http.cookiejar,re

opener = None

# 带Cookie访问
def openurl(parms):
  global opener
  if opener == None:
      #cookie设置
      cj =  http.cookiejar.CookieJar()
      opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cj))
  ret = opener.open(parms)
  return ret

def login_163(**parms):
  #初始化
  parms_key = ['domain','password','username']
  arg = {}
  for key in parms_key:
    if key in parms:
      arg[key] = parms[key]
    else:
      arg[key] = ''
  #获取syscheckcode
  pre_login = arg['domain']
  html = openurl(pre_login).read().decode('utf-8')

  patt = re.compile(r'.*?name=syscheckcode.*?value="(.*?)".*?')
  syscheckcode = patt.search(html)
  if not syscheckcode:
    raise Exception('GET syscheckcode Fail!')
  syscheckcode = syscheckcode.group(1)

  #登陆
  postdata = {
   'syscheckcode':syscheckcode,
   'password':arg['password'],
   'username':arg['username'],
    }
  postdata = urllib.parse.urlencode(postdata)
  postdata = postdata.encode('utf-8')
  req = urllib.request.Request(
    url= arg['domain'],
    data=postdata
    )
  html = openurl(req).read().decode('utf-8')

  thisurl  = 'http://reg.163.com/Main.jsp?username=' + arg['username']
  html = openurl(thisurl).read().decode('utf-8')

  # 获取随机key
  thisurl = 'http://entry.mail.163.com/coremail/fcg/ntesdoor2?verifycookie=1&lightweight=1&from=urs'

  html = openurl(thisurl).read().decode('utf-8')

  patt = re.compile(r'.*[email protected]&sid=(.*?)&from.*?')
  sid = patt.search(html);
  sid = sid.group(1)

  # 获取sid
  thisurl = 'http://mail.163.com/js6/main.jsp?sid=' + sid
  html = openurl(thisurl).read().decode('utf-8')
  thisurl = 'http://mail.163.com/js6/s?sid=' + sid + '&func=mbox:listMessages&topNav_mobileIcon_show=1&TopTabReaderShow=1&TopTabLofterShow=1'

  # 获取邮件key --- 可以读取看看,实际上是一个类似xml的表,所有的邮件都在这里,我们需要的是key,这里是抽取的第一封邮件的key
  html = openurl(thisurl).read().decode('utf-8')
  patt = re.compile(r'.*?name="id">(.*?)</string>.*?')
  key =  patt.search(html);
  key = key.group(1)

  # 获取邮件内容
  thisurl = 'http://mail.163.com/js6/read/readhtml.jsp?mid=' + key
  html = openurl(thisurl).read().decode('utf-8')

  # 测试输出
  print(html)
  # 假设返回假,,这个验证可以最后加上
  flag = True
  #if 'succeedhandle_login' in html:
    #flag = True
  return flag

# 这里是开始,我懒得缩进了 if __name__ == '__main__':
# 用户名 及 密码
while True:
  user = input('input your username:')
  pwd = input('input your password:')
  if len(user) != 0 and len(pwd) != 0:
    break
  print('输入错误')

# 测试网站
dom='https://reg.163.com/logins.jsp'
try:
  flag = login_163(username=user,password=pwd,domain=dom)
  if not flag:
    print('读取失败!')
    exit(0)
  else:
    print('读取成功')
except Exception as e:
   print('Error:',e)

反正大致过程就是上面那样,,,很标准的 post登陆,之后 163 的页面比较特殊,具体可以自己去试试。

那个key页面是抓包之后找到的,通过那个key就能获得每一封邮件了。

整个代码是从一份功能代码中抽出来,因为剩下的内容涉及xxx,所以不发了。