首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Druid.io数据仓库实践

Druid.io数据仓库实践

作者头像
用户9048088
发布2026-06-15 20:07:03
发布2026-06-15 20:07:03
100
举报
flink维表join实践-druid.io

  • 前言
  • 一、flink读取druid.io数据流插件
  • 二、flink对druid.io lookup功能
    • 依赖于JdbcTableSource
    • flink写入druid.io数据插件
  • 总结

前言

Druid.io做为目前正在使用的OLAP分析数据库,已在公司大部分业务分析中使用。在Druid.io的官网上了解到,该工具除了分析功能,还可以做为一个数据仓库使用。做为实验,个人调研了flink接入Druid.io数据进行计算再写入driod.io的功能,供建仓库使用,调研内容如下:

  1. 开发flink读取druid.io数据流插件
  2. 开发flink对druid.io lookup功能
  3. 开发flink写入druid.io数据插件

一、flink读取druid.io数据流插件

基于druid.io是时序数据库的特点,读取自然按照时间+分页的方式读取druid.io上的数据。这里可以使用Avatica Jdbc Driver远程读取druid.io上的数据。 实现起来比较简单,这里就跳过了

二、flink对druid.io lookup功能

依赖于JdbcTableSource

代码如下(示例):

代码语言:javascript
复制
RegisterDruidioTable.java
代码语言:javascript
复制
		DruidioTableInfo druidioTableInfo = ...;

        String query = ...;

        JdbcOptions jdbcOptions = JdbcOptions.builder()
                .setDriverName(druidioTableInfo.getDriver())
                .setDialect(new AvaticaJDBCDialect(query))
                .setDBUrl(druidioTableInfo.getUrl())
                .setUsername(druidioTableInfo.getUsername())
                .setPassword(druidioTableInfo.getPassword())
                .setTableName("empty")
                .build();

        JdbcReadOptions jdbcReadOptions = JdbcReadOptions.builder()
                .setFetchSize(100)
                .setQuery(query)
                .build();

        JdbcLookupOptions jdbcLookupOptions = JdbcLookupOptions.builder()
                .setCacheExpireMs(10000)
                .setCacheMaxSize(100000)
                .setMaxRetryTimes(3)
                .build();

        JdbcTableSource jdbcTableSource = JdbcTableSource.builder()
                .setOptions(jdbcOptions)
                .setReadOptions(jdbcReadOptions)
                .setLookupOptions(jdbcLookupOptions)
                .setSchema(getSchema(druidioTableInfo))
                .build();

        tableEnv.registerTableSource(sourceTableInfo.getName(), jdbcTableSource);
代码语言:javascript
复制
AvaticaJDBCDialect.java
代码语言:javascript
复制
	public class AvaticaJDBCDialect implements JdbcDialect {
    public static final String AVATICA_JDBC_DRIVER = "org.apache.calcite.avatica.remote.Driver";

    private String preDefinedQuery;

    public AvaticaJDBCDialect() {
    }

    public AvaticaJDBCDialect(String preDefinedQuery) {
        this.preDefinedQuery = preDefinedQuery;
    }

    @Override
    public String dialectName() {
        return "avatica";
    }

    @Override
    public boolean canHandle(String url) {
        return url.startsWith("jdbc:avatica:");
    }

    @Override
    public JdbcRowConverter getRowConverter(RowType rowType) {
        return null;
    }

    @Override
    public Optional<String> defaultDriverName() {
        return Optional.of(AVATICA_JDBC_DRIVER);
    }

    @Override
    public String quoteIdentifier(String identifier) {
        return "\"" + identifier + "\"";
    }

    @Override
    public String getSelectFromStatement(String tableName, String[] selectFields, String[] conditionFields) {
        if (StringUtils.isNotEmpty(preDefinedQuery)) {
            return preDefinedQuery;
        }

        String selectExpressions = (String) Arrays.stream(selectFields).map(this::quoteIdentifier).collect(Collectors.joining(", "));
        String fieldExpressions = (String) Arrays.stream(conditionFields).map((f) -> {
            return this.quoteIdentifier(f) + "=?";
        }).collect(Collectors.joining(" AND "));
        return "SELECT " + selectExpressions + " FROM " + this.quoteIdentifier(tableName) + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
    }
}

这里自己实现了AvaticaJDBCDialect的方言,使得lookup可以按照自己愿意的方式查询sql而不是按照固定的规则生成。

代码语言:javascript
复制
MyAvaticaHttpClient.java
代码语言:javascript
复制
public class MyAvaticaHttpClient extends AvaticaCommonsHttpClientImpl {

    //这个ObjectMapper比较特殊,必须得使用flink集成的
    private ObjectMapper avatiaObjectMapper = new ObjectMapper();

    public MyAvaticaHttpClient(URL url) {
        super(url);
    }

    @Override
    public byte[] send(byte[] request) {
        byte[] response = super.send(request);
        try {
            Service.Response resp = avatiaObjectMapper.readValue(response, Service.Response.class);
            if ((resp instanceof Service.ExecuteResponse) && ((Service.ExecuteResponse) resp).missingStatement) {
                Service.ExecuteRequest executeRequest = avatiaObjectMapper.readValue(request, Service.ExecuteRequest.class);
                Service.PrepareResponse prepareResponse = prepare(new Service.PrepareRequest(executeRequest.statementHandle.connectionId, executeRequest.statementHandle.signature.sql, -1));

                Field statementIdField = Meta.StatementHandle.class.getDeclaredField("id");
                statementIdField.setAccessible(true);
                statementIdField.setInt(executeRequest.statementHandle, prepareResponse.statement.id);

                byte[] newStatementRequest = avatiaObjectMapper.writeValueAsBytes(executeRequest);
                response = super.send(newStatementRequest);
            }

        } catch (Exception e) {
            System.out.println("MyAvaticaHttpClient send optimize failed, return origin response");
        }

        return response;
    }

    Service.PrepareResponse prepare(Service.PrepareRequest prepareRequest) throws IOException {
        byte[] sendResult = this.send(avatiaObjectMapper.writeValueAsBytes(prepareRequest));
        Service.Response resp = avatiaObjectMapper.readValue(sendResult, Service.Response.class);
        if (resp instanceof Service.ErrorResponse) {
            throw ((Service.ErrorResponse) resp).toException();
        } else if (!Service.PrepareResponse.class.isAssignableFrom(resp.getClass())) {
            throw new ClassCastException("Cannot cast " + resp.getClass() + " into Service.PrepareResponse");
        } else {
            return Service.PrepareResponse.class.cast(resp);
        }
    }
}

这里使用自己的MyAvaticaHttpClient,fix了druid.io的executeRequest的missingStatement bug,代价是增大了网络请求次数(十分不优雅)!。此外,druid.io推荐使用0.20.0+版本,不然executeRequest server端会报没有connectionId参数的异常!。

flink写入druid.io数据插件

简单实现了一个对接官方http写入数据接口的插件,比较简单。


总结

目前实验环境可以做基于flink的druid.io的数据计算处理,但是距离真正线上可用还有一些工作要做。先拿不那么重要的场景试用下~~

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2026-06-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • flink维表join实践-druid.io
  • 前言
  • 一、flink读取druid.io数据流插件
  • 二、flink对druid.io lookup功能
    • 依赖于JdbcTableSource
    • flink写入druid.io数据插件
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档