0%

Ambari 自定义资源类型

Ambari 里将集群、主机、服务、组件、用户等视为资源,通过自定义资源即可实现类似 Ambari 风格的增删改查功能。

本文以在 Ambari 中添加 Yarn 资源,实现查询 Yarn 全部任务和查询某个任务的日志信息。

添加资源类型

org.apache.ambari.server.controller.spi.Resource 接口中添加资源类型枚举类 Yarn。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
 /**
* Enum of internal types.
*/
enum InternalType {
Cluster,
Service,
Setting,

……

RemoteCluster,
Yarn;

……

}


/**
* Resource types. Allows for the addition of external types.
*/
final class Type implements Comparable<Type>{

……

/**
* Internal types. See {@link InternalType}.
*/
public static final Type Cluster = InternalType.Cluster.getType();
public static final Type Service = InternalType.Service.getType();
public static final Type Setting = InternalType.Setting.getType();

……

public static final Type LoggingQuery = InternalType.LoggingQuery.getType();
public static final Type RemoteCluster = InternalType.RemoteCluster.getType();

/**
* Yarn
*/
public static final Type Yarn = InternalType.Yarn.getType();

……


}

新建控制器层

新建 org.apache.ambari.server.api.services.YarnService 类,注意请求方式和资源类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package org.apache.ambari.server.api.services;

import io.swagger.annotations.Api;
import org.apache.ambari.server.api.resources.ResourceInstance;
import org.apache.ambari.server.controller.spi.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import java.util.HashMap;
import java.util.Map;

/**
* Yarn 相关服务
*
* @author Erik
* @date 2020/12/7
*/
@Path("/yarn/")
@Api(value = "yarn")
public class YarnService extends BaseService {

private static final Logger LOG = LoggerFactory.getLogger(YarnService.class);

/**
* Gets all yarn info.
* Handles: GET /yarn requests.
*
* @param headers http headers
* @param ui uri info
*
* @return information regarding all yarn
*/
@GET
@Produces("text/plain")
public Response getYarnApplications(String body, @Context HttpHeaders headers, @Context UriInfo ui) {
LOG.info("Into get all Yarn applications.");
return handleRequest(headers, body, ui, Request.Type.GET, createResource(null));
}

/**
* Handles: GET /yarn/{applicationId}
* Get a specific applicationId info.
*
* @param headers http headers
* @param ui uri info
* @param applicationId applicationId
*
* @return yan instance representation
*/
@GET
@Path("{applicationId}")
@Produces("text/plain")
public Response getClusterUser(@Context HttpHeaders headers, @Context UriInfo ui,
@PathParam("applicationId") String applicationId) {

return handleRequest(headers, null, ui, Request.Type.GET, createResource(applicationId));
}


/**
* Create a yarn instance.
*
* @param applicationId applicationId
*
* @return a yarn resource instance
*/
private ResourceInstance createResource(String applicationId) {
final Map<Resource.Type, String> mapIds = new HashMap<>();
mapIds.put(Resource.Type.Yarn, applicationId);
return createResource(Resource.Type.Yarn, mapIds);
}

}

新建资源 Provider

新建 org.apache.ambari.server.controller.internal.YarnResourceProvider,这是具体的增删改查实现逻辑,先搭建整体框架,稍后具体实现这块逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.apache.ambari.server.controller.internal;

/**
* Yarn资源
*
* @author Erik
* @date 2020/12/7
*/
@StaticallyInject
public class YarnResourceProvider extends AbstractControllerResourceProvider {

private static final Logger LOG = LoggerFactory.getLogger(YarnResourceProvider.class);

public YarnResourceProvider(AmbariManagementController managementController) {
super(Resource.Type.Yarn, propertyIds, keyPropertyIds, managementController);
}

……
}

