mongoDB之创建用户与认证模式启动

mongoDB创建用户

不管是mongoDB还是MySQL,我们往往都需要创建用户来保证安全性,在这里对这方面内容做一个简单的总结~

创建用户

创建用户有帮助增加数据库安全的作用,在mongoDB中需要下列步骤

  1. 创建管理员
  2. 授权认证
  3. 给使用的数据库添加用户

在shell中执行以下操作

sudo service mongod start         # 启动mongod
mongo                             # 以非授权的方式启动

这时我们可以访问到任意一个不需要认证的数据库(比如我们用一个可视化客户端打开)

未分类

如果使用授权方式启动mongoDB,而我们不去登录的话我们也是无法使用的,所以我们要先创建一个管理员账号

创建管理员账号

首先非授权方式启动我们的mongoDB

> use admin # 创建admin数据库
> db.createUser({user:"admin",pwd:"admin",roles:["root"]}) # 创建一个用户名为admin,密码为admin,身份为管理员的User
> db.auth("admin","admin") # 进行认证
1       # 显示1,认证成功

这样,我们的数据库就有了自己的管理员
下面我们为创建一个有用户的db

> use demo1 # 切换数据库
switched to db demo1

 # 创建一个用户名为demo1user,密码为demo1,身份为数据库用户,拥有demo1的User
> db.createUser({user:"demo1user",pwd:"demo1",roles:[{role:"dbOwner",db:"demo1"}]})
Successfully added user: {
    "user" : "demo1user",
    "roles" : [
        {
            "role" : "dbOwner",
            "db" : "demo1"
        }
    ]
}

手动以认证模式启动

经过我的尝试,我发现以认证方式启动mongd的方式有很多,我选择一种我比较喜欢的列在下面:
在安装mongoDB之后,如果是使用和我一样的方式安装的,那么将会自动生成一个配置文件,位于/etc/mongod.conf,我们也可以用这样的方式启动mongod

sudo mongod -f /etc/mongod.conf --auth # 认证模式启动
sudo mongod -f /etc/mongod.conf  # 非认证模式启动

默认通过认证模式启动

一切脱离版本的配置都是耍流氓
如果不想每次都带上–auth参数的haunted,我们可以在配置文件中修改,也就是对/etc/mongod.conf进行修改,我们以官网文档为准https://docs.mongodb.com/manual/reference/configuration-options/

未分类

关于用户认证方面的配置,点击上面的security仔细阅读一下

下面贴上我的配置文件片段(最后两行就是默认以用户认证方式启动)

# mongod.conf

# for documentation of all options, see:
#   http://docs.mongodb.org/manual/reference/configuration-options/

# Where and how to store data.
storage:
  dbPath: /var/lib/mongodb
  journal:
    enabled: true
#  engine:
#  mmapv1:
#  wiredTiger:

# where to write logging data.
systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.log

# network interfaces
net:
  port: 27017
  bindIp: 127.0.0.1


# how the process runs
processManagement:
  timeZoneInfo: /usr/share/zoneinfo

security:
  authorization: enabled

如此一来,不管是sudo mongod -f /etc/mongod.conf还是sudo service mongod start都是以认证模式启动的了。

数据库中间件 MyCAT 源码分析 —— SQL ON MongoDB

本文主要基于 MyCAT 1.6.5 正式版

1. 概述

可能你在看到这个标题会小小的吃惊,MyCAT 能使用 MongoDB 做数据节点。是的,没错,确实可以。
吼吼吼,让我们开启这段神奇的“旅途”。

本文主要分成四部分:

  1. 总体流程,让你有个整体的认识
  2. 查询操作
  3. 插入操作
  4. 彩蛋,????彩蛋,????彩蛋

建议你看过这两篇文章(非必须):

  1. 《MyCAT 源码分析 —— 【单库单表】插入》https://link.juejin.im/?target=http%3A%2F%2Fwww.iocoder.cn%2FMyCAT%2Fsingle-db-single-table-insert%2F%3Fself
  2. 《MyCAT 源码分析 —— 【单库单表】查询》https://link.juejin.im/?target=http%3A%2F%2Fwww.iocoder.cn%2FMyCAT%2Fsingle-db-single-table-select%2F%3Fself

2. 主流程

未分类

  1. MyCAT Server 接收 MySQL Client 基于 MySQL协议 的请求,翻译 SQL 成 MongoDB操作 发送给 MongoDB Server。
  2. MyCAT Server 接收 MongoDB Server 返回的 MongoDB数据,翻译成 MySQL数据结果 返回给 MySQL Client。

这样一看,MyCAT 连接 MongoDB 是不是少神奇一点列。

未分类

Java数据库连接,(Java Database Connectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC也是Sun Microsystems的商标。JDBC是面向关系型数据库的。

MyCAT 使用 JDBC 规范,抽象了对 MongoDB 的访问。通过这样的方式,MyCAT 也抽象了 SequoiaDB 的访问。可能这样说法有些抽象,看个类图压压惊。

未分类

是不是熟悉的味道。不得不说 JDBC 规范的精妙。

3. 查询操作

SELECT id, name FROM user WHERE name > '' ORDER BY _id DESC;

未分类

看顺序图已经很方便的理解整体逻辑,我就不多废话啦。我们来看几个核心的代码逻辑。

3.1 查询 MongoDB

// MongoSQLParser.java
public MongoData query() throws MongoSQLException {
   if (!(statement instanceof SQLSelectStatement)) {
       //return null;
       throw new IllegalArgumentException("not a query sql statement");
   }
   MongoData mongo = new MongoData();
   DBCursor c = null;
   SQLSelectStatement selectStmt = (SQLSelectStatement) statement;
   SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();
   int icount = 0;
   if (sqlSelectQuery instanceof MySqlSelectQueryBlock) {
       MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectStmt.getSelect().getQuery();

       BasicDBObject fields = new BasicDBObject();

       // 显示(返回)的字段
       for (SQLSelectItem item : mysqlSelectQuery.getSelectList()) {
           //System.out.println(item.toString());
           if (!(item.getExpr() instanceof SQLAllColumnExpr)) {
               if (item.getExpr() instanceof SQLAggregateExpr) {
                   SQLAggregateExpr expr = (SQLAggregateExpr) item.getExpr();
                   if (expr.getMethodName().equals("COUNT")) { // TODO 待读:count(*)
                       icount = 1;
                       mongo.setField(getExprFieldName(expr), Types.BIGINT);
                   }
                   fields.put(getExprFieldName(expr), 1);
               } else {
                   fields.put(getFieldName(item), 1);
               }
           }

       }

       // 表名
       SQLTableSource table = mysqlSelectQuery.getFrom();
       DBCollection coll = this._db.getCollection(table.toString());
       mongo.setTable(table.toString());

       // WHERE
       SQLExpr expr = mysqlSelectQuery.getWhere();
       DBObject query = parserWhere(expr);

       // GROUP BY
       SQLSelectGroupByClause groupby = mysqlSelectQuery.getGroupBy();
       BasicDBObject gbkey = new BasicDBObject();
       if (groupby != null) {
           for (SQLExpr gbexpr : groupby.getItems()) {
               if (gbexpr instanceof SQLIdentifierExpr) {
                   String name = ((SQLIdentifierExpr) gbexpr).getName();
                   gbkey.put(name, Integer.valueOf(1));
               }
           }
           icount = 2;
       }

       // SKIP / LIMIT
       int limitoff = 0;
       int limitnum = 0;
       if (mysqlSelectQuery.getLimit() != null) {
           limitoff = getSQLExprToInt(mysqlSelectQuery.getLimit().getOffset());
           limitnum = getSQLExprToInt(mysqlSelectQuery.getLimit().getRowCount());
       }
       if (icount == 1) { // COUNT(*)
           mongo.setCount(coll.count(query));
       } else if (icount == 2) { // MapReduce
           BasicDBObject initial = new BasicDBObject();
           initial.put("num", 0);
           String reduce = "function (obj, prev) { " + "  prev.num++}";
           mongo.setGrouyBy(coll.group(gbkey, query, initial, reduce));
       } else {
           if ((limitoff > 0) || (limitnum > 0)) {
               c = coll.find(query, fields).skip(limitoff).limit(limitnum);
           } else {
               c = coll.find(query, fields);
           }

           // order by
           SQLOrderBy orderby = mysqlSelectQuery.getOrderBy();
           if (orderby != null) {
               BasicDBObject order = new BasicDBObject();
               for (int i = 0; i < orderby.getItems().size(); i++) {
                   SQLSelectOrderByItem orderitem = orderby.getItems().get(i);
                   order.put(orderitem.getExpr().toString(), getSQLExprToAsc(orderitem.getType()));
               }
               c.sort(order);
               // System.out.println(order);
           }
       }
       mongo.setCursor(c);
   }
   return mongo;
}

3.2 查询条件

// MongoSQLParser.java
private void parserWhere(SQLExpr aexpr, BasicDBObject o) {
   if (aexpr instanceof SQLBinaryOpExpr) {
       SQLBinaryOpExpr expr = (SQLBinaryOpExpr) aexpr;
       SQLExpr exprL = expr.getLeft();
       if (!(exprL instanceof SQLBinaryOpExpr)) {
           if (expr.getOperator().getName().equals("=")) {
               o.put(exprL.toString(), getExpValue(expr.getRight()));
           } else {
               String op = "";
               if (expr.getOperator().getName().equals("<")) {
                   op = "$lt";
               } else if (expr.getOperator().getName().equals("<=")) {
                   op = "$lte";
               } else if (expr.getOperator().getName().equals(">")) {
                   op = "$gt";
               } else if (expr.getOperator().getName().equals(">=")) {
                   op = "$gte";
               } else if (expr.getOperator().getName().equals("!=")) {
                   op = "$ne";
               } else if (expr.getOperator().getName().equals("<>")) {
                   op = "$ne";
               }
               parserDBObject(o, exprL.toString(), op, getExpValue(expr.getRight()));
           }
       } else {
           if (expr.getOperator().getName().equals("AND")) {
               parserWhere(exprL, o);
               parserWhere(expr.getRight(), o);
           } else if (expr.getOperator().getName().equals("OR")) {
               orWhere(exprL, expr.getRight(), o);
           } else {
               throw new RuntimeException("Can't identify the operation of  of where");
           }
       }
   }
}

private void orWhere(SQLExpr exprL, SQLExpr exprR, BasicDBObject ob) {
   BasicDBObject xo = new BasicDBObject();
   BasicDBObject yo = new BasicDBObject();
   parserWhere(exprL, xo);
   parserWhere(exprR, yo);
   ob.put("$or", new Object[]{xo, yo});
}

3.3 解析 MongoDB 数据

// MongoResultSet.java
public MongoResultSet(MongoData mongo, String schema) throws SQLException {
   this._cursor = mongo.getCursor();
   this._schema = schema;
   this._table = mongo.getTable();
   this.isSum = mongo.getCount() > 0;
   this._sum = mongo.getCount();
   this.isGroupBy = mongo.getType();

   if (this.isGroupBy) {
       dblist = mongo.getGrouyBys();
       this.isSum = true;
   }
   if (this._cursor != null) {
       select = _cursor.getKeysWanted().keySet().toArray(new String[0]);
       // 解析 fields
       if (this._cursor.hasNext()) {
           _cur = _cursor.next();
           if (_cur != null) {
               if (select.length == 0) {
                   SetFields(_cur.keySet());
               }
               _row = 1;
           }
       }
       // 设置 fields 类型
       if (select.length == 0) {
           select = new String[]{"_id"};
           SetFieldType(true);
       } else {
           SetFieldType(false);
       }
   } else {
       SetFields(mongo.getFields().keySet());//new String[]{"COUNT(*)"};
       SetFieldType(mongo.getFields());
   }
}
  • 当使用 SELECT * 查询字段时,fields 使用第一条数据返回的 fields。即使,后面的数据有其他 fields,也不返回。

3.4 返回数据给 MySQL Client

// JDBCConnection.java
private void ouputResultSet(ServerConnection sc, String sql)
       throws SQLException {
   ResultSet rs = null;
   Statement stmt = null;

   try {
       stmt = con.createStatement();
       rs = stmt.executeQuery(sql);

       // header
       List<FieldPacket> fieldPks = new LinkedList<>();
       ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark);
       int colunmCount = fieldPks.size();
       ByteBuffer byteBuf = sc.allocate();
       ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();
       headerPkg.fieldCount = fieldPks.size();
       headerPkg.packetId = ++packetId;
       byteBuf = headerPkg.write(byteBuf, sc, true);
       byteBuf.flip();
       byte[] header = new byte[byteBuf.limit()];
       byteBuf.get(header);
       byteBuf.clear();
       List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size());
       for (FieldPacket curField : fieldPks) {
           curField.packetId = ++packetId;
           byteBuf = curField.write(byteBuf, sc, false);
           byteBuf.flip();
           byte[] field = new byte[byteBuf.limit()];
           byteBuf.get(field);
           byteBuf.clear();
           fields.add(field);
       }
       // header eof
       EOFPacket eofPckg = new EOFPacket();
       eofPckg.packetId = ++packetId;
       byteBuf = eofPckg.write(byteBuf, sc, false);
       byteBuf.flip();
       byte[] eof = new byte[byteBuf.limit()];
       byteBuf.get(eof);
       byteBuf.clear();
       this.respHandler.fieldEofResponse(header, fields, eof, this);

       // row
       while (rs.next()) {
           RowDataPacket curRow = new RowDataPacket(colunmCount);
           for (int i = 0; i < colunmCount; i++) {
               int j = i + 1;
               if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) {
                   curRow.add(rs.getBytes(j));
               } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL ||
                       fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte
                   // ensure that do not use scientific notation format
                   BigDecimal val = rs.getBigDecimal(j);
                   curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset()));
               } else {
                   curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset()));
               }
           }
           curRow.packetId = ++packetId;
           byteBuf = curRow.write(byteBuf, sc, false);
           byteBuf.flip();
           byte[] row = new byte[byteBuf.limit()];
           byteBuf.get(row);
           byteBuf.clear();
           this.respHandler.rowResponse(row, this);
       }
       fieldPks.clear();
       // row eof
       eofPckg = new EOFPacket();
       eofPckg.packetId = ++packetId;
       byteBuf = eofPckg.write(byteBuf, sc, false);
       byteBuf.flip();
       eof = new byte[byteBuf.limit()];
       byteBuf.get(eof);
       sc.recycle(byteBuf);
       this.respHandler.rowEofResponse(eof, this);
   } finally {
       if (rs != null) {
           try {
               rs.close();
           } catch (SQLException e) {
           }
       }
       if (stmt != null) {
           try {
               stmt.close();
           } catch (SQLException e) {
           }
       }
   }
}

