mongoDB之创建用户与认证模式启动

mongoDB创建用户

不管是mongoDB还是MySQL,我们往往都需要创建用户来保证安全性,在这里对这方面内容做一个简单的总结~

创建用户

创建用户有帮助增加数据库安全的作用,在mongoDB中需要下列步骤

  1. 创建管理员
  2. 授权认证
  3. 给使用的数据库添加用户

在shell中执行以下操作

sudo service mongod start         # 启动mongod
mongo                             # 以非授权的方式启动

这时我们可以访问到任意一个不需要认证的数据库(比如我们用一个可视化客户端打开)

未分类

如果使用授权方式启动mongoDB,而我们不去登录的话我们也是无法使用的,所以我们要先创建一个管理员账号

创建管理员账号

首先非授权方式启动我们的mongoDB

> use admin # 创建admin数据库
> db.createUser({user:"admin",pwd:"admin",roles:["root"]}) # 创建一个用户名为admin,密码为admin,身份为管理员的User
> db.auth("admin","admin") # 进行认证
1       # 显示1,认证成功

这样,我们的数据库就有了自己的管理员
下面我们为创建一个有用户的db

> use demo1 # 切换数据库
switched to db demo1

 # 创建一个用户名为demo1user,密码为demo1,身份为数据库用户,拥有demo1的User
> db.createUser({user:"demo1user",pwd:"demo1",roles:[{role:"dbOwner",db:"demo1"}]})
Successfully added user: {
    "user" : "demo1user",
    "roles" : [
        {
            "role" : "dbOwner",
            "db" : "demo1"
        }
    ]
}

手动以认证模式启动

经过我的尝试,我发现以认证方式启动mongd的方式有很多,我选择一种我比较喜欢的列在下面:
在安装mongoDB之后,如果是使用和我一样的方式安装的,那么将会自动生成一个配置文件,位于/etc/mongod.conf,我们也可以用这样的方式启动mongod

sudo mongod -f /etc/mongod.conf --auth # 认证模式启动
sudo mongod -f /etc/mongod.conf  # 非认证模式启动

默认通过认证模式启动

一切脱离版本的配置都是耍流氓
如果不想每次都带上–auth参数的haunted,我们可以在配置文件中修改,也就是对/etc/mongod.conf进行修改,我们以官网文档为准https://docs.mongodb.com/manual/reference/configuration-options/

未分类

关于用户认证方面的配置,点击上面的security仔细阅读一下

下面贴上我的配置文件片段(最后两行就是默认以用户认证方式启动)

# mongod.conf

# for documentation of all options, see:
#   http://docs.mongodb.org/manual/reference/configuration-options/

# Where and how to store data.
storage:
  dbPath: /var/lib/mongodb
  journal:
    enabled: true
#  engine:
#  mmapv1:
#  wiredTiger:

# where to write logging data.
systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.log

# network interfaces
net:
  port: 27017
  bindIp: 127.0.0.1


# how the process runs
processManagement:
  timeZoneInfo: /usr/share/zoneinfo

security:
  authorization: enabled

如此一来,不管是sudo mongod -f /etc/mongod.conf还是sudo service mongod start都是以认证模式启动的了。

数据库中间件 MyCAT 源码分析 —— SQL ON MongoDB

本文主要基于 MyCAT 1.6.5 正式版

1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。

本文主要分成四部分:

  1. 总体流程,让你有个整体的认识
  2. 查询操作
  3. 插入操作
  4. 彩蛋,????彩蛋,????彩蛋

建议你看过这两篇文章(非必须):

  1. 《MyCAT 源码分析 —— 【单库单表】插入》https://link.juejin.im/?target=http%3A%2F%2Fwww.iocoder.cn%2FMyCAT%2Fsingle-db-single-table-insert%2F%3Fself
  2. 《MyCAT 源码分析 —— 【单库单表】查询》https://link.juejin.im/?target=http%3A%2F%2Fwww.iocoder.cn%2FMyCAT%2Fsingle-db-single-table-select%2F%3Fself

2. 主流程

未分类

  1. MyCAT Server 接收 MySQL Client 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDB Server。
  2. MyCAT Server 接收 MongoDB Server 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQL Client。

这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。

未分类

Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。

MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。

未分类

是不是熟悉的味道。不得不说 JDBC 规范的精妙。

3. 查询操作

SELECT id, name FROM user WHERE name > '' ORDER BY _id DESC;

未分类

看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。

3.1 查询 MongoDB

