key/value存储系统-Memcached、Redis、Tair

每个产品的可配置参数繁多,涉及缓存策略、分布算法、序列化方式、数据压缩技术、通信方式、并发、超时等诸多方面因素,都会对测试结果产生影响,单纯的性能对比存在非常多的局限性和不合理性,所以不能作为任何评估依据,仅供参考。

1、尽管 Memcached 和 Redis 都标识为Distribute,但从Server端本身而言它们并不提供分布式的解决方案,需要Client端实现一定的分布算法将数据存储到各个节点,从而实现分布式存储,两者都提供了Replication功能(Master-Slave)保障可靠性。

2、Tair 则本身包含 Config Server 和 Data Server 采用一致性哈希算法分布数据存储,由ConfigSever来管理所有数据节点,理论上服务器端节点的维护对前端应用不会产生任何影响,同时数据能按指定复制到不同的DataServer保障可靠性,从Cluster角度来看属于一个整体Solution

环境

Memcached Memcached 1.4.21 Xmemcached 2.0.0
Redis Redis 2.8.19 Jedis 2.8.5
Tair Tair 2.3 Tair Client 2.3.1

测试

从数据库读取一组数据缓存(SET)到每个缓存服务器,其中对于每个Server的写入数据是完全一致的,不设置过期时间,进行如下测试。

1)单线程进行1次写入

2)单线程进行500次写入

3)单线程进行2000次写入

4)并行500个线程,每个线程进行1次写入

5)并行500个线程,每个线程进行5次写入

6)并行2000个线程,每个线程进行1次写入

2、分别从每个缓存服务器读取(GET)数据,其中对于每个Server的读取数据大小是完全一致的,进行如下测试。

1)单线程进行1次读取

2)单线程进行500次读取

3)单线程进行2000次读取

4)并行500个线程,每个线程进行1次读取

5)并行500个线程,每个线程进行5次读取

6)并行2000个线程,每个线程进行1次读取

四、单线程测试

1、缓存Model对象(OrderInfo)的定义参照tbOrder表(包括单据号、制单日期、商品、数量等字段)

2、单线程的读写操作对于代码的要求相对较低,不需要考虑Pool,主要代码如下:

1)Memcached单线程读写,使用二进制方式序列化,不启用压缩。

 1 public static void putItems2Memcache(List<OrderInfo> orders) throws Exception {
 2         MemcachedClient memcachedClient = null;
 3         try {
 4             MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("10.129.221.70:12000"));
 5             builder.setCommandFactory(new BinaryCommandFactory());
 6             memcachedClient = builder.build();
 7
 8             for (OrderInfo order : orders) {
 9                 boolean isSuccess = memcachedClient.set("order_" + order.BillNumber, 0, order);
10                 if (!isSuccess) {
11                     System.out.println("put: order_" + order.BillNumber + "  " + isSuccess);
12                 }
13             }
14         } catch (Exception ex) {
15             ex.printStackTrace();
16         } finally {
17             memcachedClient.shutdown();
18         }
19     }
20
21     public static void getItemsFromMemcache(List<String> billNumbers) throws Exception {
22         MemcachedClient memcachedClient = null;
23         try {
24             MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("10.129.221.70:12000"));
25             builder.setCommandFactory(new BinaryCommandFactory());
26             memcachedClient = builder.build();
27
28             for (String billnumber : billNumbers) {
29                 OrderInfo result = memcachedClient.get(billnumber);
30
31                 if (result == null) {
32                     System.out.println(" get failed : " + billnumber + " not exist ");
33                 }
34             }
35         } catch (Exception ex) {
36             ex.printStackTrace();
37         } finally {
38             memcachedClient.shutdown();
39         }
40     }

