30、Flink中操作已经配置好的远程文件系统

news/2025/2/6 9:59:01 标签: flink, 华为云

背景:flink作业中既配置了obs作为chk的远程文件系统,又在作业中读取obs文件内容时,使用obsclient会导致任务无法创建chk目录而启动失败。
解决办法:使用flink-core里的fileSystem来操作 。这样就不用去使用对应文件系统的客户端了,而是直接使用的是当前flink中配置的远程文件系统。

 public static Long getKafkaOffsetFromHoodie2(String hdfsPath) throws IOException {

    final Path path = new Path(hdfsPath);
    //获取文件系统
    final FileSystem fileSystem = path.getFileSystem();
    final FileStatus[] fileStatuses = fileSystem.listStatus(path);

    if (fileSystem.exists(path)) {

      if (fileStatuses.length > 0) {
        // 获取最新commit文件
        final FileStatus latestFile =
            Arrays.stream(fileStatuses)
                .filter(x -> x.getPath().getName().endsWith(".commit"))
                .max(Comparator.comparingLong(FileStatus::getModificationTime))
                .orElse(null);
        if (latestFile != null) {
          LOG.info("最新commit文件为:{}", latestFile.getPath().getPath());
          // 读取文件内容
          try (FSDataInputStream inputStream = fileSystem.open(latestFile.getPath());
              BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
            String line;
            StringBuilder stringBuilder = new StringBuilder();
            while ((line = reader.readLine()) != null) {
              stringBuilder.append(line);
            }
            JSONObject jsonObject = JSON.parseObject(stringBuilder.toString());

            if (jsonObject.containsKey("extraMetadata")) {
              JSONObject extraMetadata = jsonObject.getJSONObject("extraMetadata");
              if (extraMetadata.containsKey("deltastreamer.checkpoint.key")) {
                String string = extraMetadata.getString("deltastreamer.checkpoint.key");
                String offset = string.split(",")[1].split(":")[1];
                LOG.info("当前偏移量==>  " + offset);
                return Long.valueOf(offset);
              }
            } else {
              LOG.error("this is not an delta-stream mission");
              throw new IOException("this is not an delta-stream mission");
            }

          } catch (IOException e) {
            System.err.println("读取文件时发生错误: " + e.getMessage());
          }
        }

      } else {
        LOG.warn("{} 目录为空或无法访问", hdfsPath);
      }
    } else {
      LOG.error("{} 指定路径不是一个有效的目录", hdfsPath);
    }
    return null;
  }

http://www.niftyadmin.cn/n/5842855.html

相关文章

QT笔记——多语言翻译

文章目录 1、概要2、多语言切换2.1、结果展示2.2、创建项目2.2、绘制UI2.2、生成“.st”文件2.4、生成“.qm”文件2.5、工程demo 1、概要 借助QT自带的翻译功能,实现实际应用用进行 “多语言切换” 2、多语言切换 2.1、结果展示 多语言切换 2.2、创建项目 1、文件…

深度学习系列--02.损失函数

一.定义 损失函数(Loss Function)是机器学习和深度学习中用于衡量模型预测结果与真实标签之间差异的函数,它在模型训练和评估过程中起着至关重要的作用 二.作用 1.指导模型训练 提供优化方向:在训练模型时,我们的目…

Windows图形界面(GUI)-QT-C/C++ - QT Dock Widget

公开视频 -> 链接点击跳转公开课程博客首页 -> ​​​链接点击跳转博客主页 目录 一、概述 二、使用场景 1. 工具栏 2. 侧边栏 3. 调试窗口 三、常见样式 1. 停靠位置 2. 浮动窗口 3. 可关闭 4. 可移动 四、属性设置 1. 设置内容 2. 获取内容 3. 设置标题 …

PyQt6/PySide6 的 QDialog 类

QDialog 是 PyQt6 或 PySide6 库中用于创建对话框的类。对话框是一种特殊的窗口,通常用于与用户进行短期交互,如输入信息、显示消息或选择选项等。QDialog 提供了丰富的功能和灵活性,使得开发者可以轻松地创建各种类型的对话框。下面我将详细…

基于PostGIS的省域空间相邻检索实践

目录 前言 一、相关空间检索函数 1、ST_touches函数 2、ST_Intersects函数 3、ST_Relate函数 4、区别于对比 二、空间相邻检索实践 1、省域表相关介绍 2、相关省域相邻查询 3、全国各省份邻居排名 三、总结 前言 在当今数字化时代,地理空间数据的高效管理…

论文解读:《基于TinyML毫米波雷达的座舱检测、定位与分类》

摘要 本文提出了一种实时的座舱检测、定位和分类解决方案,采用毫米波(mmWave)雷达系统芯片(SoC),CapterahCAL60S344-AE,支持微型机器学习(TinyML)。提出了波束距离-多普勒…

计算机网络之计算机网络分层结构

一、分层结构概述 计算机网络分层结构将网络通信过程划分为多个层次,每个层次都负责完成特定的任务和功能。这些层次之间通过接口进行通信,上层使用下层提供的服务,并向其上层提供服务。分层结构的设计使得网络通信过程更加模块化和易于管理…

潮汐发电机使用MTi运动传感器在快速水流中保持稳定位置

开发可再生能源是应对气候危机的重要措施。太阳能和风能是可再生能源领域的巨头,但它们的产量是不可预测的,而且是间歇性的。我们很难可靠地预测风力有多强或阳光有多亮。这是国家电网运营商面临的一个问题,他们负责动态平衡能源供应和负载。…