// MongoResultSet.java
@Override
public String getString(String columnLabel) throws SQLException {
   Object x = getObject(columnLabel);
   if (x == null) {
       return null;
   }
   return x.toString();
}
  • 当返回字段值是 Object 时,返回该对象.toString()。例如:
mysql> select * from user order by _id asc;
+--------------------------+------+-------------------------------+
| _id                      | name | profile                       |
+--------------------------+------+-------------------------------+
| 1                        | 123  | { "age" : 1 , "height" : 100} |

4. 插入操作

未分类

// MongoSQLParser.java
public int executeUpdate() throws MongoSQLException {
   if (statement instanceof SQLInsertStatement) {
       return InsertData((SQLInsertStatement) statement);
   }
   if (statement instanceof SQLUpdateStatement) {
       return UpData((SQLUpdateStatement) statement);
   }
   if (statement instanceof SQLDropTableStatement) {
       return dropTable((SQLDropTableStatement) statement);
   }
   if (statement instanceof SQLDeleteStatement) {
       return DeleteDate((SQLDeleteStatement) statement);
   }
   if (statement instanceof SQLCreateTableStatement) {
       return 1;
   }
   return 1;
}

private int InsertData(SQLInsertStatement state) {
   if (state.getValues().getValues().size() == 0) {
       throw new RuntimeException("number of  columns error");
   }
   if (state.getValues().getValues().size() != state.getColumns().size()) {
       throw new RuntimeException("number of values and columns have to match");
   }
   SQLTableSource table = state.getTableSource();
   BasicDBObject o = new BasicDBObject();
   int i = 0;
   for (SQLExpr col : state.getColumns()) {
       o.put(getFieldName2(col), getExpValue(state.getValues().getValues().get(i)));
       i++;
   }
   DBCollection coll = this._db.getCollection(table.toString());
   coll.insert(o);
   return 1;
}

5. 彩蛋

1、支持多 MongoDB ,并使用 MyCAT 进行分片。

MyCAT 配置:https://link.juejin.im/?target=https%3A%2F%2Fgithub.com%2FYunaiV%2FMycat-Server%2Ftree%2F1.6%2Fsrc%2Ftest%2Fresources%2Fmulti_mongodb

2、支持 MongoDB + MySQL 作为同一个 MyCAT Table 的数据节点。查询时,可以合并数据结果。

查询时,返回 MySQL 数据记录字段要比 MongoDB 数据记录字段全,否则,合并结果时会报错。

MyCAT 配置:https://link.juejin.im/?target=https%3A%2F%2Fgithub.com%2FYunaiV%2FMycat-Server%2Ftree%2F1.6%2Fsrc%2Ftest%2Fresources%2Fsingle_mongodb_mysql

3、MongoDB 作为数据节点时,可以使用 MyCAT 提供的数据库主键字段功能。

MyCAT 配置:https://link.juejin.im/?target=https%3A%2F%2Fgithub.com%2FYunaiV%2FMycat-Server%2Ftree%2F1.6%2Fsrc%2Ftest%2Fresources%2Fsingle_mongodb

EOS 区块链数据实时异构到 MongoDB

从 EOSIO 1.1.0 开始,已经默认支持 MongoDB,所以本文提供的方法仅做参考。

0x00 背景

