[转帖]Reactor模式

https://www.cnblogs.com/crazymakercircle/p/9833847.html

看不懂代码  只看的图。。

疯狂创客圈,一个Java 高并发研习社群 【博客园 总入口 】

疯狂创客圈,倾力推出:面试必备 + 面试必备 + 面试必备 的基础原理+实战 书籍 《Netty Zookeeper Redis 高并发实战


写在前面 

? 大家好,我是 高并发的实战社群【疯狂创客圈】尼恩。Reactor模式非常重要,无论开发、还是面试。

本文的内容,在《Netty Zookeeper Redis 高并发实战》一书时,进行内容的完善和更新,并且进行的源码的升级。 博客和书不一样,书更加层层升入、层次分明,请大家以书的内容为准。 具体请参考书的第四章 —— 鼎鼎大名的Reactor反应器模式 。

基础篇:netty源码  死磕3-

传说中神一样的Reactor反应器模式

本文目录

1. 为什么是Reactor模式
2. Reactor模式简介
3. 多线程IO的致命缺陷
4. 单线程Reactor模型
4.1. 什么是单线程Reactor呢?
4.2. 单线程Reactor的参考代码
4.3. 单线程模式的缺点:
5. 多线程的Reactor
5.1. 基于线程池的改进
5.2. 改进后的完整示意图
5.3. 多线程Reactor的参考代码
6. Reactor持续改进
7. Reactor编程的优点和缺点
7.1. 优点
7.2. 缺点

1. 为什么是Reactor模式

写多了代码的兄弟们都知道,JAVA代码由于到处面向接口及高度抽象,用到继承多态和设计模式,程序的组织不是按照正常的理解顺序来的,对代码跟踪很是个问题。所以,在阅读别人的源码时,如果不了解代码的组织方式,往往是晕头转向,不知在何处。尤其是阅读经典代码的时候,更是如此。

反过来,如果先了解代码的设计模式,再来去代码,就会阅读的很轻松,不会那么难懂。

像netty这样的精品中的极品,肯定也是需要先从设计模式入手的。netty的整体架构,基于了一个著名的模式——Reactor模式。Reactor模式,是高性能网络编程的必知必会模式。

首先熟悉Reactor模式,一定是磨刀不误砍柴工。

2. Reactor模式简介

Netty是典型的Reactor模型结构,关于Reactor的详尽阐释,本文站在巨人的肩膀上,借助 Doug Lea(就是那位让人无限景仰的大爷)的“Scalable IO in Java”中讲述的Reactor模式。

“Scalable IO in Java”的地址是:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

Reactor模式也叫反应器模式,大多数IO相关组件如Netty、Redis在使用的IO模式,为什么需要这种模式,它是如何设计来解决高性能并发的呢?

3. 多线程IO的致命缺陷

最最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字连接,如果有,那么就调用一个处理函数处理,类似:

while(true){

socket = accept();

handle(socket)

}

这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。

之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,类似:

package com.crazymakercircle.iodemo.base;

import com.crazymakercircle.config.SystemConfig;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

class BasicModel implements Runnable {
    public void run() {
        try {
            ServerSocket ss =
                    new ServerSocket(SystemConfig.SOCKET_SERVER_PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
            //创建新线程来handle
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[SystemConfig.INPUT_SIZE];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] input) {
            byte[] output=null;
            /* ... */
            return output;
        }
    }
}

对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。

tomcat服务器的早期版本确实是这样实现的。

多线程并发模式,一个连接一个线程的优点是:

一定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。这也是为什么通常会讲“一个线程只能对应一个socket”的原因。另外有个问题,如果一个线程中对应多个socket连接不行吗?语法上确实可以,但是实际上没有用,每一个socket都是阻塞的,所以在一个线程里只能处理一个socket,就算accept了多个也没用,前一个socket被阻塞了,后面的是无法被执行到的。

多线程并发模式,一个连接一个线程的缺点是:

缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。

改进方法是:

采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。使用Reactor模式,对线程的数量进行控制,一个线程处理大量的事件。

4. 单线程Reactor模型

Reactor模型的朴素原型

