欢迎进入Wiki » FAQ » 使用流程提醒服务实现统一待办任务推送与消息提醒功能?

使用流程提醒服务实现统一待办任务推送与消息提醒功能?

在2014-08-04 23:52上被李小翔修改
评论 (0) · 附件 (0) · 记录 · 信息

流程实例流转过程中,会生成各种任务,并且在不同的任务类型之间转换,比如待办、待阅、已办、已阅,或者由于抽单或者授权等功能删除特定用户的任务。

在系统集成领域,通常需要实现 BPM 与门户统一待办系统、即时通讯系统、邮件/短信(SMS)等系统的集成,但是在集成过程中,上述各种复杂的业务场景常常会造成各种各种不可预知的问题,并导致集成困难、调试困难。

因此,基于上述的集成需求,为了简化开发,BroBPM 提供了任务提醒服务,通过监控相应的任务事件,即可实现流程任务与第三方系统的集成。

流程提醒服务的核心是 ProcessNotificationService,通过监听 Task 实例的 afterInsert、afterUpdate、afterDelete 事件,过滤后重新封装成命名空间为 bro-bpm(即常量 bropen.bpm.Constants.PLUGIN_NAME) 的两个事件:

  • 异步事件:主题为 taskAsync,用于待办提醒等;
  • 同步事件:主题为 task,用于统一待办同步等实时性要求较高的操作,监听器中需处理异常,以免影响其他监听器的工作。

这两个事件都通过事件总线触发,因而可以使用注解 @Listener 对其进行监听,并做进一步处理(如同步到统一待办系统、发送短信通知等),关于事件总线的详细说明可以参考《grails-platform-core插件的事件总线(Events Bus)》。

事件的 data 参数中包含了:

  • event:
    任务变更操作类型,有 create、update、delete 等三种类型,分别表示新建一个任务、任务更新、任务删除;
    需要注意的是,一种业务场景可能会同时触发多个变更,比如抽单操作会触发一个更新操作和一个或多个删除操作;任务类型的变化也可能会触发更新与删除操作。
  • task:
    bropen.bpm.instance.Task 实例
  • deletedActors:
    如果 event 为 delete,则包含所有被删除的、不再拥有该任务的员工列表,如任务的 actor、actorSubstitute 等属性;否则为空。

BroBPM 中通过插件 TencentRTXService 实现了 RTX 的消息提醒,代码摘录如下:

class TencentRTXService extends bropen.framework.plugins.im.TencentRTXService {
   ....

   /**
     * 监听需要有人参与的任务的创建/更新/删除事件<p>
     *
     * @param data 包含event、task等元素的Map
     */

   @grails.events.Listener(namespace = bropen.bpm.Constants.PLUGIN_NAME, topic = 'taskAsync')
   void sendTaskMessage( Map data ) {
        sendMessage( data.event, data.task )
   }
   
   /**
     * 发送任务消息
     */

   void sendMessage( String event, Task task ) {
       if ( !taskMessageEnabled ) return
       if ( event == "delete" ) return
       if ( task.type != Task.TYPE_TODO ) return
       if ( !task.actorId ) return
       // 发送消息
       if ( (task.status=="created" || task.status=="started") ) {
           if ( event=="create" || (task.status!=task.prevStatus && !(task.prevStatus=="created" && task.status=="started")) ) {
               // 拼接消息内容(链接不支持:只能放个链接让rtx自动识别,会很长!)
               StringBuilder title = new StringBuilder()
                title << "待办提醒:" << task.processInst.definitionName
                StringBuilder content = new StringBuilder()
                content << task.title
                content << "\r\n发 送 人: " << task.sender
                content << "\r\n收到时间: " << DateUtils.formatDatetime(new Date())
                content << "\r\n点击打开: " << SDK_URL << "/bropen/sso/reverse_brobpm_redirector.html?redirect="
                content << GrailsUtils.getServerURL() << "process/edit/" << task.processInstId << "?taskId=" << task.id
               // 发送消息
               try {
                    List<Long> actorIds = [task.actorId]
                   if ( task.substituteType == Substitute.TYPE_ANY ) actorIds << task.actorSubstituteId        // 代理人和授权人都产生待办,任意一人处理即可
                   else if ( task.substituteType == Substitute.TYPE_MOVE ) actorIds = [task.actorSubstituteId]    // 仅代理人产生代办的情况
                   List<String> usernames = User.executeQuery("select username from bropen.framework.core.security.User where employee.id in (${actorIds.join(',')}) and enabled=1")
                    String username = usernames.join(",")
                   if ( username ) sendMessage(username, title.toString(), content.toString(), null)
               } catch (Exception e) {
               }
           }
       }
   }
}

