skywalking
2022-06-09 14:51:30 31 举报
AI智能生成
skywalking源码分析
作者其他创作
大纲/内容
工具
DataCarrirer
QueueBuffer
数据存储的最小单元
Buffer
底层是一个 Object 数组
Agent 端使用
是一个环形队列
ArrayBlockQueueBuffer
底层是 ArrayBlockingQueue
OAP端使用
拓展:AtomicIntegerArray
JDK8 基于Unsafe 实现
JDK8+ 基于VarHandle实现
数据存储策略 BufferStrategy
BLOCKING 阻塞等待 Buffer 有空位
IF_POSSIBLE 直接丢弃数据
Channels
管理了多个Buffer
使用分区器 IDataPartitioner 来选择数据要存入的 Buffer
SimpleRollingPartitioner 轮询
ProducerThreadPartitioner 生产者线程绑定
数据消费
ConsumerThread
一个消费者绑定多个Buffer
MultipleChannelsConsumer
管理多个Group
每个Group有自己的消费者和对应的Channels
ConsumeDriver
同一个消费者的多条消费线程来消费一个Channels
Buffer 和消费线程的绑定关系由 buffer 数量和消费线程数量取模得出
BulkConsumePool
管理多个MultipleChannelsConsumer
每次增加 Group的时候往持有Buffer 数量最少的 MultipleChannelsConsumer 实例添加
原理概述
Agent
启动方式
静态启动:使用 -javaagent参数
入口方法:premain
在类加载时对目标类的字节码可以进行任意修改
skywalking只支持这种方式启动 Agent
动态附加:使用Attach API
入口方法:agentmain
类已经加载完成并被使用:这时候只能对目标类的字节码进行有限的修改
不能增减父类
不能增加接口
不能调整 Field
...
典型的应用:系统诊断(阿里 Arthas、笨马 XPocket)
启动流程
初始化配置
加载配置信息
/config/agent.config
系统环境变量
Agent 参数
优先级:自下往上
将配置信息映射到 Config 类
根据配置信息重新指定日志解析器
检查 Agent 名称和后端地址是否配置
标记配置加载完成
加载插件
并行类加载器
调用 registerAsParallelCapables()方法开启
原理就是将类加载时的锁从类加载器级别缩小到具体加载的某一个类
AgentClassLoader
classpath:/config/plugins,/config/activations
插件定义体系
插件定义:XxxInstrumentation
拦截实例方法/构造器: ClassInstanceMethodsEnhancePluginDefine
拦截静态方法:ClassStaticMethodsEnhancePluginDefine
AbstractClassEnhancePluginDefine是所有插件定义的顶级父类
要拦截的类:enhanceClass()
要拦截的方法:getXxxInterceptorPoints()
目标类匹配
ClassMatch
按类名匹配:NamedMatch
间接匹配:IndirectMatch
PrefixMatch
MethodAnnotationMatch
拦截器定义
afterMethod
handleMethodException
beforeMethod
插件声明
resources/skywalking-plugin.def
插件名称=插件定义
加载流程
PluginBootstrap 实例化所有插件
PluginResourceResolver 查找 skywalking-plugin.def
PluginCfg 封装 PluginDefine
DynamicPluginLoader 加载基于 XML 配置的插件
PluginFinder 分类插件
命名插件 - NameMatch
间接匹配插件 - IndrectMatch
JDK 类库插件
定制 Agent
创建 ByteBuddy实例
指定 ByteBuddy 要忽略的类
synthetic
解决 var that = this 的问题
仅在嵌套类中讨论
Field - 访问外部类属性
Constructor - 使用 private 构造器
Method - 访问内部类属性
NBAC
Nest Base Access Control
JDK11 引入
不再生成 synthetic method
新的嵌套类关系组织方式
nestHost 指向宿主类
nestMembers 列出嵌套关系中所有类型
将必要的类注入到 BootstrapClassLoader 中
解决 JDK模块系统的跨模块类访问
根据配置决定是否将修改后的字节码保存到磁盘/内存上
细节定制
指定 ByteBuddy要拦截的类
指定做字节码增强的工具
指定字节码增强的模式
Redefine
覆盖掉被修改的内容
Retransform
保留被修改的内容
注册监听器
将 Agent 安装到 Instrumentation
加载服务
服务组织
服务需要实现 BootService 接口
如果服务只有一种实现,直接创建一个类即可
如果服务有多种实现
默认实现需要使用 @DefaultImplementor
覆盖实现需要使用 @OverrideImplementor
加载流程
SPI 加载所有 BootService 的实现
根据服务的实现模式进行服务的筛选
两个注解都没有的服务实现直接加入集合
对于 @DefaultImplementor
直接加入集合
对于 @OverrideImplementor
value 指向的服务有 @DefaultImplementor 则覆盖掉
value 指向的服务没有 @DefalutImplementor 则报错
注册关闭钩子
插件工作原理
Witness 机制
作用:识别组件版本
wotmessClasses
wotmessCMethods
在指定的类下面查找指定的方法,如果有多个方法则必须同时存在
工作流程
校验 TypeDescription 的合法性
在指定类加载器下查找指定的类型,如果有多个类型则必须同时存在
Witness 机制校验当前插件是否可用
字节码增强流程
静态方法
要修改原方法入参
是JDK类库的类
不是JDK类库的类
实例化插件中定义的 Interceptor
调用 beforeMethod()
可以修改原方法入参
调用原方法
调用时可以传参
对于异常,调用handleMethodException
调用 afterMethod()
不修改原方法入参
是JDK类库的类
前置工作:使用对应的 Template 生成实际使用的拦截逻辑,即Xxx_internal
调用 prepare()
打通 BootstrapClassLoader 和AgentClassLoader
拿到日志对象 ILog
实例化插件自定义的拦截器
替代非JDK核心类库处理逻辑里的 InterceptorInstanceLoader.load
后续流程和非JDK核心类库处理流程一致
不是JDK类库的类
实例化插件中定义的 Interceptor
调用 beforeMethod()
调用原方法
调用时不能传参
对于异常,调用 handleMethodException()
调用 afterMethod()
构造器和实例方法
构造器
是 JDK类库的类
不是 JDK 类库的类
只能在拦截的构造器原本逻辑执行完成以后再执行 onConstruct
实例方法
参照静态方法
将记录状态的上下文 EnhanceContext 设置为 已增强
服务 BootService
GRPCChannelManager
Agent 到 OAP的网络连接
定时重连
通知监听器网络连接状态的变化
ServiceManagementClient
向OAP汇报自身信息
保持心跳
CommandService
调度 OAP 下发的命令
收集其他服务获得的命令
转交给 CommandExecutorService 服务进行命令处理
CommandExecutorService
CommandExecutor
ConfigurationDiscoveryCommandExecutor 配置信息变更命令处理器
AgentConfigChangeWatcher 对某一个配置项的值变化进行监听
WatcherHolder 对上面监听器的封装
WatchHolder 的集合
ProfileTaskCommandExecutor 性能追踪命令处理器
选择一个具体的命令处理器
SamplingService
控制链路是否被上报到 OAP
采样策略
如果配置了采样率,则3秒内最多上报配置数值数量的链路到OAP
如果采样机制关闭,则默认所有采集到的链路都要上报到 OAP
采样率的动态变化
SamplingRateWatcher
JVMService
收集 JVM 的相关指标
收集和发送分离
收集:XxxProvider
发送: JVMMetricsSender
KafkaXxxService
由 Agent 直连 OAP 改为通过 Kafka 交互
Agent 和 OAP 依然存在 GRPC直连
大部分的采集的数据都改为走 Kafka
StatusCheckService
判断哪些异常不算异常
链路追踪
推荐文档
https://static.googleusercontent.com/media/research.google.com/zh-CN/archive/papers/dapper-2010-1.pdf
https://github.com/opentracing/specification/blob/master/specification.md
基本概念
Trace:表示一整条链路(跨进程、跨进程的所有 Segment的集合)
Segment:表示一个JVM进程内的一个线程中的所有操作的集合
Span:表示具体的某一个操作
TraceSegment
组成 Trace 的基本单元
Trace 不是一个具体的数据模型,而是多个 Segment 穿起来表示的逻辑对象
TraceSegmentRef 用于引用Parent Segment
所有的 Span 维护在一个 LinkedList 中
relatedGlobalTraceId 表示当前Segment 所在的 Trace
isSizeLimited 如果为 true 表示当前这条线程内发生的操作次数超过了配置值,Segment 丢弃了一部分操作
Span
AsyncSpan
最顶层的 Span 定义,用于异步插件
AbstractSpan - Span 的骨架
setComponent 指定Span所在的插件
setLayer 指定 Span 所在插件的类型
tag 在Span 上 打标签
log 在 Span 上记录 异常事件和自定义事件
setOperationName 指定 Span 这个动作的名称
HTTP 请求 -> URL
Redis 操作 -> Redis命令
start 启动 Span
ref 串联 TraceSegment
setPeer 指定 Span操作的 远端地址
AbstractTracingSpan - 用于链路追踪的Span模型
spanId 从0开始自增
parentSpanId记录上一个Span的ID,第一个Span的整个值为 -1
isInAsyncMode 表示当前Span表示的异步操作是否已经开始
isAsyncStopped 表示当前Span表示的异步操作是否已经结束
TracingContext 当前链路 Segment 和Span的上下文
refs 当前 Span 所在的 Segment 的上一个Segment 的引用,可能有多个
StackBasedTracingSpan - 基于虚拟栈结构的 Span
并没有一个具体的栈结构
通过 stackDepth 和currentMaxDepth 来模拟栈操作
EntrySpan
只会在第一个插件创建,后面的插件都是复用第一个插件创建的实例
记录的inxi是最靠近服务侧的
一个 TraceSegment 只能有一个EntrySpan
只有当 stackDepth == currentMaxDepth 时才能记录信息
ExitSpan
所谓ExitSpan 和EntrySpan一样采用复用的机制,前提是在插件嵌套的情况下
多个 ExitSpan 不存在嵌套关系,是平行存在的时候,是允许同时存在多个 ExitSpan
把 ExitSpab 简单理解为离开当前进程/线程的操作
TraceSegment 里不一定非要有 ExitSpan
记录的信息是最靠近消费测的
LocalSpan
通常用于记录一个本地方法调用
NoopXxxSpan
表示一个不会被记录的操作
为了确保Span整个工作流程的统一
链路追踪上下文
AbstractTracerContext
跨进程传播数据
inject
extract
数据载体 ContextCarrier
跨线程传播数据
capture
continued
数据载体 ContextSnapshot
TracingContext
管理当前 Segment 和自己前后的Segment 的引用 TraceSegmentRef
管理当前 Segment 内所有 span - 基于栈结构 activeSpanStack
创建 Entry Exit Local 三种 Span
结束 Span 和自身
IgnoredTracerContext
被忽略的 Segment 的上下文管理器
适配器 ContextManager
用于适配 AbstractTracerContext 以创建具体的实现的实例
代理了AbstractTracerContext 的一些主要方法
这里面所有的方法实际上都是调用的对应的 AbstractTracerContext实例的方法
RuntimeContext
和 TraceContext 生命周期一致
有时候需要记录一些额外的信息,那么就记录在 RuntimeContext
ContextManagerExtendService
真正用于创建具体的 TracerContext
链路数据发送
TraceSegmentServiceClient
基于GRPC
每次发送必须强制等待所有数据都发送完
KafkaTraceSegmentServiceClient
基于 Kafka
每条数据包装为一条Kakfa消息同步发送
收藏
0 条评论
下一页