因为 YarnResourceProvider 是继承 AbstractControllerResourceProvider,所以要在 org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider 中添加 YarnResourceProvider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Factory method for obtaining a resource provider based on a given type and management controller.
*
* @param type the resource type
* @param managementController the management controller
*
* @return a new resource provider
*/
public static ResourceProvider getResourceProvider(Resource.Type type,
AmbariManagementController managementController) {

switch (type.getInternalType()) {
case Cluster:
return new ClusterResourceProvider(managementController);
case Service:
return resourceProviderFactory.getServiceResourceProvider(managementController);
case Component:
return resourceProviderFactory.getComponentResourceProvider(managementController);

……

case ViewInstance:
return resourceProviderFactory.getViewInstanceResourceProvider();

case Yarn:
return new YarnResourceProvider(managementController);

default:
throw new IllegalArgumentException("Unknown type " + type);
}
}

新建资源定义

新建 Yarn 资源的定义类 org.apache.ambari.server.api.resources.YarnResourceDefinition,在 getSubResourceDefinitions 中可定义子资源查询,实现方法可参考 UserResourceDefinition类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package org.apache.ambari.server.api.resources;

import org.apache.ambari.server.controller.spi.Resource;

import java.util.Collections;
import java.util.Set;

/**
* Yarn 资源定义
*
* @author Erik
* @date 2020/12/7
*/
public class YarnResourceDefinition extends BaseResourceDefinition {

public YarnResourceDefinition() {
super(Resource.Type.Yarn);
}

@Override
public String getPluralName() {
return "yarns";
}

@Override
public String getSingularName() {
return "yarn";
}

@Override
public Set<SubResourceDefinition> getSubResourceDefinitions() {
return Collections.emptySet();
}
}

添加新的资源实例

org.apache.ambari.server.api.resources.ResourceInstanceFactoryImpl 中添加上一步的资源定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  switch (type.getInternalType()) {
case Cluster:
resourceDefinition = new ClusterResourceDefinition();
break;

case Service:
resourceDefinition = new ServiceResourceDefinition();
break;

……

case RemoteCluster:
resourceDefinition = new RemoteClusterResourceDefinition();
break;

case Yarn:
resourceDefinition = new YarnResourceDefinition();
break;

default:
throw new IllegalArgumentException("Unsupported resource type: " + type);
}

新建 request 和 response

创建相应的 request 和 response 方法,这里新建 org.apache.ambari.server.controller.YarnRequestorg.apache.ambari.server.controller.YarnResponse

YarnRequest 示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package org.apache.ambari.server.controller;

import io.swagger.annotations.ApiModel;

import java.util.Objects;

/**
* Yarn request
*
* @author Erik
* @date 2020/12/7
*/
@ApiModel
public class YarnRequest {

private String applicationId;
private String applicationName;

public YarnRequest(String applicationId) {
this.applicationId = applicationId;
}

public String getApplicationId() {
return applicationId;
}

public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}

public String getApplicationName() {
return applicationName;
}

public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
YarnRequest that = (YarnRequest) o;
return Objects.equals(applicationId, that.applicationId) &&
Objects.equals(applicationName, that.applicationName);
}

@Override
public int hashCode() {
return Objects.hash(applicationId, applicationName);
}

@Override
public String toString() {
return "YarnRequest{" +
"applicationId='" + applicationId + '\'' +
", applicationName='" + applicationName + '\'' +
'}';
}
}

YarnResponse 示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package org.apache.ambari.server.controller;

import java.util.Objects;

/**
* Yarn response
*
* @author Erik
* @date 2020/12/7
*/
public class YarnResponse implements ApiModel {

private String applicationId;
private String applicationName;
private String applicationType;
private String user;
private String queue;
private String state;
private String finalState;
private String progress;
private String trackingUrl;

private String logInfo;

public YarnResponse() {

}

public String getApplicationId() {
return applicationId;
}

public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}

public String getApplicationName() {
return applicationName;
}

public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}

public String getApplicationType() {
return applicationType;
}

public void setApplicationType(String applicationType) {
this.applicationType = applicationType;
}

public String getUser() {
return user;
}

