flume自定义sink之mysql

package me;

import static org.mockito.Matchers.booleanThat;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

import com.google.common.base.Preconditions;

public class MySink extends AbstractSink implements Configurable {
	private Connection connect;
	private Statement stmt;
	private String columnName;
	private String url;
	private String user;
	private String password;
	private String tableName;
	// 在整个sink结束时执行一遍
	@Override
	public synchronized void stop() {
		// TODO Auto-generated method stub
		super.stop();
	}

	// 在整个sink开始时执行一遍
	@Override
	public synchronized void start() {
		// TODO Auto-generated method stub
		super.start();
		try {
			connect = DriverManager.getConnection(url, user, password);
			// 连接URL为 jdbc:mysql//服务器地址/数据库名 ,后面的2个参数分别是登陆用户名和密码
			stmt = connect.createStatement();
		} catch (SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	// 不断循环调用
	@Override
	public Status process() throws EventDeliveryException {
		// TODO Auto-generated method stub

		Channel ch = getChannel();
		Transaction txn = ch.getTransaction();
		Event event = null;
		txn.begin();
		while (true) {
			event = ch.take();
			if (event != null) {
				break;
			}
		}
		try {
			String body = new String(event.getBody());
			if (body.split(",").length == columnName.split(",").length) {
				String sql = "insert into " + tableName + "(" + columnName + ") values(" + body + ")";
				stmt.executeUpdate(sql);
				txn.commit();
				return Status.READY;
			} else {
				txn.rollback();
				return null;
			}
		} catch (Throwable th) {
			txn.rollback();

			if (th instanceof Error) {
				throw (Error) th;
			} else {
				throw new EventDeliveryException(th);
			}
		} finally {
			txn.close();
		}

	}

	@Override
	public void configure(Context arg0) {
		columnName = arg0.getString("column_name");
		Preconditions.checkNotNull(columnName, "column_name must be set!!");
        url = arg0.getString("url");
        Preconditions.checkNotNull(url, "url must be set!!");
        user = arg0.getString("user");
        Preconditions.checkNotNull(user, "user must be set!!");
        password = arg0.getString("password");
        Preconditions.checkNotNull(password, "password must be set!!");
        tableName = arg0.getString("tableName");
        Preconditions.checkNotNull(tableName, "tableName must be set!!");
	}

}
agent.sources = s1
agent.channels = c1
agent.sinks = sk1  

agent.sources.s1.type = netcat
agent.sources.s1.bind = localhost
agent.sources.s1.port = 5678
agent.sources.s1.channels = c1  

agent.sinks.sk1.type = me.MySink
agent.sinks.sk1.url=jdbc:mysql://192.168.16.33:3306/test
agent.sinks.sk1.tableName= test.user
agent.sinks.sk1.user=root
[email protected]
agent.sinks.sk1.column_name=id, username, password
agent.sinks.sk1.channel = c1  

agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
lihudeMacBook-Pro:~ SunAndLi$ cd hadoop-2.7.2/flume/
lihudeMacBook-Pro:flume SunAndLi$  bin/flume-ng agent -c conf -f conf/sink-mysql --name agent -Dflume.root.logger=INFO,console

时间: 2024-11-03 05:23:57

flume自定义sink之mysql的相关文章

flume自定义sink

package me; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.ap

flume 自定义 hbase sink 类

参考(向原作者致敬) http://ydt619.blog.51cto.com/316163/1230586 https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase flume 1.5 的配置文件示例 #Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the

【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决

如图所示,新建一个JAVA工程,编辑pom文件,pom文件内容如下[这里取出了parent]: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoca

flume中自定义sink InterCeptor

SinkProcessor: ============================ FailOver: Load balancing : //负载均衡处理器 //round_robin 轮询 1-2-3-1-2-3-... //random 随机 1-3-2-3-1-... 1.round_robin 轮询 1-2-3-1-2-3-... 2.random 随机: 自定义Sink && InterCeptor ======================================

flume-ng框架自定义sink的创建问题

flume-ng框架自定义sink的创建问题 http://tuchong.com/922005/8879879/http://tuchong.com/922005/8879878/http://tuchong.com/922005/8879876/http://tuchong.com/922005/8879874/http://tuchong.com/922005/8879873/http://tuchong.com/922005/8879871/http://tuchong.com/9220

Flume自定义Source

大家好. 公司有个需求.要求Flumne 从MQ 取消息存储到DFS ,写了Flume自定义的source .,由于我也是刚接触Flume . 所以有啥不对的请谅解. 查看了Flume-ng的源码.  一般都是根据不同的场景  extends AbstractSource implements EventDrivenSource, Configurable MQSource 代码如下: 1 public class MQSource extends AbstractSource implemen

最稳定的Nginx绿色环境,可无限自定义PHP和mysql版本、同时运行N个版本

应学生要求,我最近更新了PHPWAMP,新版PHPWAMP8.8.8.8n添加了强大的Nginx站点管理 纯绿色解压即可使用,默认集成多个mysql和php版本,Apache支持所有运行模式,集成vc运行库,Nginx具有独立服务,可以完美运行,无论是Apache还是Nginx站点管理都支持无限自定义php和mysql版本,多个PHP版本同时运行,虽然集成的都是完整无阉割的组件,但是压缩包进行了高强度压缩,仅有几十M大小,建议优先采用7Z进行解压. 我们已经知道从Nginx官方默认下载下来的Wi

Hadoop实战-Flume之自定义Sink(十九)

import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeli

webmagic自定义存储(mysql、redis存储)

在很多时候,我们使用webmagic爬取网站的时候,爬取的数据希望存储在mysql.redis中.因此需要对其扩展,实行自定义PipeLine.首先我们了解一下webmagic 的四个基本组件 一. WebMagic的四个组件 1.Downloader Downloader负责从互联网上下载页面,以便后续处理.WebMagic默认使用了HttpClient作为下载工具. 2.PageProcessor PageProcessor负责解析页面,抽取有用信息,以及发现新的链接.WebMagic使用J