执行 eosio_build.sh 脚本编译 nodeos 会默认安装 mongodb,但是从 Dawn 4.0 开始,mongo_db_plugin 插件不再生效,详情请参考(https://github.com/EOSIO/eos/issues/3030)。(https://github.com/EOSIO/eos/pull/4304)PR 重新支持 MongoDB,本文讲解如何将链上数据实时同步到 MongoDB。

未分类

0x01 部署

部署的思路如下:

  • 编译支持 mongo_db_plugin 的 nodeos
  • 配置 MongoDB
  • 启动 MongoDB
  • 创建数据库
  • 启动 nodeos,同步主网数据
  • 查看 MongoDB,确认数据

接下来我们逐步讲解。

由于 EOSIO 代码库有 gh#3030-enable-mongodb 分支,所以可以使用 git 自动将支持 mongo_db_plugin 插件的代码合并,详细命令如下:

$ mkdir -p /data/mongodb && cd /data/mongodb
$ git clone -b release/1.1 https://github.com/EOSIO/eos.git --recursive
$ cd eos
$ git fetch --all --tags --prune
$ git merge --m "merge" --commit origin/gh#3030-enable-mongodb
$ git submodule update --init --recursive
$ ./eosio_build.sh

如果 nodeos 编译成功,MongoDB 自动安装,路径在 $USER/opt/mongodb。执行 /data/mongodb/eos/build/programs/nodeos/nodeos –help | grep mongo,如果有如下的输出,表示 MongoDB 插件编译成功。

Config Options for eosio::mongo_db_plugin:
  -q [ --mongodb-queue-size ] arg (=256)
  --mongodb-wipe                        Required with --replay-blockchain,
                                        --delete-all-blocks to wipe mongo
                                        accidental wipe of mongo db.
  --mongodb-block-start arg (=0)        If specified then only abi data pushed
                                        to mongodb until specified block is
  -m [ --mongodb-uri ] arg              MongoDB URI connection string, see:
                                        https://docs.mongodb.com/master/referen
                                        in URI. Example: mongodb://127.0.0.1:27

接下来我们配置 MongoDB。

$ mkdir -p /data/mongodb/db /data/mongodb/logs
$ touch /data/mongodb/mongodb.conf

mongodb.conf 配置文件内容如下:

systemLog:
  destination: file
  path: /data/mongodb/logs/mongo.log
  logAppend: true
storage:
  dbPath: /data/mongodb/db
  journal:
   enabled: true
net:
  bindIp: 127.0.0.1
  port: 27017

接着启动 MongoDB,并且创建 eos 数据库。

$ /$USER/opt/mongodb/bin/mongod -f /data/mongodb/mongodb.conf --fork
$ /$USER/opt/mongodb/bin/mongo --port 27017
$ use eos

然后配置 EOS 主网 fullnode。

$ mkdir /data/mainnet
$ cd /data/mainnet
$ git clone https://github.com/superoneio/eos-mainnet
$ mkdir -p data logs config
$ cp eos-mainnet/config.ini mainnet/config
$ cp eos-mainnet/genesis.json mainnet
$ cp eos-mainnet/*.sh mainnet
$ chmod +x mainnet

修改 config.ini 相关配置,添加如下参数:

plugin = eosio::mongo_db_plugin
mongodb-uri = mongodb://127.0.0.1:27017/eos

最后执行 start.sh,同步主网数据。

0x02 测试

我们执行 $ /$USER/opt/mongodb/bin/mongo –port 27017 登录到 MongoDB,可以对同步的数据进行校验。

> use eos
> show dbs
admin   0.000GB
config  0.000GB
eos     0.031GB
local   0.000GB
> use eos
switched to db eos
> show tables;
accounts
actions
block_states
blocks
transaction_traces
transactions
> db.accounts.find({name:{$eq:'eosio.ram'}})
{ "_id" : ObjectId("5b4163f2992ecd51b4277f77"), "name" : "eosio.ram", "createdAt" : ISODate("2018-07-08T01:08:02.071Z") }

0x03 小结

之前介绍了将 EOS 主网数据同步到 MySQL,本文讲解了另一种同步到文档型数据库的方法。MySQL 和 MongoDB 最大的区别在于,MySQL 是传统的关系型数据库,支持 SQL 标准,而 MongoDB 是文档型数据库。MySQL 天生适用于结构化数据以及需要使用事务的场景,天生支持 SQL 标准,对开发者比较友好。

MongoDB 适合如下场景:[1]

  • 表结构不明确且数据不断变大,MongoDB 是非结构化文档数据库,扩展字段很容易且不会影响原有数据
  • 更高的写入负载,MongoDB 侧重高数据写入的性能,而非事务安全
  • 数据量很大或者将来会变得很大,MongoDB 内建了 Sharding、数据分片的特性,容易水平扩展
  • 高可用性,MongoDB 自带高可用,自动主从切换(副本集)

而 MongoDB 不支持事务、不支持 JOIN,所以涉及事务和复杂查询的场景不适合 MySQL。

关于 MongoDB 和 MySQL 的对比,可以点击 https://ruby-china.org/topics/27659 查看。

0x04 参考

  • [1] 张家江 (2017–09–07). SOCI Installation. Retrieved from http://tech.lede.com/2017/09/07/rd/server/MongoDBvsMysql

认识 MongoDB 4.0 的新特性——事务(Transactions)

前言

相信使用过主流的关系型数据库的朋友对“事务(Transactions)”不会太陌生,它可以让我们把对多张表的多次数据库操作整合为一次原子操作,这在高并发场景下可以保证多个数据操作之间的互不干扰;并且一旦在这些操作过程任一环节中出现了错误,事务会中止并且让数据回滚,这使得同时在多张表中修改数据的时候保证了数据的一致性。

以前 MongoDB 是不支持事务的,因此开发者在需要用到事务的时候,不得不借用其他工具,在业务代码层面去弥补数据库的不足。随着 4.0 版本的发布,MongoDB 也为我们带来了原生的事务操作,下面就让我们一起来认识它,并通过简单的例子了解如何去使用。

未分类

介绍

事务和副本集(Replica Sets)

副本集是 MongoDB 的一种主副节点架构,它使数据得到最大的可用性,避免单点故障引起的整个服务不能访问的情况的发生。目前 MongoDB 的多表事务操作仅支持在副本集上运行,想要在本地环境安装运行副本集可以借助一个工具包——run-rs,以下的文章中有详细的使用说明:

https://thecodebarbarian.com/introducing-run-rs-zero-config-mongodb-runner.html

事务和会话(Sessions)

事务和会话(Sessions)关联,一个会话同一时刻只能开启一个事务操作,当一个会话断开,这个会话中的事务也会结束。

事务中的函数

  • Session.startTransaction()

在当前会话中开始一次事务,事务开启后就可以开始进行数据操作。在事务中执行的数据操作是对外隔离的,也就是说事务中的操作是原子性的。

  • Session.commitTransaction()

提交事务,将事务中对数据的修改进行保存,然后结束当前事务,一次事务在提交之前的数据操作对外都是不可见的。

  • Session.abortTransaction()

中止当前的事务,并将事务中执行过的数据修改回滚。

重试

当事务运行中报错,catch 到的错误对象中会包含一个属性名为 errorLabels 的数组,当这个数组中包含以下2个元素的时候,代表我们可以重新发起相应的事务操作。

  • TransientTransactionError:出现在事务开启以及随后的数据操作阶段
  • UnknownTransactionCommitResult:出现在提交事务阶段

示例

经过上面的铺垫,你是不是已经迫不及待想知道究竟应该怎么写代码去完成一次完整的事务操作?下面我们就简单写一个例子:

场景描述: 假设一个交易系统中有2张表——记录商品的名称、库存数量等信息的表 commodities,和记录订单的表 orders。当用户下单的时候,首先要找到 commodities 表中对应的商品,判断库存数量是否满足该笔订单的需求,是的话则减去相应的值,然后在 orders 表中插入一条订单数据。在高并发场景下,可能在查询库存数量和减少库存的过程中,又收到了一次新的创建订单请求,这个时候可能就会出问题,因为新的请求在查询库存的时候,上一次操作还未完成减少库存的操作,这个时候查询到的库存数量可能是充足的,于是开始执行后续的操作,实际上可能上一次操作减少了库存后,库存的数量就已经不足了,于是新的下单请求可能就会导致实际创建的订单数量超过库存数量。

以往要解决这个问题,我们可以用给商品数据“加锁”的方式,比如基于 Redis 的各种锁,同一时刻只允许一个订单操作一个商品数据,这种方案能解决问题,缺点就是代码更复杂了,并且性能会比较低。如果用数据库事务的方式就可以简洁很多:

commodities 表数据(stock 为库存):

{ "_id" : ObjectId("5af0776263426f87dd69319a"), "name" : "灭霸原味手套", "stock" : 5 }
{ "_id" : ObjectId("5af0776263426f87dd693198"), "name" : "雷神专用铁锤", "stock" : 2 }

orders 表数据:

{ "_id" : ObjectId("5af07daa051d92f02462644c"), "commodity": ObjectId("5af0776263426f87dd69319a"), "amount": 2 }
{ "_id" : ObjectId("5af07daa051d92f02462644b"), "commodity": ObjectId("5af0776263426f87dd693198"), "amount": 3 }

通过一次事务完成创建订单操作(mongo Shell):

// 执行 txnFunc 并且在遇到 TransientTransactionError 的时候重试
function runTransactionWithRetry(txnFunc, session) {
  while (true) {
    try {
      txnFunc(session); // 执行事务
      break;
    } catch (error) {
      if (
        error.hasOwnProperty('errorLabels') &&
        error.errorLabels.includes('TransientTransactionError')
      ) {
        print('TransientTransactionError, retrying transaction ...');
        continue;
      } else {
        throw error;
      }
    }
  }
}

// 提交事务并且在遇到 UnknownTransactionCommitResult 的时候重试
function commitWithRetry(session) {
  while (true) {
    try {
      session.commitTransaction();
      print('Transaction committed.');
      break;
    } catch (error) {
      if (
        error.hasOwnProperty('errorLabels') &&
        error.errorLabels.includes('UnknownTransactionCommitResult')
      ) {
        print('UnknownTransactionCommitResult, retrying commit operation ...');
        continue;
      } else {
        print('Error during commit ...');
        throw error;
      }
    }
  }
}

// 在一次事务中完成创建订单操作
function createOrder(session) {
  var commoditiesCollection = session.getDatabase('mall').commodities;
  var ordersCollection = session.getDatabase('mall').orders;
  // 假设该笔订单中商品的数量
  var orderAmount = 3;
  // 假设商品的ID
  var commodityID = ObjectId('5af0776263426f87dd69319a');

  session.startTransaction({
    readConcern: { level: 'snapshot' },
    writeConcern: { w: 'majority' },
  });

  try {
    var { stock } = commoditiesCollection.findOne({ _id: commodityID });
    if (stock < orderAmount) {
      print('Stock is not enough');
      session.abortTransaction();
      throw new Error('Stock is not enough');
    }
    commoditiesCollection.updateOne(
      { _id: commodityID },
      { $inc: { stock: -orderAmount } }
    );
    ordersCollection.insertOne({
      commodity: commodityID,
      amount: orderAmount,
    });
  } catch (error) {
    print('Caught exception during transaction, aborting.');
    session.abortTransaction();
    throw error;
  }

  commitWithRetry(session);
}

// 发起一次会话
var session = db.getMongo().startSession({ readPreference: { mode: 'primary' } });

try {
  runTransactionWithRetry(createOrder, session);
} catch (error) {
  // 错误处理
} finally {
  session.endSession();
}

上面的代码看着感觉很多,其实 runTransactionWithRetry 和 commitWithRetry 这两个函数都是可以抽离出来成为公共函数的,不需要每次操作都重复书写。用上了事务之后,因为事务中的数据操作都是一次原子操作,所以我们就不需要考虑分布并发导致的数据一致性的问题,是不是感觉简单了许多?

你可能注意到了,代码中在执行 startTransaction 的时候设置了两个参数——readConcern 和 writeConcern,这是 MongoDB 读写操作的确认级别,在这里用于在副本集中平衡数据读写操作的可靠性和性能,如果在这里展开就太多了,所以感兴趣的朋友建议去阅读官方文档了解一下:

readConcern:

https://docs.mongodb.com/master/reference/read-concern/

writeConcern:

https://docs.mongodb.com/master/reference/write-concern/

mongodb使用总结

未分类

MongoDB 是一个基于分布式文件存储的数据库。由 C++ 语言编写。旨在为 WEB 应用提供可扩展的高性能数据存储解决方案。

MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。是世界上最大的nosql(not only sql)数据库。执行mongodb数据库需要mongod环境和mongo环境。

数据库

概念:存储数据的仓库我们称为数据库。数据库分为非关系型数据库和关系型数据库。关系型数据库(Oracle,mysql,db2,…)往往以表结构的形式进行
存储

mongodb和mysql的区别

  • 前者非关系型数据库,后者是关系型数据库
  • mongodb中是以集合的形式来充当mysql中的表结构
  • mongodb中的数据是以文档的形式进行存储

未分类

mongodb的优点

  • 面向文档存储的数据库(BSON的数据格式)

未分类

  • 有丰富的查询指令
  • 支持索引
  • 具有分片系统
  • 无模式

mongodb的缺点

  • 占用的空间比较大
  • 不支持事务
  • 对于windows来说,它不支持32位的系统

mongodb常用指令

  • show dbs 查看当前所有数据库

未分类

  • use database_name 创建数据库
  • db 查询当前使用的数据库
  • db.stats() 查询当前使用的数据库信息
  • db.dropDatabase() 删除当前数据库

未分类

  • db.help() 获取查询帮助
  • db.database_name.help() 获取指定数据库查询帮助
  • db.collection_name.find() 查询集合的信息
  • db.createCollection(coll_name,options) 创建集合
  • db.getCollectionNames() 查询所有集合
  • db.getCollection(coll_name) 查询某一个特定集合
  • db.coll_name.drop() 对集合的删除
  • db.printCollectionStats() 打印当前数据库中所有集合的状态
  • db.coll_name.insert/insertMany/save/insertOne 添加一条/多条数据
  • db.coll_name.update(query,info,con,muti) 修改数据(query: 查询的条件;info: 要更新的信息;con: 给异步操作提供扩展;muti: 返回布尔类型 默认false)(这里涉及到几个特殊属性$inc和$set 前者为相加后者为设置)
  • db.coll_name.remove(query) 删除数据(query 删除的条件)

未分类

  • 对数据的查询
  • db.coll_name.find() 查询所有信息
  • db.coll_name.find({"age": 18}) 查询某一条信息
  • db.coll_name.find({age: {$gt: 22}}) gt大于某一条件
  • db.coll_name.find({age: {$lt: 22}}) lt小于某一条件
  • db.coll_name.find({age: {$gte: 22}}) gt大于等于某一条件
  • db.coll_name.find({age: {$lte: 22}}) lte小于等于某一条件
  • db.coll_name.find({title: /好/}) 模糊查询

mongodb术语概念

未分类

项目中使用mongodb

切换到指定项目 npm init生成package.json
npm install mongodb -g 全局安装
npm install mongodb --save-dev 局部安装

mongodb.js

var Mongodb = require("mongodb")
// 连接到mongodb的服务端口
var server = new Mongodb.Server("localhost",27017,{auto_reconnect:true})
//创建数据库
var db = new Mongodb.Db('cloud',server,{safe:true})
//连接数据库
db.open((err,db) => {
    if(err) {
        console.log('连接数据库失败')
    } else {
        console.log('连接数据库成功')
    }
})

完美数据迁移-MongoDB Stream的应用

一、背景介绍

最近微服务架构火的不行,但本质上也只是风口上的一个热点词汇。
作为笔者的经验来说,想要应用一个新的架构需要带来的变革成本是非常高的。

尽管如此,目前还是有许多企业踏上了服务化改造的道路,这其中则免不了”旧改”的各种繁杂事。
所谓的”旧改”,就是把现有的系统架构来一次重构,拆分成多个细粒度的服务后,然后找时间
升级割接一把,让新系统上线。这其中,数据的迁移往往会成为一个非常重要且繁杂的活儿。

拆分服务时数据迁移的挑战在哪?

  1. 首先是难度大,做一个迁移方案需要了解项目的前身今世,评估迁移方案、技术工具等等;

  2. 其次是成本高。由于新旧系统数据结构是不一样的,需要定制开发迁移转化功能。很难有一个通用的工具能一键迁移;

  3. 再者,对于一些容量大、可靠性要求高的系统,要能够不影响业务,出了问题还能追溯,因此方案上还得往复杂了想。

二、常见方案

按照迁移的方案及流程,可将数据迁移分为三类:

1. 停机迁移

最简单的方案,停机迁移的顺序如下:

未分类

采用停机迁移的好处是流程操作简单,工具成本低;然而缺点也很明显,
迁移过程中业务是无法访问的,因此只适合于规格小、允许停服的场景。

2. 业务双写

业务双写是指对现有系统先进行改造升级,支持同时对新库和旧库进行写入。
之后再通过数据迁移工具对旧数据做全量迁移,待所有数据迁移转换完成后切换到新系统。

示意图:

未分类

业务双写的方案是平滑的,对线上业务影响极小;在出现问题的情况下可重新来过,操作压力也会比较小。

笔者在早些年前尝试过这样的方案,整个迁移过程确实非常顺利,但实现该方案比较复杂,
需要对现有的代码进行改造并完成新数据的转换及写入,对于开发人员的要求较高。
在业务逻辑清晰、团队对系统有足够的把控能力的场景下适用。

3. 增量迁移

增量迁移的基本思路是先进行全量的迁移转换,待完成后持续进行增量数据的处理,直到数据追平后切换系统。

示意图:

未分类

关键点

  • 要求系统支持增量数据的记录。
    对于MongoDB可以利用oplog实现这点,为避免全量迁移过程中oplog被冲掉,
    在开始迁移前就必须开始监听oplog,并将变更全部记录下来。
    如果没有办法,需要从应用层上考虑,比如为所有的表(集合)记录下updateTime这样的时间戳,
    或者升级应用并支持将修改操作单独记录下来。

  • 增量数据的回放是持续的。
    在所有的增量数据回放转换过程中,系统仍然会产生新的增量数据,这要求迁移工具
    能做到将增量数据持续回放并将之追平,之后才能做系统切换。

MongoDB 3.6版本开始便提供了Change Stream功能,支持对数据变更记录做监听。
这为实现数据同步及转换处理提供了更大的便利,下面将探讨如何利用Change Stream实现数据的增量迁移。

三、Change Stream 介绍

Chang Stream(变更记录流) 是指collection(数据库集合)的变更事件流,应用程序通过db.collection.watch()这样的命令可以获得被监听对象的实时变更。
在该特性出现之前,你可以通过拉取 oplog达到同样的目的;但 oplog 的处理及解析相对复杂且存在被回滚的风险,如果使用不当的话还会带来性能问题。
Change Stream 可以与aggregate framework结合使用,对变更集进行进一步的过滤或转换。

由于Change Stream 利用了存储在 oplog 中的信息,因此对于单进程部署的MongoDB无法支持Change Stream功能,
其只能用于启用了副本集的独立集群或分片集群

监听的目标

未分类

变更事件

一个Change Stream Event的基本结构如下所示:

{
   _id : { <BSON Object> },
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection"
   },
   "documentKey" : { "_id" : <ObjectId> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   }
   "clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
}

字段说明

未分类

Change Steram支持的变更类型有以下几个:

未分类

利用以下的shell脚本,可以打印出集合 T_USER上的变更事件:

watchCursor=db.T_USER.watch()
while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      printjson(watchCursor.next());
   }
}

下面提供一些样例,感受一下

insert 事件

{
    "_id": {
        "_data": "825B5826D10000000129295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B58272321C4761D1338F4860004"
    },
    "operationType": "insert",
    "clusterTime": Timestamp(1532503761, 1),
    "fullDocument": {
        "_id": ObjectId("5b58272321c4761d1338f486"),
        "name": "LiLei",
        "createTime": ISODate("2018-07-25T07:30:43.398Z")
    },
    "ns": {
        "db": "appdb",
        "coll": "T_USER"
    },
    "documentKey": {
        "_id": ObjectId("5b58272321c4761d1338f486")
    }
}

update事件

{
 "_id" : {
  "_data" : "825B5829DF0000000129295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B582980ACEC5F345DB998EE0004"
 },
 "operationType" : "update",
 "clusterTime" : Timestamp(1532504543, 1),
 "ns" : {
  "db" : "appdb",
  "coll" : "T_USER"
 },
 "documentKey" : {
  "_id" : ObjectId("5b582980acec5f345db998ee")
 },
 "updateDescription" : {
  "updatedFields" : {
   "age" : 15
  },
  "removedFields" : [ ]
 }
}

replace事件

{
    "_id" : {
        "_data" : "825B58299D0000000129295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B582980ACEC5F345DB998EE0004"
    },
    "operationType" : "replace",
    "clusterTime" : Timestamp(1532504477, 1),
    "fullDocument" : {
        "_id" : ObjectId("5b582980acec5f345db998ee"),
        "name" : "HanMeimei",
        "age" : 12
    },
    "ns" : {
        "db" : "appdb",
        "coll" : "T_USER"
    },
    "documentKey" : {
        "_id" : ObjectId("5b582980acec5f345db998ee")
    }
}

delete事件

{
    "_id" : {
        "_data" : "825B5827A90000000229295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B58272321C4761D1338F4860004"
    },
    "operationType" : "delete",
    "clusterTime" : Timestamp(1532503977, 2),
    "ns" : {
        "db" : "appdb",
        "coll" : "T_USER"
    },
    "documentKey" : {
        "_id" : ObjectId("5b58272321c4761d1338f486")
    }
}

invalidate 事件

执行db.T_USER.drop() 可输出

{
    "_id" : {
        "_data" : "825B582D620000000329295A10046A31C593902B4A9C9907FC0AB1E3C0DA04"
    },
    "operationType" : "invalidate",
    "clusterTime" : Timestamp(1532505442, 3)
}

更多的Change Event 信息可以参考这里https://docs.mongodb.com/manual/reference/change-events/

四、实现增量迁移

本次设计了一个简单的论坛帖子迁移样例,用于演示如何利用Change Stream实现完美的增量迁移方案。

背景如下:
现有的系统中有一批帖子,每个帖子都属于一个频道(channel),如下表

未分类

新系统中频道字段将采用英文简称,同时要求能支持平滑升级。
根据前面篇幅的叙述,我们将使用Change Stream 功能实现一个增量迁移的方案。

相关表的转换如下图:

未分类

原理

topic 是帖子原表,在迁移开始前将开启watch任务持续获得增量数据,并记录到 topic_incr表中;
接着执行全量的迁移转换,之后再持续对增量表数据进行迁移,直到无新的增量为止。

接下来我们使用Java程序来完成相关代码,mongodb-java–driver 在 3.6 版本后才支持 watch 功能
需要确保升级到对应版本:

<dependency>
     <groupId>org.mongodb</groupId>
     <artifactId>mongo-java-driver</artifactId>
     <version>3.6.4</version>
</dependency>

定义Channel频道的转换表

public static enum Channel {
    Food("美食"),
    Emotion("情感"),
    Pet("宠物"),
    House("家居"),
    Marriage("征婚"),
    Education("教育"),
    Travel("旅游")
    ;
    private final String oldName;

    public String getOldName() {
        return oldName;
    }

    private Channel(String oldName) {
        this.oldName = oldName;
    }

    /**
     * 转换为新的名称
     * 
     * @param oldName
     * @return
     */
    public static String toNewName(String oldName) {
        for (Channel channel : values()) {
            if (channel.oldName.equalsIgnoreCase(oldName)) {
                return channel.name();
            }
        }
        return "";
    }

    /**
     * 返回一个随机频道
     * 
     * @return
     */
    public static Channel random() {
        Channel[] channels = values();
        int idx = (int) (Math.random() * channels.length);
        return channels[idx];
    }
}