public void setUser(String user) {
this.user = user;
}

public String getQueue() {
return queue;
}

public void setQueue(String queue) {
this.queue = queue;
}

public String getState() {
return state;
}

public void setState(String state) {
this.state = state;
}

public String getFinalState() {
return finalState;
}

public void setFinalState(String finalState) {
this.finalState = finalState;
}

public String getProgress() {
return progress;
}

public void setProgress(String progress) {
this.progress = progress;
}

public String getTrackingUrl() {
return trackingUrl;
}

public void setTrackingUrl(String trackingUrl) {
this.trackingUrl = trackingUrl;
}

public String getLogInfo() {
return logInfo;
}

public void setLogInfo(String logInfo) {
this.logInfo = logInfo;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
YarnResponse that = (YarnResponse) o;
return Objects.equals(applicationId, that.applicationId) &&
Objects.equals(applicationName, that.applicationName) &&
Objects.equals(applicationType, that.applicationType) &&
Objects.equals(user, that.user) &&
Objects.equals(queue, that.queue) &&
Objects.equals(state, that.state) &&
Objects.equals(finalState, that.finalState) &&
Objects.equals(progress, that.progress) &&
Objects.equals(trackingUrl, that.trackingUrl) &&
Objects.equals(logInfo, that.logInfo);
}

@Override
public int hashCode() {
return Objects.hash(applicationId, applicationName, applicationType, user, queue, state, finalState, progress, trackingUrl, logInfo);
}

@Override
public String toString() {
return "YarnResponse{" +
"applicationId='" + applicationId + '\'' +
", applicationName='" + applicationName + '\'' +
", applicationType='" + applicationType + '\'' +
", user='" + user + '\'' +
", queue='" + queue + '\'' +
", state='" + state + '\'' +
", finalState='" + finalState + '\'' +
", progress='" + progress + '\'' +
", trackingUrl='" + trackingUrl + '\'' +
", logInfo='" + logInfo + '\'' +
'}';
}
}

新建数据库相关类

涉及到新增数据库表的情况,需要在 org.apache.ambari.server.orm.dao 包中新建相应的 DAO 层,在 org.apache.ambari.server.orm.entities 包中新建相应的实体类,并将实体类加入到 ambari-server\src\main\resources\META-INF\persistence.xml 中,这个例子中不涉及到新建数据表的情况。

实现具体逻辑

整体框架搭建好后,具体的实现逻辑在 YarnResourceProvider 类中。

定义资源属性

需要定义 Yarn 资源的属性,并组合成 yarn/属性 的方式,这样可以控制返回属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static final String YARN_RESOURCE_CATEGORY = "yarn";

/**
* Yarn application info
*/
public static final String APPLICATION_ID_PROPERTY_ID = "applicationId";
public static final String APPLICATION_NAME_PROPERTY_ID = "applicationName";
public static final String APPLICATION_TYPE_PROPERTY_ID = "applicationType";
public static final String USER_PROPERTY_ID = "user";
public static final String QUEUE_PROPERTY_ID = "queue";
public static final String STATE_PROPERTY_ID = "state";
public static final String FINAL_STATE_PROPERTY_ID = "finalState";
public static final String PROGRESS_PROPERTY_ID = "progress";
public static final String TRACKINGURL_PROPERTY_ID = "trackingUrl";
public static final String LOG_INFO_PROPERTY_ID = "logInfo";


public static final String YARN_APPLICATION_ID_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + APPLICATION_ID_PROPERTY_ID;
public static final String YARN_APPLICATION_NAME_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + APPLICATION_NAME_PROPERTY_ID;
public static final String YARN_APPLICATION_TYPE_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + APPLICATION_TYPE_PROPERTY_ID;
public static final String YARN_USER_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + USER_PROPERTY_ID;
public static final String YARN_QUEUE_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + QUEUE_PROPERTY_ID;
public static final String YARN_STATE_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + STATE_PROPERTY_ID;
public static final String YARN_FINAL_STATE_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + FINAL_STATE_PROPERTY_ID;
public static final String YARN_PROGRESS_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + PROGRESS_PROPERTY_ID;
public static final String YARN_TRACKINGURL_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + TRACKINGURL_PROPERTY_ID;
public static final String YARN_LOG_INFO_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + LOG_INFO_PROPERTY_ID;