在上面的代码中,方法 sendTaskMessage 监听了 taskAsync 事件,过滤出新的待办任务(event == "create")、或者任务被重新修订为待办的任务,通过 GrailsUtils.getServerURL() 方法拼装出任务的 URL 后,调用父类的 sendMessage API,给待办任务的办理人或授权办理人发出待办提醒。

用户通过 RTX 接收到待办后,通过 RTX to BroBPM 的反向 SSO 接口,可以直接打开流程任务并办理。

由于事件总线不能跨 JVM 执行、也无法在浏览器进行监听,因此 BroBPM 还会通过插件 CometdService 服务,将事件重新封装后,向整个应用集群广播任务的 create、update、delete 事件,广播的根频道为 /broadcast/bro-bpm/process/task,并分为下面四个子频道:

  • /todo/employeeId:ID为 employeeId 的员工收到待办任务
  • /toread/employeeId:收到待阅任务
  • /draft/employeeId:新建了一条草稿任务
  • /done/employeeId:员工的已办、已阅、任务挂起、任务删除等事件

CometD事件的监听详情可参考《基于 CometD 的消息总线》。

如下面的代码 javascript 片段,通过监听待办频道,在门户的首页弹出待办任务提醒(调用 floatingNotify 方法),点击链接即可打开待办任务进行办理:

// 计算频道
var employeeId = 123;
var pluginName = "bro-bpm";
var floatingTaskNotifies = {};
var taskTodoChannel = "/broadcast/"+pluginName+"/process/task/todo/"+employeeId;
var taskDoneChannel = "/broadcast/"+pluginName+"/process/task/done/"+employeeId;
// 预订频道
$j.cometd.ready(function(){
   // 监听并提示待办信息
   $j.cometd.subscribe(taskTodoChannel, function(message){
       // 根据任务信息,弹出待办提示,显示在右下角
       var task = message.data;
       var title = task.createTime.split(" ")[1] + " " + m('portal.task.notify.title') + task.definitionName;
       var timeout = floatingTaskNotifyTimeout;
       var containerId = "floatingTaskNotify" + task.id;
       var containerTitle = task.title;
       var notifyImgUrl = floatingTaskNotifyImg;
       var content = ". <a href='process/edit/" + task.processInstId + "?taskId=" + task.id + "' target='_blank'>"
           + ((task.title.length > 20) ? (task.title.substring(0,19)+"...") : task.title).escapeXML() + "</a>\n";
           + ". <a href='process/edit/" + task.processInstId + "?taskId=" + task.id + "' target='_blank'>"
           + m('portal.task.notify.sender') + task.sender + "</a>";
        floatingTaskNotifies[task.id] = floatingNotify(title, content, timeout, containerId, containerTitle, notifyImgUrl);
    });
   // 监听已办,隐藏上次提示的待办信息
   $j.cometd.subscribe(taskDoneChannel, function(message) {
       if ( floatingTaskNotifies[message.data.id] )
            floatingTaskNotifies[message.data.id].floatingMessage("destroy");
    });
});

上述两个示例中,都是监听 taskAsync 主题,即异步事件,对于消息提醒来说,异步事件的时间延迟完全在可以接受的范围内。

但是对于统一待办集成,则该延迟可能会导致首页待办无法即时刷新的问题,此时,可以采用 task 主题,即同步事件主题。

下面是 BroBPM 的工作任务拉取与推送服务 WorkbenchTasksEndpoint 中的代码片段,通过监听同步事件主题,将任务推送到统一待办系统中:

/**
 * 同步监听草稿、待办、待阅等类型任务的更新/删除事件。
 */

@WebMethod(exclude = true)
@grails.events.Listener(namespace = bropen.bpm.Constants.PLUGIN_NAME, topic = 'task')
void taskListener( Map data ) {
   try {
       //log.info "WorkbenchTasksEndpoint=" + System.currentTimeMillis()
       String event = data.event
        Task task = data.task
       if ( event == "create" || event == "update" ) {
            pushTask( task )
       } else if ( event == "delete" && data.deletedActors ) {
            pushTask(task, data.deletedActors)
       }
   } catch ( Exception e ) {
        errorService.push( "Process task listener error: " + e.message, e );
   }
}

/**
 * 将更新的任务推送到Workbench:调用统一待办的推送服务
 */

private void pushTask( Task task, List<Employee> deletedActors = null ) {
   if ( null == WORKBENCH_TASKS_PUSH_SRV_URL ) return;
   // 异步运行,提升性能
   executorService.submit({
       try {
            SOAPClient client = new SOAPClient( WORKBENCH_TASKS_PUSH_SRV_URL );
            String soapenv = generateWorkbenchPushSoapenv( task, deletedActors );
            SOAPResponse resp = client.send( soapenv );
           //
           def xml = new XmlParser().parseText(resp.text);
           def result = xml."soap:Body"[0].iterator().next().result;
           if ( result.success.text() == "false" ) {
                errorService.push( "Push task to workbench failed: " + result.message.text() );
           }
       } catch(Exception e) {
            errorService.push( "Push task to workbench failed: " + e.message, e );
       }
   } as java.util.concurrent.Callable)
}