2)Redis单线程读写,由于Jedis Client 不支持对象的序列化,需要自行实现对象序列化(本文使用二进制方式)。

 1 public static void putItems2Redis(List<OrderInfo> orders) {
 2         Jedis jedis = new Jedis("10.129.221.70", 6379);
 3
 4         try {
 5             jedis.connect();
 6
 7             for (OrderInfo order : orders) {
 8                 String StatusCode = jedis.set(("order_" + order.BillNumber).getBytes(), SerializeUtil.serialize(order));
 9                 if (!StatusCode.equals("OK")) {
10                     System.out.println("put: order_" + order.BillNumber + "  " + StatusCode);
11                 }
12             }
13         } catch (Exception ex) {
14             ex.printStackTrace();
15         } finally {
16             jedis.close();
17         }
18     }
19
20     public static void getItemsFromRedis(List<String> billNumbers) {
21         Jedis jedis = new Jedis("10.129.221.70", 6379);
22
23         try {
24             jedis.connect();
25
26             for (String billnumber : billNumbers) {
27                 byte[] result = jedis.get(billnumber.getBytes());
28                 if (result.length > 0) {
29                     OrderInfo order = (OrderInfo) SerializeUtil.unserialize(result);
30                     if (order == null) {
31                         System.out.println(" unserialize failed : " + billnumber);
32                     }
33                 } else {
34                     System.out.println(" get failed : " + billnumber + " not exist ");
35                 }
36             }
37         } catch (Exception ex) {
38             ex.printStackTrace();
39         } finally {
40             jedis.close();
41         }
42     }

序列化代码

 1 package common;
 2
 3 import java.io.ByteArrayInputStream;
 4 import java.io.ByteArrayOutputStream;
 5 import java.io.ObjectInputStream;
 6 import java.io.ObjectOutputStream;
 7
 8 public class SerializeUtil {
 9
10     /**
11      * 序列化
12      * @param object
13      * @return
14      */
15     public static byte[] serialize(Object object) {
16         ObjectOutputStream oos = null;
17         ByteArrayOutputStream baos = null;
18
19         try {
20             baos = new ByteArrayOutputStream();
21             oos = new ObjectOutputStream(baos);
22             oos.writeObject(object);
23             byte[] bytes = baos.toByteArray();
24             return bytes;
25         } catch (Exception e) {
26             e.printStackTrace();
27         }
28         return null;
29     }
30
31     /**
32      * 反序列化
33      * @param bytes
34      * @return
35      */
36     public static Object unserialize(byte[] bytes) {
37         ByteArrayInputStream bais = null;
38         try {
39             bais = new ByteArrayInputStream(bytes);
40             ObjectInputStream ois = new ObjectInputStream(bais);
41             return ois.readObject();
42         } catch (Exception e) {
43             e.printStackTrace();
44         }
45
46         return null;
47     }
48 }

3)Tair单线程读写,使用Java序列化,默认压缩阀值为8192字节,但本文测试的每个写入项都不会超过这个阀值,所以不受影响。

 1 public static void putItems2Tair(List<OrderInfo> orders) {
 2         try {
 3             List<String> confServers = new ArrayList<String>();
 4             confServers.add("10.129.221.70:5198");
 5             //confServers.add("10.129.221.70:5200");
 6
 7             DefaultTairManager tairManager = new DefaultTairManager();
 8             tairManager.setConfigServerList(confServers);
 9             tairManager.setGroupName("group_1");
10             tairManager.init();
11
12             for (OrderInfo order : orders) {
13                 ResultCode result = tairManager.put(0, "order_" + order.BillNumber, order);
14                 if (!result.isSuccess()) {
15                     System.out.println("put: order_" + order.BillNumber + "  " + result.isSuccess() + " code:" + result.getCode());
16                 }
17             }
18         } catch (Exception ex) {
19             ex.printStackTrace();
20         }
21     }
22
23     public static void getItemsFromTair(List<String> billNumbers) {
24         try {
25             List<String> confServers = new ArrayList<String>();
26             confServers.add("10.129.221.70:5198");
27             //confServers.add("10.129.221.70:5200");
28
29             DefaultTairManager tairManager = new DefaultTairManager();
30             tairManager.setConfigServerList(confServers);
31             tairManager.setGroupName("group_1");
32             tairManager.init();
33
34             for (String billnumber : billNumbers) {
35                 Result<DataEntry> result = tairManager.get(0, billnumber);
36                 if (result.isSuccess()) {
37                     DataEntry entry = result.getValue();
38                     if (entry == null) {
39                         System.out.println(" get failed : " + billnumber + " not exist ");
40                     }
41                 } else {
42                     System.out.println(result.getRc().getMessage());
43                 }
44             }
45         } catch (Exception ex) {
46             ex.printStackTrace();
47         }
48     }