为 topic 表预写入1w条记录

private static void preInsertData() {
    MongoCollection<Document> topicCollection = getCollection(coll_topic);

    // 分批写入,共写入1w条数据
    int current = 0;
    int batchSize = 100;

    while (current < 10000) {
        List<Document> topicDocs = new ArrayList<Document>();

        for (int j = 0; j < batchSize; j++) {
            Document topicDoc = new Document();

            Channel channel = Channel.random();
            topicDoc.append(field_channel, channel.getOldName());
            topicDoc.append(field_nonce, (int) (Math.random() * nonce_max));

            topicDoc.append("title", "This is the tilte -- " + UUID.randomUUID().toString());
            topicDoc.append("author", "LiLei");
            topicDoc.append("createTime", new Date());
            topicDocs.add(topicDoc);
        }

        topicCollection.insertMany(topicDocs);
        current += batchSize;
        logger.info("now has insert {} records", current);
    }
}

上述实现中,每个帖子都分配了随机的频道(channel)

开启监听任务,将topic上的所有变更写入到增量表

MongoCollection<Document> topicCollection = getCollection(coll_topic);
MongoCollection<Document> topicIncrCollection = getCollection(coll_topic_incr);

// 启用 FullDocument.update_lookup 选项
cursor = topicCollection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
while (cursor.hasNext()) {

    ChangeStreamDocument<Document> changeEvent = cursor.next();
    OperationType type = changeEvent.getOperationType();
    logger.info("{} operation detected", type);

    if (type == OperationType.INSERT || type == OperationType.UPDATE || type == OperationType.REPLACE
            || type == OperationType.DELETE) {

        Document incrDoc = new Document(field_op, type.getValue());
        incrDoc.append(field_key, changeEvent.getDocumentKey().get("_id"));
        incrDoc.append(field_data, changeEvent.getFullDocument());
        topicIncrCollection.insertOne(incrDoc);
    }
}

代码中通过watch 命令获得一个MongoCursor对象,用于遍历所有的变更。
FullDocument.UPDATE_LOOKUP选项启用后,在update变更事件中将携带完整的文档数据(FullDocument)。

watch()命令提交后,mongos会与分片上的mongod(主节点)建立订阅通道,这可能需要花费一点时间。

为了模拟线上业务的真实情况,启用几个线程对topic表进行持续写操作;

private static void startMockChanges() {

    threadPool.submit(new ChangeTask(OpType.insert));
    threadPool.submit(new ChangeTask(OpType.update));
    threadPool.submit(new ChangeTask(OpType.replace));
    threadPool.submit(new ChangeTask(OpType.delete));
}

ChangeTask 实现逻辑如下:

