博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
自定义Flume Sink:ElasticSearch Sink
阅读量:6449 次
发布时间:2019-06-23

本文共 4839 字,大约阅读时间需要 16 分钟。

Flume Sink的目的是从Flume Channel中获取数据然后输出到存储或者其他Flume Source中。Flume Agent启动的时候,它会为每一个Sink都启动一个SinkRunner的对象,SinkRunner.start()方法会启动一个新的线程去管理每一个Sink的生命周期。每一个Sink需要实现start()、Stop()和process()方法。你可以在start方法中去初始化Sink的参数和状态,在stop方法中清理Sink的资源。最关键的是process方法,它将处理从Channel中拿出来的数据。另外如果Sink有一些配置则需要实现Configurable接口。

由于Flume官方提供的Sink往往不能满足要求,所以我们自定义Sink来实现定制化的需求,这里以ElasticSearch为例。在Sink中实现所以文档的简单的Insert功能。例子使用Flume 1.7。

 

1. 编写代码

首先新建类ElasticSearchSink类继承AbstractSink类,由于还希望有自定义的Sink的配置,所以实现Configurable接口。

public class ElasticSearchSink extends AbstractSink implements Configurable

ElasticSearch的IP以及索引的名称可以配置在配置文件里面,配置文件就是使用flume的conf文件。你可以重写Configurable的configure的方法去获取配置,代码如下:

@Override    public void configure(Context context)    {        esHost = context.getString("es_host");        esIndex = context.getString("es_index");    }

注意里面的配置项“es_host”和“es_index”在conf配置文件中的语法:

agent.sinks = sink1agent.sinks.sink1.type = nick.test.flume.ElasticSearchSinkagent.sinks.sink1.es_host = 192.168.50.213agent.sinks.sink1.es_index = vehicle_event_test

 

接下来就是实现process方法,在这个方法中需要获取channel,因为数据都是从channel中获得的。获取消息之前,需要先获取一个Channel是事务,处理完成之后需要commit和关闭这个事务。这样才能让channel知道这个消息已经消费完成,它可以从它的内部队列中删除这个消息。如果消费失败,需要重新消费的话,可以rollback这个事务。事务的引入是flume对消息可靠性保证的关键。

process方法需要返回一个Status类型的枚举,Ready和BackOff。如果你到了一个消息,并正常处理了,需要使用Ready。如果拿到的消息是null,则可以返回BackOff。所谓BackOff(失效补偿)就是当sink获取不到 消息的时候, Sink的PollingRunner 线程需要等待一段backoff时间,等channel中的数据得到了补偿再来进行pollling 操作。

 

完整的代码如下:

public class ElasticSearchSink extends AbstractSink implements Configurable{    private String esHost;    private String esIndex;    private TransportClient client;    @Override    public Status process() throws EventDeliveryException    {        Status status = null;        // Start transaction        Channel ch = getChannel();        Transaction txn = ch.getTransaction();        txn.begin();        try        {            Event event = ch.take();            if (event != null)            {                String body = new String(event.getBody(), "UTF-8");                BulkRequestBuilder bulkRequest = client.prepareBulk();                List
jsons = new ArrayList
(); JSONObject obj = JSONObject.parseObject(body); String vehicleId = obj.getString("vehicle_id"); String eventBeginCode = obj.getString("event_begin_code"); String eventBeginTime = obj.getString("event_begin_time"); //doc id in index String id = (vehicleId + "_" + eventBeginTime + "_" + eventBeginCode).trim(); JSONObject json = new JSONObject(); json.put("vehicle_id", vehicleId); bulkRequest.add(client.prepareIndex(esIndex, esIndex).setSource(json)); BulkResponse bulkResponse = bulkRequest.get(); status = Status.READY; } else { status = Status.BACKOFF; } txn.commit(); } catch (Throwable t) { txn.rollback(); t.getCause().printStackTrace(); status = Status.BACKOFF; } finally { txn.close(); } return status; } @Override public void configure(Context context) { esHost = context.getString("es_host"); esIndex = context.getString("es_index"); } @Override public synchronized void stop() { super.stop(); } @Override public synchronized void start() { try { Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHost), 9300)); super.start(); System.out.println("finish start"); } catch (Exception ex) { ex.printStackTrace(); } }}

 

2. 打包、配置和运行

由于是自定义的Sink,所以需要打成jar包,然后copy到flume的lib文件夹下。然后配置agent的配置文件,最后启动flume就可以了。本例中,我使用了kafkasource、memorychannel和自定义的sink,完整的配置文件如下:

 

 

agent.sources = source1agent.channels = channel1agent.sinks = sink1agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSourceagent.sources.source1.channels = channel1agent.sources.source1.batchSize = 1agent.sources.source1.batchDurationMillis = 2000agent.sources.source1.kafka.bootstrap.servers = 192.168.50.116:9092,192.168.50.117:9092,192.168.50.118:9092,192.168.50.226:9092agent.sources.source1.kafka.topics = iov-vehicle-eventagent.sources.source1.kafka.consumer.group.id = flume-vehicle-event-nickagent.sinks.sink1.type = nick.test.flume.ElasticSearchSinkagent.sinks.sink1.es_host = 192.168.50.213agent.sinks.sink1.es_index = vehicle_event_testagent.sinks.sink1.channel = channel1agent.channels.channel1.type = memoryagent.channels.channel1.capacity = 1000

 

 

 

 

 

架构点滴

转载地址:http://oalwo.baihongyu.com/

你可能感兴趣的文章
上海i虹桥机场点烟器与UNIX哲学
查看>>
3.1-find命令详解
查看>>
清算/报表/日终跑批程序之性能优化案例(一)
查看>>
线上svn快速服务器搭建
查看>>
导航栏带子导航菜单并且高亮
查看>>
openstack-12:安装cinder存储服务
查看>>
防火墙的基础知识
查看>>
Java的新项目学成在线笔记-day10(四)
查看>>
链路捆绑; 远程访问;链路备份;不通vlan通信;静态 默认路由综合实验
查看>>
我国典型电子垃圾拆解地持久性有毒化学污染物污染现状
查看>>
21. 正则工具简介 下
查看>>
Office 365:如何批量初始化OneDrive for Business?
查看>>
centos directory server
查看>>
马哥第一周
查看>>
Fedora 30的升级方法
查看>>
Oracle技术之如何监测一个PLSQL过程的运行情况(一)
查看>>
为什么大部分人喜欢稳定?
查看>>
【NetApp】7mode和Cmode系统之间的相互转换
查看>>
2012.5.7
查看>>
Cent OS查看系统版本信息的几个命令
查看>>