zanePerfor前端性能监控平台高可用之Mongodb副本集读写分离架构

优势:

MongoDB 副本集(Replica Set)是有自动故障恢复功能的主从集群,有一个Primary节点和一个或多个Secondary节点组成。

当主节点挂掉之后,会由多个副本节点选举产生出新的主节点。(节点数请保持为基数个)。

这样就能保证应用的高可用,其中一个或多个节点挂掉之后还能正常运行和服务。

劣势:

数据丢失:主节点挂掉之后,副本节点选举出新的主节点需要一定的时间,这段时间会造成数据的丢失。

不能承受高吞吐量:副本集架构追根还是单机执行写任务,在高并发应用中性能受限,不能解决流量洪峰时的实时读写。

不能完全保证项目的高可用:在副本集的环境中,要是所有的Secondary都宕机了,只剩下Primary。最后Primary会变成Secondary,将不能再提供服务。

总结:

在大多数情况下推荐使用副本集架构,副本集架构在保证高可用的同时还能降低服务器成本,相对于集群分片来说配置也更简单,更易于维护,具体选择什么架构需要根据自己的项目来决定。

Mongodb副本集架构搭建:

Mongodb副本集搭建比较简单,你只需要根据下面的步骤一步一步操作即可(以下内容以Linux或mac为例进行环境搭建)。

一:安装Mongodb (略)