while (true) {
    logger.info("ChangeTask {}", opType);
    if (opType == OpType.insert) {
        doInsert();
    } else if (opType == OpType.update) {
        doUpdate();
    } else if (opType == OpType.replace) {
        doReplace();
    } else if (opType == OpType.delete) {
        doDelete();
    }
    sleep(200);
    long currentAt = System.currentTimeMillis();
    if (currentAt - startAt > change_during) {
        break;
    }
}

每一个变更任务会不断对topic产生写操作,触发一系列ChangeEvent产生。

  • doInsert:生成随机频道的topic后,执行insert
  • doUpdate:随机取得一个topic,将其channel字段改为随机值,执行update
  • doReplace:随机取得一个topic,将其channel字段改为随机值,执行replace
  • doDelete:随机取得一个topic,执行delete

以doUpdate为例,实现代码如下:

private void doUpdate() {
    MongoCollection<Document> topicCollection = getCollection(coll_topic);

    Document random = getRandom();
    if (random == null) {
        logger.info("update skip");
        return;
    }

    String oldChannel = random.getString(field_channel);
    Channel channel = Channel.random();

    random.put(field_channel, channel.getOldName());
    random.put("createTime", new Date());
    topicCollection.updateOne(new Document("_id", random.get("_id")), new Document("$set", random));

    counter.onChange(oldChannel, channel.getOldName());
}

启动一个全量迁移任务,将 topic 表中数据迁移到 topic_new 新表

final MongoCollection<Document> topicCollection = getCollection(coll_topic);
final MongoCollection<Document> topicNewCollection = getCollection(coll_topic_new);

Document maxDoc = topicCollection.find().sort(new Document("_id", -1)).first();
if (maxDoc == null) {
    logger.info("FullTransferTask detect no data, quit.");
    return;
}

ObjectId maxID = maxDoc.getObjectId("_id");
logger.info("FullTransferTask maxId is {}..", maxID.toHexString());

AtomicInteger count = new AtomicInteger(0);

topicCollection.find(new Document("_id", new Document("$lte", maxID)))
        .forEach(new Consumer<Document>() {

            @Override
            public void accept(Document topic) {
                Document topicNew = new Document(topic);
                // channel转换
                String oldChannel = topic.getString(field_channel);
                topicNew.put(field_channel, Channel.toNewName(oldChannel));

                topicNewCollection.insertOne(topicNew);
                if (count.incrementAndGet() % 100 == 0) {
                    logger.info("FullTransferTask progress: {}", count.get());
                }
            }

        });
logger.info("FullTransferTask finished, count: {}", count.get());

在全量迁移开始前,先获得当前时刻的的最大 _id 值(可以将此值记录下来)作为终点。
随后逐个完成迁移转换。

在全量迁移完成后,便开始最后一步:增量迁移

注:增量迁移过程中,变更操作仍然在进行

final MongoCollection<Document> topicIncrCollection = getCollection(coll_topic_incr);
final MongoCollection<Document> topicNewCollection = getCollection(coll_topic_new);

ObjectId currentId = null;
Document sort = new Document("_id", 1);
MongoCursor<Document> cursor = null;

// 批量大小
int batchSize = 100;
AtomicInteger count = new AtomicInteger(0);

try {
    while (true) {

        boolean isWatchTaskStillRunning = watchFlag.getCount() > 0;

        // 按ID增量分段拉取
        if (currentId == null) {
            cursor = topicIncrCollection.find().sort(sort).limit(batchSize).iterator();
        } else {
            cursor = topicIncrCollection.find(new Document("_id", new Document("$gt", currentId)))
                    .sort(sort).limit(batchSize).iterator();
        }

        boolean hasIncrRecord = false;

        while (cursor.hasNext()) {
            hasIncrRecord = true;

            Document incrDoc = cursor.next();

            OperationType opType = OperationType.fromString(incrDoc.getString(field_op));
            ObjectId docId = incrDoc.getObjectId(field_key);

            // 记录当前ID
            currentId = incrDoc.getObjectId("_id");

            if (opType == OperationType.DELETE) {

                topicNewCollection.deleteOne(new Document("_id", docId));
            } else {

                Document doc = incrDoc.get(field_data, Document.class);

                // channel转换
                String oldChannel = doc.getString(field_channel);
                doc.put(field_channel, Channel.toNewName(oldChannel));

                // 启用upsert
                UpdateOptions options = new UpdateOptions().upsert(true);

                topicNewCollection.replaceOne(new Document("_id", docId),
                        incrDoc.get(field_data, Document.class), options);
            }

            if (count.incrementAndGet() % 10 == 0) {
                logger.info("IncrTransferTask progress, count: {}", count.get());
            }
        }

        // 当watch停止工作(没有更多变更),同时也没有需要处理的记录时,跳出
        if (!isWatchTaskStillRunning && !hasIncrRecord) {
            break;
        }

        sleep(200);
    }
} catch (Exception e) {
    logger.error("IncrTransferTask ERROR", e);
}

增量迁移的实现是一个不断 tail 的过程,利用 **_id 字段的有序特性 ** 进行分段迁移;
即记录下当前处理的 _id 值,循环拉取在 该 _id 值之后的记录进行处理。

增量表(topic_incr)中除了DELETE变更之外,其余的类型都保留了整个文档,
因此可直接利用 replace + upsert 追加到新表。

最后,运行整个程序

[2018-07-26 19:44:16] INFO ~ IncrTransferTask progress, count: 2160
[2018-07-26 19:44:16] INFO ~ IncrTransferTask progress, count: 2170
[2018-07-26 19:44:27] INFO ~ all change task has stop, watch task quit.
[2018-07-26 19:44:27] INFO ~ IncrTransferTask finished, count: 2175
[2018-07-26 19:44:27] INFO ~ TYPE 美食:1405
[2018-07-26 19:44:27] INFO ~ TYPE 宠物:1410
[2018-07-26 19:44:27] INFO ~ TYPE 征婚:1428
[2018-07-26 19:44:27] INFO ~ TYPE 家居:1452
[2018-07-26 19:44:27] INFO ~ TYPE 教育:1441
[2018-07-26 19:44:27] INFO ~ TYPE 情感:1434
[2018-07-26 19:44:27] INFO ~ TYPE 旅游:1457
[2018-07-26 19:44:27] INFO ~ ALLCHANGE 12175
[2018-07-26 19:44:27] INFO ~ ALLWATCH 2175

查看 topic 表和 topic_new 表,发现两者数量是相同的。
为了进一步确认一致性,我们对两个表的分别做一次聚合统计:

topic表

db.topic.aggregate([{
    "$group":{
        "_id":"$channel",
        "total": {"$sum": 1}
        }
    },
    {
        "$sort": {"total":-1}
        }
    ])

topic_new表

db.topic_new.aggregate([{
    "$group":{
        "_id":"$channel",
        "total": {"$sum": 1}
        }
    },
    {
        "$sort": {"total":-1}
        }
    ])

前者输出结果:

未分类

后者输出结果:

未分类

前后对比的结果是一致的!

五、后续优化

前面的章节演示了一个增量迁移的样例,在投入到线上运行之前,这些代码还得继续优化:

  • 写入性能,线上的数据量可能会达到亿级,在全量、增量迁移时应采用合理的批量化处理;
    另外可以通过增加并发线程,添置更多的Worker,分别对不同业务库、不同表进行处理以提升效率。
    增量表存在幂等性,即回放多次其最终结果还是一致的,但需要保证表级有序,即一个表同时只有一个线程在进行增量回放。

  • 容错能力,一旦 watch 监听任务出现异常,要能够从更早的时间点开始(使用startAtOperationTime参数),
    而如果写入时发生失败,要支持重试。

  • 回溯能力,做好必要的跟踪记录,比如将转换失败的ID号记录下来,旧系统的数据需要保留,
    以免在事后追究某个数据问题时找不着北。

  • 数据转换,新旧业务的差异不会很简单,通常需要借助大量的转换表来完成。

  • 一致性检查,需要根据业务特点开发自己的一致性检查工具,用来证明迁移后数据达到想要的一致性级别。

BTW,数据迁移一定要结合业务特性、架构差异来做考虑,否则还是在耍流氓。

六、小结

服务化系统中扩容、升级往往会进行数据迁移,对于业务量大,中断敏感的系统通常会采用平滑迁移的方式。
MongoDB 3.6 版本后提供了 Change Stream 功能以支持应用订阅数据的变更事件流,
本文使用 Stream 功能实现了增量平滑迁移的例子,这是一次尝试,相信后续这样的应用场景会越来越多。

mongodb分布式集群搭建手记

摘要: 一、架构简介 目标 单机搭建mongodb分布式集群(副本集 + 分片集群),演示mongodb分布式集群的安装部署、简单操作。 说明 在同一个vm启动由两个分片组成的分布式集群,每个分片都是一个PSS(Primary-Secondary-Secondary)模式的数据副本集; Config副本集采用PSS(Primary-Secondary-Secondary)模式。

一、架构简介

目标

单机搭建mongodb分布式集群(副本集 + 分片集群),演示mongodb分布式集群的安装部署、简单操作。

未分类

说明

在同一个vm启动由两个分片组成的分布式集群,每个分片都是一个PSS(Primary-Secondary-Secondary)模式的数据副本集;
Config副本集采用PSS(Primary-Secondary-Secondary)模式。

二、配置说明

端口通讯

当前集群中存在shard、config、mongos共12个进程节点,端口矩阵编排如下:

编号  实例类型
1   mongos
2   mongos
3   mongos
4   config
5   config
6   config
7   shard1
8   shard1
9   shard1
10  shard2
11  shard2
12  shard2

内部鉴权

节点间鉴权采用keyfile方式实现鉴权,mongos与分片之间、副本集节点之间共享同一套keyfile文件。 官方说明

账户设置

管理员账户:admin/Admin@01,具有集群及所有库的管理权限
应用账号:appuser/AppUser@01,具有appdb的owner权限

关于初始化权限

keyfile方式默认会开启鉴权,而针对初始化安装的场景,Mongodb提供了localhost-exception机制,
可以在首次安装时通过本机创建用户、角色,以及副本集初始操作。

三、准备工作

1. 下载安装包

官方地址:https://www.mongodb.com/download-center

wget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel70-3.6.3.tgz

2. 部署目录

解压压缩文件,将bin目录拷贝到目标路径/opt/local/mongo-cluster,参考以下命令:

tar -xzvf mongodb-linux-x86_64-rhel70-3.6.3.tgz
mkdir -p  /opt/local/mongo-cluster
cp -r mongodb-linux-x86_64-rhel70-3.6.3/bin  /opt/local/mongo-cluster

3. 创建配置文件

cd /opt/local/mongo-cluster
mkdir conf 

A. mongod 配置文件 mongo_node.conf

mongo_node.conf 作为mongod实例共享的配置文件,内容如下:

storage:
    engine: wiredTiger
    directoryPerDB: true
    journal:
        enabled: true
systemLog:
    destination: file
    logAppend: true
operationProfiling:
  slowOpThresholdMs: 10000
replication:
    oplogSizeMB: 10240
processManagement:
    fork: true
net:
    http:
      enabled: false
security:
    authorization: "enabled"

选项说明可参考这里

B. mongos 配置文件 mongos.conf

systemLog:
    destination: file
    logAppend: true
processManagement:
    fork: true
net:
    http:
      enabled: false

4. 创建keyfile文件