// MongoSQLParser.java
public MongoData query() throws MongoSQLException {
   if (!(statement instanceof SQLSelectStatement)) {
       //return null;
       throw new IllegalArgumentException("not a query sql statement");
   }
   MongoData mongo = new MongoData();
   DBCursor c = null;
   SQLSelectStatement selectStmt = (SQLSelectStatement) statement;
   SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();
   int icount = 0;
   if (sqlSelectQuery instanceof MySqlSelectQueryBlock) {
       MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();

       BasicDBObject fields = new BasicDBObject();

       // 显示(返回)的字段
       for (SQLSelectItem item : mysqlSelectQuery.getSelectList()) {
           //System.out.println(item.toString());
           if (!(item.getExpr() instanceof SQLAllColumnExpr)) {
               if (item.getExpr() instanceof SQLAggregateExpr) {
                   SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();
                   if (expr.getMethodName().equals("COUNT")) { // TODO 待读:count(*)
                       icount = 1;
                       mongo.setField(getExprFieldName(expr), Types.BIGINT);
                   }
                   fields.put(getExprFieldName(expr), 1);
               } else {
                   fields.put(getFieldName(item), 1);
               }
           }

       }

       // 表名
       SQLTableSource table = mysqlSelectQuery.getFrom();
       DBCollection coll = this._db.getCollection(table.toString());
       mongo.setTable(table.toString());

       // WHERE
       SQLExpr expr = mysqlSelectQuery.getWhere();
       DBObject query = parserWhere(expr);

       // GROUP BY
       SQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();
       BasicDBObject gbkey = new BasicDBObject();
       if (groupby != null) {
           for (SQLExpr gbexpr : groupby.getItems()) {
               if (gbexpr instanceof SQLIdentifierExpr) {
                   String name = ((SQLIdentifierExpr) gbexpr).getName();
                   gbkey.put(name, Integer.valueOf(1));
               }
           }
           icount = 2;
       }

       // SKIP / LIMIT
       int limitoff = 0;
       int limitnum = 0;
       if (mysqlSelectQuery.getLimit() != null) {
           limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());
           limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());
       }
       if (icount == 1) { // COUNT(*)
           mongo.setCount(coll.count(query));
       } else if (icount == 2) { // MapReduce
           BasicDBObject initial = new BasicDBObject();
           initial.put("num", 0);
           String reduce = "function (obj, prev) { " + "  prev.num++}";
           mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));
       } else {
           if ((limitoff > 0) || (limitnum > 0)) {
               c = coll.find(query, fields).skip(limitoff).limit(limitnum);
           } else {
               c = coll.find(query, fields);
           }

           // order by
           SQLOrderBy orderby = mysqlSelectQuery.getOrderBy();
           if (orderby != null) {
               BasicDBObject order = new BasicDBObject();
               for (int i = 0; i < orderby.getItems().size(); i++) {
                   SQLSelectOrderByItem orderitem = orderby.getItems().get(i);
                   order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));
               }
               c.sort(order);
               // System.out.println(order);
           }
       }
       mongo.setCursor(c);
   }
   return mongo;
}

3.2 查询条件

// MongoSQLParser.java
private void parserWhere(SQLExpr aexpr, BasicDBObject o) {
   if (aexpr instanceof SQLBinaryOpExpr) {
       SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr;
       SQLExpr exprL = expr.getLeft();
       if (!(exprL instanceof SQLBinaryOpExpr)) {
           if (expr.getOperator().getName().equals("=")) {
               o.put(exprL.toString(), getExpValue(expr.getRight()));
           } else {
               String op = "";
               if (expr.getOperator().getName().equals("<")) {
                   op = "$lt";
               } else if (expr.getOperator().getName().equals("<=")) {
                   op = "$lte";
               } else if (expr.getOperator().getName().equals(">")) {
                   op = "$gt";
               } else if (expr.getOperator().getName().equals(">=")) {
                   op = "$gte";
               } else if (expr.getOperator().getName().equals("!=")) {
                   op = "$ne";
               } else if (expr.getOperator().getName().equals("<>")) {
                   op = "$ne";
               }
               parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));
           }
       } else {
           if (expr.getOperator().getName().equals("AND")) {
               parserWhere(exprL, o);
               parserWhere(expr.getRight(), o);
           } else if (expr.getOperator().getName().equals("OR")) {
               orWhere(exprL, expr.getRight(), o);
           } else {
               throw new RuntimeException("Can't identify the operation of  of where");
           }
       }
   }
}

private void orWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob) {
   BasicDBObject xo = new BasicDBObject();
   BasicDBObject yo = new BasicDBObject();
   parserWhere(exprL, xo);
   parserWhere(exprR, yo);
   ob.put("$or", new Object[]{xo, yo});
}

3.3 解析 MongoDB 数据

// MongoResultSet.java
public MongoResultSet(MongoData mongo, String schema) throws SQLException {
   this._cursor = mongo.getCursor();
   this._schema = schema;
   this._table = mongo.getTable();
   this.isSum = mongo.getCount() > 0;
   this._sum = mongo.getCount();
   this.isGroupBy = mongo.getType();

   if (this.isGroupBy) {
       dblist = mongo.getGrouyBys();
       this.isSum = true;
   }
   if (this._cursor != null) {
       select = _cursor.getKeysWanted().keySet().toArray(new String[0]);
       // 解析 fields
       if (this._cursor.hasNext()) {
           _cur = _cursor.next();
           if (_cur != null) {
               if (select.length == 0) {
                   SetFields(_cur.keySet());
               }
               _row = 1;
           }
       }
       // 设置 fields 类型
       if (select.length == 0) {
           select = new String[]{"_id"};
           SetFieldType(true);
       } else {
           SetFieldType(false);
       }
   } else {
       SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};
       SetFieldType(mongo.getFields());
   }
}
  • 当使用 SELECT * 查询字段时,fields 使用第一条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。

3.4 返回数据给 MySQL Client

// JDBCConnection.java
private void ouputResultSet(ServerConnection sc, String sql)
       throws SQLException {
   ResultSet rs = null;
   Statement stmt = null;

   try {
       stmt = con.createStatement();
       rs = stmt.executeQuery(sql);

       // header
       List<FieldPacket> fieldPks = new LinkedList<>();
       ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark);
       int colunmCount = fieldPks.size();
       ByteBuffer byteBuf = sc.allocate();
       ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();
       headerPkg.fieldCount = fieldPks.size();
       headerPkg.packetId = ++packetId;
       byteBuf = headerPkg.write(byteBuf, sc, true);
       byteBuf.flip();
       byte[] header = new byte[byteBuf.limit()];
       byteBuf.get(header);
       byteBuf.clear();
       List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size());
       for (FieldPacket curField : fieldPks) {
           curField.packetId = ++packetId;
           byteBuf = curField.write(byteBuf, sc, false);
           byteBuf.flip();
           byte[] field = new byte[byteBuf.limit()];
           byteBuf.get(field);
           byteBuf.clear();
           fields.add(field);
       }
       // header eof
       EOFPacket eofPckg = new EOFPacket();
       eofPckg.packetId = ++packetId;
       byteBuf = eofPckg.write(byteBuf, sc, false);
       byteBuf.flip();
       byte[] eof = new byte[byteBuf.limit()];
       byteBuf.get(eof);
       byteBuf.clear();
       this.respHandler.fieldEofResponse(header, fields, eof, this);

       // row
       while (rs.next()) {
           RowDataPacket curRow = new RowDataPacket(colunmCount);
           for (int i = 0; i < colunmCount; i++) {
               int j = i + 1;
               if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {
                   curRow.add(rs.getBytes(j));
               } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||
                       fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte
                   // ensure that do not use scientific notation format
                   BigDecimal val = rs.getBigDecimal(j);
                   curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset()));
               } else {
                   curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));
               }
           }
           curRow.packetId = ++packetId;
           byteBuf = curRow.write(byteBuf, sc, false);
           byteBuf.flip();
           byte[] row = new byte[byteBuf.limit()];
           byteBuf.get(row);
           byteBuf.clear();
           this.respHandler.rowResponse(row, this);
       }
       fieldPks.clear();
       // row eof
       eofPckg = new EOFPacket();
       eofPckg.packetId = ++packetId;
       byteBuf = eofPckg.write(byteBuf, sc, false);
       byteBuf.flip();
       eof = new byte[byteBuf.limit()];
       byteBuf.get(eof);
       sc.recycle(byteBuf);
       this.respHandler.rowEofResponse(eof, this);
   } finally {
       if (rs != null) {
           try {
               rs.close();
           } catch (SQLException e) {
           }
       }
       if (stmt != null) {
           try {
               stmt.close();
           } catch (SQLException e) {
           }
       }
   }
}