3、测试结果,每项重复测试取平均值

五、多线程测试

1、除了多线程相关代码外的公共代码和单线程基本一致,多线程测试主要增加了Client部分代码对ConnectionPool、TimeOut相关设置,池策略、大小都会对性能产生很大影响,为了达到更高的性能,不同的使用场景下都需要有科学合理的测算。

2、主要测试代码

1)每个读写测试线程任务完成后统一调用公共Callback,在每批测试任务完成后记录消耗时间

 1 package common;
 2
 3 public class ThreadCallback {
 4
 5     public static int CompleteCounter = 0;
 6     public static int failedCounter = 0;
 7
 8     public static synchronized void OnException() {
 9         failedCounter++;
10     }
11
12     public static synchronized void OnComplete(String msg, int totalThreadCount, long startMili) {
13         CompleteCounter++;
14         if (CompleteCounter == totalThreadCount) {
15             long endMili = System.currentTimeMillis();
16             System.out.println("(总共" + totalThreadCount + "个线程 ) " + msg + "  ,总耗时为:" + (endMili - startMili) + "毫秒 ,发生异常线程数:" + failedCounter);
17             CompleteCounter = 0;
18             failedCounter = 0;
19         }
20     }
21 }

2)Memcached多线程读写,使用XMemcached客户端连接池,主要设置连接池大小ConnectionPoolSize=5,连接超时时间ConnectTimeout=2000ms,测试结果要求没有超时异常线程。

测试方法

 1         /*-------------------Memcached(多线程初始化)--------------------*/
 2         MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("192.168.31.191:12000"));
 3         builder.setCommandFactory(new BinaryCommandFactory());
 4         builder.setConnectionPoolSize(5);
 5         builder.setConnectTimeout(2000);
 6         MemcachedClient memcachedClient = builder.build();
 7         memcachedClient.setOpTimeout(2000);
 8
 9         /*-------------------Memcached(多线程写入)--------------------*/
10         orders = OrderBusiness.loadOrders(5);
11         startMili = System.currentTimeMillis();
12         totalThreadCount = 500;
13         for (int i = 1; i <= totalThreadCount; i++) {
14             MemcachePutter putter = new MemcachePutter();
15             putter.OrderList = orders;
16             putter.Namesapce = i;
17             putter.startMili = startMili;
18             putter.TotalThreadCount = totalThreadCount;
19             putter.memcachedClient = memcachedClient;
20
21             Thread th = new Thread(putter);
22             th.start();
23         }
24
25                 //读取代码基本一致

线程任务类

 1 public class MemcachePutter implements Runnable {
 2     public List<OrderInfo> OrderList;
 3     public int Namesapce;
 4     public int TotalThreadCount;
 5     public long startMili;
 6     public MemcachedClient memcachedClient = null; // 线程安全的?
 7
 8     @Override
 9     public void run() {
10         try {
11             for (OrderInfo order : OrderList) {
12                 boolean isSuccess = memcachedClient.set("order_" + order.BillNumber, 0, order);
13                  if (!isSuccess) {
14                 System.out.println("put: order_" + order.BillNumber + "  " + isSuccess);
15                 }
16             }
17         } catch (Exception ex) {
18             ex.printStackTrace();
19             ThreadCallback.OnException();
20         } finally {
21             ThreadCallback.OnComplete("Memcached 每个线程进行" + OrderList.size() + "次 [写入] ", TotalThreadCount, startMili);
22         }
23     }
24 }
25
26
27
28 public class MemcacheGetter implements Runnable {
29
30     public List<String> billnumbers;
31     public long startMili;
32     public int TotalThreadCount;
33     public MemcachedClient memcachedClient = null; // 线程安全的?
34
35     @Override
36     public void run() {
37         try {
38             for (String billnumber : billnumbers) {
39                 OrderInfo result = memcachedClient.get(billnumber);
40                 if (result == null) {
41                     System.out.println(" get failed : " + billnumber + " not exist ");
42                 }
43             }
44         } catch (Exception ex) {
45             ex.printStackTrace();
46             ThreadCallback.OnException();
47         } finally {
48             ThreadCallback.OnComplete("Memcached 每个线程进行" + billnumbers.size() + "次 [读取] ", TotalThreadCount, startMili);
49         }
50     }
51 }