cd /opt/local/mongo-cluster
mkdir keyfile
openssl rand -base64 756 > mongo.key
chmod 400 mongo.key
mv mongo.key keyfile

mongo.key 采用随机算法生成,用作节点内部通讯的密钥文件

5. 创建节点目录

WORK_DIR=/opt/local/mongo-cluster
mkdir -p $WORK_DIR/nodes/config/n1/data
mkdir -p $WORK_DIR/nodes/config/n2/data
mkdir -p $WORK_DIR/nodes/config/n3/data

mkdir -p $WORK_DIR/nodes/shard1/n1/data
mkdir -p $WORK_DIR/nodes/shard1/n2/data
mkdir -p $WORK_DIR/nodes/shard1/n3/data

mkdir -p $WORK_DIR/nodes/shard2/n1/data
mkdir -p $WORK_DIR/nodes/shard2/n2/data
mkdir -p $WORK_DIR/nodes/shard2/n3/data

mkdir -p $WORK_DIR/nodes/mongos/n1
mkdir -p $WORK_DIR/nodes/mongos/n2
mkdir -p $WORK_DIR/nodes/mongos/n3

以config 节点1 为例,nodes/config/n1/data是数据目录,而pid文件、日志文件都存放于n1目录
以mongos 节点1 为例,nodes/mongos/n1 存放了pid文件和日志文件

四、搭建集群

1. Config副本集

按以下脚本启动3个Config实例

WORK_DIR=/opt/local/mongo-cluster
KEYFILE=$WORK_DIR/keyfile/mongo.key
CONFFILE=$WORK_DIR/conf/mongo_node.conf
MONGOD=$WORK_DIR/bin/mongod

$MONGOD --port 26001 --configsvr --replSet configReplSet --keyFile $KEYFILE --dbpath $WORK_DIR/nodes/config/n1/data --pidfilepath $WORK_DIR/nodes/config/n1/db.pid --logpath $WORK_DIR/nodes/config/n1/db.log --config $CONFFILE

$MONGOD --port 26002 --configsvr --replSet configReplSet --keyFile $KEYFILE --dbpath $WORK_DIR/nodes/config/n2/data --pidfilepath $WORK_DIR/nodes/config/n2/db.pid --logpath $WORK_DIR/nodes/config/n2/db.log --config $CONFFILE

$MONGOD --port 26003 --configsvr --replSet configReplSet --keyFile $KEYFILE --dbpath $WORK_DIR/nodes/config/n3/data --pidfilepath $WORK_DIR/nodes/config/n3/db.pid --logpath $WORK_DIR/nodes/config/n3/db.log --config $CONFFILE

待成功启动后,输出日志如下:

about to fork child process, waiting until server is ready for connections.
forked process: 4976
child process started successfully, parent exiting

此时通过ps 命令也可以看到3个启动的进程实例。

连接其中一个Config进程,执行副本集初始化

./bin/mongo --port 26001 --host 127.0.0.1
> MongoDB server version: 3.4.7
> cfg={
    _id:"configReplSet", 
    configsvr: true,
    members:[
        {_id:0, host:'127.0.0.1:26001'},
        {_id:1, host:'127.0.0.1:26002'}, 
        {_id:2, host:'127.0.0.1:26003'}
    ]};
rs.initiate(cfg);

其中configsvr:true指明这是一个用于分片集群的Config副本集。
关于副本集配置可参考这里

2. 创建分片

按以下脚本启动Shard1的3个实例

WORK_DIR=/opt/local/mongo-cluster
KEYFILE=$WORK_DIR/keyfile/mongo.key
CONFFILE=$WORK_DIR/conf/mongo_node.conf
MONGOD=$WORK_DIR/bin/mongod

echo "start shard1 replicaset"

$MONGOD --port 27001 --shardsvr --replSet shard1 --keyFile $KEYFILE --dbpath $WORK_DIR/nodes/shard1/n1/data --pidfilepath $WORK_DIR/nodes/shard1/n1/db.pid --logpath $WORK_DIR/nodes/shard1/n1/db.log --config $CONFFILE
$MONGOD --port 27002 --shardsvr --replSet shard1 --keyFile $KEYFILE --dbpath $WORK_DIR/nodes/shard1/n2/data --pidfilepath $WORK_DIR/nodes/shard1/n2/db.pid --logpath $WORK_DIR/nodes/shard1/n2/db.log --config $CONFFILE
$MONGOD --port 27003 --shardsvr --replSet shard1 --keyFile $KEYFILE --dbpath $WORK_DIR/nodes/shard1/n3/data --pidfilepath $WORK_DIR/nodes/shard1/n3/db.pid --logpath $WORK_DIR/nodes/shard1/n3/db.log --config $CONFFILE

待成功启动后,输出日志如下:

about to fork child process, waiting until server is ready for connections.
forked process: 5976
child process started successfully, parent exiting

此时通过ps 命令也可以看到3个启动的Shard进程实例。

连接其中一个Shard进程,执行副本集初始化

./bin/mongo --port 27001 --host 127.0.0.1
> MongoDB server version: 3.4.7
> cfg={
    _id:"shard1", 
    members:[
        {_id:0, host:'127.0.0.1:27001'},
        {_id:1, host:'127.0.0.1:27002'}, 
        {_id:2, host:'127.0.0.1:27003'}
    ]};
rs.initiate(cfg);

参考以上步骤,启动Shard2的3个实例进程,并初始化副本集。

3. 启动mongos路由

执行以下脚本启动3个mongos进程

WORK_DIR=/opt/local/mongo-cluster
KEYFILE=$WORK_DIR/keyfile/mongo.key
CONFFILE=$WORK_DIR/conf/mongos.conf
MONGOS=$WORK_DIR/bin/mongos

echo "start mongos instances"
$MONGOS --port=25001 --configdb configReplSet/127.0.0.1:26001,127.0.0.1:26002,127.0.0.1:26003 --keyFile $KEYFILE --pidfilepath $WORK_DIR/nodes/mongos/n1/db.pid --logpath $WORK_DIR/nodes/mongos/n1/db.log --config $CONFFILE
$MONGOS --port 25002 --configdb configReplSet/127.0.0.1:26001,127.0.0.1:26002,127.0.0.1:26003 --keyFile $KEYFILE --pidfilepath $WORK_DIR/nodes/mongos/n2/db.pid --logpath $WORK_DIR/nodes/mongos/n2/db.log --config $CONFFILE
$MONGOS --port 25003 --configdb configReplSet/127.0.0.1:26001,127.0.0.1:26002,127.0.0.1:26003 --keyFile $KEYFILE --pidfilepath $WORK_DIR/nodes/mongos/n3/db.pid --logpath $WORK_DIR/nodes/mongos/n3/db.log --config $CONFFILE

待成功启动后,通过ps命令看到mongos进程:

dbuser      7903    1  0 17:49 ?        00:00:00 /opt/local/mongo-cluster/bin/mongos --port=25001 --configdb configReplSet/127.0.0.1:26001,127.0.0.1:26002,127.0.0.1:26003 --keyFile /opt/local/mongo-cluster/keyfile/mongo.key --pidfilepath /opt/local/mongo-cluster/nodes/mongos/n1/db.pid --logpath /opt/local/mongo-cluster/nodes/mongos/n1/db.log --config /opt/local/mongo-cluster/conf/mongos.conf
dbuser      7928    1  0 17:49 ?        00:00:00 /opt/local/mongo-cluster/bin/mongos --port 25002 --configdb configReplSet/127.0.0.1:26001,127.0.0.1:26002,127.0.0.1:26003 --keyFile /opt/local/mongo-cluster/keyfile/mongo.key --pidfilepath /opt/local/mongo-cluster/nodes/mongos/n2/db.pid --logpath /opt/local/mongo-cluster/nodes/mongos/n2/db.log --config /opt/local/mongo-cluster/conf/mongos.conf
dbuser      7954    1  0 17:49 ?        00:00:00 /opt/local/mongo-cluster/bin/mongos --port 25003 --configdb configReplSet/127.0.0.1:26001,127.0.0.1:26002,127.0.0.1:26003 --keyFile /opt/local/mongo-cluster/keyfile/mongo.key --pidfilepath /opt/local/mongo-cluster/nodes/mongos/n3/db.pid --logpath /opt/local/mongo-cluster/nodes/mongos/n3/db.log --config /opt/local/mongo-cluster/conf/mongos.conf

接入其中一个mongos实例,执行添加分片操作:

./bin/mongo --port 25001 --host 127.0.0.1
mongos> MongoDB server version: 3.4.7
mongos> sh.addShard("shard1/127.0.0.1:27001")
{ "shardAdded" : "shard1", "ok" : 1 }
mongos> sh.addShard("shard2/127.0.0.1:27004")
{ "shardAdded" : "shard2", "ok" : 1 }

至此,分布式集群架构启动完毕,但进一步操作需要先添加用户。

4. 初始化用户

接入其中一个mongos实例,添加管理员用户

use admin
db.createUser({
    user:'admin',pwd:'Admin@01',
    roles:[
        {role:'clusterAdmin',db:'admin'},
        {role:'userAdminAnyDatabase',db:'admin'},
        {role:'dbAdminAnyDatabase',db:'admin'},
        {role:'readWriteAnyDatabase',db:'admin'}
]})

当前admin用户具有集群管理权限、所有数据库的操作权限。
需要注意的是,在第一次创建用户之后,localexception不再有效,接下来的所有操作要求先通过鉴权。

use admin
db.auth('admin','Admin@01')

检查集群状态

mongos> sh.status()
--- Sharding Status --- 
  sharding version: {
    "_id" : 1,
    "minCompatibleVersion" : 5,
    "currentVersion" : 6,
    "clusterId" : ObjectId("5aa39c3e915210dc501a1dc8")
}
  shards:
    {  "_id" : "shard1",  "host" : "shard1/127.0.0.1:27001,127.0.0.1:27002,127.0.0.1:27003",  "state" : 1 }
    {  "_id" : "shard2",  "host" : "shard2/127.0.0.1:27004,127.0.0.1:27005,127.0.0.1:27006",  "state" : 1 }
  active mongoses:
    "3.4.7" : 3
autosplit:
    Currently enabled: yes

集群用户

分片集群中的访问都会通过mongos入口,而鉴权数据是存储在config副本集中的,即config实例中system.users数据库存储了集群用户及角色权限配置。mongos与shard实例则通过内部鉴权(keyfile机制)完成,因此shard实例上可以通过添加本地用户以方便操作管理。在一个副本集上,只需要在Primary节点上添加用户及权限,相关数据会自动同步到Secondary节点。
关于集群鉴权https://docs.mongodb.com/manual/core/security-users/?spm=a2c4e.11153940.blogcont617224.15.77552d7eSApnpI#sharded-cluster-users
在本案例中,我们为两个分片副本集都添加了本地admin用户。

通过mongostat工具可以显示集群所有角色:

          host insert query update delete getmore command dirty used flushes mapped vsize  res faults qrw arw net_in net_out conn    set repl                time