设置资源属性

定义好资源属性后需要添加设置属性。

首先是 keyPropertyIds, 这个可认为是 key 值,根据这个值寻找 Yarn 资源,且返回的时候必须要返回这个属性值,否则请求获取不到值。

1
2
3
4
5
6
/**
* The key property ids for a Yarn resource.
*/
private static Map<Resource.Type, String> keyPropertyIds = ImmutableMap.<Resource.Type, String>builder()
.put(Resource.Type.Yarn, YARN_APPLICATION_ID_PROPERTY_ID)
.build();

然后是 propertyIds,这里必须包含 keyPropertyIds 中的值,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* The property ids for a Yarn resource.
*/
public static Set<String> propertyIds = Sets.newHashSet(
YARN_APPLICATION_ID_PROPERTY_ID,
YARN_APPLICATION_NAME_PROPERTY_ID,
YARN_APPLICATION_TYPE_PROPERTY_ID,
YARN_USER_PROPERTY_ID,
YARN_QUEUE_PROPERTY_ID,
YARN_STATE_PROPERTY_ID,
YARN_FINAL_STATE_PROPERTY_ID,
YARN_PROGRESS_PROPERTY_ID,
YARN_TRACKINGURL_PROPERTY_ID,
YARN_LOG_INFO_PROPERTY_ID);

最后是 getPKPropertyIds,这个是固定写法。

1
2
3
4
@Override
protected Set<String> getPKPropertyIds() {
return new HashSet<>(keyPropertyIds.values());
}

添加注解

Ambari 使用 Guice 进行注解,这里需要使用 HostComponentStateDAO 查询组件所在主机名,所有需要使用 @Inject 注解引入 HostComponentStateDAO,并且需要在类前加上 @StaticallyInject

1
2
3
4
5
6
7
8
9
10
11
@StaticallyInject
public class YarnResourceProvider extends AbstractControllerResourceProvider {

……

@Inject
protected static HostComponentStateDAO hostComponentStateDAO;

……

}

查询逻辑

getResources 方法中实现具体的查询逻辑,主要是通过 getRequest 方法解析请求参数,如果 applicationId 为空,则返回所有 application 信息;如果 applicationId 不为空,则查询改任务的日志信息。主要使用执行 shell 命令获取返回值,解析返回值获取相应信息。