请参考: LINUX系统下安装mongodb(https://blog.seosiwei.com/detail/40)

关于副本集搭建还可参考我的另一篇文章: MongoDB主从副本集架构(https://blog.seosiwei.com/detail/39)

二:副本集搭建

(备注:鉴于成本,以下内容在单机下部署为例,多机部署只需要替换下IP即可)

1、创建数据和日志存放目录

// 数据存放目录
mkdir -p /data/replication/s0
mkdir -p /data/replication/s1
mkdir -p /data/replication/s2
// 日志存放目录
mkdir -p /data/replication/log

2、启动Mongodb服务

(下面以28100,28101,28100三个端口为例)

// 启动mongodb服务
mongod --dbpath /data/replication/s0 --logpath /data/replication/log/s0.log --fork --smallfiles --port 28100 --replSet rs1
mongod --dbpath /data/replication/s1 --logpath /data/replication/log/s1.log --fork --smallfiles --port 28101 --replSet rs1
mongod --dbpath /data/replication/s2 --logpath /data/replication/log/s2.log --fork --smallfiles --port 28102 --replSet rs1
  • --dbpath:存放数据目录

  • --logpath:存放日志目录

  • --smallfiles:是否使用较小的默认文件。默认为false,不使用。

  • --replSet: 副本集名称,副本集名称必须一致

进入28100服务设置副本集

// 登录 mongodb
mongo localhost:28100

// 切换到admin用户
use admin

// 初始化副本集
rs.initiate({_id:"rs1",members:[{_id:0,host:"127.0.0.1:28100"},{_id:1,host:"127.0.0.1:28101"},{_id:2,host:"127.0.0.1:28102"}]})

// 查看副本集状态
rs.status()

副本集设置成功之后,查看状态会看到如下信息即标识成功。

{
    "set" : "rs1",
    "date" : ISODate("2018-11-14T08:40:44.659Z"),
    "myState" : 1,
    "term" : NumberLong(2),
    "heartbeatIntervalMillis" : NumberLong(2000),
    "optimes" : {
        "lastCommittedOpTime" : {
            "ts" : Timestamp(1542184835, 1),
            "t" : NumberLong(2)
        },
        "readConcernMajorityOpTime" : {
            "ts" : Timestamp(1542184835, 1),
            "t" : NumberLong(2)
        },
        "appliedOpTime" : {
            "ts" : Timestamp(1542184835, 1),
            "t" : NumberLong(2)
        },
        "durableOpTime" : {
            "ts" : Timestamp(1542184835, 1),
            "t" : NumberLong(2)
        }
    },
    "members" : [
        {
            "_id" : 0,
            "name" : "127.0.0.1:28100",
            "health" : 1,
            "state" : 1,
            "stateStr" : "PRIMARY",
            "uptime" : 5977,
            "optime" : {
                "ts" : Timestamp(1542184835, 1),
                "t" : NumberLong(2)
            },
            "optimeDate" : ISODate("2018-11-14T08:40:35Z"),
            "electionTime" : Timestamp(1542178880, 1),
            "electionDate" : ISODate("2018-11-14T07:01:20Z"),
            "configVersion" : 1,
            "self" : true
        },
        {
            "_id" : 1,
            "name" : "127.0.0.1:28101",
            "health" : 1,
            "state" : 2,
            "stateStr" : "SECONDARY",
            "uptime" : 5970,
            "optime" : {
                "ts" : Timestamp(1542184835, 1),
                "t" : NumberLong(2)
            },
            "optimeDurable" : {
                "ts" : Timestamp(1542184835, 1),
                "t" : NumberLong(2)
            },
            "optimeDate" : ISODate("2018-11-14T08:40:35Z"),
            "optimeDurableDate" : ISODate("2018-11-14T08:40:35Z"),
            "lastHeartbeat" : ISODate("2018-11-14T08:40:43.345Z"),
            "lastHeartbeatRecv" : ISODate("2018-11-14T08:40:43.603Z"),
            "pingMs" : NumberLong(0),
            "syncingTo" : "127.0.0.1:28102",
            "configVersion" : 1
        },
        {
            "_id" : 2,
            "name" : "127.0.0.1:28102",
            "health" : 1,
            "state" : 2,
            "stateStr" : "SECONDARY",
            "uptime" : 5970,
            "optime" : {
                "ts" : Timestamp(1542184835, 1),
                "t" : NumberLong(2)
            },
            "optimeDurable" : {
                "ts" : Timestamp(1542184835, 1),
                "t" : NumberLong(2)
            },
            "optimeDate" : ISODate("2018-11-14T08:40:35Z"),
            "optimeDurableDate" : ISODate("2018-11-14T08:40:35Z"),
            "lastHeartbeat" : ISODate("2018-11-14T08:40:43.345Z"),
            "lastHeartbeatRecv" : ISODate("2018-11-14T08:40:43.575Z"),
            "pingMs" : NumberLong(0),
            "syncingTo" : "127.0.0.1:28100",
            "configVersion" : 1
        }
    ],
    "ok" : 1,
    "operationTime" : Timestamp(1542184835, 1),
    "$clusterTime" : {
        "clusterTime" : Timestamp(1542184835, 1),
        "signature" : {
            "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
            "keyId" : NumberLong(0)
        }
    }
}

3、设置Mongodb副本可读

在mac和linux系统中,一般在~目录下会有个.mongorc.js文件,给此文件新增一句rs.slaveOk();即可。

​查看是否有此文件:

cd ~
ll -a

若无,则全局查找:

// 全局搜索
sudo find / -name .mongorc.js

添加rs.slaveOk();

vi ~/.mongorc.js

// 此文件默认为空
// 增加一行,保存退出
rs.slaveOk();

重启Mongodb,这时所有副本节点都可读。

在zanePerfor (前端性能监控平台)生产环境中使用,并做读写分离。

找到项目的 config/config.prod.js文件

更改如下Mongodb配置即可:

// mongodb 服务
// 此处替换 url 参数为链接副本集方式即可
const dbclients = {
        db3: {
           url: 'mongodb://127.0.0.1:28100,127.0.0.1:28101,127.0.0.1:28102/performance?replicaSet=rs1',
            options: {
                poolSize: 100,
                keepAlive: 10000,
                connectTimeoutMS: 10000,
                autoReconnect: true,
                reconnectTries: 100,
                reconnectInterval: 1000,
            },
        },
 };

读写分离:

项目所有查询已经做好了读写分离操作,例如查询page页列表,其他皆如此即可,这样就保证了服务的读写压力(主节点负责写数据,副本节点负责读取数据)。

未分类

read参数说明

primary - (默认值)    只从主节点读取。如果主节点不可用则报错。不能跟 tags 选项组合使用。
secondary            只有当从节点可用时,从中读取,否则报错。
primaryPreferred     优先读取主节点,不可用时读取从节点。
secondaryPreferred   优先读取从节点,不可用时读取主节点。
nearest              所有操作都读最近的候选节点,不同于其他模式,该选项会随机选取所有主、从节点。

选项别名:

p   primary
pp  primaryPreferred
s   secondary
sp  secondaryPreferred
n   nearest

zanePerfor下一步篇:

https://blog.seosiwei.com/detail/43

zanePerfor github地址:

https://github.com/wangweianger/zanePerfor

zanePerfor 开发文档:

https://blog.seosiwei.com/performance/index.html

Mac安装mongodb

在Mac上安装首选使用brew进行安装

Brew是Mac上的套件管理器,类似于Linux系统中的yum或者apt-get,在安装软件的时候可以摆脱下载软件包再手动安装的繁琐操作,让安装软件变得更加的简单。

Homebrew

homebrew是Mac下的软件包管理工具,主要是填补brew缺失的软件包。提供安装软件,卸载软件等操作。

首先安装Homebrew:

/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

如果想要卸载brew,不知道卸载命令,可以再一次执行brew的安装命令,如果已经安装brew再次执行安装命令的话,会提示,告诉已经安装过brew了,如果想重复安装就执行卸载命令,然后根据终端的提示执行卸载命令就可以了。

brew的常用命令

1.更新brew

brew update

2.安装软件

brew install soft_name
// soft_name为所要安装的软件名

3.卸载软件

brew uninstall soft_name

4.显示使用brew安装的软件列表

brew list

5.更新软件

brew upgrade // 更新所有使用brew安装的软件
brew upgrade soft_name // 更新soft_name

6.查看哪些软件需要更新

brew outdated

7.查找软件

// 当记不清软件的名字的时候,可以使用search,只需要写出几个字母,就可以联想所有的结果并且输出
brew search

8.查找使用brew安装的东西,安装在哪里

brew --cache

安装mongodb

因为已经安装了brew,所以通过brew安装mongodb:

brew install mongodb

安装成功

未分类

输入mongo启动,会发现并没有成功

未分类

这是因为没有创建mongo的默认数据写入目录,需要自己手动创建

创建默认数据写入目录

注:默认目录为根目录下的data/db

mkdir -p /data/db

然后给刚刚创建的目录赋予可读写的权限

chown `id -u` /data/db

注:如果不使用命令行修改权限,可以前往/data文件夹右键点击显示简介,选择最下面的共享与权限,把所有权限改成读与写

如果不想使用mongo的默认目录,可以自己更改,使用–dbpath参数

mongo --dbpath dir_name

现在可以放心的启动mongodb了

mongod

然后再开启一个新的终端,执行

mongo

要先执行mongod再执行mongo,出现箭头表示链接成功

未分类

数据库错误

如果数据库启动不了,可能是由于未正常关闭导致的,可以删除/data/db文件夹中的mongod.lock文件,然后重新启动,如果还不可以,可以查杀一下进程:

ps -aef | grep mongo

然后根据进程ID杀掉进程,最后重启mongodb

MongoDB常用命令

1.查询库、查询表

show dbs  //查询所有的数据库

show collections   //查询当前数据库下的所有数据表123

2.建库和删库

use myDbs  //建立一个名为myDbs的数据库,当这个库存在时则是切换到这个数据库中去

use myDbs

db.dropDatabase();  //这两句是删除这个数据库12345

3.建表和删表

//表操作都是要先到一个数据库中去,通过use方法
db.myTable.insert({name:’hf’,age:20});  //在mongodb中在插入数据时即创建了改表,此时创建的是名为myTable的数据表
db.myTable.drop();  //删除myTable这个数据表
//如果没有指定数据库,表会创建在mongdb默认数据库test里1234

4.单表的增删改

db.myTable.insert({name:’hahaha’,age:12});  //新增

db.myTable.update({name:’hf’},{$set:{age:25}})  //修改

db.myTable.remove({name:’hf'});  //删除12345

5.查询

db.myTable.find();  //查询myTable中的所有数据

db.myTable.find().sort({age:1})  //根据age升续

db.myTable.find().count();  //查询

MongoDB数据的导出、导入、备份、恢复

mongodb数据库是一种非关系性的数据库,在日常的工作中用到的也是很多的,接下来介绍一下mongodb数据的导出、导入、备份、恢复,掌握这些技能避免数据丢失

使用的工具主要有

  • mongoexport数据导出
  • mongoimport数据导入
  • mongodump数据备份
  • mongorestore数据恢复

mongoexport是数据导出工具

用法:

mongodbexport -d 数据库名 -c 数据表名 -o 导出文件名 --type json/csv -f "字段名"

参数:

-d :数据库名
-c :collection名
-o :输出的文件名
--type : 输出的格式,默认为json
-f :输出的字段,如果-type为csv,则需要加上-f "字段名"

示例:

mongoexport.exe -d test -c runoob -o d:datadata.json --type json -f "title"

mongoimport是数据导入工具

用法:

mongoimport -d 数据库名 -c 数据表名 --file 文件名 --headerline --type json/csv -f "字段"

参数:

-d :数据库名
-c :collection名
--type :导入的格式默认json
-f :导入的字段名
--headerline :如果导入的格式是csv,则可以使用第一行的标题作为导入的字段
--file :要导入的文件

示例:

mongoimport.exe -d test -c runoob --file d:datadata.json --type json

mongodump是数据备份工具

用法:

mongodump -h dbhost -d dbname -o dbdirectory

参数:

-h: MongDB所在服务器地址,例如:127.0.0.1,当然也可以指定端口号:127.0.0.1:27017
-d: 需要备份的数据库实例,例如:test
-o: 备份的数据存放位置,例如:/home/mongodump/,当然该目录需要提前建立,这个目录里面存放该数据库实例的备份数据。

示例:

mongodump -h dbhost -d dbname -o dbdirectory

mongorestore是数据恢复工具

用法:

mongorestore -h dbhost -d dbname --dir dbdirectory

参数:

-h: MongoDB所在服务器地址
-d: 需要恢复的数据库实例,例如:test,当然这个名称也可以和备份时候的不一样,比如test2
--dir: 备份数据所在位置,例如:/home/mongodump/itcast/
--drop: 恢复的时候,先删除当前数据,然后恢复备份的数据。就是说,恢复后,备份后添加修改的数据都会被删除,慎用!

示例:

mongorestore -h 192.168.17.129:27017 -d itcast_restore --dir /home/mongodump/itcast/

MongoDB 账号密码登录

配置MongoDB 账号密码登录的步骤如下 (假设有 2 个数据库 admin (自带的) 和 foo):

1、启动 MongoDB: mongod --config /usr/local/etc/mongod.conf

2、进入数据库 admin: use admin

3、创建用户 admin:

db.createUser(
  {
    user: "admin",
    pwd: "ebag",
    roles: [ { role: "userAdminAnyDatabase", db: "admin" }, "readWriteAnyDatabase" ]
  }
)

4、进入数据库 foo: use foo

5、创建用户 bar:

db.createUser(
  {
    user: "bar",
    pwd: "bar",
    roles: [
        { role: "dbAdmin", db: "foo" },
        { role: "readWrite", db: "foo" }
    ]
  }
)

6、需要授权的方式启动: mongod --auth --config /usr/local/etc/mongod.conf

7、授权登录

  • 方式一: mongo 先进入然后 db.auth("bar", "bar") 授权
  • 方式二: mongo --port 27017 -u "bar" -p "bar" --authenticationDatabase "foo"

MongoDB数据导入到ElasticSearch python代码实现

ElasticSearch对文本的搜索速度真的是神速呀,基本是毫秒级别的。对于大文本,简直就是秒飞MYSQL十条街。使用python实现:

es = Elasticsearch(['10.18.6.26:9200'])
ret = collection.find({})

# 删除mongo的_id字段,否则无法把Object类型插入到Elastic
map(lambda x:(del x['_id']),ret)

actions=

for idx,item in enumerate(ret):
    i={
        "_index":"jsl",
     "_type":"text",
     "_id":idx,
        "_source":{
            # 需要提取的字段
            "title":item.get('title'),
            "url":item.get('url')
        }
    }
    actions.append(i)


start=time.time()
helpers.bulk(es,actions)

end=time.time()-start
print(end)

运行下来,20W条数据,大概用了15秒左右全部导入ElasticSearch 数据库中。

nodejs操作mongodb

前一篇博文说明了如何在win7下安装mongodb,下面简要测试一下nodejs操作mongodb:

首先安装nodejs mongodb

npm  install mongodb
var  mongodb = require('mongodb');
var  server  = new mongodb.Server('localhost', 27017, {auto_reconnect:true});
var  db = new mongodb.Db('mydb', server, {safe:true});

//连接db
db.open(function(err, db){
    if(!err){
        console.log('connect db');
        // 连接Collection(可以认为是mysql的table)
        // 第1种连接方式
        // db.collection('mycoll',{safe:true}, function(err, collection){
        //     if(err){
        //         console.log(err);
        //     }
        // });
        // 第2种连接方式
        db.createCollection('mycoll', {safe:true}, function(err, collection){
            if(err){
                console.log(err);
            }else{
                //新增数据
                // var tmp1 = {id:'1',title:'hello',number:1};
       //          collection.insert(tmp1,{safe:true},function(err, result){
       //              console.log(result);
       //          }); 
                   //更新数据
                   // collection.update({title:'hello'}, {$set:{number:3}}, {safe:true}, function(err, result){
                   //     console.log(result);
                   // });
                   // 删除数据
                       // collection.remove({title:'hello'},{safe:true},function(err,result){
        //                   console.log(result);
        //               });

                // console.log(collection);
                // 查询数据
                var tmp1 = {title:'hello'};
                   var tmp2 = {title:'world'};
                   collection.insert([tmp1,tmp2],{safe:true},function(err,result){
                   console.log(result);
                   }); 
                   collection.find().toArray(function(err,docs){
                   console.log('find');
                   console.log(docs);
                   }); 
                   collection.findOne(function(err,doc){
                    console.log('findOne');
                      console.log(doc);
                   }); 
            }

        });
        // console.log('delete ...');
        // //删除Collection
        // db.dropCollection('mycoll',{safe:true},function(err,result){

  //           if(err){

        //         console.log('err:');
        //         console.log(err);
        //     }else{
        //         console.log('ok:');
        //         console.log(result);
        //     }
  //       }); 
    }else{
        console.log(err);
    }
});

Yii框架连接mongodb数据库的代码

yii2框架是yii的升级版本,本文我们分别讲解在yii框架中如何连接数据库mongodb。

在文件夹common/config/main_local.php中加入如下代码:

<?php
return [
'components' => [
'mongodb' => [
'class' => 'yiimongodbConnection',
'dsn' => 'mongodb://localhost:27017/数据库名'
],
],
];

Ubuntu16.04安装MongoDB

最近需要用到MongoDB,我在这里记录一下安装步骤。

首先说下什么是MongoDB,摘至维基百科:

MongoDB是一种面向文档的数据库管理系统,由C++撰写而成,以此来解决应用程序开发社区中的大量现实问题;
MongoDB使用内存映射文件, 32位系统上限制大小为2GB的数据(64位支持更大的数据)。 MongoDB服务器只能用在小端序系统,虽然大部分公司会同时准备小端序和大端序系统;

1、安装开始
  
选择需要的版本
      
未分类

2、导入公钥

sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 9DA31620334BD75D9DCB49F368818C72E52529D4

3、创建MongoDB列表文件

echo "deb [ arch=amd64,arm64 ] https://repo.mongodb.org/apt/ubuntu xenial/mongodb-org/4.0 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-4.0.list

4、重新加载本地包数据库

sudo apt-get update

5、安装MongoDB包,我这里选择稳定版

sudo apt-get install -y mongodb-org

为防止意外升级,可以将软件包固定在当前安装的版本中:

echo "mongodb-org hold" | sudo dpkg --set-selections
echo "mongodb-org-server hold" | sudo dpkg --set-selections
echo "mongodb-org-shell hold" | sudo dpkg --set-selections
echo "mongodb-org-mongos hold" | sudo dpkg --set-selections
echo "mongodb-org-tools hold" | sudo dpkg --set-selections

注:如果通过软件包管理器安装,则在安装期间会创建数据目录 /var/lib/mongodb和日志目录/var/log/mongodb

6、启动MongoDB

sudo service mongod start

检查日志文件的内容以/var/log/mongodb/mongod.log 获取行读数,验证进程是否已成功启动,27017是独立mongod 侦听的默认端

[initandlisten] waiting for connections on port 27017

停止MongoDB的命令

sudo service mongod stop

重启MongoDB命令

sudo service mongod restart

7、mongo在与主机相同的主机上启动shell mongod,以mongod使用默认端口27017 连接到localhost上运行的shell

mongo

注:MongoDB安装完“admin”库中默认没有用户,此时用mongo命令登录是超级管理员用户

mongoDB通过_id删除doc

根据mongodb数据记录里面的_id字段删除相应的docs,通过下面代码进行删除时,并不能删除成功

代码如下:

var ObjectId = require('mongodb').ObjectId;
db.collection('infochanges').remove({"_id":{"_id":ObjectId(idvalue)}).then(function(){})

报错如下:

TypeError:Cannot convert undefined or null to object

解决方法:

使用findAndRemove,代码如下:

db.collection('infochanges').findAndRemove({"_id":ObjectId(index)}).then(function(){})

通过_id删除docs要用findAndRemove,remove不起作用

使用Vertx+Ignite+MongoDB搭建大DAU游戏服务器

最近在funplus做游戏,进而研究了一个新型架构。

之前做游戏都是自己使用java搭建架构,经过几年的积累确实也达到了最初的设想,多进程,进程内多线程,无锁,0延迟纯jdbc写库。对于单服架构来说,已经趋近于极致。

今年小游戏盛行,如海盗来了,疯狂游戏那家公司,全部使用的都是go+mongodb实现的,因为go的语言级别支援高并发,这点是java无法比拟的。不过java开源项目多,有很多的高手铺垫了超多的框架,比如vertx,akka都可以更加充分的释放java的能力。就看使用者的认识水平了。

本次选择vertx,主要是其在网络通讯这块,对netty的包装,加上自己的eventloop模型,使得响应web请求速度基本属于前3的水平。

netServer = vertx.createHttpServer(httpServerOptions);
        netServer.requestHandler();
        netServer.requestHandler(hs -> {
            if (hs.path().equals("/ping")) {
                hs.response().end("pong");
                return;
            }
            hs.response().close();
            return;
        });

        netServer.websocketHandler(ws -> {
            if (!ws.path().equals("/" + wsname)) {
                ws.reject();
                return;
            }
            Player player = new Player(ws, ws.binaryHandlerID());
            players.put(player.getConnId(), player);
            player.setServerUUID(gateAdress);
            //日志
            if (log.isDebugEnabled()) {
                SocketAddress addrLocal = ws.localAddress();
                log.debug("新建一个连接:连接ID={},  本地端口={}, 远程地址={}", player.getconnId(), addrLocal.port(), ws.remoteAddress());
            }
            //有连接过来了
            ws.binaryMessageHandler(data -> {
                int readableBytes = data.length();
                if (readableBytes < IMessage.MIN_MESSAGE_LENGTH) {
                    return;
                }
                int len = data.getShort(0);
                if (len > 64 * 1024) {
                    log.error("conn:" + player.getId() + "  发送数据包过大:" + len);
                    return;
                }
                if (readableBytes < len) {
                    return;
                }

                CGMessage msg = decode(data);
                if (msg == null) return;
                inputHandler(msg, player);
            });
            ws.exceptionHandler(e -> {
                if (e.getMessage().equals("WebSocket is closed")) {
//                    player.disconnect();
                }
                //断连的日志就不打印堆栈了
                if (e.getMessage().contains("Player reset by peer") || e.getMessage().contains("远程主机强迫关闭了一个现有的连接")) {
                    log.error("主动断开:connId={},cause={}", player.getconnId(), e.getCause());
                } else {
                    //输出错误日志
                    log.error("发生异常:connId={},cause={}", player.getconnId(), e.getCause());
                }
            });
            ws.closeHandler(t -> {
//                if (player == null) return;
                //连接状态
                //日志
                if (log.isDebugEnabled()) {
                    log.debug("连接关闭:connId={}, status={}", player.getconnId(), player == null ? "" : player.toString());
                }
                if (player.getState() == PlayerState.connected || player.getState() == PlayerState.init || player.getState() == PlayerState.logouting) {
                    player.setState(PlayerState.logouted);
                    //Remove掉 session connId = Player
                    //删掉连接对应的player
                    players.remove(player.getConnId());
                    return;
                }
                if (player.getUserInfo() == null) {
                    //删掉连接对应的player
                    players.remove(player.getConnId());
                    return;
                }
                gateService.closePlayer(player.getconnId(), ar -> {
                    if (ar.failed()) {
                        Loggers.coreLogger.error("player connId:" + player.getconnId() + " 离线退出异常!!!" + ar.cause().getMessage());
                    }
                    //删掉连接对应的player
                    players.remove(player.getConnId());
                });

            });
        }).listen(port, host, res -> {
            if (res.succeeded()) {
                //启动日志信息
                log.info(" started. Listen: " + port + "  vert:" + vertx.hashCode());
                future.complete();
            }
        });

vertx能方便的使用eventloop线程池响应玩家发来的请求,并永远在特定线程进行代码调用。

比自己使用hash线程池靠谱很多。ps. 自己造轮子不是不好,主要实现方法不一定测试完整,有意想不到的情况,就要自己来趟坑。

后面主要是说一下,但如果大规模请求MongoDB,需要更高的MongoDB响应要求。进而想到要加缓存机制,最初想到的是redis+mongodb,自己实现读通过,写通过。
如果redis不存在,则从mongodb读取,并放入缓存,写数据先写缓存,后写mongodb。

自己实现的这种存储机制,比较low。所以继续寻找缓存方案。

过程中,发现了一个曝光率不高的框架,也就是Apache Ignite。最新一代数据网格。

关键的一步,就是如果让vertx与Ignite工作到一起。这是一个必要的条件。

package cn.empires;

import cn.empires.common.Globals;
import cn.empires.common.contants.Loggers;
import cn.empires.gs.support.observer.Event;
import cn.empires.verticle.OnlineVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Launcher;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;

public class MainLaunch extends Launcher {

    private JsonObject config;

    public static void main(String[] args) {
        System.setProperty("logFileName", "gateServer");
        new MainLaunch().dispatch(args);
    }

    @Override
    protected String getDefaultCommand() {
        return super.getDefaultCommand();
    }

    @Override
    protected String getMainVerticle() {
        return "cn.empires.verticle.GateVerticle";
    }

    @Override
    public void afterConfigParsed(JsonObject config) {
        super.afterConfigParsed(config);
        this.config = config;
    }

    @Override
    public void beforeStartingVertx(VertxOptions options) {
        options.setClustered(true);
    }

    @Override
    public void afterStartingVertx(Vertx vertx) {
        super.afterStartingVertx(vertx);
        //config.put("redis.password", "123456");
        //初始化全局相关信息
        ListenerInit.init(Event.instance);
        Loggers.coreLogger.info("Globals init .");
        Globals.init(vertx, config);
        vertx.deployVerticle(OnlineVerticle.class, new DeploymentOptions().setConfig(config));
    }

    @Override
    public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
        super.beforeDeployingVerticle(deploymentOptions);
    }

    @Override
    public void beforeStoppingVertx(Vertx vertx) {
        super.beforeStoppingVertx(vertx);
    }

    @Override
    public void afterStoppingVertx() {
        super.afterStoppingVertx();
    }

    @Override
    public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
        super.handleDeployFailed(vertx, mainVerticle, deploymentOptions, cause);
    }

}

如果想使用Ignite的缓存,必须需要Ignite实例对象。否则无法获取。

if (ignite == null) {
     ClusterManager clusterManager = ((VertxInternal) vertx).getClusterManager();
     String uuid = clusterManager.getNodeID();
     ignite = Ignition.ignite(UUID.fromString(uuid));
}

在classpath中,配置一个ignite.xml,vertx启动的时候自动会加载ignite.xml,然后使用IgniteManager进行集群管理。
我只贴一遍ignite.xml配置

<?xml version="1.0" encoding="UTF-8"?>

<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->

<!--
    Ignite Spring configuration file to startup Ignite cache.

    This file demonstrates how to configure cache using Spring. Provided cache
    will be created on node startup.

    Use this configuration file when running HTTP REST examples (see 'examples/rest' folder).

    When starting a standalone node, you need to execute the following command:
    {IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-cache.xml

    When starting Ignite from Java IDE, pass path to this file to Ignition:
    Ignition.start("examples/config/example-cache.xml");
-->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="dataStorageConfiguration">
            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                <!-- Set the page size to 4 KB -->
                  <property name="pageSize" value="4096"/>
                <!-- Set concurrency level -->
                  <property name="concurrencyLevel" value="6"/>
                  <property name="systemRegionInitialSize" value="#{40 * 1024 * 1024}"/>
                  <property name="systemRegionMaxSize" value="#{80 * 1024 * 1024}"/>
                  <property name="defaultDataRegionConfiguration">
                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                        <property name="name" value="Default_Region"/>
                        <!-- 设置默认内存区最大内存为 512M. -->
                        <property name="maxSize" value="#{512L * 1024 * 1024}"/>
                        <!-- Enabling RANDOM_LRU eviction for this region.  -->
                            <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                    </bean>
                </property>
                <property name="dataRegionConfigurations">
                    <list>
                      <!--
                          Defining a data region that will consume up to 500 MB of RAM and 
                          will have eviction and persistence enabled.
                      -->
                      <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                        <!-- Custom region name. -->
                        <property name="name" value="500MB_Region"/>

                        <!-- 100 MB initial size. -->
                        <property name="initialSize" value="#{100L * 1024 * 1024}"/>

                        <!-- 500 MB maximum size. -->
                        <property name="maxSize" value="#{500L * 1024 * 1024}"/>

                        <!-- Enabling RANDOM_LRU eviction for this region.  -->
                            <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                      </bean>
                    </list>
                </property>
            </bean>
        </property>
        <property name="cacheConfiguration">
            <list>
                   <bean class="org.apache.ignite.configuration.CacheConfiguration">
                           <property name="name" value="UserInfo"/>
                           <property name="cacheMode" value="PARTITIONED"/>
                           <property name="atomicityMode" value="ATOMIC"/>
                           <property name="backups" value="0"/>
                           <property name="cacheStoreFactory">
                               <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                                   <constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
                               </bean>
                           </property>
                           <property name="readThrough" value="true"/>
                           <property name="writeThrough" value="true"/>
                           <property name="writeBehindEnabled" value="true"/>
                           <property name="writeBehindFlushSize" value="1024"/>
                           <property name="writeBehindFlushFrequency" value="5"/>
                           <property name="writeBehindFlushThreadCount" value="1"/>
                           <property name="writeBehindBatchSize" value="512"/>
                           <property name="dataRegionName" value="Default_Region"/>
                </bean>
            </list>
        </property>
        <property name="failureDetectionTimeout" value="60000"/>
        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <!--
                        Ignite provides several options for automatic discovery that can be used
                        instead os static IP based discovery. For information on all options refer
                        to our documentation: http://apacheignite.readme.io/docs/cluster-config
                    -->
                    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                    <!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> -->
                        <property name="addresses">
                            <list>
                                <!-- In distributed environment, replace with actual host IP address. -->
                                <value>127.0.0.1:47500..47509</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Ignite 对内存有细致的划分,可以分多个区域Region,每个区域有自己的配置,比如设置初始大小和最大大小,以及淘汰策略。
UserInfo对应的CacheConfiguration对Cache使用进行了配置,比如readThrough writeThrough writeBehindEnabled等等,细致的配置诸如后写刷新频率writeBehindFlushFrequency为5,表示5秒才会刷新一次更新数据。

    public static <T> IgniteCache<String, T> createIgniteCache(String cacheName, Class<? extends CacheStoreAdapter<String, T>> clazz) {
        CacheConfiguration<String, T> cacheCfg = new CacheConfiguration<>(cacheName);
        return Globals.ignite().getOrCreateCache(cacheCfg);
    }

在Globals工具类,提供工具方法获得IgniteCache对象。

package cn.empires.gs.player.service.impl;

import org.apache.ignite.IgniteCache;
import org.apache.ignite.lang.IgniteFuture;

import cn.empires.common.Globals;
import cn.empires.common.cache.UserCacheStore;
import cn.empires.common.service.ServiceBase;
import cn.empires.gs.model.UserInfo;
import cn.empires.gs.player.service.UserService;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;

public class UserServiceImpl extends ServiceBase implements UserService {

    private final IgniteCache<String, UserInfo> cache;

    public UserServiceImpl(Vertx vertx, JsonObject config) {
        super(vertx, config);
        cache = Globals.createIgniteCache(UserInfo.tableName, UserCacheStore.class);
    }

    @Override
    public UserService getUserInfo(String id, Handler<AsyncResult<UserInfo>> handler) {
        IgniteFuture<UserInfo> future = cache.getAsync(id);
        future.listen(h -> {
            if(h.isDone()) {
                handler.handle(Future.succeededFuture(h.get()));
            }
        });        
        return this;
    }


    @Override
    public UserService saveUserInfo(UserInfo userInfo, Handler<AsyncResult<UserInfo>> handler) {
        IgniteFuture<Void> future = cache.putAsync(userInfo.get_id(), userInfo);
        future.listen(h -> {
            if(h.isDone()) {
                handler.handle(Future.succeededFuture(userInfo));
            }
        });
        return this;
    }

}

最后一件事,就是同步写库,可以读通过从MongoDB进行读取。

package cn.empires.common.cache;

import java.util.ArrayList;
import java.util.List;

import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;

import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.bson.Document;

import com.mongodb.Block;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;

import cn.empires.common.Globals;
import cn.empires.common.contants.Loggers;
import cn.empires.gs.model.UserInfo;
import io.vertx.core.json.JsonObject;

public class UserCacheStore extends CacheStoreAdapter<String, UserInfo> implements LifecycleAware {

    /** Mongo collection. */
    private MongoCollection<Document> collection;

    @Override
    public void start() throws IgniteException {
    }

    @Override
    public UserInfo load(String key) throws CacheLoaderException {
        if(collection == null) {
            collection = Globals.mongoDb().getCollection(UserInfo.tableName);
        }
        FindIterable<Document> iter = collection.find(Filters.eq("_id", key));
        final List<JsonObject> result = new ArrayList<>(1);
        iter.forEach(new Block<Document>() {
            public void apply(Document _doc) {
                result.add(new JsonObject(_doc.toJson()));
            }
        });
        if(result != null && !result.isEmpty()) {
            Loggers.userLogger.info("CacheStore load UserInfo.");
            JsonObject jsonObj = result.get(0);
            return UserInfo.fromDB(jsonObj);
        }
        return null;
    }

    @Override
    public void write(Entry<? extends String, ? extends UserInfo> entry) throws CacheWriterException {
        if(collection == null) {
            collection = Globals.mongoDb().getCollection(UserInfo.tableName);
        }
        Document filter = new Document();
        filter.append("_id", entry.getKey());

        Document replacement = new Document();
        replacement.append("value", entry.getValue().toString());
        collection.replaceOne(filter, replacement, new UpdateOptions().upsert(true));
        Loggers.userLogger.info("CacheStore saved UserInfo.");
    }

    @Override
    public void delete(Object key) throws CacheWriterException {

    }



    @Override
    public void stop() throws IgniteException {

    }

}

由于在ignite.xml中进行了配置

<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
    <constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
</bean>

所以在使用Cache获取UserInfo的时候,如果不存在对应的信息,就会从MongoDB读取。