/**
 * 为pushTask生成soap封包
 */

private generateWorkbenchPushSoapenv( Task task, List<Employee> deletedActors ) {
   // 生成soap请求的xml封包
   StringBuilder soapenv = new StringBuilder()
   try {
        soapenv << '<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.workbench.bropen.com.cn/">'
        soapenv << '<soapenv:Header/><soapenv:Body>'
       // 删除
       if ( deletedActors ) {
            soapenv << '<ser:deleteTasks>'
            soapenv << '<appCode>' << WORKBENCH_TASKS_PUSH_APP_CODE << '</appCode>'
           for ( Employee actor in deletedActors ) {
                soapenv << '<deletedTasks>'
                soapenv << '<taskId>' << task.id << '</taskId>'
                String username = User.executeQuery("select usernameEver from bropen.framework.core.security.User where id=?", [actor.userId])[0];
                soapenv << '<ownerUser>' << username << '</ownerUser>'
                soapenv << '</deletedTasks>'
           }
            soapenv << '</ser:deleteTasks>'
       }
       // 更新
       else {
            List<Employee> actors = [];
           if ( task.substituteType != Substitute.TYPE_MOVE ) actors << task.actor;
           else if ( task.actorSubstitute ) actors << task.actorSubstitute;
            BpmProcessInstance process = task.processInst;
           //
           String taskUrl = getTaskUrl([applicationId: process.applicationId, completeTime: task.completeTime, docId: process.id, taskId: task.id])
            Calendar cal = Calendar.getInstance();
            String startTime = null;
           if ( task.startTime ) {
                cal.setTime( task.startTime )
                startTime = javax.xml.bind.DatatypeConverter.printDateTime(cal);
           }
            String completeTime = null;
           if ( task.completeTime ) {
                cal.setTime( task.completeTime )
                completeTime = javax.xml.bind.DatatypeConverter.printDateTime(cal);
           }
           //
           soapenv << '<ser:updateTasks>'
            soapenv << '<appCode>' << WORKBENCH_TASKS_PUSH_APP_CODE << '</appCode>'
           for ( Employee actor in actors ) {
                soapenv << '<tasks>'
                soapenv << '<taskType>' << getTaskType(task) << '</taskType>'
                soapenv << '<definition>' << process.definitionName.encodeAsHTML() << '</definition>'
                soapenv << '<status>' << getProcessStatus(process) << '</status>'
                soapenv << '<title>' << task.title?.encodeAsHTML() << '</title>'
                soapenv << '<docId>' << process.id << '</docId>'
                soapenv << '<docNumber>' << (process.dataNumber?:"") << '</docNumber>'
                soapenv << '<docDrafter>' << (process.initiatorName?:"") << '</docDrafter>'
                soapenv << '<docDraftOrg>' << (process.organizationFullName?:"")?.encodeAsHTML() << '</docDraftOrg>'
                soapenv << '<taskId>' << task.id << '</taskId>'
                soapenv << '<senderName>' << (task.sender?:"") << '</senderName>'
                soapenv << '<ownerName>' << actor.name << '</ownerName>'
                String username = User.executeQuery("select usernameEver from bropen.framework.core.security.User where id=?", [actor.userId])[0];
                soapenv << '<ownerUser>' << username << '</ownerUser>'
               if ( startTime ) soapenv << '<startTime>' << startTime << '</startTime>'
               if ( completeTime ) soapenv << '<completeTime>' << completeTime << '</completeTime>'
                soapenv << '<priority>' << getPriority(process) << '</priority>'
                soapenv << '<url>' << taskUrl << '</url>'
                soapenv << '</tasks>'
           }
            soapenv << '</ser:updateTasks>'
       }
        soapenv << '</soapenv:Body></soapenv:Envelope>'
   } catch (Exception e) {
        errorService.push( "Generate workbench push soap xml failed.", e );
       return null;
   }
   return soapenv.toString();
}

需要注意的是,同步事件执行必须捕捉并处理异常,以免影响其他同步事件的执行。

BTW:这里是实现是任务推送服务,BroBPM同时提供了任务的增量抓取的Web服务,以便统一待办系统直接抓取数据,接口详情可以访问应用的URL /Foobar/services/workbenchTasks?wsdl;或者联系我们索取独立的统一待办系统及解决方案。

标签: BroBPM 事件
在2014-08-04 23:47上被李小翔创建

Copyright © 2013 北京博瑞开源软件有限公司
京ICP备12048974号