數(shù)據(jù)采集 ETL 工具 bboss-datatran v6.7.7 發(fā)布
數(shù)據(jù)采集 ETL 工具 bboss-datatran v6.7.7 發(fā)布,支持 Elasticsearch 8 以及其他 Elasticsearch 低版本和 Opensearch 之間數(shù)據(jù)同步。
- 新增輕量級(jí)但功能強(qiáng)大的大數(shù)據(jù)指標(biāo)分析計(jì)算模塊,可以非常方便地實(shí)現(xiàn)基于時(shí)間窗口的多種維度的實(shí)時(shí)指標(biāo)計(jì)算和離線指標(biāo)計(jì)算功能,適用于有限維度指標(biāo) key 和無(wú)限維度指標(biāo) key,同時(shí)可以非常方便地將指標(biāo)分析計(jì)算結(jié)果存儲(chǔ)到各種數(shù)據(jù)庫(kù),以極低成本快速構(gòu)建企業(yè)級(jí)大數(shù)據(jù)分析應(yīng)用,導(dǎo)入以下包即可:
<dependency> <groupId>com.bbossgroups.plugins</groupId> <artifactId>bboss-datatran-metrics</artifactId> <version>6.7.7</version></dependency>
使用案例
https://gitee.com/bboss/bboss-elastic-tran/tree/master/bboss-datatran-core/src/test/java/org/frameworkset/tran/metrics
增量采集狀態(tài)表 increament_tab 結(jié)構(gòu)調(diào)整,增加以下字段:
jobType varchar (500),用于保存輸入插件類(lèi)型,不能為空,不同的插件類(lèi)型對(duì)應(yīng)的值見(jiàn)下面的表格說(shuō)明,避免不同類(lèi)型作業(yè)加載增量狀態(tài)時(shí),互相干擾;
jobId varchar (500),用于保存外部設(shè)置的作業(yè) id,可為空,相同的進(jìn)程內(nèi)啟動(dòng)多個(gè)同類(lèi)型的輸入插件的作業(yè)時(shí),必須指定每個(gè)作業(yè)的 jobId, 避免各個(gè)作業(yè)加載增量狀態(tài)時(shí),互相干擾
插件類(lèi)型 | jobType | jobId |
---|---|---|
HttpInputDataTranPlugin | HttpInputDataTranPlugin | 空 |
DBInputDataTranPlugin | DBInputDataTranPlugin | 空 |
ElasticsearchInputDataTranPlugin | ElasticsearchInputDataTranPlugin | 空 |
FileInputDataTranPlugin | FileInputDataTranPlugin | 空 |
HBaseInputDatatranPlugin | HBaseInputDatatranPlugin | 空 |
Kafka2InputDatatranPlugin | Kafka2InputDatatranPlugin | 空 |
MongoDBInputDatatranPlugin | MongoDBInputDatatranPlugin | 空 |
升級(jí)注意事項(xiàng):升級(jí) 6.7.7 前,需要手動(dòng)增加 jobType 和 jobId 兩個(gè)字段,并修改 increament_tab 表中的狀態(tài)記錄,根據(jù)作業(yè)輸入插件類(lèi)型,填寫(xiě)正確的 jobType,然后再啟動(dòng)作業(yè),這樣作業(yè)才能繼續(xù)正常工作。
- 優(yōu)化 kafka 輸出插件任務(wù)狀態(tài)記錄管理功能,采用指標(biāo)分析 Metrics 對(duì)數(shù)據(jù)發(fā)送情況進(jìn)行聚合統(tǒng)計(jì),按照指定的時(shí)間窗口進(jìn)行聚合計(jì)算后,執(zhí)行回調(diào)任務(wù)處理 success 方法,任務(wù) taskMetrics 為聚合計(jì)算后的統(tǒng)計(jì)信息,可以通過(guò)開(kāi)關(guān)控制是否進(jìn)行預(yù)聚合功能:
kafkaOutputConfig.setEnableMetricsAgg(true);//啟用預(yù)聚合功能kafkaOutputConfig.setMetricsAggWindow(60);//指定統(tǒng)計(jì)時(shí)間窗口,單位:秒,默認(rèn)值60秒
4. 優(yōu)化 kafka 輸入插件攔截器功能:定時(shí)記錄統(tǒng)計(jì)插件消費(fèi) kafka 數(shù)據(jù)記錄情況,并調(diào)用任務(wù)攔截器的 aftercall 方法輸出統(tǒng)計(jì) jobMetrics 信息,可以指定統(tǒng)計(jì)時(shí)間間隔:
kafka2InputConfig.setMetricsInterval(300 * 1000L);//300秒做一次任務(wù)攔截調(diào)用,默認(rèn)值
5. 部分插件增加字段映射功能,涉及插件:日志采集插件、excel 采集插件、生成日志 /excel 文件插件、kafka 輸入插件
文件采集插件字段映射配置示例:
FileInputConfig fileInputConfig = new FileInputConfig();_fileInputConfig = fileInputConfig;FileConfig fileConfig = new FileConfig(); fileConfig.setFieldSplit(";");//指定日志記錄字段分割符 //指定字段映射配置 fileConfig.addDateCellMapping(0, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat()); fileConfig.addNumberCellMapping(1, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat()); fileConfig.addCellMappingWithType(2, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue());
kafka 映射配置示例:
Kafka2InputConfig kafka2InputConfig = new Kafka2InputConfig(); kafka2InputConfig.setFieldSplit(";");//指定kafka記錄字段分割符 //指定字段映射配置 kafka2InputConfig.addDateCellMapping(0, //記錄切割得到的字段列表位置索引,從0開(kāi)始 excelCellMapping.getFieldName(), //映射的字段名稱(chēng) cellType, //字段值類(lèi)型 excelCellMapping.getDefaultValue(), //字段默認(rèn)值 excelCellMapping.getDataFormat());//字段格式:日期格式或者數(shù)字格式 kafka2InputConfig.addNumberCellMapping(1, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat()); kafka2InputConfig.addCellMappingWithType(2, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue());
cellType 取值范圍:
public static final int CELL_BOOLEAN = 5;public static final int CELL_DATE = 3;public static final int CELL_NUMBER = 2;public static final int CELL_NUMBER_INTEGER = 6;public static final int CELL_NUMBER_LONG = 7;public static final int CELL_NUMBER_FLOAT = 8;public static final int CELL_NUMBER_SHORT = 9;public static final int CELL_STRING = 1;
6. 增加全局 JobContext,用于存放在作業(yè)中使用的初始化數(shù)據(jù)
7. 作業(yè)攔截器、任務(wù)攔截器異常方法參數(shù) Exception 類(lèi)型調(diào)整為 Throwable 類(lèi)型
更多變更,參考提交記錄:https://gitee.com/bboss/bboss-elastic-tran/commits/master
數(shù)據(jù)同步作業(yè)開(kāi)發(fā)視頻教程
https://www.bilibili.com/video/BV1xf4y1Z7xu
bboss 案例大全
https://esdoc.bbossgroups.com/#/bboss-datasyn-demo
Quick Start
https://esdoc.bbossgroups.com/#/quickstart
開(kāi)發(fā)交流
https://www.bbossgroups.com/forum.html
版權(quán)聲明:
本站所有文章和圖片均來(lái)自用戶(hù)分享和網(wǎng)絡(luò)收集,文章和圖片版權(quán)歸原作者及原出處所有,僅供學(xué)習(xí)與參考,請(qǐng)勿用于商業(yè)用途,如果損害了您的權(quán)利,請(qǐng)聯(lián)系網(wǎng)站客服處理。