---
description: >-
方舟作为开放的架构,可以通过消费实时数据来满足更多使用场景。服务端接到一条 SDK 发来的数据后,会对数据做一些预处理并将数据写入到消息队列 Kafka
中供下游各类计算模块使用。
---
# 直接从Kafka中消费数据
>[info]
>消费实时数据功能,涉及较多的技术细节,适用于对相关功能有经验的用户参考。如果对文档内容有疑惑,请及时联系工作人员。
## 要求
1. 启动消费的机器需与部署方舟的机器在同一个内网,且必须可以解析方舟服务器的 host。
2. 请选用兼容的 Kafka 客户端版本,高版本服务端兼容低版本客户端,反之则可能存在兼容性问题。方舟 Kafka 服务端版本为0.8.7,具体情况可在服务器上查看。
## 数据源topic版本说明
方舟5.2版本数据接入模块做了架构调整,默认情况下使用老的架构,我们会根据用户的数据情况在升级时切换到新的架构,并且在升级到5.3之前一定会切换到新的架构下。所以会出现5.2版本有些用户使用的是老架构,有些使用的是新架构的情况。具体可以通过Ambari上下面这个参数来判断:如果is.new.process=true,说明是新架构,如果没有该参数或者is.new.process=false,说明是老架构。

新架构和老架构原始数据保存在kafka中的topic不同。老架构profile数据保存在profile\__${appid}中,event数据保存在event\__${_appid_}中。而新架构下原始数据都保存在pre\_${_appid_}中,消费pre\_${_appid_}中的数据需要通过xwhat的值来判断是profile数据还是event数据,如果xwhat属于下面六个值中的一个,则说明是profile事件(用户数据),具体可参考[https://arkdocs.analysys.cn/integration/prepare/data-model](https://arkdocs.analysys.cn/integration/prepare/data-model) 中的Profile部分的说明。如果xwhat不属于下面六个值,那说明是Event事件数据。
profile事件名:
* $profile_set
* $profile_set_once
* $profile_unset
* $profile_increment
* $profile_append
* $profile_delete
## 消费参数
| 参数名称 | 参数值 |
| --------- | ---------------------------------------------------------------- |
| topic | **老架构:**event\_$_{appid}/profile\_$_{appid}(其中{appid}表示项目的appid) |
| | **新架构:**pre\_${appid} |
| partition | 不同的用户可能不一样,以实际情况为准 |
| zookeeper | ark1:2181,ark2:2181,ark3:2181 |
| broker | ark1:9092,ark2:9092, ark3:9092 |
## 消费数据
消费有shell、原生API等多种方式,可以选择一种适合使用场景的方式。
下面给出两种 Shell 方式启动消费的示例,使用 Shell 方式可以通过重定向标准输出将数据写入文件后处理或直接用管道作为其他进程的输入,可以对接各种编程语言实现的处理程序。
#### **使用 Kafka Console Consumer**
* 可以使用 Kafka 自带的 Kafka Console Consumer 通过命令行方式消费,例如从最新数据开始消费:
`bin/kafka-console-consumer.sh --zookeeper ark1:2181 --topic topicname(根据需要输入想消费的topic名)`
* 可以将 stdout 输出到文件或作为其他数据处理进程的输入数据。
#### **使用 Simple Consumer Shell**
* 使用 Simple Consumer Shell 可以实现更灵活的消费,例如:
```bash
bin/kafka-run-class.sh kafka.tools.SimpleConsumerShell \
--broker-list ark2:9092 \
--offset 1234 \
--partition 2 \
--topic event_topic \
--print-offsets
```
## 数据格式
消费的数据的格式与导入时的[数据格式](../prepare/data-type.md)基本一致。
>[info] 以上内容没有解答我的问题?[点击我进入方舟论坛去反馈](https://www.analysysdata.com/forum/index) 🚀
- 产品简介
- 快速上手
- Step 1 安装部署
- Step 2 激活系统创建项目
- Step 3 开启您的分析旅程
- 1. 集成 SDK
- 2. 可视化埋点
- 3. 创建分析模型
- 附:埋点方案设计
- 附:数据分析思路
- 产品更新日志
- V5.5 新增LTV分析功能等
- V5.3 UI 升级、分布分析重构、维度表动态更新、细节优化等
- V5.2 新增归因分析、消息中心、重构埋点方案、优化看数据体验……
- V5.1.0317 体验优化& Bug修复
- V5.1 升级可视化埋点、增强权限控制……
- Part I 产品功能说明
- 名词解释
- 指标说明
- 看板
- 5.3.3 看板 UI 重构
- 分析
- 事件分析
- 渠道分析
- 渠道相关名词解释
- 来源识别规则
- 搜索引擎
- 社交媒体
- 小程序场景值
- Session 分析
- Session 规则
- 实时分析
- 留存分析
- 转化漏斗
- 智能路径
- 归因分析
- 热图分析
- Web/H5 热图
- APP 热图
- 分布分析
- 间隔分析
- 属性分析
- LTV 分析
- 多主体分析
- 自定义查询
- 用户
- 用户分群
- 用户探查
- 用户标签
- 标签体系应用概览
- 标签体系
- 标签生命周期管理
- 标签加工
- 如何自定义SQL创建标签
- 单用户档案
- 运营
- 广告跟踪
- 微信小程序渠道追踪
- 预置广告媒介和渠道
- App 推广监测(Beta)
- 电子邮件
- 短信
- 消息通知
- 项目管理
- 项目概览
- 项目角色管理
- 项目成员管理
- 数据接入管理
- 埋点方案
- 可视化埋点
- 集成SDK接入数据
- 数据验证
- 用户数据导入
- 微信小程序全埋点事件定义
- 元数据管理
- 元事件
- 虚拟事件
- 事件属性
- 用户属性
- Session 管理
- 页面组管理
- 维度表
- 服务集成配置
- 监控告警
- 智能监控
- 自定义监控
- 平台管理
- 企业概览
- 项目管理
- 成员管理
- 安全设置
- 企业设置
- 日志管理
- 帐号设置
- Part II 技术文档
- 技术接入准备工作
- 部署环境检测工具
- 数据模型
- 数据格式
- 预置事件和属性
- App预置事件/属性
- JS 预置事件/属性
- 如何准确识别用户
- 如何设计埋点方案
- 分平台上报数据 vs 跨平台打通
- SDK 指南
- Android SDK
- 快速集成
- 全埋点模块
- 消息推送模块
- Android Hybrid模式
- SDK Gradle集成方式
- 多渠道打包
- 易观小工具
- 合规相关
- iOS SDK
- 快速集成
- 全埋点介绍
- iOS Hybrid模式
- 消息推送模块
- JS SDK
- 快速集成
- JS SDK基础版
- JS SDK插件
- uni-app SDK
- 快速集成
- 打包原生APP
- 开启移动端全埋点
- uni-app SDK标准版
- 微信小程序 SDK
- 快速集成
- 微信小程序标准版
- 微信小程序插件版
- 微信小程序通用框架版
- 支付宝小程序 SDK
- 支付宝小程序标准版
- 支付宝小程序通用框架版
- 字节跳动小程序 SDK
- 字节跳动小程序标准版
- 字节跳动小程序通用框架版
- 百度小程序 SDK
- 百度小程序标准版
- 百度小程序通用框架版
- 钉钉小程序 SDK
- 钉钉小程序标准版
- 钉钉小程序通用框架版
- QQ小程序 SDK
- QQ小程序标准版
- QQ小程序通用框架版
- 快应用 SDK
- 华为WeCode小程序
- WeCode SDK 标准版
- WeCode SDK插件
- PhoneGap SDK
- mPaaS SDK
- ReactNative SDK
- Flutter SDK
- Java SDK
- Python SDK
- PHP SDK
- C++ SDK
- C# SDK
- Node JS SDK
- Lua SDK
- Golang SDK
- SDK FAQ
- identify与alias的区别
- 爬虫数据如何识别?
- 页面停留如何获取时间?
- 如果获取SDK及更新日志
- 代码埋点和无埋点有什么区别
- Web页面中发现丢失某一个事件
- 自研 SDK 注意事项
- 页面时长统计功能
- 飞书小程序 SDK
- 飞书小程序标准版
- 飞书小程序通用框架版
- Unreal Engine SDK
- 数据验证
- 客户端埋点验证
- Debug 数据验证
- 数据入库验证
- 数据导入
- 接口导入
- JAVA工具包
- 标准json文件导入
- csv格式导入
- 数据导入FAQ
- 数据导出
- JAVA工具包
- 事件数据导出
- 用户数据导出
- 直接从Kafka中消费数据
- 使用程序访问数据库
- 脚本工具
- API
- 分析API
- 事件分析
- 留存分析
- 自定义查询
- 转化漏斗
- 属性分析
- Session分析
- 渠道分析
- 分布分析
- 用户API
- 分群查询
- 用户档案
- 分群管理
- 管理API
- 权限管理
- 元数据管理
- 埋点方案管理
- 维度表管理
- 运营API
- 广告跟踪
- APP推广监测
- 平台管理API
- 项目管理
- 成员管理
- 第三方登录
- OAuth2.0登录
- LDAP登录
- GDPR 合规
- Part III 常见问题
- License 许可
- 产品试用及采购
- 参与贡献