完整 YarnResourceProvider 示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
package org.apache.ambari.server.controller.internal;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import org.apache.ambari.server.StaticallyInject;
import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.controller.YarnRequest;
import org.apache.ambari.server.controller.YarnResponse;
import org.apache.ambari.server.controller.spi.*;
import org.apache.ambari.server.orm.dao.HostComponentStateDAO;
import org.apache.ambari.server.orm.entities.HostComponentStateEntity;
import org.apache.ambari.server.utils.ShellCommandUtil;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Yarn资源
*
* @author Erik
* @date 2020/12/7
*/
@StaticallyInject
public class YarnResourceProvider extends AbstractControllerResourceProvider {

private static final Logger LOG = LoggerFactory.getLogger(YarnResourceProvider.class);

@Inject
protected static HostComponentStateDAO hostComponentStateDAO;

public static final String YARN_RESOURCE_CATEGORY = "yarn";

/**
* Yarn application info
*/
public static final String APPLICATION_ID_PROPERTY_ID = "applicationId";
public static final String APPLICATION_NAME_PROPERTY_ID = "applicationName";
public static final String APPLICATION_TYPE_PROPERTY_ID = "applicationType";
public static final String USER_PROPERTY_ID = "user";
public static final String QUEUE_PROPERTY_ID = "queue";
public static final String STATE_PROPERTY_ID = "state";
public static final String FINAL_STATE_PROPERTY_ID = "finalState";
public static final String PROGRESS_PROPERTY_ID = "progress";
public static final String TRACKINGURL_PROPERTY_ID = "trackingUrl";
public static final String LOG_INFO_PROPERTY_ID = "logInfo";


public static final String YARN_APPLICATION_ID_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + APPLICATION_ID_PROPERTY_ID;
public static final String YARN_APPLICATION_NAME_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + APPLICATION_NAME_PROPERTY_ID;
public static final String YARN_APPLICATION_TYPE_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + APPLICATION_TYPE_PROPERTY_ID;
public static final String YARN_USER_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + USER_PROPERTY_ID;
public static final String YARN_QUEUE_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + QUEUE_PROPERTY_ID;
public static final String YARN_STATE_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + STATE_PROPERTY_ID;
public static final String YARN_FINAL_STATE_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + FINAL_STATE_PROPERTY_ID;
public static final String YARN_PROGRESS_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + PROGRESS_PROPERTY_ID;
public static final String YARN_TRACKINGURL_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + TRACKINGURL_PROPERTY_ID;
public static final String YARN_LOG_INFO_PROPERTY_ID = YARN_RESOURCE_CATEGORY + "/" + LOG_INFO_PROPERTY_ID;

/**
* The key property ids for a Yarn resource.
*/
private static Map<Resource.Type, String> keyPropertyIds = ImmutableMap.<Resource.Type, String>builder()
.put(Resource.Type.Yarn, YARN_APPLICATION_ID_PROPERTY_ID)
.build();

/**
* The property ids for a Yarn resource.
*/
public static Set<String> propertyIds = Sets.newHashSet(
YARN_APPLICATION_ID_PROPERTY_ID,
YARN_APPLICATION_NAME_PROPERTY_ID,
YARN_APPLICATION_TYPE_PROPERTY_ID,
YARN_USER_PROPERTY_ID,
YARN_QUEUE_PROPERTY_ID,
YARN_STATE_PROPERTY_ID,
YARN_FINAL_STATE_PROPERTY_ID,
YARN_PROGRESS_PROPERTY_ID,
YARN_TRACKINGURL_PROPERTY_ID,
YARN_LOG_INFO_PROPERTY_ID);

public YarnResourceProvider(AmbariManagementController managementController) {
super(Resource.Type.Yarn, propertyIds, keyPropertyIds, managementController);
}

@Override
protected Set<String> getPKPropertyIds() {
return new HashSet<>(keyPropertyIds.values());
}


@Override
public Set<Resource> getResources(Request request, Predicate predicate)
throws SystemException, UnsupportedPropertyException, NoSuchResourceException, NoSuchParentResourceException {

LOG.info("Into YarnResourceProvider:getResource.");
final Set<YarnRequest> requests = new HashSet<>();

if (predicate == null) {
LOG.info("Predicate is null.");
requests.add(getRequest(null));
} else {
LOG.info("Predicate not null.");
for (Map<String, Object> propertyMap : getPropertyMaps(predicate)) {
YarnRequest req = getRequest(propertyMap);
requests.add(req);
}
}

// 获取 Yarn 资源
Set<YarnResponse> responses = getYarnResources(requests);

// 构建返回数据
Set<String> requestedIds = getRequestPropertyIds(request, predicate);
Set<Resource> resources = new HashSet<>();

for (YarnResponse yarnResponse : responses) {
ResourceImpl resource = new ResourceImpl(Resource.Type.Yarn);

setResourceProperty(resource, YARN_APPLICATION_ID_PROPERTY_ID,
yarnResponse.getApplicationId(), requestedIds);

setResourceProperty(resource, YARN_APPLICATION_NAME_PROPERTY_ID,
yarnResponse.getApplicationName(), requestedIds);

setResourceProperty(resource, YARN_APPLICATION_TYPE_PROPERTY_ID,
yarnResponse.getApplicationType(), requestedIds);

setResourceProperty(resource, YARN_USER_PROPERTY_ID,
yarnResponse.getUser(), requestedIds);

setResourceProperty(resource, YARN_QUEUE_PROPERTY_ID,
yarnResponse.getQueue(), requestedIds);

setResourceProperty(resource, YARN_STATE_PROPERTY_ID,
yarnResponse.getState(), requestedIds);

setResourceProperty(resource, YARN_FINAL_STATE_PROPERTY_ID,
yarnResponse.getFinalState(), requestedIds);

setResourceProperty(resource, YARN_PROGRESS_PROPERTY_ID,
yarnResponse.getProgress(), requestedIds);

setResourceProperty(resource, YARN_LOG_INFO_PROPERTY_ID,
yarnResponse.getLogInfo(), requestedIds);

resources.add(resource);
}

return resources;
}

/**
* 获取 Yarn 资源信息
*
* @param requests requests
* @return Set<YarnResponse>
*/
private Set<YarnResponse> getYarnResources(Set<YarnRequest> requests) throws NoSuchResourceException {
Set<YarnResponse> responsesSet = new HashSet<>();

// 获取 resourceManager 所在的节点
String hostName = getHostName();

for (YarnRequest request : requests) {
String applicationId = request.getApplicationId();

/**
* 如果 applicationId 为空,则返回所有 application 信息
* 如果 applicationId 不为空,则查询日志
*/
if (null == applicationId) {
LOG.info("applicationId is null.");

// 执行语句
String[] remoteAppListCmdStr = new String[]{"ssh", hostName, "yarn", "application", "-list", "-appStates", "ALL"};
printCmd(remoteAppListCmdStr);

String stdout = null;
try {

ShellCommandUtil.Result cmdResult = ShellCommandUtil.runCommand(remoteAppListCmdStr);

// 判断结果码
if (cmdResult.getExitCode() == 0) {
stdout = cmdResult.getStdout();

if (stdout != null) {

// 解析输出结果
responsesSet = parseApplicationInfo(stdout);
} else {
LOG.error("Run command stdout is empty!");
}
} else {
LOG.error("Run command error,exitCode is:{}.", cmdResult.getExitCode());
throw new SystemException("Run command error!");
}
} catch (Exception e) {
LOG.error("Run command error - Executor exception ({})", e.getMessage());
}

} else {
LOG.info("ApplicationId is:{}.", applicationId);

try {
String appLogCmdStr = "su hdfs -c \'yarn logs -applicationId " + applicationId + "\'";
String[] remoteAppLogCmdStr = new String[]{"ssh", hostName, appLogCmdStr};
printCmd(remoteAppLogCmdStr);

LOG.info("Start execute shell cmd.");
ShellCommandUtil.Result cmdResult = ShellCommandUtil.runCommand(remoteAppLogCmdStr);
LOG.info("Execute finish.");

LOG.info("cmdResult exitCode:{}", cmdResult.getExitCode());
if (cmdResult.getExitCode() == 0) {
String stdout = cmdResult.getStdout();

YarnResponse yarnResponse = new YarnResponse();
yarnResponse.setLogInfo(stdout);
yarnResponse.setApplicationId(applicationId);
responsesSet.add(yarnResponse);
} else {
LOG.error("Run command error,exitCode is:{}.", cmdResult.getExitCode());
throw new SystemException("Run command error!");
}

} catch (Exception e) {
LOG.error("Run command error - Executor exception ({})", e.getMessage());
}
}
}

return responsesSet;
}

/**
* 获取 resourcemanager 节点
*/
private String getHostName() throws NoSuchResourceException {
String hostName = null;
String serviceName = "YARN";
String componentName = "RESOURCEMANAGER";

List<HostComponentStateEntity> hostComponentStateEntities = hostComponentStateDAO.findByServiceAndComponent(serviceName, componentName);
HostComponentStateEntity hostComponentStateEntity = hostComponentStateEntities.get(0);
if (hostComponentStateEntity != null) {
hostName = hostComponentStateEntity.getHostName();
} else {
throw new NoSuchResourceException("Not find resourceManager in this cluster!");
}

return hostName;
}

/**
* 解析 application 信息
*
* @param stdout 输出信息
*/
private Set<YarnResponse> parseApplicationInfo(String stdout) {
Set<YarnResponse> yarnResponseSet = new HashSet<>();
// 将字符串转换为字节流
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(stdout.getBytes(Charset.forName("utf8")));
BufferedReader br = new BufferedReader(new InputStreamReader(byteArrayInputStream, Charset.forName("utf8")));

/**
* 逐行读取
* 只解析以“application_”开头的行
*/
try {
String line;
while ((line = br.readLine()) != null) {

if (line.startsWith("application_")) {

String[] property = line.split("\t");

YarnResponse yarnResponse = new YarnResponse();

yarnResponse.setApplicationId(property[0].trim());
yarnResponse.setApplicationName(property[1].trim());
yarnResponse.setApplicationType(property[2].trim());
yarnResponse.setUser(property[3].trim());
yarnResponse.setQueue(property[4].trim());
yarnResponse.setState(property[5].trim());
yarnResponse.setFinalState(property[6].trim());
yarnResponse.setProgress(property[7].trim());
yarnResponse.setTrackingUrl(property[8].trim());

yarnResponseSet.add(yarnResponse);
}
}
br.close();
} catch (IOException e) {
e.printStackTrace();
}

return yarnResponseSet;
}

/**
* 打印执行命令
*
* @param cmd 命令字符串
*/
private void printCmd(String[] cmd) {

String cmdStr = "";
for (String str : cmd) {
cmdStr = cmdStr + str + " ";
}

LOG.info("Cmd str is:{}.", cmdStr);
}

private YarnRequest getRequest(Map<String, Object> properties) {
YarnRequest yarnRequest = new YarnRequest(null);
if (properties == null) {
return yarnRequest;
}

Object object = properties.get(YARN_APPLICATION_ID_PROPERTY_ID);
if (object != null) {
String applicationId = properties.get(YARN_APPLICATION_ID_PROPERTY_ID).toString();
LOG.info("Request:applicationId:{}", applicationId);

yarnRequest = new YarnRequest(applicationId);
}

return yarnRequest;
}

}