Java的NIO模式的Selector网络通讯,其实就是一个简单的Reactor模型。可以说是Reactor模型的朴素原型。

 static class Server
    {

        public static void testServer() throws IOException
        {

            // 1、获取Selector选择器
            Selector selector = Selector.open();

            // 2、获取通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 3.设置为非阻塞
            serverSocketChannel.configureBlocking(false);
            // 4、绑定连接
            serverSocketChannel.bind(new InetSocketAddress(SystemConfig.SOCKET_SERVER_PORT));

            // 5、将通道注册到选择器上,并注册的操作为:“接收”操作
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            // 6、采用轮询的方式,查询获取“准备就绪”的注册过的操作
            while (selector.select() > 0)
            {
                // 7、获取当前选择器中所有注册的选择键(“已经准备就绪的操作”)
                Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
                while (selectedKeys.hasNext())
                {
                    // 8、获取“准备就绪”的时间
                    SelectionKey selectedKey = selectedKeys.next();

                    // 9、判断key是具体的什么事件
                    if (selectedKey.isAcceptable())
                    {
                        // 10、若接受的事件是“接收就绪” 操作,就获取客户端连接
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        // 11、切换为非阻塞模式
                        socketChannel.configureBlocking(false);
                        // 12、将该通道注册到selector选择器上
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }
                    else if (selectedKey.isReadable())
                    {
                        // 13、获取该选择器上的“读就绪”状态的通道
                        SocketChannel socketChannel = (SocketChannel) selectedKey.channel();

                        // 14、读取数据
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int length = 0;
                        while ((length = socketChannel.read(byteBuffer)) != -1)
                        {
                            byteBuffer.flip();
                            System.out.println(new String(byteBuffer.array(), 0, length));
                            byteBuffer.clear();
                        }
                        socketChannel.close();
                    }

                    // 15、移除选择键
                    selectedKeys.remove();
                }
            }

            // 7、关闭连接
            serverSocketChannel.close();
        }

        public static void main(String[] args) throws IOException
        {
            testServer();
        }
    }

实际上的Reactor模式,是基于Java NIO的,在他的基础上,抽象出来两个组件——Reactor和Handler两个组件:

(1)Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。

(2)Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。

4.1. 什么是单线程Reactor呢?

如下图所示:

这是最简单的单Reactor单线程模型。Reactor线程是个多面手,负责多路分离套接字,Accept新连接,并分派请求到Handler处理器中。

下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多。Reactor和Hander 处于一条线程执行。

顺便说一下,可以将上图的accepter,看做是一种特殊的handler。

4.2. 单线程Reactor的参考代码

“Scalable IO in Java”,实现了一个单线程Reactor的参考代码,Reactor的代码如下:

