Azkaban使用示例

1、Web端管理工作流

Azkaban提供了易操作的Web管理界面,具体操作可参考:http://azkaban.github.io/azkaban/docs/2.5/#ajax-api

值得注意的是,可以使用Azkaban提供的Web界面,定义或覆盖工作流的具体执行参数,操作界面如下:

2.1、登录认证

命令:curl -k -X POST --data "action=login&username=azkaban&password=azkaban" https://localhost:8443

返回json格式:

<span style="white-space:pre">	</span>{
  		"status" : "success",
  		"session.id" : "c001aba5-a90f-4daf-8f11-62330d034c0a"
	}

2.2、创建一个Project

命令:curl -k -X POST --data "session.id=9089beb2-576d-47e3-b040-86dbdc7f523e&name=aaaa&description=11" https://localhost:8443/manager?action=create

返回json格式:

<span style="white-space:pre">	</span>{
  		"status":"success",
  		"path":"manager?project=aaaa",
  		"action":"redirect"
	} 

2.3、上传一个Project的zip文件

命令:curl -k -i -H "Content-Type: multipart/mixed" -X POST --form ‘session.id=e7a29776-5783-49d7-afa0-b0e688096b5e‘ --form ‘ajax=upload‘ --form ‘[email protected];type=application/zip‘ --form ‘project=MyProject‘ https://localhost:8443/manager

注意:在zip文件的目录下执行该命令,否则无法找到zip文件

返回json格式:

<span style="white-space:pre">	</span>{
  		"error" : "Installation Failed.\nError unzipping file.",
  		"projectId" : "192",
  		"version" : "1"
	}

2.4、执行工作流

命令:curl -k --get --data ‘session.id=189b956b-f39f-421e-9a95-e3117e7543c9‘ --data ‘ajax=executeFlow‘ --data ‘project=azkaban-test-project‘ --data ‘flow=test‘ https://localhost:8443/executor

返回json格式:

<span style="white-space:pre">	</span>{
  		message: "Execution submitted successfully with exec id 295",
  		project: "foo-demo",
  		flow: "test",
  		execid: 295
	}

3、使用Ajax API操作工作流

在程序中执行工作流的话,则需要使用Azkaban提供的ajax api了,以下是使用Java模拟https请求的代码:

AzkabanHttpsPost类

package hadoop.azkaban;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.util.Properties;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;

import net.sf.json.JSONObject;

public class AzkabanHttpsPost {
        static String keystorePassword;
        static String keystore;
        static String truststore;

        static{
                InputStream is=Thread.currentThread().getContextClassLoader().
                getResourceAsStream("azkaban.properties");
                Properties p=new Properties();
                try{
                        p.load(is);
                        keystorePassword = p.getProperty("PASSWORD");
                        keystore = p.getProperty("KEYSTORE");
                        truststore = p.getProperty("TRUSTSTORE");
                }catch(Exception e){
                        e.printStackTrace();
                }
        }

        /**
         * 获得KeyStore.
         *
         * @param storePath
         *            密钥库路径
         * @param password
         *            密码
         * @return 密钥库
         * @throws Exception
         */
        public static KeyStore getKeyStore(String password, String storePath)
                        throws Exception {
                // 实例化密钥库
                KeyStore ks = KeyStore.getInstance("JKS");
                // 获得密钥库文件流
                FileInputStream is = new FileInputStream(storePath);
                // 加载密钥库
                ks.load(is, password.toCharArray());
                // 关闭密钥库文件流
                is.close();
                return ks;
        }
         /**
         * 获得SSLSocketFactory.
         *
         * @param password
         *            密码
         * @param keyStorePath
         *            密钥库路径
         * @param trustStorePath
         *            信任库路径
         * @return SSLSocketFactory
         * @throws Exception
         */
        public static SSLContext getSSLContext() throws Exception {
                // 实例化密钥库
                KeyManagerFactory keyManagerFactory = KeyManagerFactory
                                .getInstance(KeyManagerFactory.getDefaultAlgorithm());
                // 获得密钥库
                KeyStore keyStore = getKeyStore(AzkabanHttpsPost.keystorePassword, AzkabanHttpsPost.keystore);
                // 初始化密钥工厂
                keyManagerFactory.init(keyStore, AzkabanHttpsPost.keystorePassword.toCharArray());
                // 实例化信任库
                TrustManagerFactory trustManagerFactory = TrustManagerFactory
                                .getInstance(TrustManagerFactory.getDefaultAlgorithm());
                // 获得信任库
                KeyStore trustStore = getKeyStore(AzkabanHttpsPost.keystorePassword, AzkabanHttpsPost.truststore);
                // 初始化信任库
                trustManagerFactory.init(trustStore);
                // 实例化SSL上下文
                SSLContext ctx = SSLContext.getInstance("TLS");
                // 初始化SSL上下文
                ctx.init(keyManagerFactory.getKeyManagers(),
                                trustManagerFactory.getTrustManagers(), null);
                // 获得SSLSocketFactory
                return ctx;
        }