3)Redis多线程读写,使用Jedis客户端连接池,从源码可以看出依赖与Apache.Common.Pool2,主要设置连接池MaxTotal=5,连接超时时间Timeout=2000ms,测试结果要求没有超时异常线程。

测试方法

 1         /*-------------------Redis(多线程初始化)--------------------*/
 2         GenericObjectPoolConfig config = new GenericObjectPoolConfig();
 3         config.setMaxTotal(5);
 4         JedisPool jpool = new JedisPool(config, "192.168.31.191", 6379, 2000);
 5
 6         /*-------------------Redis(多线程写入)--------------------*/
 7         totalThreadCount = 2000;
 8         orders = OrderBusiness.loadOrders(1);
 9         startMili = System.currentTimeMillis();
10         for (int i = 1; i <= totalThreadCount; i++) {
11             RedisPutter putter = new RedisPutter();
12             putter.OrderList = orders;
13             putter.Namesapce = i;
14             putter.startMili = startMili;
15             putter.TotalThreadCount = totalThreadCount;
16             putter.jpool = jpool;
17
18             Thread th = new Thread(putter);
19             th.start();
20         }

线程任务类

 1 public class RedisPutter implements Runnable {
 2
 3     public List<OrderInfo> OrderList;
 4     public int Namesapce;
 5     public int TotalThreadCount;
 6     public long startMili;
 7     public JedisPool jpool;
 8
 9     @Override
10     public void run() {
11         Jedis jedis = jpool.getResource();
12
13         try {
14             jedis.connect();
15
16             for (OrderInfo order : OrderList) {
17                 String StatusCode = jedis.set(("order_" + order.BillNumber).getBytes(), SerializeUtil.serialize(order));
18                 if (!StatusCode.equals("OK")) {
19                     System.out.println("put: order_" + order.BillNumber + "  " + StatusCode);
20                 }
21             }
22         } catch (Exception ex) {
23             // ex.printStackTrace();
24             jpool.returnBrokenResource(jedis);
25             ThreadCallback.OnException();
26         } finally {
27             jpool.returnResource(jedis);
28             ThreadCallback.OnComplete("Redis 每个线程进行" + OrderList.size() + "次 [写入] ", TotalThreadCount, startMili);
29         }
30     }
31 }
32
33
34
35 public class RedisGetter implements Runnable {
36     public List<String> billnumbers;
37     public long startMili;
38     public int TotalThreadCount;
39     public JedisPool jpool;
40
41     @Override
42     public void run() {
43         Jedis jedis = jpool.getResource();
44
45         try {
46             jedis.connect();
47             for (String billnumber : billnumbers) {
48                 byte[] result = jedis.get(billnumber.getBytes());
49                 if (result.length > 0) {
50                     OrderInfo order = (OrderInfo) SerializeUtil.unserialize(result);
51                     if (order == null) {
52                         System.out.println(" unserialize failed : " + billnumber);
53                     }
54                 } else {
55                     System.out.println(" get failed : " + billnumber + " not exist ");
56                 }
57             }
58         } catch (Exception ex) {
59             // ex.printStackTrace();
60             jpool.returnBrokenResource(jedis);
61             ThreadCallback.OnException();
62         } finally {
63             jpool.returnResource(jedis);
64             ThreadCallback.OnComplete("Redis 每个线程进行" + billnumbers.size() + "次 [读取] ", TotalThreadCount, startMili);
65         }
66     }
67 }

