又想速度快,又要大数据,又要保证数据不出错,还要拥抱变化,改需求的时候不那么痛苦,特别是字段的调整,按照以前的做法,想想就头疼。使用NoSQL,简直就是随心所欲,再奇葩的数据结构,处理起来也很容易。下面看我如何用NoSQL数据库实现高并发,高可靠的CRM系统。
1、前言
随着facebook、微博等WEB2.0互联网网站的兴起,传统的关系数据库在应付web2.0网站,特别是超大规模和高并发的SNS类型的web2.0纯动态网站已经显得力不从心,暴露了很多难以克服的问题,而非关系型的数据库则由于其本身的特点得到了非常迅速的发展。
NoSQL项目的名字上看不出什么相同之处,但是,它们通常在某些方面相同:它们可以处理超大量的数据。
目前NoSQL数据库的发展非常迅速,各大公司都在大量使用,比如GOOGLE、360、百度,在不久的将来必将成为主流。
为了适应大数据处理的需要,在将来较长的一段时间里能够让系统适应高并发的访问,我们在开发CRM系统的过程中,经过不断摸索和实践,通过使用NoSQL数据库,解决了传统关系型数据很难解决的难题,实现了大数据和高并发的处理能力。
2、系统展示
3、性能数据
测试环境是CentOS系统,ngnix + php5.0, i7 2600K处理器,8G内存,SSDB 1.6.8.8版本。
针对最常用的功能,以前数据量最大的功能进行测试。
用PHP程序模拟UI层直接调用业务逻辑接口,插入大量数据,并对大量数据进行读取。
功能 |
测试说明 |
测试结果(毫秒) |
添加客户资料 |
添加100万条数据 |
15203 |
客户资料查询 |
对100万条客户资料进行查询1万次 |
20 |
下订单 |
添加100万条数据 |
58653 |
订单查询 |
对100万条订单进行查询1万次 |
25 |
添加新消息 |
添加100万条数据 |
13454 |
获取新消息 |
对100万条消息数据进行查询1万次 |
18 |
4、系统架构
在使用数据库的选择上,我们使用同时具有高性能,并且接口使用很简单的SSDB。因为SSDB的作者是我的朋友,同时SSDB已经在百度、360等大公司大量应用,所以不用担心可靠性问题。
开发语言使用PHP。
4.1 系统需求
2014年禾葡兰借着微信营销的大好机会,在几个月时间里,团队发展到了100人以上,每日订单数超过200单。
在快速发展的过程中,对于数据处理的需求日益增长,通过数据和统计进行决策,使经营水平和收入更上一个台阶,并能够适应将来的大规模扩展的需要。
目前纯手工统计和计算比较容易出错,而且效率很低,很难适应快速增长的需要,为了解决这些问题,欣所罗门核心团队成员一致认为有必要开发一个软件系统来解决问题。
4.2 需要解决的3大难点
1、事务处理
在数据完整性要求很高的CRM系统中,必需要事务处理机制,保证原子操作。但是,NoSQL数据库最大的缺点就是不支持事务处理。
要想实现事务处理,目前最好的方法是使用zookeeper,可以模拟实现分布式事务。
2、数据关系
和关系数据库不同的是,SSDB没有关系的概念。所有数据间的关系必需在开始的时候设计好。
3、组合条件查询
由于NoSQL数据库没有模式,因此,想要根据数据内容里的某个字段对数据集进行筛选,根本不可能。所以,我们使用lucene实现全文索引,解决了组合条件查询的问题。
4.3 系统设计
由于目前流行的PHP开发框架MVC框架都不支持NoSQL数据,同时避免由于开源框架带来的安全问题,因此不使用MVC框架。仅在UI层使用php的smarty模板引擎。
系统统一入口文件index.php,通过m参数引用ui目录和controller目录下相应的文件。
代码目录结构
/ CRM系统根目录
/index.php (统一入口文件)
|-------common/ (用于存放公共函数)
|--------common.php (公共函数接口文件,使用公共函数只需要引用一个文件就可以。)
|-------config/ (用于存放配置信息)
|-------model/ (数据对像)
|-------data/ (数据层,对数据的读写封装。)
|-------controller/ (业务逻辑层)
|-------ui/ (smarty引擎和模板)
|-------libs/ (smarty引擎目录)
|-------configs/ (smarty引擎配置文件目录)
|-------plugins/ (smarty引擎自定义的一些实用插件目录)
|-------templates/ (smarty引擎模板目录)
|-------templates_c/ (smarty引擎模板编译目录)
|-------cache/ (smarty缓存目录)
5、涉及内容
一、 数据库连接
二、 自动编号ID实现
三、 数据的读写
四、 数据关系实现
五、 分布式事务实现
六、 全文索引
七、 组合条件查询
6、源码解析
一、数据库连接
SSDB的数据库连接很简单。
include_once(‘SSDB.php‘); try{ $ssdb = new SimpleSSDB(‘127.0.0.1‘, 8888); }catch(SSDBException $e){ die(__LINE__ . ‘ ‘ . $e->getMessage()); }
include_once(‘SSDB.php‘); try{ $ssdb = new SimpleSSDB(‘127.0.0.1‘, 8888); }catch(SSDBException $e){ die(__LINE__ . ‘ ‘ . $e->getMessage()); }
二、 自动编号ID实现
一般来说,我们在进行数据设计的时候,都会给每条记录设置一个自动编号的ID。但是在NoSQL数据中,需要自己来实现自动编号ID。
我们通过SSDB的incr接口,就可以模拟自动编号ID的实现。
系统初始化的时候,我们在数据库中增加一条数据。
$ssdb->set(‘hpl_product_autoincrement_id‘, 0);
$ssdb->set(‘hpl_product_autoincrement_id‘, 0);
添加新记录的时候,使用incr接口得到新的自动编号ID,由于incr接口是原子操作,所以不会出现重复的ID。
$id = $ssdb->incr(‘hpl_product_autoincrement_id‘, 1);
$id = $ssdb->incr(‘hpl_product_autoincrement_id‘, 1);
三、 数据的读写
首先,我们要创建数据的model类。下面是产品model类。
class ProductModel{ public $id; //编号 public $catalogid; //所属分类 public $name; //产品名称 public $cost; //产品成本价格 public $price; //销售价格 public $saleprice; //优惠价格 public $amount; //产品数量 public $facetype; //适合皮肤类型 public $desc; //产品描述 public $code; //产品编号 public $weight; //净含量(克) public $effect; //主要功效 public $crowd; //适合人群 public $addtime; //产品添加时间 public $status; //产品状态(0 = 下架 1 = 上架) }
class ProductModel{ public $id; //编号 public $catalogid; //所属分类 public $name; //产品名称 public $cost; //产品成本价格 public $price; //销售价格 public $saleprice; //优惠价格 public $amount; //产品数量 public $facetype; //适合皮肤类型 public $desc; //产品描述 public $code; //产品编号 public $weight; //净含量(克) public $effect; //主要功效 public $crowd; //适合人群 public $addtime; //产品添加时间 public $status; //产品状态(0 = 下架 1 = 上架) }
新添加记录。
$product = new ProductModel(); $product->id = $id; ... $key = ‘hpl_product_‘.$id; $json = json_encode($product); $ssdb->hset(‘hpl_product‘, $key, $json);
$product = new ProductModel(); $product->id = $id; ... $key = ‘hpl_product_‘.$id; $json = json_encode($product); $ssdb->hset(‘hpl_product‘, $key, $json);
增加索引,根据产品ID进行索引,在使用数据的时候就可以根据产品ID获取数据列表(如果有多种排列方式就需要创建多个索引)。
$key = ‘hpl_product_‘.$id; $ssdb->zset(‘hpl_product_id‘, $key, $id);
$key = ‘hpl_product_‘.$id; $ssdb->zset(‘hpl_product_id‘, $key, $id);
根据索引获取产品列表(前10个)。
$products = array(); //根据索引取出产品列表 $items = $ssdb->zscan(‘hpl_product_id‘, ‘‘, ‘‘, ‘‘, 10); foreach($items as $key=>$score){ //取出产品信息 $json = $ssdb->hget(‘hpl_product‘, $key); $products[] = json_decode($json); }
四、数据关系实现
在添加产品的时候,需要把产品和产品分类关联起来,可以根据产品分类列出产品。使用SSDB我们只需要对每个产品分类创建一个列表就可以了。
添加关系,在列表里添加一条数据。
$hname = ‘hpl_product_catalog_‘.$catalogid; $hkey = ‘hpl_product_‘.$id; $ssdb->hset($hname, $hkey, $id);
$hname = ‘hpl_product_catalog_‘.$catalogid; $hkey = ‘hpl_product_‘.$id; $ssdb->hset($hname, $hkey, $id);
删除关系,当改变产品分类或者删除产品的时候执行。
$hname = ‘hpl_product_catalog_‘.$catalogid; $hkey = ‘hpl_product_‘.$id; $ssdb->hdel($hname, $hkey);
$hname = ‘hpl_product_catalog_‘.$catalogid; $hkey = ‘hpl_product_‘.$id; $ssdb->hdel($hname, $hkey);
根据分类取出产品列表(前10个)。
$products = array(); //根据分类取出产品列表 $hname = ‘hpl_product_catalog_‘.$catalogid; $keys = $ssdb->hkeys($hname, ‘‘, ‘‘, 10); foreach ($keys as $key) { //取出产品信息 $json = $ssdb->hget(‘hpl_product‘, $key); $products[] = json_decode($json); }
$products = array(); //根据分类取出产品列表 $hname = ‘hpl_product_catalog_‘.$catalogid; $keys = $ssdb->hkeys($hname, ‘‘, ‘‘, 10); foreach ($keys as $key) { //取出产品信息 $json = $ssdb->hget(‘hpl_product‘, $key); $products[] = json_decode($json); }
五、分布式事务实现
由于CRM系统对数据统一性的需要,所以必需要有事务来支持。比如在下订单的时候,如果没有事务,在并发处理时就有可能导致产品库存出错。由于SSDB数据库没有实现事务处理,我们使用zookeeper来实现分布式锁的处理,保证关键业务的原子操作。
事务处理的实现我们分别由前台和后台实现,前台系统发送业务处理请求,后台进程收到消息后进行处理,后台进程处理结束后设置处理标志,前台系统每隔一段时间查询一次标志位,检查业务是否处理完成。
后台实现使用JAVA语言编写。
/** Executor.java */ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { String znode; DataMonitor dm; ZooKeeper zk; String filename; String exec[]; Process child; public Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); } /** * @param args */ public static void main(String[] args) { if (args.length < 4) { System.err .println("USAGE: Executor hostPort znode filename program [args ...]"); System.exit(2); } String hostPort = args[0]; String znode = args[1]; String filename = args[2]; String exec[] = new String[args.length - 3]; System.arraycopy(args, 3, exec, 0, exec.length); try { new Executor(hostPort, znode, filename, exec).run(); } catch (Exception e) { e.printStackTrace(); } } /*************************************************************************** * We do process any events ourselves, we just need to forward them on. * * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent) */ public void process(WatchedEvent event) { dm.process(event); } public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } } public void closing(int rc) { synchronized (this) { notifyAll(); } } static class StreamWriter extends Thread { OutputStream os; InputStream is; StreamWriter(InputStream is, OutputStream os) { this.is = is; this.os = os; start(); } public void run() { byte b[] = new byte[80]; int rc; try { while ((rc = is.read(b)) > 0) { os.write(b, 0, rc); } } catch (IOException e) { } } } public void exists(byte[] data) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } } } /** DataMonitor.java */ import java.util.Arrays; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; import com.udpwork.ssdb.SSDB; import com.udpwork.ssdb.Link; import com.udpwork.ssdb.MemoryStream; import com.udpwork.ssdb.Response; public class DataMonitor implements Watcher, StatCallback { ZooKeeper zk; String znode; Watcher chainedWatcher; boolean dead; DataMonitorListener listener; byte prevData[]; public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); } /** * Other classes use the DataMonitor by implementing this method */ public interface DataMonitorListener { /** * The existence status of the node has changed. */ void exists(byte data[]); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); } public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don‘t need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It‘s all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let‘s find out //读取订单信息 //znode = /product/order/ ... //计算库存是否充足 ... //生成订单数据 ... //减少产品库存 ... zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } } public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don‘t need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } } }
php zookeeper处理类
class OrderWorker extends Zookeeper { const CONTAINER = ‘/product/order‘; protected $acl = array( array( ‘perms‘ => Zookeeper::PERM_ALL, ‘scheme‘ => ‘world‘, ‘id‘ => ‘anyone‘ ) );</p><p> private $znode; public function __construct( $host = ‘‘, $watcher_cb = null, $recv_timeout = 10000 ) { parent::__construct( $host, $watcher_cb, $recv_timeout ); } //添加订单 public function Add($order) { if( ! $this->exists( self::CONTAINER ) ) { $this->create( self::CONTAINER, null, $this->acl ); } $this->znode = $this->create( self::CONTAINER . ‘/w-‘, null, $this->acl, Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE ); $this->set($this->znode, json_encode(array(‘status‘=>0, ‘data‘=>$order, ‘orderid‘=>‘‘))); $this->znode = str_replace( self::CONTAINER .‘/‘, ‘‘, $this->znode ); return $this->znode; } //获取订单处理信息 public function Get($znode){ $data = $this->get(self::CONTAINER .‘/‘.$znode); $json = json_decode($data); return $json; } //删除订单znode public function Del($znode){ $this->delete(self::CONTAINER .‘/‘.$znode); } }
class OrderWorker extends Zookeeper { const CONTAINER = ‘/product/order‘; protected $acl = array( array( ‘perms‘ => Zookeeper::PERM_ALL, ‘scheme‘ => ‘world‘, ‘id‘ => ‘anyone‘ ) );</p><p> private $znode; public function __construct( $host = ‘‘, $watcher_cb = null, $recv_timeout = 10000 ) { parent::__construct( $host, $watcher_cb, $recv_timeout ); } //添加订单 public function Add($order) { if( ! $this->exists( self::CONTAINER ) ) { $this->create( self::CONTAINER, null, $this->acl ); } $this->znode = $this->create( self::CONTAINER . ‘/w-‘, null, $this->acl, Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE ); $this->set($this->znode, json_encode(array(‘status‘=>0, ‘data‘=>$order, ‘orderid‘=>‘‘))); $this->znode = str_replace( self::CONTAINER .‘/‘, ‘‘, $this->znode ); return $this->znode; } //获取订单处理信息 public function Get($znode){ $data = $this->get(self::CONTAINER .‘/‘.$znode); $json = json_decode($data); return $json; } //删除订单znode public function Del($znode){ $this->delete(self::CONTAINER .‘/‘.$znode); } }
PHP前台下订单:
$worker = new OrderWorker( ‘127.0.0.1:2181‘ ); $znode = $worker->Add($order); echo json_encode(array(‘code‘=>0, ‘znode‘=>$znode));
echo json_encode(array(‘code‘=>0, ‘znode‘=>$znode));
PHP前台检测订单:
$worker = new OrderWorker( ‘127.0.0.1:2181‘ ); $data = $worker->Get($znode); if (intval($data->status) == 1) { $worker->Del($znode); echo json_encode(array(‘status‘=>1, ‘msg‘=>‘订单处理成功‘, ‘orderid‘=>$data->orderid)); } else if (intval($data->status) == 2) { $worker->Del($znode); echo json_encode(array(‘status‘=>2, ‘msg‘=>‘订单处理失败‘)); } else { echo json_encode(array(‘status‘=>0, ‘msg‘=>‘正在处理‘)); }
$worker = new OrderWorker( ‘127.0.0.1:2181‘ ); $data = $worker->Get($znode); if (intval($data->status) == 1) { $worker->Del($znode); echo json_encode(array(‘status‘=>1, ‘msg‘=>‘订单处理成功‘, ‘orderid‘=>$data->orderid)); } else if (intval($data->status) == 2) { $worker->Del($znode); echo json_encode(array(‘status‘=>2, ‘msg‘=>‘订单处理失败‘)); } else { echo json_encode(array(‘status‘=>0, ‘msg‘=>‘正在处理‘)); }
六、全文索引
创建索引
import java.io.File; import java.io.FileReader; import java.io.IOException; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.LongField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.util.Version; public class IndexOrders { private IndexWriter writer = null; public void Add(String indexPath, OrderModel order) throws IOException { // 获取放置索引文件的位置,若传入参数为空,则读取search.properties中设置的默认值。 if (indexPath == null) { indexPath = LoadProperties.getProperties("indexDir"); } final File indexDir = new File(indexPath); if (!indexDir.exists() || !indexDir.canRead()) { System.out .println("Document directory ‘" + indexDir.getAbsolutePath() + "‘ does not exist or is not readable, please check the path"); System.exit(1); } try { // 创建索引库IndexWriter if(writer == null){ initialIndexWriter(indexDir); } index(writer, order); } catch (IOException e) { e.printStackTrace(); } } public void Close() { if (null != writer) { writer.close(); } } private void initialIndexWriter(File indexDir) throws IOException { Directory returnIndexDir = FSDirectory.open(indexDir); IndexWriterConfig iwc = new IndexWriterConfig(Version.LUCENE_48,new StandardAnalyzer(Version.LUCENE_48)); writer = new IndexWriter(returnIndexDir, iwc); } private void index(IndexWriter writer, OrderModel order) throws IOException { // 创建文档Document Document doc = new Document(); Field orderidField = new StringField("orderid", order.orderid, Field.Store.YES); doc.add(orderidField); ... //向索引库中写入文档内容 writer.addDocument(doc); } } }
七、组合条件查询
由java类实现查询,因为PHP可以直接调用java程序,所以PHP只需要把java类返回的结果显示出来就可以。
int pageIndex=1; int pageSize=1; int start = (pageIndex - 1) * pageSize; String query_fields[]=new String[]{"filename","content"};//对哪几个字段进行查询检索 File file_index_dir = new File(indexDir); try { Directory directory = new SimpleFSDirectory(file_index_dir); IndexReader indexReader = DirectoryReader.open(directory); // 创建搜索类 IndexSearcher indexSearcher = new IndexSearcher(indexReader); TermQuery query1 = new TermQuery(new Term("orderid", orderid)); TermQuery query2 = new TermQuery(new Term("contact", contact)); BooleanQuery query = new BooleanQuery(); query.add(query1, BooleanClause.Occur.MUST); query.add(query2, BooleanClause.Occur.MUST); int max_result_size = start + pageSize; TopScoreDocCollector topDocs = TopScoreDocCollector.create(max_result_size, false); indexSearcher.search(query, topDocs); int rowCount = topDocs.getTotalHits(); //满足条件的总记录数 int pages = (rowCount - 1) / pageSize + 1; //计算总页数 TopDocs tds = topDocs.topDocs(start, pageSize); ScoreDoc[] scoreDoc = tds.scoreDocs; for (int i = 0; i < scoreDoc.length; i++) { // 内部编号 int doc_id = scoreDoc[i].doc; // 根据文档id找到文档 Document mydoc = indexSearcher.doc(doc_id); //读取搜索结果 mydoc.get("orderid"); mydoc.get("contact"); mydoc.get("addr"); ... } } catch (Exception e) { e.printStackTrace(); }
有问题可以加我QQ沟通:10980327
使用NoSQL实现高并发CRM系统实践(源代码+解析)