开篇
先来调侃一句,原来独角兽Uber的程序员写的代码也是看得懂的,而且还是比较容易看得懂的,所以有时候在设计模式和代码结构清晰以及可读性方面我还是更倾向于后者,宁可重复或浪费一部分代码也要整个代码的可读性更强一些。
整个JVM Profiler的启动过程比较简单也非常清晰,当我们通过下面命令启动Profiler以后,会按照以下步骤进行:
java -javaagent:target/jvm-profiler-0.0.8.jar=
reporter=com.uber.profiling.reporters.ConsoleOutputReporter,
tag=mytag,metricInterval=5000,
durationProfiling=com.uber.profiling.examples.HelloWorldApplication.publicSleepMethod,
argumentProfiling=com.uber.profiling.examples.HelloWorldApplication.publicSleepMethod.1,
sampleInterval=100
-cp target/jvm-profiler-0.0.8.jar com.uber.profiling.examples.HelloWorldApplication
- 1、解析参数:以启动命令行传入的参数作为基准,以命令参数指定配置文件中的参数进行覆盖。核心点在于构建需要拦截的方法和方法参数等。
- 2、拦截方法:针对需要拦截的方法在原有的方法体当中织入前置和后置耗时统计代码。
- 3、创建所有采集器Profiler:创建包括IO,Memory,cpu,方法体耗时等Profiler采集器。
- 4、创建Reporter对象:根据参数指定创建指定的Reporter对象,用于上报采集数据。
- 5、启动采集器Profiler:根据周期性和非周期两种来启动采集器,其中周期性通过ScheduledExecutorService来实现的。
- 6、增加shutdown钩子函数:用于停止的时候做一些额外操作。
源码介绍
- 通过agentmain和premain两种固定的函数构建入口
- Arguments.parseArgs()解析命令行参数
- arguments.runConfigProvider()解析配置文件的参数
- agentImpl.run()进入JVM Profiler的启动过程
public final class Agent {
private static AgentImpl agentImpl = new AgentImpl();
private Agent() {
}
// Java SE6开始,提供了在应用程序的VM启动后在动态添加代理的方式,即agentmain方式
public static void agentmain(final String args, final Instrumentation instrumentation) {
premain(args, instrumentation);
}
// premain是Java SE5开始就提供的代理方式,其必须在命令行指定代理jar,并且代理类必须在main方法前启动。
public static void premain(final String args, final Instrumentation instrumentation) {
System.out.println("Java Agent " + AgentImpl.VERSION + " premain args: " + args);
// 解析参数
Arguments arguments = Arguments.parseArgs(args);
// 解析参数中指定的配资文件内的配置信息,覆盖传参带入的参数
arguments.runConfigProvider();
agentImpl.run(arguments, instrumentation, null);
}
}
参数解析过程
- 参数解析过程用于解析命令行传入的参数
- 命令行参数是按照a=b的格式传入的,解析分割生成Map对象
- 根据Map对象构建参数对象Arguments进行具体参数解析过程
public static Arguments parseArgs(String args) {
if (args == null) {
return new Arguments(new HashMap<>());
}
args = args.trim();
if (args.isEmpty()) {
return new Arguments(new HashMap<>());
}
// 解析参数,按照a=b,c=d,e=f的格式进行参数传入
Map<String, List<String>> map = new HashMap<>();
for (String argPair : args.split(",")) {
String[] strs = argPair.split("=");
if (strs.length != 2) {
throw new IllegalArgumentException("Arguments for the agent should be like: key1=value1,key2=value2");
}
String key = strs[0].trim();
if (key.isEmpty()) {
throw new IllegalArgumentException("Argument key should not be empty");
}
List<String> list = map.get(key);
if (list == null) {
list = new ArrayList<>();
map.put(key, list);
}
list.add(strs[1].trim());
}
return new Arguments(map);
}
private Arguments(Map<String, List<String>> parsedArgs) {
// parsedArgs是kv的map形式
updateArguments(parsedArgs);
}
参数具体解析过程
- 命令行传入的参数解析成Map对象
- 根据Map对象的key获取对应的参数值赋值给对应的变量
- 拦截方法体会构建ClassAndMethod,拦截方法体参数会构建ClassMethodArgument
public final static String ARG_NOOP = "noop";
public final static String ARG_REPORTER = "reporter";
public final static String ARG_CONFIG_PROVIDER = "configProvider";
public final static String ARG_CONFIG_FILE = "configFile";
public final static String ARG_METRIC_INTERVAL = "metricInterval";
public final static String ARG_SAMPLE_INTERVAL = "sampleInterval";
public final static String ARG_TAG = "tag";
public final static String ARG_CLUSTER = "cluster";
public final static String ARG_APP_ID_VARIABLE = "appIdVariable";
public final static String ARG_APP_ID_REGEX = "appIdRegex";
public final static String ARG_DURATION_PROFILING = "durationProfiling";
public final static String ARG_ARGUMENT_PROFILING = "argumentProfiling";
public final static String ARG_BROKER_LIST = "brokerList";
public final static String ARG_SYNC_MODE = "syncMode";
public final static String ARG_TOPIC_PREFIX = "topicPrefix";
public final static String ARG_OUTPUT_DIR = "outputDir";
public final static String ARG_IO_PROFILING = "ioProfiling";
public static final long MIN_INTERVAL_MILLIS = 50;
public void updateArguments(Map<String, List<String>> parsedArgs) {
// 保存原始的参数信息,也就是kv的map对象
rawArgValues.putAll(parsedArgs);
// 解析参数 ARG_NOOP
String argValue = getArgumentSingleValue(parsedArgs, ARG_NOOP);
if (needToUpdateArg(argValue)) {
noop = Boolean.parseBoolean(argValue);
logger.info("Got argument value for noop: " + noop);
}
// 解析参数 ARG_REPORTER
argValue = getArgumentSingleValue(parsedArgs, ARG_REPORTER);
if (needToUpdateArg(argValue)) {
reporterConstructor = ReflectionUtils.getConstructor(argValue, Reporter.class);
logger.info("Got argument value for reporter: " + argValue);
}
// 解析参数 ARG_CONFIG_PROVIDER
argValue = getArgumentSingleValue(parsedArgs, ARG_CONFIG_PROVIDER);
if (needToUpdateArg(argValue)) {
configProviderConstructor = ReflectionUtils.getConstructor(argValue, ConfigProvider.class);
logger.info("Got argument value for configProvider: " + argValue);
}
// 解析参数 ARG_CONFIG_FILE,应该可以覆盖通过传参数带入的参数,系统提供YAML格式的解析器
argValue = getArgumentSingleValue(parsedArgs, ARG_CONFIG_FILE);
if (needToUpdateArg(argValue)) {
configFile = argValue;
logger.info("Got argument value for configFile: " + configFile);
}
// 解析参数 ARG_METRIC_INTERVAL
argValue = getArgumentSingleValue(parsedArgs, ARG_METRIC_INTERVAL);
if (needToUpdateArg(argValue)) {
metricInterval = Long.parseLong(argValue);
logger.info("Got argument value for metricInterval: " + metricInterval);
}
if (metricInterval < MIN_INTERVAL_MILLIS) {
throw new RuntimeException("Metric interval too short, must be at least " + Arguments.MIN_INTERVAL_MILLIS);
}
// 解析参数 ARG_SAMPLE_INTERVAL
argValue = getArgumentSingleValue(parsedArgs, ARG_SAMPLE_INTERVAL);
if (needToUpdateArg(argValue)) {
sampleInterval = Long.parseLong(argValue);
logger.info("Got argument value for sampleInterval: " + sampleInterval);
}
if (sampleInterval != 0 && sampleInterval < MIN_INTERVAL_MILLIS) {
throw new RuntimeException("Sample interval too short, must be 0 (disable sampling) or at least " + Arguments.MIN_INTERVAL_MILLIS);
}
// 解析参数 ARG_TAG
argValue = getArgumentSingleValue(parsedArgs, ARG_TAG);
if (needToUpdateArg(argValue)) {
tag = argValue;
logger.info("Got argument value for tag: " + tag);
}
// 解析参数 ARG_CLUSTER
argValue = getArgumentSingleValue(parsedArgs, ARG_CLUSTER);
if (needToUpdateArg(argValue)) {
cluster = argValue;
logger.info("Got argument value for cluster: " + cluster);
}
// 解析参数 ARG_APP_ID_VARIABLE
argValue = getArgumentSingleValue(parsedArgs, ARG_APP_ID_VARIABLE);
if (needToUpdateArg(argValue)) {
appIdVariable = argValue;
logger.info("Got argument value for appIdVariable: " + appIdVariable);
}
// 解析参数 ARG_APP_ID_REGEX
argValue = getArgumentSingleValue(parsedArgs, ARG_APP_ID_REGEX);
if (needToUpdateArg(argValue)) {
appIdRegex = argValue;
logger.info("Got argument value for appIdRegex: " + appIdRegex);
}
// 解析参数 ARG_DURATION_PROFILING
List<String> argValues = getArgumentMultiValues(parsedArgs, ARG_DURATION_PROFILING);
if (!argValues.isEmpty()) {
durationProfiling.clear();
for (String str : argValues) {
int index = str.lastIndexOf(".");
if (index <= 0 || index + 1 >= str.length()) {
throw new IllegalArgumentException("Invalid argument value: " + str);
}
// 以最后一个.作为分割符,前半部分代表class,后半部分代表方法名
// 以com.uber.profiling.examples.HelloWorldApplication.publicSleepMethod为例
String className = str.substring(0, index);
String methodName = str.substring(index + 1, str.length());
// 创建类名和方法名的对象
ClassAndMethod classAndMethod = new ClassAndMethod(className, methodName);
// durationProfiling保存所有需要监控的ClassAndMethod(包括类名和方法名)
durationProfiling.add(classAndMethod);
logger.info("Got argument value for durationProfiling: " + classAndMethod);
}
}
// 解析参数 ARG_ARGUMENT_PROFILING
argValues = getArgumentMultiValues(parsedArgs, ARG_ARGUMENT_PROFILING);
if (!argValues.isEmpty()) {
argumentProfiling.clear();
for (String str : argValues) {
// 以最后一个.作为分割符,前半部分代表类和方法,后半部分代表参数下标
int index = str.lastIndexOf(".");
if (index <= 0 || index + 1 >= str.length()) {
throw new IllegalArgumentException("Invalid argument value: " + str);
}
// 前半部分获取类和方法名,后半部分代表下标
String classMethodName = str.substring(0, index);
int argumentIndex = Integer.parseInt(str.substring(index + 1, str.length()));
// 分割类名和方法名
index = classMethodName.lastIndexOf(".");
if (index <= 0 || index + 1 >= classMethodName.length()) {
throw new IllegalArgumentException("Invalid argument value: " + str);
}
// 获取类名和方法名
String className = classMethodName.substring(0, index);
String methodName = str.substring(index + 1, classMethodName.length());
// 构建ClassMethodArgument对象,包括类名,方法名,参数下标。
ClassMethodArgument classMethodArgument = new ClassMethodArgument(className, methodName, argumentIndex);
// argumentProfiling用来保存ClassMethodArgument对象
argumentProfiling.add(classMethodArgument);
logger.info("Got argument value for argumentProfiling: " + classMethodArgument);
}
}
// 解析参数 ARG_BROKER_LIST
argValue = getArgumentSingleValue(parsedArgs, ARG_BROKER_LIST);
if (needToUpdateArg(argValue)) {
brokerList = argValue;
logger.info("Got argument value for brokerList: " + brokerList);
}
// 解析参数 ARG_SYNC_MODE
argValue = getArgumentSingleValue(parsedArgs, ARG_SYNC_MODE);
if (needToUpdateArg(argValue)) {
syncMode = Boolean.parseBoolean(argValue);
logger.info("Got argument value for syncMode: " + syncMode);
}
// 解析参数 ARG_TOPIC_PREFIX
argValue = getArgumentSingleValue(parsedArgs, ARG_TOPIC_PREFIX);
if (needToUpdateArg(argValue)) {
topicPrefix = argValue;
logger.info("Got argument value for topicPrefix: " + topicPrefix);
}
// 解析参数 ARG_OUTPUT_DIR
argValue = getArgumentSingleValue(parsedArgs, ARG_OUTPUT_DIR);
if (needToUpdateArg(argValue)) {
outputDir = argValue;
logger.info("Got argument value for outputDir: " + outputDir);
}
// 解析参数 ARG_IO_PROFILING
argValue = getArgumentSingleValue(parsedArgs, ARG_IO_PROFILING);
if (needToUpdateArg(argValue)) {
ioProfiling = Boolean.parseBoolean(argValue);
logger.info("Got argument value for ioProfiling: " + ioProfiling);
}
}
配置文件参数解析过程
- 解析配置文件对应的参数,然后覆盖命令行解析的参数
public void runConfigProvider() {
try {
// 获取指定的配置提供者,也就是配置解析的自定义函数
ConfigProvider configProvider = getConfigProvider();
if (configProvider != null) {
// 获取配置提供者返回的参数配置信息
Map<String, Map<String, List<String>>> extraConfig = configProvider.getConfig();
// Get root level config (use empty string as key in the config map)
Map<String, List<String>> rootConfig = extraConfig.get("");
if (rootConfig != null) {
updateArguments(rootConfig);
logger.info("Updated arguments based on config: " + JsonUtils.serialize(rootConfig));
}
// Get tag level config (use tag value to find config values in the config map)
if (getTag() != null && !getTag().isEmpty()) {
Map<String, List<String>> overrideConfig = extraConfig.get(getTag());
if (overrideConfig != null) {
updateArguments(overrideConfig);
logger.info("Updated arguments based on config override: " + JsonUtils.serialize(overrideConfig));
}
}
}
} catch (Throwable ex) {
logger.warn("Failed to update arguments with config provider", ex);
}
}
启动采集过程
- 负责对创建方法体拦截的对象JavaAgentFileTransformer并添加到instrumentation当中。
- 负责创建Profiler对象
- 启动所有的Profiler对象
- 创建钩子函数负责处理shutdown的后续操作
public void run(Arguments arguments, Instrumentation instrumentation, Collection<AutoCloseable> objectsToCloseOnShutdown) {
if (arguments.isNoop()) {
logger.info("Agent noop is true, do not run anything");
return;
}
// 获取监控上报方法的对象
Reporter reporter = arguments.getReporter();
String processUuid = UUID.randomUUID().toString();
String appId = null;
// 获取监控对象的appId变量
String appIdVariable = arguments.getAppIdVariable();
if (appIdVariable != null && !appIdVariable.isEmpty()) {
appId = System.getenv(appIdVariable);
}
if (appId == null || appId.isEmpty()) {
appId = SparkUtils.probeAppId(arguments.getAppIdRegex());
}
// 将监控的方法名和监控方法的参数下标作为参数创建JavaAgentFileTransformer对象后通过instrumentation.addTransformer绑定到instrumentation。
if (!arguments.getDurationProfiling().isEmpty()
|| !arguments.getArgumentProfiling().isEmpty()) {
// 构建JavaAgentFileTransformer对象
instrumentation.addTransformer(new JavaAgentFileTransformer(arguments.getDurationProfiling(), arguments.getArgumentProfiling()));
}
// 创建了所有Profiler对象
List<Profiler> profilers = createProfilers(reporter, arguments, processUuid, appId);
// 启动所有Profile对象,内部分为一次性和周期性两种
ProfilerGroup profilerGroup = startProfilers(profilers);
// 后置的钩子函数
Thread shutdownHook = new Thread(new ShutdownHookRunner(profilerGroup.getPeriodicProfilers(), Arrays.asList(reporter), objectsToCloseOnShutdown));
Runtime.getRuntime().addShutdownHook(shutdownHook);
}
创建采集器过程
- 创建了各种采集数据的采集器对象
private List<Profiler> createProfilers(Reporter reporter, Arguments arguments, String processUuid, String appId) {
String tag = arguments.getTag();
String cluster = arguments.getCluster();
long metricInterval = arguments.getMetricInterval();
List<Profiler> profilers = new ArrayList<>();
// 创建cpu采集器
CpuAndMemoryProfiler cpuAndMemoryProfiler = new CpuAndMemoryProfiler(reporter);
cpuAndMemoryProfiler.setTag(tag);
cpuAndMemoryProfiler.setCluster(cluster);
cpuAndMemoryProfiler.setIntervalMillis(metricInterval);
cpuAndMemoryProfiler.setProcessUuid(processUuid);
cpuAndMemoryProfiler.setAppId(appId);
profilers.add(cpuAndMemoryProfiler);
// 创建进程采集器
ProcessInfoProfiler processInfoProfiler = new ProcessInfoProfiler(reporter);
processInfoProfiler.setTag(tag);
processInfoProfiler.setCluster(cluster);
processInfoProfiler.setProcessUuid(processUuid);
processInfoProfiler.setAppId(appId);
profilers.add(processInfoProfiler);
// 创建接口耗时统计采集器MethodDurationProfiler
if (!arguments.getDurationProfiling().isEmpty()) {
// 采集结果存储数据结构
ClassAndMethodLongMetricBuffer classAndMethodMetricBuffer = new ClassAndMethodLongMetricBuffer();
MethodDurationProfiler methodDurationProfiler = new MethodDurationProfiler(classAndMethodMetricBuffer, reporter);
methodDurationProfiler.setTag(tag);
methodDurationProfiler.setCluster(cluster);
methodDurationProfiler.setIntervalMillis(metricInterval);
methodDurationProfiler.setProcessUuid(processUuid);
methodDurationProfiler.setAppId(appId);
MethodDurationCollector methodDurationCollector = new MethodDurationCollector(classAndMethodMetricBuffer);
// 设置methodDurationCollector收集器
MethodProfilerStaticProxy.setCollector(methodDurationCollector);
profilers.add(methodDurationProfiler);
}
// 创建接口参数采集器MethodArgumentProfiler
if (!arguments.getArgumentProfiling().isEmpty()) {
ClassMethodArgumentMetricBuffer classAndMethodArgumentBuffer = new ClassMethodArgumentMetricBuffer();
MethodArgumentProfiler methodArgumentProfiler = new MethodArgumentProfiler(classAndMethodArgumentBuffer, reporter);
methodArgumentProfiler.setTag(tag);
methodArgumentProfiler.setCluster(cluster);
methodArgumentProfiler.setIntervalMillis(metricInterval);
methodArgumentProfiler.setProcessUuid(processUuid);
methodArgumentProfiler.setAppId(appId);
MethodArgumentCollector methodArgumentCollector = new MethodArgumentCollector(classAndMethodArgumentBuffer);
// 设置methodArgumentCollector统计数据收集器
MethodProfilerStaticProxy.setArgumentCollector(methodArgumentCollector);
profilers.add(methodArgumentProfiler);
}
// 创建堆栈信息包括线程信息的采集器
if (arguments.getSampleInterval() > 0) {
StacktraceMetricBuffer stacktraceMetricBuffer = new StacktraceMetricBuffer();
StacktraceCollectorProfiler stacktraceCollectorProfiler = new StacktraceCollectorProfiler(stacktraceMetricBuffer, AgentThreadFactory.NAME_PREFIX);
stacktraceCollectorProfiler.setIntervalMillis(arguments.getSampleInterval());
StacktraceReporterProfiler stacktraceReporterProfiler = new StacktraceReporterProfiler(stacktraceMetricBuffer, reporter);
stacktraceReporterProfiler.setTag(tag);
stacktraceReporterProfiler.setCluster(cluster);
stacktraceReporterProfiler.setIntervalMillis(metricInterval);
stacktraceReporterProfiler.setProcessUuid(processUuid);
stacktraceReporterProfiler.setAppId(appId);
profilers.add(stacktraceCollectorProfiler);
profilers.add(stacktraceReporterProfiler);
}
// 创建IO采集器
if (arguments.isIoProfiling()) {
IOProfiler ioProfiler = new IOProfiler(reporter);
ioProfiler.setTag(tag);
ioProfiler.setCluster(cluster);
ioProfiler.setIntervalMillis(metricInterval);
ioProfiler.setProcessUuid(processUuid);
ioProfiler.setAppId(appId);
profilers.add(ioProfiler);
}
return profilers;
}
- 负责启动创建的所有Profiler对象
- 一次性采集的Profiler对象直接启动一次
- 周期性的Profiler对象直接通过ScheduledExecutorService服务进行调度
public ProfilerGroup startProfilers(Collection<Profiler> profilers) {
if (started) {
logger.warn("Profilers already started, do not start it again");
return new ProfilerGroup(new ArrayList<>(), new ArrayList<>());
}
List<Profiler> oneTimeProfilers = new ArrayList<>();
List<Profiler> periodicProfilers = new ArrayList<>();
for (Profiler profiler : profilers) {
if (profiler.getIntervalMillis() == 0) {
oneTimeProfilers.add(profiler);
} else if (profiler.getIntervalMillis() > 0) {
periodicProfilers.add(profiler);
} else {
logger.log(String.format("Ignored profiler %s due to its invalid interval %s", profiler, profiler.getIntervalMillis()));
}
}
// 启动一次性采集程序
for (Profiler profiler : oneTimeProfilers) {
try {
profiler.profile();
logger.info("Finished one time profiler: " + profiler);
} catch (Throwable ex) {
logger.warn("Failed to run one time profiler: " + profiler, ex);
}
}
// 先单次执行采集程序
for (Profiler profiler : periodicProfilers) {
try {
profiler.profile();
logger.info("Ran periodic profiler (first run): " + profiler);
} catch (Throwable ex) {
logger.warn("Failed to run periodic profiler (first run): " + profiler, ex);
}
}
// 周期性的执行需要调度的采集程序
scheduleProfilers(periodicProfilers);
started = true;
return new ProfilerGroup(oneTimeProfilers, periodicProfilers);
}
周期性调度过程
- 周期性任务调度直接将Profiler封装成Thread对象
- 周期性任务通过ScheduledExecutorService进行调度
// 对于周期性的采集器需要周期性的进行调度执行
private void scheduleProfilers(Collection<Profiler> profilers) {
int threadPoolSize = Math.min(profilers.size(), MAX_THREAD_POOL_SIZE);
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(threadPoolSize, new AgentThreadFactory());
// 对于周期性的采集器需要周期性的取执行
for (Profiler profiler : profilers) {
if (profiler.getIntervalMillis() < Arguments.MIN_INTERVAL_MILLIS) {
throw new RuntimeException("Interval too short for profiler: " + profiler + ", must be at least " + Arguments.MIN_INTERVAL_MILLIS);
}
ProfilerRunner worker = new ProfilerRunner(profiler);
scheduledExecutorService.scheduleAtFixedRate(worker, 0, profiler.getIntervalMillis(), TimeUnit.MILLISECONDS);
logger.info(String.format("Scheduled profiler %s with interval %s millis", profiler, profiler.getIntervalMillis()));
}
}
public class ProfilerRunner implements Runnable {
private static final AgentLogger logger = AgentLogger.getLogger(ProfilerRunner.class.getName());
private static final int MAX_ERROR_COUNT_TO_LOG = 100;
private final Profiler profiler;
private final AtomicLong errorCounter = new AtomicLong(0);
public ProfilerRunner(Profiler profiler) {
this.profiler = profiler;
}
@Override
public void run() {
try {
profiler.profile();
} catch (Throwable e) {
long errorCountValue = errorCounter.incrementAndGet();
if (errorCountValue <= MAX_ERROR_COUNT_TO_LOG) {
logger.warn("Failed to run profile: " + profiler, e);
} else {
e.printStackTrace();
}
}
}
}
maven打包配置
- Javassist有额外的打包要求,特别的地方就在下面这个地方。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifestEntries>
<Agent-Class>com.uber.profiling.Agent</Agent-Class>
<Premain-Class>com.uber.profiling.Agent</Premain-Class>
</manifestEntries>
</archive>
</configuration>
</plugin>