4)Tair多线程读写,使用官方Tair-Client,可设置参数MaxWaitThread主要指最大等待线程数,当超过这个数量的线程在等待时,新的请求将直接返回超时,本文测试设置MaxWaitThread=100,连接超时时间Timeout=2000ms,测试结果要求没有超时异常线程。

测试方法

 1      /*-------------------Tair(多线程初始化tairManager)--------------------*/
 2         List<String> confServers = new ArrayList<String>();
 3         confServers.add("192.168.31.191:5198");
 4         DefaultTairManager tairManager = new DefaultTairManager();
 5         tairManager.setConfigServerList(confServers);
 6         tairManager.setGroupName("group_1");
 7         tairManager.setMaxWaitThread(100);// 最大等待线程数,当超过这个数量的线程在等待时,新的请求将直接返回超时
 8         tairManager.setTimeout(2000);// 请求的超时时间,单位为毫秒
 9         tairManager.init();
10
11         /*-------------------Tair(多线程写入)--------------------*/
12         orders = OrderBusiness.loadOrders(5);
13         startMili = System.currentTimeMillis();
14         totalThreadCount = 500;
15         for (int i = 1; i <= totalThreadCount; i++) {
16             TairPutter putter = new TairPutter();
17             putter.OrderList = orders;
18             putter.Namesapce = i;
19             putter.startMili = startMili;
20             putter.TotalThreadCount = totalThreadCount;
21             putter.tairManager = tairManager;
22
23             Thread th = new Thread(putter);
24             th.start();
25         }
26      /*-------------------Tair(多线程读取)--------------------*/
27         //读取代码基本一致

线程任务类

 1 public class TairGetter implements Runnable {
 2     public List<String> billnumbers;
 3     public long startMili;
 4     public int TotalThreadCount;
 5     public DefaultTairManager tairManager;
 6
 7     @Override
 8     public void run() {
 9         try {
10             for (String billnumber : billnumbers) {
11                 Result<DataEntry> result = tairManager.get(0, billnumber);
12                 if (result.isSuccess()) {
13                     DataEntry entry = result.getValue();
14                     if (entry == null) {
15                         System.out.println(" get failed : " + billnumber + " not exist ");
16                     }
17                 } else {
18                     System.out.println(result.getRc().getMessage());
19                 }
20             }
21         } catch (Exception ex) {
22             // ex.printStackTrace();
23             ThreadCallback.OnException();
24         } finally {
25             ThreadCallback.OnComplete("Tair 每个线程进行" + billnumbers.size() + "次 [读取] ", TotalThreadCount, startMili);
26         }
27     }
28 }
29
30
31
32 public class TairPutter implements Runnable {
33
34     public List<OrderInfo> OrderList;
35     public int Namesapce;
36     public int TotalThreadCount;
37     public long startMili;
38     public DefaultTairManager tairManager;
39
40     @Override
41     public void run() {
42         try {
43             for (OrderInfo order : OrderList) {
44                 ResultCode result = tairManager.put(0, "order_" + order.BillNumber, order);
45                 if (!result.isSuccess()) {
46                     System.out.println("put: order_" + order.BillNumber + "  " + result.isSuccess() + " code:" + result.getCode());
47                 }
48             }
49         } catch (Exception ex) {
50             // ex.printStackTrace();
51             ThreadCallback.OnException();
52         } finally {
53             ThreadCallback.OnComplete("Tair 每个线程进行" + OrderList.size() + "次 [写入] ", TotalThreadCount, startMili);
54         }
55     }
56 }

3、测试结果,每项重复测试取平均值

六、Memcached、Redis、Tair 都非常优秀

Redis在单线程环境下的性能表现非常突出,但在并行环境下则没有很大的优势,是JedisPool或者CommonPool的性能瓶颈还是我测试代码的问题请麻烦告之,过程中修改setMaxTotal,setMaxIdle都没有太大的改观。

Tair由于需要在服务器端实现数据分布等相关算法,所以在测试对比中性能有所损耗应该也很好理解。