127.0.0.1:27001    *0    *0    *0    *0      0    6|0  0.1% 0.1%      0        1.49G 44.0M    n/a 0|0 0|0  429b  56.1k  25 shard1  PRI Mar 10 19:05:13.928
127.0.0.1:27002    *0    *0    *0    *0      0    7|0  0.1% 0.1%      0        1.43G 43.0M    n/a 0|0 0|0  605b  55.9k  15 shard1  SEC Mar 10 19:05:13.942
127.0.0.1:27003    *0    *0    *0    *0      0    7|0  0.1% 0.1%      0        1.43G 43.0M    n/a 0|0 0|0  605b  55.9k  15 shard1  SEC Mar 10 19:05:13.946
127.0.0.1:27004    *0    *0    *0    *0      0    6|0  0.1% 0.1%      0        1.48G 43.0M    n/a 0|0 0|0  546b  55.8k  18 shard2  PRI Mar 10 19:05:13.939
127.0.0.1:27005    *0    *0    *0    *0      0    6|0  0.1% 0.1%      0        1.43G 42.0M    n/a 0|0 0|0  540b  54.9k  15 shard2  SEC Mar 10 19:05:13.944
127.0.0.1:27006    *0    *0    *0    *0      0    6|0  0.1% 0.1%      0        1.46G 44.0M    n/a 0|0 0|0  540b  54.9k  17 shard2  SEC Mar 10 19:05:13.936

五、数据操作

在案例中,创建appuser用户、为数据库实例appdb启动分片。

use appdb
db.createUser({user:'appuser',pwd:'AppUser@01',roles:[{role:'dbOwner',db:'appdb'}]})
sh.enableSharding("appdb")

创建集合book,为其执行分片初始化。

use appdb
db.createCollection("book")
db.device.ensureIndex({createTime:1})
sh.shardCollection("appdb.book", {bookId:"hashed"}, false, { numInitialChunks: 4} )

继续往device集合写入1000W条记录,观察chunks的分布情况

use appdb
var cnt = 0;
for(var i=0; i<1000; i++){
    var dl = [];
    for(var j=0; j<100; j++){
        dl.push({
                "bookId" : "BBK-" + i + "-" + j,
                "type" : "Revision",
                "version" : "IricSoneVB0001",
                "title" : "Jackson's Life",
                "subCount" : 10,
                "location" : "China CN Shenzhen Futian District",
                "author" : {
                      "name" : 50,
                      "email" : "[email protected]",
                      "gender" : "female"
                },
                "createTime" : new Date()
            });
      }
      cnt += dl.length;
      db.book.insertMany(dl);
      print("insert ", cnt);
}

执行db.book.getShardDistribution(),输出如下:

Shard shard1 at shard1/127.0.0.1:27001,127.0.0.1:27002,127.0.0.1:27003
data : 13.41MiB docs : 49905 chunks : 2
estimated data per chunk : 6.7MiB
estimated docs per chunk : 24952

Shard shard2 at shard2/127.0.0.1:27004,127.0.0.1:27005,127.0.0.1:27006
data : 13.46MiB docs : 50095 chunks : 2
estimated data per chunk : 6.73MiB
estimated docs per chunk : 25047

Totals
data : 26.87MiB docs : 100000 chunks : 4
Shard shard1 contains 49.9% data, 49.9% docs in cluster, avg obj size on shard : 281B
Shard shard2 contains 50.09% data, 50.09% docs in cluster, avg obj size on shard : 281B

六、总结

Mongodb集群架构由Mongos、Config副本集和多个分片组成;
安装过程中先初始化Config副本集、分片副本集,最后通过Mongos添加分片
Config副本集存储了集群访问的用户及角色权限,为了方便管理,可以给分片副本集添加本地用户
Mongodb提供了LocalException机制,首次安装数据库时可以在本机直接添加用户

MongoDB 4.0 事务实现解析

基于飞天分布式系统和高性能存储,提供三节点副本集的高可用架构,容灾切换,故障迁移完全透明化。并提供专业的数据库在线扩容、备份回滚、性能优化等解决方案。

上个月底 MongoDB Wolrd 宣布发布 MongoDB 4.0, 支持复制集多文档事务,阿里云数据库团队 研发工程师第一时间对事务功能的时间进行了源码分析,解析事务实现机制。

MongoDB 4.0 引入的事务功能,支持多文档ACID特性,例如使用 mongo shell 进行事务操作

> s = db.getMongo().startSession()
session { "id" : UUID("3bf55e90-5e88-44aa-a59e-a30f777f1d89") }
> s.startTransaction()
> db.coll01.insert({x: 1, y: 1})
WriteResult({ "nInserted" : 1 })
> db.coll02.insert({x: 1, y: 1})
WriteResult({ "nInserted" : 1 })
> s.commitTransaction()  (或者 s.abortTransaction()回滚事务)

支持 MongoDB 4.0 的其他语言 Driver 也封装了事务相关接口,用户需要创建一个 Session,然后在 Session 上开启事务,提交事务。例如

python 版本

with client.start_session() as s:
    s.start_transaction()
    collection_one.insert_one(doc_one, session=s)
    collection_two.insert_one(doc_two, session=s)
    s.commit_transaction()

java 版本

try (ClientSession clientSession = client.startSession()) {
   clientSession.startTransaction();
   collection.insertOne(clientSession, docOne);
   collection.insertOne(clientSession, docTwo);
   clientSession.commitTransaction();
}

Session

Session 是 MongoDB 3.6 版本引入的概念,引入这个特性主要就是为实现多文档事务做准备。Session 本质上就是一个「上下文」。

在以前的版本,MongoDB 只管理单个操作的上下文,mongod 服务进程接收到一个请求,为该请求创建一个上下文 (源码里对应 OperationContext),然后在服务整个请求的过程中一直使用这个上下文,内容包括,请求耗时统计、请求占用的锁资源、请求使用的存储快照等信息。有了 Session 之后,就可以让多个请求共享一个上下文,让多个请求产生关联,从而有能力支持多文档事务。

每个 Session 包含一个唯一的标识 lsid,在 4.0 版本里,用户的每个请求可以指定额外的扩展字段,主要包括:

  • lsid: 请求所在 Session 的 ID, 也称 logic session id
  • txnNmuber: 请求对应的事务号,事务号在一个 Session 内必须单调递增
  • stmtIds: 对应请求里每个操作(以insert为例,一个insert命令可以插入多个文档)操作ID

实际上,用户在使用事务时,是不需要理解这些细节,MongoDB Driver 会自动处理,Driver 在创建 Session 时分配 lsid,接下来这个 Session 里的所以操作,Driver 会自动为这些操作加上 lsid,如果是事务操作,会自动带上 txnNumber。

值得一提的是,Session lsid 可以通过调用 startSession 命令让 server 端分配,也可以客户端自己分配,这样可以节省一次网络开销;而事务的标识,MongoDB 并没有提供一个单独的 startTransaction的命令,txnNumber 都是直接由 Driver 来分配的,Driver 只需保证一个 Session 内,txnNumber 是递增的,server 端收到新的事务请求时,会主动的开始一个新事务。

MongoDB 在 startSession 时,可以指定一系列的选项,用于控制 Session 的访问行为,主要包括:

  • causalConsistency: 是否提供 causal consistency 的语义,如果设置为true,不论从哪个节点读取,MongoDB 会保证 “read your own write” 的语义。参考 causal consistency
  • readConcern:参考 MongoDB readConcern 原理解析
  • writeConcern:参考 MongoDB writeConcern 原理解析
  • readPreference: 设置读取时选取节点的规则,参考 read preference
  • retryWrites:如果设置为true,在复制集场景下,MongoDB 会自动重试发生重新选举的场景; 参考retryable write

ACID

Atomic

针对多文档的事务操作,MongoDB 提供 “All or nothing” 的原子语义保证。

Consistency

太难解释了,还有抛弃 Consistency 特性的数据库?

Isolation

MongoDB 提供 snapshot 隔离级别,在事务开始创建一个 WiredTiger snapshot,然后在整个事务过程中使用这个快照提供事务读。

Durability

事务使用 WriteConcern {j: ture} 时,MongoDB 一定会保证事务日志提交才返回,即使发生 crash,MongoDB 也能根据事务日志来恢复;而如果没有指定 {j: true} 级别,即使事务提交成功了,在 crash recovery 之后,事务的也可能被回滚掉。

事务与复制

复制集配置下,MongoDB 整个事务在提交时,会记录一条 oplog(oplog 是一个普通的文档,所以目前版本里事务的修改加起来不能超过文档大小 16MB的限制),包含事务里所有的操作,备节点拉取oplog,并在本地重放事务操作。

事务 oplog 示例,包含事务操作的 lsid,txnNumber,以及事务内所有的操作日志(applyOps字段)

"ts" : Timestamp(1530696933, 1), "t" : NumberLong(1), "h" : NumberLong("4217817601701821530"), "v" : 2, "op" : "c", "ns" : "admin.$cmd", "wall" : ISODate("2018-07-04T09:35:33.549Z"), "lsid" : { "id" : UUID("e675c046-d70b-44c2-ad8d-3f34f2019a7e"), "uid" : BinData(0,"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=") }, "txnNumber" : NumberLong(0), "stmtId" : 0, "prevOpTime" : { "ts" : Timestamp(0, 0), "t" : NumberLong(-1) }, "o" : { "applyOps" : [ { "op" : "i", "ns" : "test.coll2", "ui" : UUID("a49ccd80-6cfc-4896-9740-c5bff41e7cce"), "o" : { "_id" : ObjectId("5b3c94d4624d615ede6097ae"), "x" : 20000 } }, { "op" : "i", "ns" : "test.coll3", "ui" : UUID("31d7ae62-fe78-44f5-ba06-595ae3b871fc"), "o" : { "_id" : ObjectId("5b3c94d9624d615ede6097af"), "x" : 20000 } } ] } }

整个重放过程如下:

  1. 获取当前 Batch (后台不断拉取 oplog 放入 Batch)
  2. 设置 OplogTruncateAfterPoint 时间戳为 Batch里第一条 oplog 时间戳 (存储在 local.replset.oplogTruncateAfterPoint 集合)
  3. 写入 Batch 里所有的 oplog 到 local.oplog.rs 集合,根据 oplog 条数,如果数量较多,会并发写入加速
  4. 清理 OplogTruncateAfterPoint, 标识 oplog 完全成功写入;如果在本步骤完成前 crash,重启恢复时,发现 oplogTruncateAfterPoint 被设置,会将 oplog 截短到该时间戳,以恢复到一致的状态点。
  5. 将 oplog 划分到到多个线程并发重放,为了提升并发效率,事务产生的 oplog 包含的所有修改操作,跟一条普通单条操作的 oplog 一样,会据文档ID划分到多个线程。
  6. 更新 ApplyThrough 时间戳为 Batch 里最后一条 oplog 时间戳,标识下一次重启后,从该位置重新同步,如果本步骤之前失败,重启恢复时,会从 ApplyThrough 上一次的值(上一个 Batch 最后一条 oplog)拉取 oplog。
  7. 更新 oplog 可见时间戳,如果有其他节点从该备节点同步,此时就能读到这部分新写入的 oplog
  8. 更新本地 Snapshot(时间戳),新的写入将对用户可见。

事务与存储引擎

事务时序统一

WiredTiger 很早就支持事务,在 3.x 版本里,MongoDB 就通过 WiredTiger 事务,来保证一条修改操作,对数据、索引、oplog 三者修改的原子性。但实际上 MongoDB 经过多个版本的迭代,才提供了事务接口,核心难点就是时序问题。

