server的actor(2024-08-21)
FriendActor(处理其他服务器的请求)
探活(非阻塞的)
直接返回当前服务器注册在hashmap中的所有其他sever服务器
处理其他服务器的请求(阻塞)
服务器之间交互直接通过class文件+方法的方式,直接通过反射创建对应实体类和方法来执行
WorkerRequestHandlerImpl(处理worker请求)
workerHeartbeat(接收worker的心跳)非阻塞
内部维护着一个worker集群状态的map,如果对应appid集群状态有修改,则更新map。
处理完后,写入日志监控器
reportInstanceStatus(处理tasktracker上报的任务实例状态)阻塞的
- 创建一个已经完成任务的事件。
- 更新工作流中对应的任务
- 更新任务日志
丢弃掉晚上报的请求
丢弃掉不是server任务管理器中执行机器上报的任务
reportLog(处理日志)非阻塞
构造好接收的任务信息,把信息入库
queryJobCluster(查询任务的可执行集群)阻塞的
关于MapReduce的调研(2024-08-26)
任务拆分
拆分任务和任务实际的执行逻辑(业务代码),用户只需要自定义任务如何拆分和业务代码。
1
2
3
4
5
6
7
8
9
| graph TD
a1[新建一个重量级任务]-->a
a{判断当前任务是否为根任务}-->|yes|b[开始分发任务]
b-->c[构造子任务]
c-->d[拆分任务,从任务的参数中取出总数和每个子任务的大小]
d-->d1[按拆分后的任务新建一个子task,将构造的子任务,\n模拟器一个请求,发送给当前机器]
d1-->d2[当前机器接收到请求,把所有分段的任务,\n保存到数据库中]
a-->|no|b1[开始执行当前任务,根据任务的状态返回执行结果]
|

通过TaskTracker来处理子任务
上回书说到map会将大任务拆分成子任务保存到自己机器的数据库中
拆分后每个任务分片的调度原理,使用者无感
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| graph TD
a[初始化tasktracker]-->b[初始化定时任务线程池]
b-->bb[向线程池中提交三种任务]
bb-->b1[定时检查当前任务的执行状态,3秒一次]
bb-->bb2{是否为MAP_REDUCE任务}
bb-->b3[定时扫描数据库中的task,\n出于内存占用量考虑,每次最多获取100个,\n并将需要执行的任务派发出去]
bb2-->|yes|b2[执行器动态上线,1分钟一次:\n检测是否需要更多的worker节点执行任务]
b1-->b11[从数据库中统计出子任务的运行状态\n主要是个状态的数量]
b11-->b12{未完成的任务数量是否为0\n用来判断任务是否真的执行结束}
b12-->|yes|b13[根据任务的类型做不同的处理\n单机执行:再查一遍数据库,直接认为任务完成\nMAP:如果没有失败的任务就认为任务完成]
b13-->b14{other:根据终极任务名称和任务id查询数据库中是否存在终极任务}
b14-->|yes|b15[无论终极任务执行失败还是成功,都会任务当前任务执行成功]
b14-->|no|b16[根据当前任务id新建一条终极任务提交给当前机器,\n必须让当前机器执行一遍终极任务]
b12-->|no|b17[检测任务是否超时,把任务执行状态上报给server服务器]
b15-->b17
b16-->b17
b17-->b18[判断是否存在之前未确认的任务,重新发送未确认任务]
b18-->b19[检查有多少已宕机的ProcessorTracke,上面的任务重新派发\n删除掉宕机的机器]
b2-->b21[判断是否需要动态加载新的执行器\n没有执行器或者可用的执行器小于配置的最大执行器数量]
b21-->b22[向server端发送请求查询当前任务所有的可执行worker]
b22-->b23[把所有可执行worker注册到ProcessTracker状态管理]
b3-->b31[从任务管理器中取出所有可以执行的worker地址]
b31-->b32[从数据库中查出当前根任务下所有等待调度的子任务]
b32-->b33[通过取模算出当前任务需要执行的机器,给固定机器派发任务]
b33-->b34[把当前任务更新为已调度,给目标机器发送任务开始命令]
|