package com.crazymakercircle.ReactorModel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class Reactor implements Runnable
{
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException
    { //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //非阻塞
        serverSocket.configureBlocking(false);

        //分步处理,第一步,接收accept事件
        SelectionKey sk =
                serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        //attach callback object, Acceptor
        sk.attach(new Acceptor());
    }

    public void run()
    {
        try
        {
            while (!Thread.interrupted())
            {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                {
                    //Reactor负责dispatch收到的事件
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void dispatch(SelectionKey k)
    {
        Runnable r = (Runnable) (k.attachment());
        //调用之前注册的callback对象
        if (r != null)
        {
            r.run();
        }
    }

    // inner class
    class Acceptor implements Runnable
    {
        public void run()
        {
            try
            {
                SocketChannel channel = serverSocket.accept();
                if (channel != null)
                    new Handler(selector, channel);
            } catch (IOException ex)
            { /* ... */ }
        }
    }
}

Handler的代码如下:

package com.crazymakercircle.ReactorModel;

import com.crazymakercircle.config.SystemConfig;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

class Handler implements Runnable
{
    final SocketChannel channel;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
    ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector selector, SocketChannel c) throws IOException
    {
        channel = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = channel.register(selector, 0);

        //将Handler作为callback对象
        sk.attach(this);

        //第二步,注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete()
    {
        /* ... */
        return false;
    }

    boolean outputIsComplete()
    {

        /* ... */
        return false;
    }

    void process()
    {
        /* ... */
        return;
    }

    public void run()
    {
        try
        {
            if (state == READING)
            {
                read();
            }
            else if (state == SENDING)
            {
                send();
            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void read() throws IOException
    {
        channel.read(input);
        if (inputIsComplete())
        {

            process();

            state = SENDING;
            // Normally also do first write now

            //第三步,接收write就绪事件
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException
    {
        channel.write(output);

        //write完就结束了, 关闭select key
        if (outputIsComplete())
        {
            sk.cancel();
        }
    }
}

这两段代码,是建立在JAVA NIO的基础上的,这两段代码建议一定要看懂。可以在IDE中去看源码,这样直观感觉更佳。

如果对NIO的Seletor不完全了解,影响到上面的代码阅读,请阅读疯狂创客圈的Java NIO死磕 文章。

4.3. 单线程模式的缺点:

1、 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。

2、因此,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。

5. 多线程的Reactor

5.1. 基于线程池的改进

在线程Reactor模式基础上,做如下改进:

(1)将Handler处理器的执行放入线程池,多线程进行业务处理。

(2)而对于Reactor而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor拆分为两个线程。

一个简单的图如下:

5.2. 改进后的完整示意图

下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多,只是更加详细。Reactor是一条独立的线程,Hander 处于线程池中执行。

5.3. 多线程Reactor的参考代码

“Scalable IO in Java”,的多线程Reactor的参考代码,是基于单线程做一个线程池的改进,改进的Handler的代码如下:

package com.crazymakercircle.ReactorModel;

import com.crazymakercircle.config.SystemConfig;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

class MthreadHandler implements Runnable
{
    final SocketChannel channel;
    final SelectionKey selectionKey;
    ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
    ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    ExecutorService pool = Executors.newFixedThreadPool(2);
    static final int PROCESSING = 3;

    MthreadHandler(Selector selector, SocketChannel c) throws IOException
    {
        channel = c;
        c.configureBlocking(false);
        // Optionally try first read now
        selectionKey = channel.register(selector, 0);

        //将Handler作为callback对象
        selectionKey.attach(this);

        //第二步,注册Read就绪事件
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete()
    {
       /* ... */
        return false;
    }

    boolean outputIsComplete()
    {

       /* ... */
        return false;
    }

    void process()
    {
       /* ... */
        return;
    }

    public void run()
    {
        try
        {
            if (state == READING)
            {
                read();
            }
            else if (state == SENDING)
            {
                send();
            }
        } catch (IOException ex)
        { /* ... */ }
    }

    synchronized void read() throws IOException
    {
        // ...
        channel.read(input);
        if (inputIsComplete())
        {
            state = PROCESSING;
            //使用线程pool异步执行
            pool.execute(new Processer());
        }
    }

    void send() throws IOException
    {
        channel.write(output);

        //write完就结束了, 关闭select key
        if (outputIsComplete())
        {
            selectionKey.cancel();
        }
    }

    synchronized void processAndHandOff()
    {
        process();
        state = SENDING;
        // or rebind attachment
        //process完,开始等待write就绪
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }

    class Processer implements Runnable
    {
        public void run()
        {
            processAndHandOff();
        }
    }

}

Reactor 类没有大的变化,参考前面的代码。

6. Reactor持续改进

对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。代码如下:

package com.crazymakercircle.ReactorModel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class MthreadReactor implements Runnable
{

    //subReactors集合, 一个selector代表一个subReactor
    Selector[] selectors=new Selector[2];
    int next = 0;
    final ServerSocketChannel serverSocket;

    MthreadReactor(int port) throws IOException
    { //Reactor初始化
        selectors[0]=Selector.open();
        selectors[1]= Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //非阻塞
        serverSocket.configureBlocking(false);

        //分步处理,第一步,接收accept事件
        SelectionKey sk =
                serverSocket.register( selectors[0], SelectionKey.OP_ACCEPT);
        //attach callback object, Acceptor
        sk.attach(new Acceptor());
    }

    public void run()
    {
        try
        {
            while (!Thread.interrupted())
            {
                for (int i = 0; i <2 ; i++)
                {
                    selectors[i].select();
                    Set selected =  selectors[i].selectedKeys();
                    Iterator it = selected.iterator();
                    while (it.hasNext())
                    {
                        //Reactor负责dispatch收到的事件
                        dispatch((SelectionKey) (it.next()));
                    }
                    selected.clear();
                }

            }
        } catch (IOException ex)
        { /* ... */ }
    }

    void dispatch(SelectionKey k)
    {
        Runnable r = (Runnable) (k.attachment());
        //调用之前注册的callback对象
        if (r != null)
        {
            r.run();
        }
    }

    class Acceptor { // ...
        public synchronized void run() throws IOException
        {
            SocketChannel connection =
                    serverSocket.accept(); //主selector负责accept
            if (connection != null)
            {
                new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection
            }
            if (++next == selectors.length) next = 0;
        }
    }
}

7. Reactor编程的优点和缺点

6.1. 优点

1)响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;

2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;

3)可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;

4)可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;

6.2. 缺点

1)相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。

2)Reactor模式需要底层的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持,如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效。

3) Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用改进版的Reactor模式如Proactor模式。

在开启Netty源码前,上面的经典代码,一定要看懂哦!

原文地址:https://www.cnblogs.com/jinanxiaolaohu/p/11803248.html

时间: 2024-10-15 02:45:04

[转帖]Reactor模式的相关文章

【转帖】两种IO模式:Proactor与Reactor模式

https://www.cnblogs.com/pigerhan/p/3474217.html. 挺好的说明了epoll和IOCP的区别 在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作. 在比较这两个模式之前,我们首先的搞明白几个概念,什么是阻塞和非阻塞,什么是同步和异步 ?同步和异步是针对应用程序和内核的交互而言的,同步指的是用户进程触发IO操作并等待或者轮询的去查看IO操作是否就绪

NIO Reactor模式

Reactor模式和NIO——转: 本文可看成是对Doug Lea Scalable IO in Java一文的翻译. 当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作. 他们都有一个共同的结构:1. Read request2. Decode request3. Process service4. Encode reply5. Send reply 经典的网络服务的设计如下图,在每个线程中完成对数据的处理: 但这种模式在用户负载增加时,性能将下降

【转】Netty那点事(四)Netty与Reactor模式

[原文]https://github.com/code4craft/netty-learning/blob/master/posts/ch4-reactor.md 一:Netty.NIO.多线程? 时隔很久终于又更新了!之前一直迟迟未动也是因为积累不够,后面比较难下手.过年期间@李林锋hw发布了一个Netty5.0架构剖析和源码解读 http://vdisk.weibo.com/s/C9LV9iVqH13rW/1391437855,看完也是收获不少.前面的文章我们分析了Netty的结构,这次咱们

对于观察者模式,Reactor模式,Proactor模式的一点理解

最近就服务器程序IO效率这一块了解一下设计模式中的Reacotr模式和proactor模式,感觉跟观察者模式有些类似的地方,网上也看了一些其他人对三者之间区别的理解,都讲得很仔细,在此根据自己的理解做一点简单的记录和总结,如果理解不对的地方,以后再慢慢深入和更新. 观察者模式: 也可以称为为 发布-订阅 模式,主要适用于多个对象依赖某一个对象的状态并,当某对象状态发生改变时,要通知其他依赖对象做出更新.是一种1对多的关系.当然,如果依赖的对象只有一个时也是一种特殊的一对一关系.通常,观察者模式适

高性能IO设计中的Reactor模式与Proactor模式

在高性能的IO设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作.在比较这两个模式之前,我们首先要搞明白几个概念.什么是阻塞和非阻塞?什么是同步和异步?同步和异步是针对应用程序和内核的交互而言的,同步指的是用户进程触发IO操作并等待或者轮询的去查看IO操作是否就绪,而异步是指用户进程触发IO操作以后便开始做自己的事情,而当IO操作已经完成的时候会得到IO完成的通知(异步的特点就是通知).而阻塞和非阻塞是针

Nio学习3——基础模型:Reactor模式和多路复用

Reactor模式和NIO 本文可看成是对Doug Lea Scalable IO in Java一文的翻译. 当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作.他们都有一个共同的结构: 1. Read request 2. Decode request 3. Process service 4. Encode reply 5. Send reply 经典的网络服务的设计如下图,在每个线程中完成对数据的处理: 但这种模式在用户负载增加时,性能将下降

Reactor 模式的简单实现

Reactor 模式简单实现 在网上有部分文章在描述Netty时,会提到Reactor.这个Reactor到底是什么呢?为了搞清楚Reactor到底是什么鬼,我写了一个简单的Demo,来帮助大家理解他. 网上是这么描述Reactor的: The Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients. Each

libevent之Reactor模式

通过前边的一篇博文轻量级网络库libevent初探,我们知道libevent实际上是封装了不同操作系统下的/dev/poll.kqueue.event ports.select.poll和epoll事件机制,从而给我们提供一个统一的接口. libevent采用了Reactor I/O 设计模式,而Reactor是基于同步I/O机制的,所以libevent实际是一个基于同步I/O机制的库. 对于I/O设计模式,与Reactor相对应的还有Proactor.下边我们先来看下这两者的不同之处. Rea

I/O并发模式—Reactor模式

好莱坞原则是"不要打电话给我们,我们会打电话通知你",其实Reactor模式就是如此,你不必一直询问某个事件是否发生了,当事件发生时,会主动通知你.一般用于服务器并发的处理请求.先来看几个Reactor模式中的参与者. 事件处理器:对应一个描述符,实现了应用程序在该描述符上提供的服务. Reactor管理器:是事件处理器的调度核心.用于控制事件调度,以及应用程序注册.删除事件处理器和相关的描述符.Reactor管理器使用同步事件分离器来等待事件的发生. 同步事件分离器(demultip