时间: 2024-11-01 17:15:47

key/value存储系统-Memcached、Redis、Tair的相关文章

Python自动化 【第十一篇】:Python进阶-RabbitMQ队列/Memcached/Redis

 本节内容: RabbitMQ队列 Memcached Redis 1.  RabbitMQ 安装 http://www.rabbitmq.com/install-standalone-mac.html 安装python rabbitMQ module pip install pika or easy_install pika or 源码 https://pypi.python.org/pypi/pika 实现最简单的队列 send 端 received 端 1.1 Work Queues 在这

缓存:Memcached Redis

一.Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信. 1.Memcached安装配置 #安装倚赖包 yum install libevent-devel #安装软件 yum

memcached&amp;redis性能测试

转自:http://www.iigrowing.cn/memcached-redis-xing-neng-ce-shi.html 一.Memcached 1.1.memcached简介 Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提供动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通

Key/Value之王Memcached初探:三、Memcached解决Session的分布式存储场景的应用

一.高可用的Session服务器场景简介 1.1 应用服务器的无状态特性 应用层服务器(这里一般指Web服务器)处理网站应用的业务逻辑,应用的一个最显著的特点是:应用的无状态性. PS:提到无状态特性,不得不说下Http协议.我们常常听到说,Http是一个无状态协议,同一个会话的连续两个请求互相不了解,他们由最新实例化的环境进行解析,除了应用本身可能已经存储在全局对象中的所有信息外,该环境不保存与会话有关的任何信息.之所以我们在使用ASP.NET WebForm开发中会感觉不到Http的无状态特

Key/Value之王Memcached初探:一、掀起MC的盖头来

一.Memcached是何方神圣? 在数据驱动的Web开发中,经常要重复从数据库中取出相同的数据,这种重复极大的增加了数据库负载.缓存是解决这个问题的好办法.但是ASP.NET中的HttpRuntime.Cache虽然已经可以实现对页面局部进行缓存,但还是不够灵活,此时Memcached或许是你想要的. Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.经过多年的发

【转】 Key/Value之王Memcached初探:三、Memcached解决Session的分布式存储场景的应用

一.高可用的Session服务器场景简介 1.1 应用服务器的无状态特性 应用层服务器(这里一般指Web服务器)处理网站应用的业务逻辑,应用的一个最显著的特点是:应用的无状态性. PS:提到无状态特性,不得不说下Http协议.我们常常听到说,Http是一个无状态协议,同一个会话的连续两个请求互相不了解,他们由最新实例化的环境进行解析,除了应用本身可能已经存储在全局对象中的所有信息外,该环境不保存与会话有关的任何信息.之所以我们在使用ASP.NET WebForm开发中会感觉不到Http的无状态特

Key/Value存储系统etcd的特性

etcd 是一个高可用的Key/Value存储系统,和其他KV存储系统不同的是,它的灵感来自于 ZooKeeper 和 Doozer,主要用于分享配置和服务发现.利用 etcd 的特性,应用程序可以在集群中共享信息.配置或作服务发现,etcd 会在集群的各个节点中复制这些数据并保证这些数据始终正确. 它的主要优点是: 简单:支持 curl 方式的用户 API (HTTP+JSON) 安全:可选 SSL 客户端证书认证 快速:单实例可达每秒 1000 次写操作 可靠:使用Raft强一致性算法实现分

tair 分布式key/value存储系统 环境搭建

一.安装tair 1.安装svn工具 yum -y install subversion 2.获取源代码 获得底层库 tbsys 和 tbnet的源代码 svn checkout http://code.taobao.org/svn/tb-common-utils/trunk/ tb-common-utils 获得tair源代码 svn checkout http://code.taobao.org/svn/tair/trunk/ tair 3.安装前的准备,安装依赖库或软件 编译tair或tb

python对缓存(memcached,redis)的操作

1.Memcached Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载.它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态.数据库驱动网站的速度.Memcached基于一个存储键/值对的hashmap.其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信 memcached服务端安装部署 安装libevent memcached依赖于libevent API,因此要事先安装之