// MongoResultSet.java
@Override
public String getString(String columnLabel) throws SQLException {
   Object x = getObject(columnLabel);
   if (x == null) {
       return null;
   }
   return x.toString();
}
  • 当返回字段值是 Object 时,返回该对象.toString()。例如:
mysql> select * from user order by _id asc;
+--------------------------+------+-------------------------------+
| _id                      | name | profile                       |
+--------------------------+------+-------------------------------+
| 1                        | 123  | { "age" : 1 , "height" : 100} |

4. 插入操作

未分类

// MongoSQLParser.java
public int executeUpdate() throws MongoSQLException {
   if (statement instanceof SQLInsertStatement) {
       return InsertData((SQLInsertStatement) statement);
   }
   if (statement instanceof SQLUpdateStatement) {
       return UpData((SQLUpdateStatement) statement);
   }
   if (statement instanceof SQLDropTableStatement) {
       return dropTable((SQLDropTableStatement) statement);
   }
   if (statement instanceof SQLDeleteStatement) {
       return DeleteDate((SQLDeleteStatement) statement);
   }
   if (statement instanceof SQLCreateTableStatement) {
       return 1;
   }
   return 1;
}

private int InsertData(SQLInsertStatement state) {
   if (state.getValues().getValues().size() == 0) {
       throw new RuntimeException("number of  columns error");
   }
   if (state.getValues().getValues().size() != state.getColumns().size()) {
       throw new RuntimeException("number of values and columns have to match");
   }
   SQLTableSource table = state.getTableSource();
   BasicDBObject o = new BasicDBObject();
   int i = 0;
   for (SQLExpr col : state.getColumns()) {
       o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));
       i++;
   }
   DBCollection coll = this._db.getCollection(table.toString());
   coll.insert(o);
   return 1;
}

5. 彩蛋

1、支持多 MongoDB ,并使用 MyCAT 进行分片。

MyCAT 配置:https://link.juejin.im/?target=https%3A%2F%2Fgithub.com%2FYunaiV%2FMycat-Server%2Ftree%2F1.6%2Fsrc%2Ftest%2Fresources%2Fmulti_mongodb

2、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。

查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。

MyCAT 配置:https://link.juejin.im/?target=https%3A%2F%2Fgithub.com%2FYunaiV%2FMycat-Server%2Ftree%2F1.6%2Fsrc%2Ftest%2Fresources%2Fsingle_mongodb_mysql

3、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。

MyCAT 配置:https://link.juejin.im/?target=https%3A%2F%2Fgithub.com%2FYunaiV%2FMycat-Server%2Ftree%2F1.6%2Fsrc%2Ftest%2Fresources%2Fsingle_mongodb

EOS 区块链数据实时异构到 MongoDB

从 EOSIO 1.1.0 开始,已经默认支持 MongoDB,所以本文提供的方法仅做参考。

0x00 背景