        /**
         * 初始化HttpsURLConnection.
         *
         * @param password
         *            密码
         * @param keyStorePath
         *            密钥库路径
         * @param trustStorePath
         *            信任库路径
         * @throws Exception
         */
        public static void initHttpsURLConnection() throws Exception {
                // 声明SSL上下文
                SSLContext sslContext = null;
                // 实例化主机名验证接口
                HostnameVerifier hnv = new MyHostnameVerifier();
                try {
                        sslContext = getSSLContext();
                } catch (GeneralSecurityException e) {
                        e.printStackTrace();
                }
                if (sslContext != null) {
                        HttpsURLConnection.setDefaultSSLSocketFactory(sslContext
                                        .getSocketFactory());
                }
                HttpsURLConnection.setDefaultHostnameVerifier(hnv);
        }
         /**
         * 发送请求.
         *
         * @param httpsUrl
         *            请求的地址,如https://localhost:8043
         * @param xmlStr
         *            请求的数据,如action=login&username=azkaban&password=azkaban
         * @throws Exception
         */
        public static JSONObject post(String url,String xmlStr) throws Exception {
                initHttpsURLConnection();
                JSONObject jsonObj = null;
                HttpsURLConnection urlCon = null;
                try {
                        urlCon = (HttpsURLConnection) (new URL(url)).openConnection();
                        urlCon.setDoInput(true);
                        urlCon.setDoOutput(true);
                        urlCon.setRequestMethod("POST");
                        // 如下设置后,azkaban才能识别出是以ajax的方式访问,从而返回json格式的操作信息
                        urlCon.setRequestProperty("Content-Type",
                                        "application/x-www-form-urlencoded");
                        urlCon.setRequestProperty("X-Requested-With", "XMLHttpRequest");
                        urlCon.setUseCaches(true);
                        // 设置为gbk可以解决服务器接收时读取的数据中文乱码问题
                        urlCon.getOutputStream().write(xmlStr.getBytes("gbk"));
                        urlCon.getOutputStream().flush();
                        urlCon.getOutputStream().close();
                        BufferedReader in = new BufferedReader(new InputStreamReader(
                                        urlCon.getInputStream()));
                        String line="";
                        String temp;
                        while ((temp = in.readLine()) != null) {
                                line = line + temp;
                        }
                        jsonObj = JSONObject.fromObject(line);
                } catch (MalformedURLException e) {
                        e.printStackTrace();
                } catch (IOException e) {
                        e.printStackTrace();
                } catch (Exception e) {
                        e.printStackTrace();
                }
                return jsonObj;
        }

MyHostnameVerifier类

package hadoop.azkaban;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLSession;  

/**
 * 实现用于主机名验证的基接口。
 * 在握手期间,如果 URL 的主机名和服务器的标识主机名不匹配,则验证机制可以回调此接口的实现程序来确定是否应该允许此连接。
 */
public class MyHostnameVerifier implements HostnameVerifier {
    @Override
    public boolean verify(String hostname, SSLSession session) {
        if("localhost".equals(hostname)){
            return true;
        } else {
            return false;
        }
    }
}  

以下是调用https,操作Azkaban的示例:

package hadoop.azkaban;

import java.io.InputStream;
import java.util.Properties;

import net.sf.json.JSONObject;

/**
 *
 * @author hu
 *
 */
public class AzkabanOperator {
        public static String url;
        public static String azkabanUser;
        public static String azkabanPassword;
        public static String GDI_Project;
        public static String GDI_Workflow;
        static {
                InputStream is = Thread.currentThread().getContextClassLoader()
                                .getResourceAsStream("azkaban.properties");
                Properties p = new Properties();
                try {
                        p.load(is);
                        url = p.getProperty("URL");
                        azkabanUser = p.getProperty("AZKABANUSER");
                        azkabanPassword = p.getProperty("AZKABANPASSWORD");
                        GDI_Project = p.getProperty("GDI_Project");
                        GDI_Workflow = p.getProperty("GDI_Workflow");
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }

        public JSONObject login() throws Exception {
                JSONObject result = null;
                String queryStr = "action=login&username=" + azkabanUser + "&password="
                                + azkabanPassword;
                result = AzkabanHttpsPost.post(url, queryStr);
                return result;
        }
         public JSONObject executeGDIFlow(String sessionID, String project,
                        String flow, String cwParams, String smParams, String gdiParams)
                        throws Exception {
                JSONObject result = null;
                String executeStr = "session.id=" + sessionID
                                + "&ajax=executeFlow&project=" + project + "&flow=" + flow
                                + "&flowOverride[cw_params]=" + cwParams
                                + "&flowOverride[sm_params]=" + smParams
                                + "&flowOverride[gdi_params]=" + gdiParams;
                String executeUrl = url + "/executor";
                result = AzkabanHttpsPost.post(executeUrl, executeStr);
                return result;
        }

        public JSONObject fetchFlow(String sessionID, String execID)
                        throws Exception {
                JSONObject result = null;
                String executeStr = "session.id=" + sessionID
                                + "&ajax=fetchexecflow&execid=" + execID;
                String executeUrl = url + "/executor";
                result = AzkabanHttpsPost.post(executeUrl, executeStr);
                return result;
        }
时间: 2024-11-05 19:21:20

Azkaban使用示例的相关文章

hadoop工作流引擎之azkaban [转]

介绍 Azkaban是twitter出的一个任务调度系统,操作比Oozie要简单很多而且非常直观,提供的功能比较简单.Azkaban以Flow为执行单元进行定时调度,Flow就是预定义好的由一个或多个可存在依赖关系的Job组成的工作流.Azkaban的官方主页是http://azkaban.github.io/azkaban2/ ,它的的主要特点有下面几个: 兼容所有Hadoop版本(1.x,2.x,CDH) 可以通过WebUI进行管理配置,操作方便 可以通过UI配置定时调度 扩展性好,可针对某

工作流调度器azkaban

为什么需要工作流调度系统 一个完整的数据分析系统通常都是由大量任务单元组成: shell脚本程序,java程序,mapreduce程序.hive脚本等 各任务单元之间存在时间先后及前后依赖关系 为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行 例如,我们可能有这样一个需求,某个业务系统每天产生20G原始数据,我们每天都要对其进行处理,处理步骤如下所示: 1. 通过Hadoop先将原始数据同步到HDFS上: 2. 借助MapReduce计算框架对原始数据进行转换,生成的数据以分

azkaban(安装配置加实战)

为什么需要工作流调度系统? 一个完整的数据分析系统通常都是由大量任务单元组成:shell 脚本程序,java 程序,mapreduce 程序.hive 脚本等? 各任务单元之间存在时间先后及前后依赖关系? 为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行:例如,我们可能有这样一个需求,某个业务系统每天产生 20G 原始数据,我们每天都要对其进行处理,处理步骤如下所示:1. 通过 Hadoop 先将原始数据同步到 HDFS 上:2. 借助 MapReduce 计算框架对原始数据

工作流调度系统Azkaban的简介和使用

1 概述 1.1 为什么需要工作流调度系统 l 一个完整的数据分析系统通常都是由大量任务单元组成: shell脚本程序,java程序,mapreduce程序.hive脚本等 l 各任务单元之间存在时间先后及前后依赖关系 l 为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行: 例如,我们可能有这样一个需求,某个业务系统每天产生20G原始数据,我们每天都要对其进行处理,处理步骤如下所示: 1.  通过Hadoop先将原始数据同步到HDFS上: 2.  借助MapReduce计算框

CentOS中安装Azkaban 2.5

必备软件 yum install git -y 单机安装步骤 git clone https://github.com/azkaban/azkaban.git cd azkaban; ./gradlew build installDist cd azkaban-solo-server/build/install/azkaban-solo-server; bin/start-solo.sh # open http://localhost:8081/ bin/shutdown-solo.sh # 当

分布式任务调度框架 Azkaban —— Flow 2.0 的使用

一.Flow 2.0 简介 1.1 Flow 2.0 的产生 Azkaban 目前同时支持 Flow 1.0 和 Flow2.0 ,但是官方文档上更推荐使用 Flow 2.0,因为 Flow 1.0 会在将来的版本被移除.Flow 2.0 的主要设计思想是提供 1.0 所没有的流级定义.用户可以将属于给定流的所有 job / properties 文件合并到单个流定义文件中,其内容采用 YAML 语法进行定义,同时还支持在流中再定义流,称为为嵌入流或子流. 1.2 基本结构 项目 zip 将包含

pfsense Web服务器负载平衡配置示例

在pfsense的网关和服务器中有两种类型的负载平衡功能.网关负载平衡可以通过多个WAN连接分发Internet绑定的流量.服务器负载平衡管理传入流量,因此它利用多个内部服务器进行负载分配和冗余,服务器负载平衡允许流量在多个内部服务器之间分配,它最常用于Web服务器和SMTP服务器.下面我们就以实例来介绍服务器负载平衡的设置. 下面介绍如何通过pfsense2.32配置Web服务器的负载平衡. 网络环境 服务器负载平衡示例网络环境 上图为示例网络环境.它由单个防火墙组成,使用其WAN IP地址池

docker深入2-API示例

2017/9/18 一.目的 演示 http API 使用的方式 注1:本次实例是在 docker swarm mode 下使用的,目的是:更新指定服务的镜像. 注2:要在 swarm manager node 上执行. docker 的 API 文档是自动生成的,没有太多有用的示例可用. [版本] ~]# docker version Client:  Version:      17.06.0-ce  API version:  1.30  Go version:   go1.8.3  Gi

Storm入门(四)WordCount示例

Storm API文档网址如下: http://storm.apache.org/releases/current/javadocs/index.html 一.关联代码 使用maven,代码如下. pom.xml  和Storm入门(三)HelloWorld示例相同 RandomSentenceSpout.java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor lice