接口测试

这里使用 Postman 进行接口调用测试,分别测试查询全部 Yarn任务、模糊查询 Yarn 任务和查询某个 Yarn 任务的日志。

  • 获取全部 Yarn 任务

查询全部 Yarn 任务,并且分页查询,每页显示 10 个,从第 1 个数据开始,以 applicationId 的倒序查询。

http://IP:8080/api/v1/yarn?fields=yarn/*&page_size=10&from=0&sortBy=yarn/applicationId.desc

  • 模糊查询 Yarn 任务

模糊查询符合 applicationId 条件的 Yarn 任务。

http://IP:8080/api/v1/yarn?fields=yarn/*&page_size=10&from=0&sortBy=yarn/applicationId.desc

Body:{"RequestInfo":{"query":"yarn/applicationId.matches(.*002.*)"}}

  • 获取单个 Yarn 任务的日志

查询某个 applicationId 的日志。

http://IP:8080/api/v1/yarn/application_1606871223118_0001?fields=yarn/applicationId,yarn/logInfo

至此,自定义资源类型的查询功能开发完毕,也可以仿照其他资源类型开发增加(POST)、修改(PUT)和删除(DELETE)功能。

总结

主要介绍了自定义 Ambari 资源的流程,主要分为添加资源类型、新建控制器层、新建资源 Provider、新建资源定义、新建 reqquest 和 response等,涉及到数据库表的时候还要添加数据库的 DAO 层和 Entity 类,具体实现逻辑在资源 Provider 中实现。