因为在工作中需要推动 Apache 的升级,经过预研,从 1.3.4 到 3.1.2 有的体验了很大的提升,在性能和功能性有了很多的改善,推荐升级。
查看官方的升级文档,可知有提供升级脚本,如果只是跨小版本的更新那么只用执行脚本就好了,但跨多个大版本升级时依然容易出现各种问题,特此总结。
> 旧版本:1.3.4 <br> 新版本:3.1.2
升级完成后使用资源中心报错 IllegalArgumentException: Failed to specify server's Kerberos principal name
资源中心使用的 HDFS,开启了 kerberos
认证
解决方法:
编辑 dolphinscheduler/api-server/conf/hdfs-site.xml
添加以下内容
<property> <name>dfs.namenode.kerberos.principal.pattern</name> <value>*</value></property>
升级完成后查看任务实例的日志,报错未找到日志,查看报错信息,检查新版本的目录结构和表里的日志路径,发现原因是新版本的日志路径有变更。
升级前的日志路径在 /logs/
下。
升级后的日志路径在 /worker-server/logs/
下。
因此需要修改这里的目录
解决方法: 执行 SQL 修改日志路径
update t_ds_task_instance set log_path=replace(log_path,'/logs/','/worker-server/logs/');
然后将原日志文件 copy 到新的日志路径
cp -r {旧版本dolphinscheduler目录}/logs/[1-9]* {新版本dolphinscheduler目录}/worker-server/logs/*
查看报错信息,原因是 t_ds_process_definition_log
和 t_ds_process_definition
主键的初始值不一致,那么修改成一致的就好了!
解决方法: 执行 SQL
# 查出主键自增值select AUTO_INCREMENT FROM information_schema.TABLES WHERE TABLE_SCHEMA = 'dolphinscheduler' AND TABLE_NAME = 't_ds_process_definition' limit 1# 将上面SQL的执行结果填写到下方参数处执行alter table dolphinscheduler_bak1.t_ds_process_definition_log auto_increment = {max_id};
检查查询的 SQL
在 dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
文件里,select id="queryTaskInstanceListPaging"
的 SQL
select <include refid="baseSqlV2"> <property name="alias" value="instance" /> </include> , process.name as process_instance_name from t_ds_task_instance instance left join t_ds_task_definition_log define on define.code=instance.task_code and define.version=instance.task_definition_version left join t_ds_process_instance process on process.id=instance.process_instance_id where define.project_code = #{projectCode} <if test="startTime != null"> and instance.start_time <![CDATA[ >=]]> #{startTime} </if> ......省略多余部分
查询任务实例列表的 SQL 会关联 t_ds_task_definition_log
表,经检查发现是 define.code=instance.task_code
这一句关联不上。
结合下面的查询条件 define.project_code = #{projectCode}
可知,关联 t_ds_task_definition_log
主要是为了过滤 projectCode
,那么来修改下这个 SQL:
解决方法:
select <include refid="baseSqlV2"> <property name="alias" value="instance" /> </include> , process.name as process_instance_name from t_ds_task_instance instance-- left join t_ds_task_definition_log define -- on define.code=instance.task_code and -- define.version=instance.task_definition_version join t_ds_process_instance process on process.id=instance.process_instance_id join t_ds_process_definition define on define.code=process.process_definition_code where define.project_code = #{projectCode} <if test="startTime != null"> and instance.start_time <![CDATA[ >=]]> #{startTime} </if> ......省略多余部分
直接用 t_ds_process_definition
关联,也有 project_code
字段可以用来关联过滤,这里修改后就能查出数据了。
查看代码
513 if (TASK_TYPE_SUB_PROCESS.equals(taskType)) {514 JsonNode jsonNodeDefinitionId = param.get("processDefinitionId");515 if (jsonNodeDefinitionId != null) {516 param.put("processDefinitionCode",517 processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode());518 param.remove("processDefinitionId");519 }520 }
很明显是 processDefinitionMap.get(jsonNodeDefinitionId.asInt())
返回了 null, 加个 null 判断,如果返回 null 直接跳过,并将相关信息打印出来,升级结束后可以根据日志核对。
解决方法:
修改后:
if (jsonNodeDefinitionId != null) { if (processDefinitionMap.get(jsonNodeDefinitionId.asInt()) != null) { param.put("processDefinitionCode",processDefinitionMap.get(jsonNodeDefinitionId.asInt()).getCode()); param.remove("processDefinitionId"); } else { logger.error("*******************error"); logger.error("*******************param:" + param); logger.error("*******************jsonNodeDefinitionId:" + jsonNodeDefinitionId); }}
查看代码
669 if (mapEntry.isPresent()) {670 Map.Entry<long, map<string, long>> processCodeTaskNameCodeEntry = mapEntry.get();671 dependItem.put("definitionCode", processCodeTaskNameCodeEntry.getKey());672 String depTasks = dependItem.get("depTasks").asText();673 long taskCode =674 "ALL".equals(depTasks) || processCodeTaskNameCodeEntry.getValue() == null ? 0L675 : processCodeTaskNameCodeEntry.getValue().get(depTasks);676 dependItem.put("depTaskCode", taskCode);677 }
很明显是 processCodeTaskNameCodeEntry.getValue().get(depTasks)
返回了 null,修改下逻辑,不为 null 才赋值并打印相关日志。
解决方法:
修改后:
long taskCode =0; if (processCodeTaskNameCodeEntry.getValue() != null &&processCodeTaskNameCodeEntry.getValue().get(depTasks)!=null){ taskCode =processCodeTaskNameCodeEntry.getValue().get(depTasks); }else{ logger.error("******************** depTasks:"+depTasks); logger.error("******************** taskCode not in "+JSONUtils.toJsonString(processCodeTaskNameCodeEntry)); } dependItem.put("depTaskCode", taskCode);
可在 api-server/conf/application.yaml
配置接入 LDAP
security: authentication: # Authentication types (supported types: PASSWORD,LDAP) type: LDAP # IF you set type `LDAP`, below config will be effective ldap: # ldap server config urls: xxx base-dn: xxx username: xxx password: xxx user: # admin userId when you use LDAP login admin: xxx identity-attribute: xxx email-attribute: xxx # action when ldap user is not exist (supported types: CREATE,DENY) not-exist-action: CREATE
要成功接入 LDAP 至少需要 urls,base-dn,username,password,identity
和 email 正确填写,不知道 email 字段名可以按下面的方式处理,email 先空着
启动服务后用 LDAP 用户登录
解决办法: LDAP 认证的代码在 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/security/impl/ldap/LdapService.java
的 ldapLogin()
ctx = new InitialLdapContext(searchEnv, null);SearchControls sc = new SearchControls();sc.setReturningAttributes(new String[]{ldapEmailAttribute});sc.setSearchScope(SearchControls.SUBTREE_SCOPE);EqualsFilter filter = new EqualsFilter(ldapUserIdentifyingAttribute, userId);NamingEnumeration<searchresult> results = ctx.search(ldapBaseDn, filter.toString(), sc);if (results.hasMore()) { // get the users DN (distinguishedName) from the result SearchResult result = results.next(); NamingEnumeration<!--? extends Attribute--> attrs = result.getAttributes().getAll(); while (attrs.hasMore()) { // Open another connection to the LDAP server with the found DN and the password searchEnv.put(Context.SECURITY_PRINCIPAL, result.getNameInNamespace()); searchEnv.put(Context.SECURITY_CREDENTIALS, userPwd); try { new InitialDirContext(searchEnv); } catch (Exception e) { logger.warn("invalid ldap credentials or ldap search error", e); return null; } Attribute attr = attrs.next(); if (attr.getID().equals(ldapEmailAttribute)) { return (String) attr.get(); } }}
第三行会根据填的字段过滤,先注释第三行
// sc.setReturningAttributes(new String[]{ldapEmailAttribute});
重新执行后第 10 行会返回全部字段
NamingEnumeration<!--? extends Attribute--> attrs = result.getAttributes().getAll();
通过打印或调试在里面找到 email 字段填到配置文件里,再还原上面注释的代码,重启服务后即可正常接入 LDAP 登录。
经多次测试,发现普通用户只能看到所属用户为自己的资源文件,管理员授权后依然无法查看资源文件
解决办法:
文件 dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/permission/ResourcePermissionCheckServiceImpl.java
的 listAuthorizedResource()
方法,将 return 的集合修改为 relationResources
@Override public Set<integer> listAuthorizedResource(int userId, Logger logger) { List<resource> relationResources; if (userId == 0) { relationResources = new ArrayList<>(); } else { // query resource relation List<integer> resIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 0); relationResources = CollectionUtils.isEmpty(resIds) ? new ArrayList<>() : resourceMapper.queryResourceListById(resIds); } List<resource> ownResourceList = resourceMapper.queryResourceListAuthored(userId, -1); relationResources.addAll(ownResourceList); return relationResources.stream().map(Resource::getId).collect(toSet()); // 解决资源文件授权无效的问题// return ownResourceList.stream().map(Resource::getId).collect(toSet()); }
检查新版本的 Change log ,发现在 3.1.3 版本修复了这个 bug
https://github.com/apache/dolphinscheduler/pull/13318
因为 kerberos 配置了票据过期时间,一段时间后资源中心的 hdfs 资源将无法访问,最好的解决办法是添加定时更新凭证的相关逻辑。
解决办法:
在文件 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/CommonUtils.java
添加方法
/** * * 定时更新凭证 */ private static void startCheckKeytabTgtAndReloginJob() { // 每天循环,定时更新凭证 Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(() -> { try { UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab(); logger.warn("Check Kerberos Tgt And Relogin From Keytab Finish."); } catch (IOException e) { logger.error("Check Kerberos Tgt And Relogin From Keytab Error", e); } }, 0, 1, TimeUnit.DAYS); logger.info("Start Check Keytab TGT And Relogin Job Success."); }
然后在该文件的 loadKerberosConf
方法返回 true
前调用:
public static boolean loadKerberosConf(String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath, Configuration configuration) throws IOException { if (CommonUtils.getKerberosStartupState()) { System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF, StringUtils.defaultIfBlank(javaSecurityKrb5Conf, PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH))); configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, Constants.KERBEROS); UserGroupInformation.setConfiguration(configuration); UserGroupInformation.loginUserFromKeytab( StringUtils.defaultIfBlank(loginUserKeytabUsername, PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME)), StringUtils.defaultIfBlank(loginUserKeytabPath, PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH))); startCheckKeytabTgtAndReloginJob(); // 此处调用 return true; } return false; }
这篇文章主要是记录升级过程中遇到的问题,希望能够对大家有所帮助!