MongoDB 通过 oplog 时间戳来标识全局顺序,而 WiredTiger 通过内部的事务ID来标识全局顺序,在实现上,2者没有任何关联。这就导致在并发情况下, MongoDB 看到的事务提交顺序与 WiredTiger 看到的事务提交顺序不一致。

为解决这个问题,WiredTier 3.0 引入事务时间戳(transaction timestamp)机制,应用程序可以通过 WT_SESSION::timestamp_transaction 接口显式的给 WiredTiger 事务分配 commit timestmap,然后就可以实现指定时间戳读(read “as of” a timestamp)。有了 read “as of” a timestamp 特性后,在重放 oplog 时,备节点上的读就不会再跟重放 oplog 有冲突了,不会因重放 oplog 而阻塞读请求,这是4.0版本一个巨大的提升。

/*
 * __wt_txn_visible --
 *  Can the current transaction see the given ID / timestamp?
 */
static inline bool
__wt_txn_visible(
    WT_SESSION_IMPL *session, uint64_t id, const wt_timestamp_t *timestamp)
{
    if (!__txn_visible_id(session, id))
        return (false);

    /* Transactions read their writes, regardless of timestamps. */
    if (F_ISSET(&session->txn, WT_TXN_HAS_ID) && id == session->txn.id)
        return (true);

#ifdef HAVE_TIMESTAMPS
    {
    WT_TXN *txn = &session->txn;

    /* Timestamp check. */
    if (!F_ISSET(txn, WT_TXN_HAS_TS_READ) || timestamp == NULL)
        return (true);

    return (__wt_timestamp_cmp(timestamp, &txn->read_timestamp) <= 0);
    }
#else
    WT_UNUSED(timestamp);
    return (true);
#endif
}

从上面的代码可以看到,再引入事务时间戳之后,在可见性判断时,还会额外检查时间戳,上层读取时指定了时间戳读,则只能看到该时间戳以前的数据。而 MongoDB 在提交事务时,会将 oplog 时间戳跟事务关联,从而达到 MongoDB Server 层时序与 WiredTiger 层时序一致的目的。

事务对 cache 的影响

WiredTiger(WT) 事务会打开一个快照,而快照的存在的 WiredTiger cache evict 是有影响的。一个 WT page 上,有N个版本的修改,如果这些修改没有全局可见(参考 __wt_txn_visible_all),这个 page 是不能 evict 的(参考 __wt_page_can_evict)。

在 3.x 版本里,一个写请求对数据、索引、oplog的修改会放到一个 WT 事务里,事务的提交由 MongoDB 自己控制,MongoDB 会尽可能快的提交事务,完成写清求;但 4.0 引入事务之后,事务的提交由应用程序控制,可能出现一个事务修改很多,并且很长时间不提交,这会给 WT cache evict 造成很大的影响,如果大量内存无法 evict,最终就会进入 cache stuck 状态。

为了尽量减小 WT cache 压力,MongoDB 4.0 事务功能有一些限制,但事务资源占用超过一定阈值时,会自动 abort 来释放资源。规则包括

  1. 事务的生命周期不能超过 transactionLifetimeLimitSeconds (默认60s),该配置可在线修改
  2. 事务修改的文档数不能超过 1000 ,不可修改
  3. 事务修改产生的 oplog 不能超过 16mb,这个主要是 MongoDB 文档大小的限制, oplog 也是一个普通的文档,也必须遵守这个约束。

Read as of a timestamp 与 oldest timestamp

Read as of a timestamp 依赖 WiredTiger 在内存里维护多版本,每个版本跟一个时间戳关联,只要 MongoDB 层可能需要读的版本,引擎层就必须维护这个版本的资源,如果保留的版本太多,也会对 WT cache 产生很大的压力。

WiredTiger 提供设置 oldest timestamp 的功能,允许由 MongoDB 来设置该时间戳,含义是Read as of a timestamp 不会提供更小的时间戳来进行一致性读,也就是说,WiredTiger 无需维护 oldest timestamp 之前的所有历史版本。MongoDB 层需要频繁(及时)更新 oldest timestamp,避免让 WT cache 压力太大。

引擎层 Rollback 与 stable timestamp

在 3.x 版本里,MongoDB 复制集的回滚动作是在 Server 层面完成,但节点需要回滚时,会根据要回滚的 oplog 不断应用相反的操作,或从回滚源上读取最新的版本,整个回滚操作效率很低。

4.0 版本实现了存储引擎层的回滚机制,当复制集节点需要回滚时,直接调用 WiredTiger 接口,将数据回滚到某个稳定版本(实际上就是一个 Checkpoint),这个稳定版本则依赖于 stable timestamp。WiredTiger 会确保 stable timestamp 之后的数据不会写到 Checkpoint里,MongoDB 根据复制集的同步状态,当数据已经同步到大多数节点时(Majority commited),会更新 stable timestamp,因为这些数据已经提交到大多数节点了,一定不会发生 ROLLBACK,这个时间戳之前的数据就都可以写到 Checkpoint 里了。

MongoDB 需要确保频繁(及时)的更新 stable timestamp,否则影响 WT Checkpoint 行为,导致很多内存无法释放。

分布式事务

MongoDB 4.0 支持副本集多文档事务,并计划在 4.2 版本支持分片集群事务功能。下图是从 MongoDB 3.0 引入 WiredTiger 到 4.0 支持多文档事务的功能迭代图,可以发现一盘大棋即将上线,敬请期待。

未分类

基于飞天分布式系统和高性能存储,提供三节点副本集的高可用架构,容灾切换,故障迁移完全透明化。并提供专业的数据库在线扩容、备份回滚、性能优化等解决方案。
了解更多

Mongodb和mysql的区别

1. Mongodb简介及优缺点分析

Mongodb是非关系型数据库(nosql ),属于文档型数据库。文档是mongoDB中数据的基本单元,类似关系数据库的行,多个键值对有序地放置在一起便是文档,语法有点类似javascript面向对象的查询语言,它是一个面向集合的,模式自由的文档型数据库。

存储方式:虚拟内存+持久化。
查询语句:是独特的Mongodb的查询方式。
适合场景:事件的记录,内容管理或者博客平台等等。
架构特点:可以通过副本集,以及分片来实现高可用。
数据处理:数据是存储在硬盘上的,只不过需要经常读取的数据会被加载到内存中,将数据存储在物理内存中,从而达到高速读写。
成熟度与广泛度:新兴数据库,成熟度较低,Nosql数据库中最为接近关系型数据库,比较完善的DB之一,适用人群不断在增长。

优点:
快速!在适量级的内存的Mongodb的性能是非常迅速的,它将热数据存储在物理内存中,使得热数据的读写变得十分快。高扩展性,存储的数据格式是json格式!

未分类

缺点:
① mongodb不支持事务操作。
② mongodb占用空间过大。
③ 开发文档不是很完全,完善。

未分类

2. MySQL优缺点分析

优点:
在不同的引擎上有不同 的存储方式。
查询语句是使用传统的sql语句,拥有较为成熟的体系,成熟度很高。
开源数据库的份额在不断增加,mysql的份额页在持续增长。

缺点:
在海量数据处理的时候效率会显著变慢。

3. Mongodb和MySQL数据库的对比

  • 传统的关系数据库一般由数据库(database)、表(table)、记录(record)三个层次概念组成,MongoDB是由数据库(database)、集合(collection)、文档对象(document)三个层次组成。
  • MongoDB对于关系型数据库里的表,但是集合中没有列、行和关系概念,这体现了模式自由的特点。

未分类

未分类

4. MongoDB常用语句

# 连接Mongo数据库,并设置数据存储地址
mongod.exe --dbpath "d:softwareMongoDBServer3.0data"

#-----------------------#1# 数据库
# 查看所有的数据库
show dbs
# 删除当前使用的数据库
db.dropDatabase()
# 使用这个数据库(只有插入数据后完成创建数据库)
use dbt
# 查看当前使用的数据库
db
db.getName()
# 查看当前数据库状态
db.stats()
# 修复当前数据库
db.repairDatabase()
# 从一个数据库复制到另一个数据库
db.copyDatabase("mydb", "temp", "127.0.0.1");

#-----------------------#2# 集合
# 查看当前数据库下所有的集合
show collections
show tables
# 创建名称为coll集合
db.createCollection('coll')
db.createCollection("coll2", {capped:true, autoIndexId:true, size:6142800, max:10000})      # 可选参数
# 查看当前集合状态
db.coll.stats()
# 删除名称为coll集合
db.coll.drop()


#-----------------------#3# 集合数据
# 插入空数据并且直接创建名称为coll集合
db.coll.insert({})
# 插入一个或多个数据
db.coll.insert({name:'tom', age:22})
db.coll.insert([{name:'adam', age:10},{name:'john', age:23}])
# 添加数据(save方法可以修改相同id的数据)
db.coll.save({name:'allen'})
# 删除一个或所有的数据
db.coll.remove({name:'tom'})
db.coll.remove({})
# 删除符合条件的数据中的第一条
db.coll.remove({name:'tom'}, 1)
# 更改数据
db.coll.update({name:'tom', age:22}, {$set:{name:'tom', age:222}})
# 查看数据
db.coll.find()
# 查看一条数据
db.coll.findOne()
db.coll.find({}, {name:1, '_id':0})     # 1表示显示,0表示不显示(find默认显示_id)
# 格式化显示数据,使数据更加清晰明了
db.coll.find().pretty()
# 使用and,or查看数据
db.coll.find({name:'tom', age:22})      # 等同and使用
db.coll.find({$or:[{name:'tom'}, {age:21}]})        # or使用



# 操作符大于,小于,等于,不等于,大于不等于,小于不等于
db.coll.find({age: {$gt: 22}})      # 大于
db.coll.find({age: {$lt: 22}})      # 大于
db.coll.find({age: 22})      # 等于
db.coll.find({age: {$ne: 22}})      # 不等于
db.coll.find({age: {$gte: 22}})      # 大于等于
db.coll.find({age: {$lte: 22}})      # 小于等于

# 显示从skip之后limit个
db.coll.find().limit(2).skip(1)

#-----------------------# # 用户
# 3.x之后版本添加用户
use admin
db.createUser({user:'nu', pwd:'nu', roles:[{role:'readWrite',db:'admin'}]})
# 用户认证
db.auth("nu", "nu");
# 显示当前所有用户
show users;
db.system.users.find()
3.x版本删除用户
db.removeUser('nu')     # 不推荐使用,已经废弃
db.dropUser("nu");

# 当前db版本
db.version();

# 当前db的链接机器地址和端口
db.getMongo();

# 备份到备份目录
mongodump

# 从备份目录恢复备份语句。
mongorestore

MongoDb 判断字段长度的方法

查询某字段长度超过一定长度时的方法, MongoDB中可能不好处理,一般这样:

 db.test.find({
    $where:"this.F_DAQDATA.legnth>600"
});

但用$where查询时性能可能不太好,在网上搜索之后,发现使用正则可能会更好,同时判断字段是否存在:

 db.test.find({
    F_DAQDATA: {
        $type:2,        // 字段类型为2,表示有此字段,或者用: $exists: true
        $regex: /^.{600,}$/       // 长度大于600
    }     
});