流程实例流转过程中,会生成各种任务,并且在不同的任务类型之间转换,比如待办、待阅、已办、已阅,或者由于抽单或者授权等功能删除特定用户的任务。
在系统集成领域,通常需要实现 BPM 与门户统一待办系统、即时通讯系统、邮件/短信(SMS)等系统的集成,但是在集成过程中,上述各种复杂的业务场景常常会造成各种各种不可预知的问题,并导致集成困难、调试困难。
因此,基于上述的集成需求,为了简化开发,BroBPM 提供了任务提醒服务,通过监控相应的任务事件,即可实现流程任务与第三方系统的集成。
流程提醒服务的核心是 ProcessNotificationService,通过监听 Task 实例的 afterInsert、afterUpdate、afterDelete 事件,过滤后重新封装成命名空间为 bro-bpm(即常量 bropen.bpm.Constants.PLUGIN_NAME) 的两个事件:
- 异步事件:主题为 taskAsync,用于待办提醒等;
- 同步事件:主题为 task,用于统一待办同步等实时性要求较高的操作,监听器中需处理异常,以免影响其他监听器的工作。
这两个事件都通过事件总线触发,因而可以使用注解 @Listener 对其进行监听,并做进一步处理(如同步到统一待办系统、发送短信通知等),关于事件总线的详细说明可以参考《grails-platform-core插件的事件总线(Events Bus)》。
事件的 data 参数中包含了:
- event:
任务变更操作类型,有 create、update、delete 等三种类型,分别表示新建一个任务、任务更新、任务删除;
需要注意的是,一种业务场景可能会同时触发多个变更,比如抽单操作会触发一个更新操作和一个或多个删除操作;任务类型的变化也可能会触发更新与删除操作。 - task:
bropen.bpm.instance.Task 实例 - deletedActors:
如果 event 为 delete,则包含所有被删除的、不再拥有该任务的员工列表,如任务的 actor、actorSubstitute 等属性;否则为空。
BroBPM 中通过插件 TencentRTXService 实现了 RTX 的消息提醒,代码摘录如下:
class TencentRTXService extends bropen.framework.plugins.im.TencentRTXService {
....
/**
* 监听需要有人参与的任务的创建/更新/删除事件<p>
*
* @param data 包含event、task等元素的Map
*/
@grails.events.Listener(namespace = bropen.bpm.Constants.PLUGIN_NAME, topic = 'taskAsync')
void sendTaskMessage( Map data ) {
sendMessage( data.event, data.task )
}
/**
* 发送任务消息
*/
void sendMessage( String event, Task task ) {
if ( !taskMessageEnabled ) return
if ( event == "delete" ) return
if ( task.type != Task.TYPE_TODO ) return
if ( !task.actorId ) return
// 发送消息
if ( (task.status=="created" || task.status=="started") ) {
if ( event=="create" || (task.status!=task.prevStatus && !(task.prevStatus=="created" && task.status=="started")) ) {
// 拼接消息内容(链接不支持:只能放个链接让rtx自动识别,会很长!)
StringBuilder title = new StringBuilder()
title << "待办提醒:" << task.processInst.definitionName
StringBuilder content = new StringBuilder()
content << task.title
content << "\r\n发 送 人: " << task.sender
content << "\r\n收到时间: " << DateUtils.formatDatetime(new Date())
content << "\r\n点击打开: " << SDK_URL << "/bropen/sso/reverse_brobpm_redirector.html?redirect="
content << GrailsUtils.getServerURL() << "process/edit/" << task.processInstId << "?taskId=" << task.id
// 发送消息
try {
List<Long> actorIds = [task.actorId]
if ( task.substituteType == Substitute.TYPE_ANY ) actorIds << task.actorSubstituteId // 代理人和授权人都产生待办,任意一人处理即可
else if ( task.substituteType == Substitute.TYPE_MOVE ) actorIds = [task.actorSubstituteId] // 仅代理人产生代办的情况
List<String> usernames = User.executeQuery("select username from bropen.framework.core.security.User where employee.id in (${actorIds.join(',')}) and enabled=1")
String username = usernames.join(",")
if ( username ) sendMessage(username, title.toString(), content.toString(), null)
} catch (Exception e) {
}
}
}
}
}
在上面的代码中,方法 sendTaskMessage 监听了 taskAsync 事件,过滤出新的待办任务(event == "create")、或者任务被重新修订为待办的任务,通过 GrailsUtils.getServerURL() 方法拼装出任务的 URL 后,调用父类的 sendMessage API,给待办任务的办理人或授权办理人发出待办提醒。
用户通过 RTX 接收到待办后,通过 RTX to BroBPM 的反向 SSO 接口,可以直接打开流程任务并办理。
由于事件总线不能跨 JVM 执行、也无法在浏览器进行监听,因此 BroBPM 还会通过插件 CometdService 服务,将事件重新封装后,向整个应用集群广播任务的 create、update、delete 事件,广播的根频道为 /broadcast/bro-bpm/process/task,并分为下面四个子频道:
- /todo/employeeId:ID为 employeeId 的员工收到待办任务
- /toread/employeeId:收到待阅任务
- /draft/employeeId:新建了一条草稿任务
- /done/employeeId:员工的已办、已阅、任务挂起、任务删除等事件
CometD事件的监听详情可参考《基于 CometD 的消息总线》。
如下面的代码 javascript 片段,通过监听待办频道,在门户的首页弹出待办任务提醒(调用 floatingNotify 方法),点击链接即可打开待办任务进行办理:
// 计算频道
var employeeId = 123;
var pluginName = "bro-bpm";
var floatingTaskNotifies = {};
var taskTodoChannel = "/broadcast/"+pluginName+"/process/task/todo/"+employeeId;
var taskDoneChannel = "/broadcast/"+pluginName+"/process/task/done/"+employeeId;
// 预订频道
$j.cometd.ready(function(){
// 监听并提示待办信息
$j.cometd.subscribe(taskTodoChannel, function(message){
// 根据任务信息,弹出待办提示,显示在右下角
var task = message.data;
var title = task.createTime.split(" ")[1] + " " + m('portal.task.notify.title') + task.definitionName;
var timeout = floatingTaskNotifyTimeout;
var containerId = "floatingTaskNotify" + task.id;
var containerTitle = task.title;
var notifyImgUrl = floatingTaskNotifyImg;
var content = ". <a href='process/edit/" + task.processInstId + "?taskId=" + task.id + "' target='_blank'>"
+ ((task.title.length > 20) ? (task.title.substring(0,19)+"...") : task.title).escapeXML() + "</a>\n";
+ ". <a href='process/edit/" + task.processInstId + "?taskId=" + task.id + "' target='_blank'>"
+ m('portal.task.notify.sender') + task.sender + "</a>";
floatingTaskNotifies[task.id] = floatingNotify(title, content, timeout, containerId, containerTitle, notifyImgUrl);
});
// 监听已办,隐藏上次提示的待办信息
$j.cometd.subscribe(taskDoneChannel, function(message) {
if ( floatingTaskNotifies[message.data.id] )
floatingTaskNotifies[message.data.id].floatingMessage("destroy");
});
});
上述两个示例中,都是监听 taskAsync 主题,即异步事件,对于消息提醒来说,异步事件的时间延迟完全在可以接受的范围内。
但是对于统一待办集成,则该延迟可能会导致首页待办无法即时刷新的问题,此时,可以采用 task 主题,即同步事件主题。
下面是 BroBPM 的工作任务拉取与推送服务 WorkbenchTasksEndpoint 中的代码片段,通过监听同步事件主题,将任务推送到统一待办系统中:
/**
* 同步监听草稿、待办、待阅等类型任务的更新/删除事件。
*/
@WebMethod(exclude = true)
@grails.events.Listener(namespace = bropen.bpm.Constants.PLUGIN_NAME, topic = 'task')
void taskListener( Map data ) {
try {
//log.info "WorkbenchTasksEndpoint=" + System.currentTimeMillis()
String event = data.event
Task task = data.task
if ( event == "create" || event == "update" ) {
pushTask( task )
} else if ( event == "delete" && data.deletedActors ) {
pushTask(task, data.deletedActors)
}
} catch ( Exception e ) {
errorService.push( "Process task listener error: " + e.message, e );
}
}
/**
* 将更新的任务推送到Workbench:调用统一待办的推送服务
*/
private void pushTask( Task task, List<Employee> deletedActors = null ) {
if ( null == WORKBENCH_TASKS_PUSH_SRV_URL ) return;
// 异步运行,提升性能
executorService.submit({
try {
SOAPClient client = new SOAPClient( WORKBENCH_TASKS_PUSH_SRV_URL );
String soapenv = generateWorkbenchPushSoapenv( task, deletedActors );
SOAPResponse resp = client.send( soapenv );
//
def xml = new XmlParser().parseText(resp.text);
def result = xml."soap:Body"[0].iterator().next().result;
if ( result.success.text() == "false" ) {
errorService.push( "Push task to workbench failed: " + result.message.text() );
}
} catch(Exception e) {
errorService.push( "Push task to workbench failed: " + e.message, e );
}
} as java.util.concurrent.Callable)
}
/**
* 为pushTask生成soap封包
*/
private generateWorkbenchPushSoapenv( Task task, List<Employee> deletedActors ) {
// 生成soap请求的xml封包
StringBuilder soapenv = new StringBuilder()
try {
soapenv << '<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.workbench.bropen.com.cn/">'
soapenv << '<soapenv:Header/><soapenv:Body>'
// 删除
if ( deletedActors ) {
soapenv << '<ser:deleteTasks>'
soapenv << '<appCode>' << WORKBENCH_TASKS_PUSH_APP_CODE << '</appCode>'
for ( Employee actor in deletedActors ) {
soapenv << '<deletedTasks>'
soapenv << '<taskId>' << task.id << '</taskId>'
String username = User.executeQuery("select usernameEver from bropen.framework.core.security.User where id=?", [actor.userId])[0];
soapenv << '<ownerUser>' << username << '</ownerUser>'
soapenv << '</deletedTasks>'
}
soapenv << '</ser:deleteTasks>'
}
// 更新
else {
List<Employee> actors = [];
if ( task.substituteType != Substitute.TYPE_MOVE ) actors << task.actor;
else if ( task.actorSubstitute ) actors << task.actorSubstitute;
BpmProcessInstance process = task.processInst;
//
String taskUrl = getTaskUrl([applicationId: process.applicationId, completeTime: task.completeTime, docId: process.id, taskId: task.id])
Calendar cal = Calendar.getInstance();
String startTime = null;
if ( task.startTime ) {
cal.setTime( task.startTime )
startTime = javax.xml.bind.DatatypeConverter.printDateTime(cal);
}
String completeTime = null;
if ( task.completeTime ) {
cal.setTime( task.completeTime )
completeTime = javax.xml.bind.DatatypeConverter.printDateTime(cal);
}
//
soapenv << '<ser:updateTasks>'
soapenv << '<appCode>' << WORKBENCH_TASKS_PUSH_APP_CODE << '</appCode>'
for ( Employee actor in actors ) {
soapenv << '<tasks>'
soapenv << '<taskType>' << getTaskType(task) << '</taskType>'
soapenv << '<definition>' << process.definitionName.encodeAsHTML() << '</definition>'
soapenv << '<status>' << getProcessStatus(process) << '</status>'
soapenv << '<title>' << task.title?.encodeAsHTML() << '</title>'
soapenv << '<docId>' << process.id << '</docId>'
soapenv << '<docNumber>' << (process.dataNumber?:"") << '</docNumber>'
soapenv << '<docDrafter>' << (process.initiatorName?:"") << '</docDrafter>'
soapenv << '<docDraftOrg>' << (process.organizationFullName?:"")?.encodeAsHTML() << '</docDraftOrg>'
soapenv << '<taskId>' << task.id << '</taskId>'
soapenv << '<senderName>' << (task.sender?:"") << '</senderName>'
soapenv << '<ownerName>' << actor.name << '</ownerName>'
String username = User.executeQuery("select usernameEver from bropen.framework.core.security.User where id=?", [actor.userId])[0];
soapenv << '<ownerUser>' << username << '</ownerUser>'
if ( startTime ) soapenv << '<startTime>' << startTime << '</startTime>'
if ( completeTime ) soapenv << '<completeTime>' << completeTime << '</completeTime>'
soapenv << '<priority>' << getPriority(process) << '</priority>'
soapenv << '<url>' << taskUrl << '</url>'
soapenv << '</tasks>'
}
soapenv << '</ser:updateTasks>'
}
soapenv << '</soapenv:Body></soapenv:Envelope>'
} catch (Exception e) {
errorService.push( "Generate workbench push soap xml failed.", e );
return null;
}
return soapenv.toString();
}
需要注意的是,同步事件执行必须捕捉并处理异常,以免影响其他同步事件的执行。
BTW:这里是实现是任务推送服务,BroBPM同时提供了任务的增量抓取的Web服务,以便统一待办系统直接抓取数据,接口详情可以访问应用的URL /Foobar/services/workbenchTasks?wsdl;或者联系我们索取独立的统一待办系统及解决方案。