执行 eosio_build.sh 脚本编译 nodeos 会默认安装 mongodb,但是从 Dawn 4.0 开始,mongo_db_plugin 插件不再生效,详情请参考(https://github.com/EOSIO/eos/issues/3030)。(https://github.com/EOSIO/eos/pull/4304)PR 重新支持 MongoDB,本文讲解如何将链上数据实时同步到 MongoDB。

未分类

0x01 部署

部署的思路如下:

  • 编译支持 mongo_db_plugin 的 nodeos
  • 配置 MongoDB
  • 启动 MongoDB
  • 创建数据库
  • 启动 nodeos,同步主网数据
  • 查看 MongoDB,确认数据

接下来我们逐步讲解。

由于 EOSIO 代码库有 gh#3030-enable-mongodb 分支,所以可以使用 git 自动将支持 mongo_db_plugin 插件的代码合并,详细命令如下:

$ mkdir -p /data/mongodb && cd /data/mongodb
$ git clone -b release/1.1 https://github.com/EOSIO/eos.git --recursive
$ cd eos
$ git fetch --all --tags --prune
$ git merge --m "merge" --commit origin/gh#3030-enable-mongodb
$ git submodule update --init --recursive
$ ./eosio_build.sh

如果 nodeos 编译成功,MongoDB 自动安装,路径在 $USER/opt/mongodb。执行 /data/mongodb/eos/build/programs/nodeos/nodeos –help | grep mongo,如果有如下的输出,表示 MongoDB 插件编译成功。

Config Options for eosio::mongo_db_plugin:
  -q [ --mongodb-queue-size ] arg (=256)
  --mongodb-wipe                        Required with --replay-blockchain,
                                        --delete-all-blocks to wipe mongo
                                        accidental wipe of mongo db.
  --mongodb-block-start arg (=0)        If specified then only abi data pushed
                                        to mongodb until specified block is
  -m [ --mongodb-uri ] arg              MongoDB URI connection string, see:
                                        https://docs.mongodb.com/master/referen
                                        in URI. Example: mongodb://127.0.0.1:27

接下来我们配置 MongoDB。

$ mkdir -p /data/mongodb/db /data/mongodb/logs
$ touch /data/mongodb/mongodb.conf

mongodb.conf 配置文件内容如下:

systemLog:
  destination: file
  path: /data/mongodb/logs/mongo.log
  logAppend: true
storage:
  dbPath: /data/mongodb/db
  journal:
   enabled: true
net:
  bindIp: 127.0.0.1
  port: 27017

接着启动 MongoDB,并且创建 eos 数据库。

$ /$USER/opt/mongodb/bin/mongod -f /data/mongodb/mongodb.conf --fork
$ /$USER/opt/mongodb/bin/mongo --port 27017
$ use eos

然后配置 EOS 主网 fullnode。

$ mkdir /data/mainnet
$ cd /data/mainnet
$ git clone https://github.com/superoneio/eos-mainnet
$ mkdir -p data logs config
$ cp eos-mainnet/config.ini mainnet/config
$ cp eos-mainnet/genesis.json mainnet
$ cp eos-mainnet/*.sh mainnet
$ chmod +x mainnet

修改 config.ini 相关配置,添加如下参数:

plugin = eosio::mongo_db_plugin
mongodb-uri = mongodb://127.0.0.1:27017/eos

最后执行 start.sh,同步主网数据。

0x02 测试

我们执行 $ /$USER/opt/mongodb/bin/mongo –port 27017 登录到 MongoDB,可以对同步的数据进行校验。

> use eos
> show dbs
admin   0.000GB
config  0.000GB
eos     0.031GB
local   0.000GB
> use eos
switched to db eos
> show tables;
accounts
actions
block_states
blocks
transaction_traces
transactions
> db.accounts.find({name:{$eq:'eosio.ram'}})
{ "_id" : ObjectId("5b4163f2992ecd51b4277f77"), "name" : "eosio.ram", "createdAt" : ISODate("2018-07-08T01:08:02.071Z") }

0x03 小结

之前介绍了将 EOS 主网数据同步到 MySQL,本文讲解了另一种同步到文档型数据库的方法。MySQL 和 MongoDB 最大的区别在于,MySQL 是传统的关系型数据库,支持 SQL 标准,而 MongoDB 是文档型数据库。MySQL 天生适用于结构化数据以及需要使用事务的场景,天生支持 SQL 标准,对开发者比较友好。

MongoDB 适合如下场景:[1]

  • 表结构不明确且数据不断变大,MongoDB 是非结构化文档数据库,扩展字段很容易且不会影响原有数据
  • 更高的写入负载,MongoDB 侧重高数据写入的性能,而非事务安全
  • 数据量很大或者将来会变得很大,MongoDB 内建了 Sharding、数据分片的特性,容易水平扩展
  • 高可用性,MongoDB 自带高可用,自动主从切换(副本集)

而 MongoDB 不支持事务、不支持 JOIN,所以涉及事务和复杂查询的场景不适合 MySQL。

关于 MongoDB 和 MySQL 的对比,可以点击 https://ruby-china.org/topics/27659 查看。

0x04 参考

  • [1] 张家江 (2017–09–07). SOCI Installation. Retrieved from http://tech.lede.com/2017/09/07/rd/server/MongoDBvsMysql

认识 MongoDB 4.0 的新特性——事务(Transactions)

前言

相信使用过主流的关系型数据库的朋友对“事务(Transactions)”不会太陌生,它可以让我们把对多张表的多次数据库操作整合为一次原子操作,这在高并发场景下可以保证多个数据操作之间的互不干扰;并且一旦在这些操作过程任一环节中出现了错误,事务会中止并且让数据回滚,这使得同时在多张表中修改数据的时候保证了数据的一致性。

以前 MongoDB 是不支持事务的,因此开发者在需要用到事务的时候,不得不借用其他工具,在业务代码层面去弥补数据库的不足。随着 4.0 版本的发布,MongoDB 也为我们带来了原生的事务操作,下面就让我们一起来认识它,并通过简单的例子了解如何去使用。

未分类

介绍

事务和副本集(Replica Sets)

副本集是 MongoDB 的一种主副节点架构,它使数据得到最大的可用性,避免单点故障引起的整个服务不能访问的情况的发生。目前 MongoDB 的多表事务操作仅支持在副本集上运行,想要在本地环境安装运行副本集可以借助一个工具包——run-rs,以下的文章中有详细的使用说明:

https://thecodebarbarian.com/introducing-run-rs-zero-config-mongodb-runner.html

事务和会话(Sessions)

事务和会话(Sessions)关联,一个会话同一时刻只能开启一个事务操作,当一个会话断开,这个会话中的事务也会结束。

事务中的函数

  • Session.startTransaction()

在当前会话中开始一次事务,事务开启后就可以开始进行数据操作。在事务中执行的数据操作是对外隔离的,也就是说事务中的操作是原子性的。

  • Session.commitTransaction()

提交事务,将事务中对数据的修改进行保存,然后结束当前事务,一次事务在提交之前的数据操作对外都是不可见的。

  • Session.abortTransaction()

中止当前的事务,并将事务中执行过的数据修改回滚。

重试

当事务运行中报错,catch 到的错误对象中会包含一个属性名为 errorLabels 的数组,当这个数组中包含以下2个元素的时候,代表我们可以重新发起相应的事务操作。

  • TransientTransactionError:出现在事务开启以及随后的数据操作阶段
  • UnknownTransactionCommitResult:出现在提交事务阶段

示例

经过上面的铺垫,你是不是已经迫不及待想知道究竟应该怎么写代码去完成一次完整的事务操作?下面我们就简单写一个例子:

场景描述: 假设一个交易系统中有2张表——记录商品的名称、库存数量等信息的表 commodities,和记录订单的表 orders。当用户下单的时候,首先要找到 commodities 表中对应的商品,判断库存数量是否满足该笔订单的需求,是的话则减去相应的值,然后在 orders 表中插入一条订单数据。在高并发场景下,可能在查询库存数量和减少库存的过程中,又收到了一次新的创建订单请求,这个时候可能就会出问题,因为新的请求在查询库存的时候,上一次操作还未完成减少库存的操作,这个时候查询到的库存数量可能是充足的,于是开始执行后续的操作,实际上可能上一次操作减少了库存后,库存的数量就已经不足了,于是新的下单请求可能就会导致实际创建的订单数量超过库存数量。

以往要解决这个问题,我们可以用给商品数据“加锁”的方式,比如基于 Redis 的各种锁,同一时刻只允许一个订单操作一个商品数据,这种方案能解决问题,缺点就是代码更复杂了,并且性能会比较低。如果用数据库事务的方式就可以简洁很多:

commodities 表数据(stock 为库存):

{ "_id" : ObjectId("5af0776263426f87dd69319a"), "name" : "灭霸原味手套", "stock" : 5 }
{ "_id" : ObjectId("5af0776263426f87dd693198"), "name" : "雷神专用铁锤", "stock" : 2 }

orders 表数据:

{ "_id" : ObjectId("5af07daa051d92f02462644c"), "commodity": ObjectId("5af0776263426f87dd69319a"), "amount": 2 }
{ "_id" : ObjectId("5af07daa051d92f02462644b"), "commodity": ObjectId("5af0776263426f87dd693198"), "amount": 3 }

通过一次事务完成创建订单操作(mongo Shell):

// 执行 txnFunc 并且在遇到 TransientTransactionError 的时候重试
function runTransactionWithRetry(txnFunc, session) {
  while (true) {
    try {
      txnFunc(session); // 执行事务
      break;
    } catch (error) {
      if (
        error.hasOwnProperty('errorLabels') &&
        error.errorLabels.includes('TransientTransactionError')
      ) {
        print('TransientTransactionError, retrying transaction ...');
        continue;
      } else {
        throw error;
      }
    }
  }
}

// 提交事务并且在遇到 UnknownTransactionCommitResult 的时候重试
function commitWithRetry(session) {
  while (true) {
    try {
      session.commitTransaction();
      print('Transaction committed.');
      break;
    } catch (error) {
      if (
        error.hasOwnProperty('errorLabels') &&
        error.errorLabels.includes('UnknownTransactionCommitResult')
      ) {
        print('UnknownTransactionCommitResult, retrying commit operation ...');
        continue;
      } else {
        print('Error during commit ...');
        throw error;
      }
    }
  }
}

// 在一次事务中完成创建订单操作
function createOrder(session) {
  var commoditiesCollection = session.getDatabase('mall').commodities;
  var ordersCollection = session.getDatabase('mall').orders;
  // 假设该笔订单中商品的数量
  var orderAmount = 3;
  // 假设商品的ID
  var commodityID = ObjectId('5af0776263426f87dd69319a');

  session.startTransaction({
    readConcern: { level: 'snapshot' },
    writeConcern: { w: 'majority' },
  });

  try {
    var { stock } = commoditiesCollection.findOne({ _id: commodityID });
    if (stock < orderAmount) {
      print('Stock is not enough');
      session.abortTransaction();
      throw new Error('Stock is not enough');
    }
    commoditiesCollection.updateOne(
      { _id: commodityID },
      { $inc: { stock: -orderAmount } }
    );
    ordersCollection.insertOne({
      commodity: commodityID,
      amount: orderAmount,
    });
  } catch (error) {
    print('Caught exception during transaction, aborting.');
    session.abortTransaction();
    throw error;
  }

  commitWithRetry(session);
}

// 发起一次会话
var session = db.getMongo().startSession({ readPreference: { mode: 'primary' } });

try {
  runTransactionWithRetry(createOrder, session);
} catch (error) {
  // 错误处理
} finally {
  session.endSession();
}

上面的代码看着感觉很多,其实 runTransactionWithRetry 和 commitWithRetry 这两个函数都是可以抽离出来成为公共函数的,不需要每次操作都重复书写。用上了事务之后,因为事务中的数据操作都是一次原子操作,所以我们就不需要考虑分布并发导致的数据一致性的问题,是不是感觉简单了许多?

你可能注意到了,代码中在执行 startTransaction 的时候设置了两个参数——readConcern 和 writeConcern,这是 MongoDB 读写操作的确认级别,在这里用于在副本集中平衡数据读写操作的可靠性和性能,如果在这里展开就太多了,所以感兴趣的朋友建议去阅读官方文档了解一下:

readConcern:

https://docs.mongodb.com/master/reference/read-concern/

writeConcern:

https://docs.mongodb.com/master/reference/write-concern/

mongodb使用总结

未分类

MongoDB 是一个基于分布式文件存储的数据库。由 C++ 语言编写。旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。

MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。是世界上最大的nosql(not only sql)数据库。执行mongodb数据库需要mongod环境和mongo环境。

数据库

概念:存储数据的仓库我们称为数据库。数据库分为非关系型数据库和关系型数据库。关系型数据库(Oracle,mysql,db2,…)往往以表结构的形式进行
存储

mongodb和mysql的区别

  • 前者非关系型数据库,后者是关系型数据库
  • mongodb中是以集合的形式来充当mysql中的表结构
  • mongodb中的数据是以文档的形式进行存储

未分类

mongodb的优点

  • 面向文档存储的数据库(BSON的数据格式)

未分类

  • 有丰富的查询指令
  • 支持索引
  • 具有分片系统
  • 无模式

mongodb的缺点

  • 占用的空间比较大
  • 不支持事务
  • 对于windows来说,它不支持32位的系统

mongodb常用指令

  • show dbs 查看当前所有数据库

未分类

  • use database_name 创建数据库
  • db 查询当前使用的数据库
  • db.stats() 查询当前使用的数据库信息
  • db.dropDatabase() 删除当前数据库

未分类

  • db.help() 获取查询帮助
  • db.database_name.help() 获取指定数据库查询帮助
  • db.collection_name.find() 查询集合的信息
  • db.createCollection(coll_name,options) 创建集合
  • db.getCollectionNames() 查询所有集合
  • db.getCollection(coll_name) 查询某一个特定集合
  • db.coll_name.drop() 对集合的删除
  • db.printCollectionStats() 打印当前数据库中所有集合的状态
  • db.coll_name.insert/insertMany/save/insertOne 添加一条/多条数据
  • db.coll_name.update(query,info,con,muti) 修改数据(query: 查询的条件;info: 要更新的信息;con: 给异步操作提供扩展;muti: 返回布尔类型 默认false)(这里涉及到几个特殊属性$inc和$set 前者为相加后者为设置)
  • db.coll_name.remove(query) 删除数据(query 删除的条件)

未分类

  • 对数据的查询
  • db.coll_name.find() 查询所有信息
  • db.coll_name.find({"age": 18}) 查询某一条信息
  • db.coll_name.find({age: {$gt: 22}}) gt大于某一条件
  • db.coll_name.find({age: {$lt: 22}}) lt小于某一条件
  • db.coll_name.find({age: {$gte: 22}}) gt大于等于某一条件
  • db.coll_name.find({age: {$lte: 22}}) lte小于等于某一条件
  • db.coll_name.find({title: /好/}) 模糊查询

mongodb术语概念

未分类

项目中使用mongodb

切换到指定项目 npm init生成package.json
npm install mongodb -g 全局安装
npm install mongodb --save-dev 局部安装

mongodb.js

var Mongodb = require("mongodb")
// 连接到mongodb的服务端口
var server = new Mongodb.Server("localhost",27017,{auto_reconnect:true})
//创建数据库
var db = new Mongodb.Db('cloud',server,{safe:true})
//连接数据库
db.open((err,db) => {
    if(err) {
        console.log('连接数据库失败')
    } else {
        console.log('连接数据库成功')
    }
})

启动Memcached支援

今天闲来没事,看了下宝塔的面板,发现宝塔有一个Memcached的功能,于是百度了下。

解释是这样的:
Memcached配合typecho的插件可以让您的博客支持更大的并发

em。。。。我的烂机子正好需要这个东西!

遂开始进行折腾。我们以宝塔面板为例子(方便):

找到你的php相应的版本,然后安装Memcached拓展。

未分类

稍等一阵子就好了,php.ini不用你手动设置哦,宝塔会自己帮你设置好的。
然后我们来安装typecho的支援插件!

cd
cd /www/wwwroot/你的域名/usr/plugins
git clone https://github.com/phpgao/TpCache.git

然后启用!并且配置相关信息!

未分类

然后就可以了。若有不懂的可以到作者这个项目下查看:https://github.com/phpgao/TpCache

CentOS7下安装memcached服务

首先下载memcached

wget http://www.memcached.org/files/memcached-1.5.9.tar.gz

安装前需要先安装libevent

yum -y install libevent libevent-devel
#解压
tar zxvf memcached-1.5.9.tar.gz
#进入目录
cd memcached-1.5.9
#配置
./configure --prefix=/usr/local/memcached --prefix=/usr/local/memcached/
#编译
make
#安装
make install
#启动memcached
/usr/local/memcached/bin/memcached -m 10 -u root &

注:-m 内存(单位为M) -u 用户 另外还可以有其他参数-l 主机IP -p 端口

如需设置开机自启动可以按如下方式编辑文件,然后加入启动命令

vi /etc/rc.d/rc.local

注:可能需要赋予rc.local文件可执行权限才可以开机执行

PHP 和 Python 基于 UDP 协议操作 memcached

在写完(https://mp.weixin.qq.com/s?__biz=MzAwOTU4NzM5Ng==&mid=2455770298&idx=1&sn=1a6232862a977c9bc85d99620a9e8499&scene=21#wechat_redirect)这篇文章后,我重新燃起了对 memcached 的兴趣,在新浪博客的时候,我们很早就使用了 memcached,但由于大部分服务使用了 squid 缓存,所以 memcached 没有大规模使用,十年过去了,我对 memcached 的认知也越来越肤浅了,乘着这次机会,我重新看了一遍 memcached 官方 wiki,打算写几篇文章回顾下,今天聊第一个话题,那就是如何基于 UDP 协议操作 memcached。

首先 memcached 支持 TCP 和 UDP 协议,至于两者的差别不是本文的重点,本文主要讲解如何在 PHP 和 Python 中以 UDP 协议的方式操作 memcached。

memcached 服务如何开启 UDP

由于出现过 memcached UDP 反射攻击,所以很多 linux 发行版默认启动的是关闭 UDP 端口的,如果你想开启,可以执行下列命令:

$ memcached -m 64 -p 11212 -U 11211 -u memcache -l 127.0.0.1

-U 参数表示指定 UDP 端口,如果 -U 参数的值为0 表示关闭 UDP 端口。

一旦执行上列命令,表示 memcached 服务器端同时监听 11211 的 UDP 和 TCP 端口,通过下列命令可看出:

$ netstat -an | grep 11211
tcp 0 0 127.0.0.1:11211 0.0.0.0:* LISTEN     
udp 0 0 127.0.0.1:11211 0.0.0.0:*

命令行 UDP 连接 memcached

在 Linux 中,telnet 只能运行于 TCP 模式,所以为了演示,只能采用 nc 命令行。UDP 操作 memcached,在操作数据的时候必须增加一个 frame 头,后续的数据格式和 TCP 操作 memcached 一样,查看官方手册:

 The frame header is 8 bytes long, as follows (all values are 16-bit integers

 in network byte order, high byte first):


 0-1 Request ID.

 2-3 Sequence number.

 4-5 Total number of datagrams in this message.

 6-7 Reserved for future use; must be 0.

为了说的简单点,可以执行下列命令,以 UDP 协议操作 memcached:

$ printf 'x00x00x00x00x00x01x00x00statsrn' | nc -u 127.0.0.1 11211 
$ printf 'x00x00x00x00x00x01x00x00set v 0 0 1rnxrn' | nc -u 127.0.0.1 11211 
$ printf 'x00x00x00x00x00x01x00x00get vrn' | nc -u 127.0.0.1 11211

PHP 以 UDP 协议操作 memcached

php-memcached 扩展也支持 UDP 协议操作 memcached,但并不鼓励,所以官方文档介绍 UDP 操作非常少,我也是查了官方的 Issues 才明白的。另外即使支持,UDP 操作也有限制,比如 set 命令支持 UDP 协议,但 get 命令就不支持,至于原因,大家可以思考下,后续我会简单说一说。

先看代码:

$m_udp = new Memcached();
# 使用 UDP 协议模式
$m_udp->setOption(Memcached::OPT_USE_UDP, true);
# 注意,支持文本模式的协议,而非二进制协议
$m_udp->setOption(Memcached::OPT_BINARY_PROTOCOL, false);

$m_udp->addServer('127.0.0.1', 11211, 1);

echo $m_udp->get('y');
var_dump($m_udp->getResultMessage());

输出 string(20) “ACTION NOT SUPPORTED”,可以看出 php-memcached 扩展做了限制,不允许 UDP 协议操作 get 命令。

$m_udp->set('y',"ok");
var_dump($m_udp->getResultMessage());
$m_tcp =new Memcached();
# 切换为默认的 TCP 连接方式
$m_tcp->addServer('127.0.0.1', 11211, 1);
echo $m_tcp->get("y");

执行完毕,成功输出 ok。

Python 以 UDP 协议操作 memcached

Python 有专门的包基于 UDP 协议操作 memcached,这就是 python-memcached-udp 包,安装后,演示一个例子:

client = memcached_udp.Client([('localhost', 11211)])
client.set('key1', 'value1')
r = client.get('key1')
print (r)

大家可以看看这个包的源代码,非常有意思,可以学到很多 memcached 命令知识。

Centos 7安装Nginx+PHP+MariaDB环境搭建WordPress博客

WordPress是一个免费的开源项目,是使用PHP语言开发的博客平台,用户可以在支持PHP和MySQL数据库的服务器上架设属于自己的网站。也可以把 WordPress当作一个内容管理系统(CMS)来使用,国内外有不少的知名网站建设都是基于WordPress程序,WordPress有许多第三方开发的免费模板和免费插件,安装方式简单易用,相信很多人的第一个站点都是基于WordPress建的。访问官网https://wordpress.org/

在安装ShadowsocksR服务之前希望先对服务器做基本安全配置(非强制) 跳转链接https://www.gyuryong.com/index.php/archives/18/

运行环境搭建

WordPress基于PHP开发的,相信是大家最熟悉也是最容易部署的Web项目了。环境准备:lnmp(linux+nginx+mysql+php)或者lamp(linux+apache+mysql+php),大同小异,本文推荐使用nginx作为Web服务器。
为了避免不必要的麻烦先关闭防火墙和selinux。

1.安装nginx

安装nginx,默认情况Centos7中无Nginx的源,可以如下执行命令添加源,Centos其他版本或者RHEL查看官方教程(教程链接https://www.nginx.com/resources/wiki/start/topics/tutorials/install/):

vi /etc/yum.repos.d/nginx.repo

写入

[nginx]
name=nginx repo
baseurl=http://nginx.org/packages/centos/7/$basearch/
gpgcheck=0
enabled=1

安装nginx:

yum install nginx -y

2.安装php和mariadb

yum install php-fpm php-mysql mariadb-server unzip

3.修改配置文件

修改/etc/nginx/conf.d/default.conf中下面两断内容:

vi /etc/nginx/conf.d/default.conf

更改前:

location / {
    root   /usr/share/nginx/html;
    index  index.html index.htm;
}

更改后:

    root   /usr/share/nginx/html;
    index  index.html index.htm index.php;
location / {
    try_files $uri $uri/ /index.php$is_args$args;
}

更改前:

#location ~ .php$ {
#    root           html;
#    fastcgi_pass   127.0.0.1:9000;
#    fastcgi_index  index.php;
#    fastcgi_param  SCRIPT_FILENAME  /scripts$fastcgi_script_name;
#    include        fastcgi_params;
#}

更改后:

location ~ .php$ {
    fastcgi_pass   127.0.0.1:9000;
    fastcgi_index  index.php;
    fastcgi_param  SCRIPT_FILENAME  $request_filename;
    include        fastcgi_params;
}

修改/etc/php-fpm.d/www.conf配置:

vi /etc/php-fpm.d/www.conf

user = apache改为user = nginx,将group = apache改为group = nginx

4.开启服务

systemctl start nginx.service
systemctl start mariadb.service
systemctl start php-fpm.service

5.设置开机自启

systemctl enable nginx mariadb php-fpm

安装WorePress

1.移除/usr/share/nginx/html内所有文件:

cd /usr/share/nginx/html
rm 50x.html index.html

2.下载WordPress并解压,到官网复制最新版链接(跳转链接https://cn.wordpress.org/download/):

yum install wget -y
wget https://cn.wordpress.org/wordpress-4.9.4-zh_CN.zip
unzip wordpress-4.9.4-zh_CN.zip

3.将Web文件移动到根目录并删除没用文件夹:

mv wordpress/* .
rmdir wordpress
rm wordpress-4.9.4-zh_CN.zip

4.权限设置

chown nginx.nginx -R .

5.创建数据库wordpress:

mysql
create database wordpress;
exit

接下来输入你的ip地址就可以安装WordPress了!

MySQL选型以及使用mariadb踩过的几个坑

MySQL新版本选型

公司早期主要用mysql5.5这个版本,今年我们把数据库配置中心搭建起来,主要推的是mysql5.6这个版本,性能和功能上都有了一定的提升,mysql5.6也能支持gtid,但是无法在线在gtid模式与普通模式之间切换,同时5.6的同步性能还是无法让人满意,只能做到在多个db的情况启动并行复制,业务上很难有这样的保证,所以一旦写操作密集的业务,同步慢就会是个严重的问题;

所以,最近一直在考虑升级MySQL,升级MySQL首先面临的一个问题就是选一个合适的版本,首先我们考虑是的采用mysql5.7,5.7今年已经连续发了多个正式版本了,目前使用范围也比较广,可以考虑在正式环境使用了。于是我们进行了线上对比测试,发现mysql5.7与我们线上的mysql5.6版本的性能有较大的差距(也许还是有些参数没有调好的原因,5.7确实要复杂很多)。

与此同时,最近公司频繁出现一些日志性存储,大多数都是采用innodb引擎,容量非常浪费,另一方面由于我们公司的标准mysql服务器容量是1.3T左右,对于一些大容量需求的业务来说,容量上也存在瓶颈,如果要保留长时间的数据就难以满足需求了。所以借此机会我们打算把这块一起考虑进去,目前Percona和Mariadb都支持Tokudb,Mariadb 10.2还是10.3也准备支持Myrocks了。

于是决定对比试一下,选择了Percona 5.7.14、Mariadb 10.1.18与我们线上的MySQL 5.6进行了对比测试,经过压测。

首先是都使用Innodb引擎的情况:

Mariadb与MySQL5.6测试结果接近

Percona 5.7.14与官方MySQL5.7的性能结果差不多,比起MySQL5.6有一定的差距(去掉performance_schema好一点,但是也有差距)。

采用Tokudb引擎测试的结果与官方声称的有差距,使用snappy压缩的情况下,insert比innodb约慢1/4,update只有innodb的一半左右。Percona性能更差,就不考虑了。

最终选型Mariadb 10.1.18,并且在线上部署了一个业务,通过这个业务慢慢试用以后,逐步推广开来。

使用Mariadb踩到的一些坑

在使用Mariadb的过程中,碰到了不少问题,这里主要提一下我碰到的两个较大的问题,供大家参考:

1、同步性能问题:

我们上的这个业务高峰期达到了9000多写操作/秒,首先面临的第一个问题就是同步性能跟不上,slave同步线程数加到了16个线程,勉强能追上,但是一旦数据库从库停一会,就有可能面临永远最不上的可能。当快绝望的时候,看了一下mariadb的官方文章(https://mariadb.com/kb/en/mariadb/parallel-replication/),Mariadb的并行复制支持好几种模式,其中有in-order和out-of-order两种,不过我们这个业务支持in-order,所以没考虑out-of-order,在in-order模式下,又支持两种:Conservative 和 Optimistic,缺省情况下Conservative ,这种并行模式会严格保证事物的顺序性,估计和5.7的group commit原理差不多;而Optimistic模式下,复制的时候会尽量启动更多的会话,直到发现冲突时才会去处理冲突。果断试了一下Optimistic,非常强劲,最高同步速度达到了14000次/秒。

2、”内存泄露”

系统部署结构为:两个Mariadb做成主主复制,在前面部署了一个自己开发的分布式数据库,业务方连接到分布式数据库进程。系统上线了几天,发现主库会莫名其妙的挂掉,好在有分布式数据库,并且会自动切换,Mariadb主库挂了,会自动切到另外一个主库上,业务方没有感知。查看内核日志,发现是OOM了,内核把MySQL杀掉了。

于是开始了各种尝试,去掉Tokudb引擎配置,换Mariadb 10.1.19 ,都尝试过,最终都会发生主库挂掉的事情。一次偶然的机会,我把主库上的slave停掉了,发现主库的内存突然下降好多,并且内存不再增加了,但是一旦把主库上的slave启动就会发现,内存又逐渐身高。这种现象很像mysql线程内的内存分配机制造成的(基于mem_root的内存分配,线程停掉会全部释放),所以初步怀疑是这个原因造成的。发现作为双主中的另外一个Mariadb,就不会出现内存上涨的问题。

发现上面的现象以后,就开始代码上的调试,用gdb启动一个mariadb,另外一个用普通命令启动,这两个库做成双主:

第一种情况:测试作为从库的时候,接收到的binlog事件情况

在普通命令启动的mariadb上插入一行数据,gdb查看接收到的事件的顺序如下:

### i ) Gtid_log_event

### ii) Table_map_log_event

### iii) Write_rows_log_event

### iv) Xid_log_event

第二种情况:测试作为主库的时候,接收到的binlog事件

在gdb启动的mariadb上插入一行记录,然后gdb观察接收到的事件为:

### 1)Rotate_log_event

### 2)Gtid_list_log_event

### 3)Rotate_log_event

Rotate_log_event事件是虚拟出来的,用于让主库跟上从库的同步位置,这基本上是一个空事件,没有做任何处理,所以初步怀疑是在处理Gtid_list_log_event事件的时候,出现了问题。

反复查看Gtid_list_log_event::do_appy_event函数中的调用情况,发现确实有些方法会调用thd->alloc来分配内存,但是没有回收,所以造成内存不断的增大,我考虑了一下,因为是主库对于同步性能要求也不高,所以在Gtid_list_log_event::do_apply_event函数的最后加了一行代码:free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));  重新编译后,跑了一天,内存终于稳定了。

由于目前发现只有主库有该事件,主库同步处理性能要求不高,所以暂时先这样用着了。不知道mariadb官方版本什么时候会优化一下。

总体来看,Mariadb还是比较适合我们公司的,它有最新的功能、特性能够给我们提供很多解决方案。Tokudb可以解决日志型存储的问题;连接池可以解决大量连接情况下性能地下的问题;审计插件提供安全方面的审核;slave并发模式能够提供高性能的复制能力。除了这些常见功能以外,Mariadb还提供了Cassandra插件、图数据库插件等等,这些都给我们给业务的服务增加了想象力。