博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
将WBE与WebSphere Message Broker 一起使用
阅读量:2491 次
发布时间:2019-05-11

本文共 13757 字,大约阅读时间需要 45 分钟。

, 软件工程师, IBM

本系列的向您介绍了整个系列中使用的交易系统场景。在中,您构建并测试了该交易系统应用程序。在第 3 部分中,您将了解如何使用该交易系统场景作为示例,将 WebSphere Message Broker(以下称为 Message Broker)集成到业务事件处理解决方案中。

将 Message Broker 与 WebSphere Business Events(以下称为 Business Events)集成的优点是什么呢?Message Broker 提供了用于业务应用程序集成和消息转换的智能 SOA 方法。但是,业务经常还需要业务事件处理。例如,尽管集成的应用程序可以通过 Message Broker 自由地彼此发送消息,但是您的业务还需要检测复杂的消息模式(例如,您的其中一个应用程序在一段时间内似乎不是活动的)。WebSphere Business Events(以下称为 Business Events)可以检测这样的模式,自动生成消息,并将消息发送到 Message Broker 以便能够纠正该情况。

本文中使用的场景基于一个交易系统,其中有一个 Business Events 应用程序处理买入和卖出交易事件。如果同一个客户在一小时内买入并卖出同一股票,则 Business Events 系统将生成一个 SellAfterBuy 操作。对于特定客户在一天内执行的每三个 SellAfterBuy 操作,无论是对哪些股票执行的,都会生成一个 SpeculativeCustomer 操作。有关该场景的更多信息,请参阅。

在本文中,我们将使用您在中创建的交易系统应用程序,对其进行增强以包括与 Message Broker 的集成,并使用 Message Broker 接触点发送事件和使用操作。

图 1 显示了本文中描述的 Message Broker 和 Business Events 集成场景的大致体系结构:

集成体系结构

在图 1 中,该集成体系结构具有五个主要组件,其中三个组件包含在 WebSphere Message Broker“超级组件”中:

  • 交易系统客户端负责向 Message Broker 发送请求,并从 Message Broker 接收响应。就本文而言,我们将使用可在 获得的 rfhutil 实用工具来模拟该客户端。
  • 在 Message Broker 中,事件/操作转换负责在整个系统中通过中介传递消息。此组件是本文的重点。Message Broker 从客户端接收消息,并将其转发到 Business Events。它还从 Business Events 接收操作并将其转发到客户端。
  • Business Events 检测何时发生了某个业务事件。
  • 在 Message Broker 中,交易系统处理客户端发送的交易。Message Broker 交易系统组件是交易处理系统的一个逻辑组件,这里介绍它是出于完整性的考虑。其实现方式取决于具体的业务环境。由于本文重点研究如何将 Message Broker 与 Business Events 集成,也就是如何将事件/操作转换连接到 Business Events,因此该交易系统组件的详细信息就不在本文中进行讨论了。
  • Message Broker 操作处理系统在 Business Events 已发起某个操作时执行业务流程。由于本文的重点不是业务处理功能,因此仅简单研究从 Business Events 接收的操作,并根据接收到的操作的类型,将其传递到相关的 WebSphere MQ 队列。

我们的集成场景遵循以下原则:

  • 与 Business Events 进行的所有通信都是使用 Java Message Service (JMS) 消息队列执行的,这是 Message Broker 与 Business Events 建立连接的简单方法。
  • 提交到 Business Events 的所有事件均由 Message Broker 中的一个消息流处理。这样可以实现特定代理消息到 Business Events 所需的通用事件结构的转换。
  • 类似地,Business Events 生成的任何操作均逻辑地流经 Message Broker 流,在流中可以进行从通用操作结构到操作处理系统可能需要的特定代理消息的转换。但是,本文中描述的流没有实现此类转换逻辑。

这些原则确保 Message Broker 超级组件和客户端除了需要知道如何使用标准接口进行通信以外,均不需要有关 Business Event 组件的任何特定知识,以便对 Business Event 组件实现的任何可能的改动不会影响 WebSphere Message Broker 超级组件。

您可以使用本文描述的过程从头构建您自己的项目。该解决方案的 Message Broker 项目交换文件和 Business Events 项目文件包括在部分。您可以将这些文件分别导入 Message Broker Toolkit 和 Business Events。但是,您仍然需要完成 部分中描述的手动配置。

我们将使用 WebSphere MQ 作为 Message Broker 与 Business Events 之间的消息以及 Message Broker 与交易系统客户端之间的消息的 JMS 提供者。在下面的部分中,您将配置 WebSphere MQ JMS 并创建必需的队列管理器和队列。

JMS 队列用于在 Message Broker 与 Business Events 之间传递消息。要配置 WebSphere MQ JMS,请完成以下操作步骤:

  1. 编辑 WebSphere MQ 安装的 Java\bin 目录中的 WebSphere MQ JMSAdmin.config 文件中的 INITIAL_CONTEXT_FACTORYPROVIDER_URL 条目。按如下所示更新这些条目:
    INITIAL_CONTEXT_FACTORY=com.sun.jndi.fscontext.RefFSContextFactorPROVIDER_URL=file:/C:/JNDI-Directory.
  2. 在您选择的位置创建一个单独的 WebSphere MQ 队列连接工厂定义文件,其中包含以下代码(假设队列管理器名称为 WBRK61_DEFAULT_QUEUE_MANAGER):
    # Define a QueueConnectionFactory# Only parameters being overridden from their default values are specified.# This sets up a MQ client binding.DEF QCF(qcf1) +TRANSPORT(CLIENT) +QMANAGER(WBRK61_DEFAULT_QUEUE_MANAGER) +HOSTNAME(127.0.0.1)+PORT(2414)#DEF Q(wbe_input_queue) +QUEUE(WBE_INPUT_QUEUE) +QMANAGER(WBRK61_DEFAULT_QUEUE_MANAGER)#DEF Q(wbe_output_queue) +QUEUE(WBE_OUTPUT_QUEUE) +QMANAGER(WBRK61_DEFAULT_QUEUE_MANAGER)#DEF Q(wbe_backout_queue) +QUEUE(WBE_BACKOUT_QUEUE) +QMANAGER(WBRK61_DEFAULT_QUEUE_MANAGER)END

    此文件声明该队列连接工厂名称为 qcf1,并指定了三个 JMS 队列:wbe_input_queuewbe_output_queuewbe_backout_queue。对应的 WebSphere MQ 队列分别为 WBE_INPUT_QUEUEWBE_OUTPUT_QUEUEWBE_BACKOUT_QUEUE。Message Broker 输出流(将在稍后描述)中的 JMSInput 节点需要 wbe_backout_queue,用于该流在无法成功处理消息时所撤销的消息。

  3. 创建名为 C:/JNDI-Directory 的文件目录。
  4. 通过输入以下内容,使用您创建的队列连接工厂定义文件运行 WebSphere MQ jmsadmin 命令:
    jmsadmin < your_definition_file
    ,这将在 C:/JNDI-Directory 目录中创建 .bindings 文件。

使用 WebSphere MQ Explorer 创建一个 WebSphere MQ 队列管理器。如果您使用任何现有的 MQ 队列管理器,例如可能在安装 Message Broker 时已经为您创建的 WBRK61_DEFAULT_QUEUE_MANAGER,则可以跳过此步骤。否则,请启动 WebSphere MQ Explorer 并右键单击 Queue Managers,然后单击 New => Queue Manager 创建新的队列管理器。

如果您使用 WBRK61_DEFAULT_QUEUE_MANAGER,则应该看到如图 2 所示的队列。

WBRK61_DEFAULT_QUEUE_MANAGER 队列

如果您是创建新的队列管理器,则需要自己创建以下队列:

  • WBE_INPUT_QUEUE
  • WBE_OUTPUT_QUEUE
  • WBE_BACKOUT_QUEUE
  • wmb_input_queue
  • wmb_unknown_message_type_queue
  • wmb_buy_ack_queue
  • wmb_sell_after_buy_queue
  • wmb_speculative_customer_queue
  • wmb_unknowns_queue

例如,要创建 WBE_INPUT_QUEUE,请右键单击 WBRK61_DEFAULT_QUEUE_MANAGER 下面的 Queues,并选择 New => Local Queue

Business Events 要求输入事件和出站操作对象符合一般 XML 格式,如下所述:

  • connector 是顶级 XML 元素,它提供诸如连接器名称和提交时间戳等信息。connector 元素可以包括一个 connector-bundle 子元素。
  • connector-bundle 对应于一个操作或事件。目前,一个连接器消息只能包含一个事件或操作。但是,一个事件或操作可以包含一个或多个事件或操作对象;这些对象在模型中表示为 connector-objects
  • 一个 connector-object 映射到一个事件或操作对象。它包括对象的名称和字段的数组。
  • fieldsconnector-object 中的一系列键值对。

您开发的 Message Broker 流需要使用如下事件消息示例所示的消息格式与 Business Events 交互:

String_CustomerID							StockA							2008-04-10T14:01:55Z							9.9							9.9

图 3 演示了该输入流,其中使用了 MQInput 节点来从 MQ 队列读取 MQ 消息。输入消息被传递到 TradeFilter 节点以检查输入消息是否属于某些类型,这些类型指示是否应该将输入消息作为事件传递给 Business Events。如果是,则将消息传递给 CreateEvent 节点,后者将消息转换为 Business Events 能够接受的格式。然后 MQJMSTransform. 将转换后的消息从 MQ 消息转换为可由 Business Events 读取的 JMS 消息。然后 JMSOutput 节点将 JMS 消息(现在是 Business Events 事件)发送到 Business Events。为简单起见,无关的消息将发送到 UnknownMessageTypeNode 节点,后者将消息存储在某个 MQ 队列中。

输入流

完成以下步骤创建如图 3 所示的消息流:

  1. 在 Message Broker Toolkit Broker Application Development 视图中,如图 4 所示,通过选择 File => New => Message Flow Project 创建一个消息流项目,并为其指定您选择的名称。
    Application Development 视图
  2. 通过右键单击该新项目并选择 New => Message Flow 创建新的消息流。将该流命名为 input
  3. 将一个 MQInput 节点从面板中的 WebSphere MQ 抽屉拖放到开发画布上。
  4. 单击该节点并在 Properties 选项卡的 Basic 文件夹中指定 in 作为该节点的 Queue name 属性。在 Input Message Parsing 文件夹中为 Message domain 属性分配值 XMLNS

    输入消息的正文预期应该具有根元素 ,此根元素包含五个子元素:。下面是这样的输入消息的示例:

    BuyString_CustomerIDStockA9.99.9

    元素的有效值为 BuySell。我们将在稍后的阶段中创建 BuySell 消息。

  5. 将一个 Filter 节点从 Routing 抽屉拖放到画布上,并将其重命名为 TradeFilter。TradeFilter 节点将 BuySell 类型的消息路由到 True 终端,并将所有其他消息路由到 False 终端。
  6. 双击 TradeFilter 节点并按如下方式定义 Function Main()
    CREATE FUNCTION Main() RETURNS BOOLEANBEGIN	DECLARE myref REFERENCE TO Body.Trade.Type;	RETURN (myref='Buy') OR (myref='Sell');END;
  7. 将一个 Compute 节点从 Transformation 抽屉拖放到画布上,并将其重命名为 CreateEvent。CreateEvent 节点使用 ESLQ 语言将有效的输入消息转换为 Business Events 所需的事件消息格式。
  8. 双击 CreateEvent 节点并按如下方式定义其 Main ESQL 函数:
    CREATE FUNCTION Main() RETURNS BOOLEANBEGIN	CALL CopyMessageHeaders();	-- create 	SET OutputRoot.XML.connector.(XML.Attribute)name = 'Trade System';			SET OutputRoot.XML.connector.(XML.Attribute)version = '2.2';	-- create 	SET OutputRoot.XML.connector."connector-bundle".(XML.Attribute)name = CAST (InputRoot.XML.Trade.Type AS CHARACTER);			SET OutputRoot.XML.connector."connector-bundle".(XML.Attribute)type = 'event';	-- create 	SET OutputRoot.XML.connector."connector-bundle"."connector-object".(XML.Attribute)name = 'TradeObject';	-- create "CustomerID" field	SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[1] = InputRoot.XML.Trade.CustomerID;			SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[1].(XML.Attribute)name = 'CustomerID';			SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[1].(XML.Attribute)type = 'String';	-- create "StockID" field	SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[2] = InputRoot.XML.Trade.StockID;			SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[2].(XML.Attribute)name = 'StockID';			SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[2].(XML.Attribute)type = 'String';	-- create "Date" field	SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[3] = CAST (CURRENT_DATE AS CHARACTER FORMAT 'IU');	SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[3].(XML.Attribute)name = 'Date';			SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[3].(XML.Attribute)type = 'DateTime';			-- create "Quantity" field	SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[4] = InputRoot.XML.Trade.Quantity;			SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[4].(XML.Attribute)name = 'Quantity';			SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[4].(XML.Attribute)type = 'Real';	-- create "Price" field			SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[5] = InputRoot.XML.Trade.Price;			SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[5].(XML.Attribute)name = 'Price';			SET OutputRoot.XML.connector."connector-bundle"."connector-object".field[5].(XML.Attribute)type = 'Real';			RETURN TRUE;END;
  9. 将一个 MQJMSTransform. 节点从 JMS 抽屉拖放到画布上。
  10. 将一个 MQOutput 节点从 WebSphere MQ 抽屉拖放到画布上,并将其重命名为 UnknownMessageTypeQ。单击该节点并指定 wmb_unknown_message_type_queue 作为该节点的 Queue name 属性。
  11. 将一个 JMSOutput 节点从 JMS 抽屉拖放到画布上。为该节点的关联属性分配以下值:
    • 在 Basic 文件夹中,Destination queue = wbe_input_queue
    • 在 JMS Connection 文件夹中,JMS provider name = WebSphere MQ
    • 在 JMS Connection 文件夹中,Initial context factory = com.sun.jndi.fscontext.RefFSContextFactory
    • 在 JMS Connection 文件夹中,Location JNDI bindings 为: file:/C:\JNDI-Directory.
    • 在 JMS Connection 文件夹中,Connection factory name = qcf1
  12. 按如 所示连接您刚才创建的所有节点。

图 5 显示了 Message Broker 输出流。JMSInput 节点读取 Business Events 生成的操作。JMSMQTransform. 节点将消息转换为 MQ 消息。然后 RouteReply 节点将转换后的消息路由到其中一个适当的 MQOutput 节点。

输出流

完成以下步骤创建如图 5 所示的输出消息流:

  1. 创建一个新的消息流并将其命名为 output
  2. 将一个 JMSInput 节点从 JMS 抽屉拖放到画布上。为该节点的对应属性分配以下值:
    • 在 Basic 文件夹中,Destination queue = wbe_output_queue
    • 在 JMS Connection 文件夹中,JMS provider name = Websphere MQ
    • 在 JMS Connection 文件夹中,Initial context factory = com.sun.jndi.fscontext.RefFSContextFactory
    • 在 JMS Connection 文件夹中,Location JNDI bindings = file:/C:\JNDI-Directory
    • 在 JMS Connection 文件夹中,Connection factory name = qcf1
  3. 将一个 Route 节点从 Routing 抽屉拖放到画布上,并将其重命名为 RouteReply。右键单击 RouteReply 并选择 Add Output Terminal 两次,以便为该节点创建两个额外的输出终端。将两个新的终端分别命名为 BuyAckSellAfterBuy,并将现有的 Match 终端重命名为 SpeculativeCustomer
  4. 单击 RouteReply 节点并配置 Filter table 属性,如图 6 所示。
    RouteReply 配置
    此配置导致将 Buy AckSellAfterBuySpeculativeCustomer 操作分别路由到 BuyAckSellAfterBuySpeculativeCustomer 输出终端,并将所有其他消息路由到 Default 终端。
  5. 配置四个 MQOutput 节点并将其分别命名为 UnknownsNodeBuyAckNodeSellAfterBuyNodeSpeculativeCustomerNode
  6. 将这四个节点的 Queue name 属性分别设置为 wmb_unknowns_queuewmb_buy_ack_queuewmb_sell_after_buy_queuewmb_speculative_customer_queue
  7. 按如 所示将您刚才创建的所有节点连接在一起。Default、BuyAck、SellAfterBuy 和 SpeculativeCustomer 输出终端分别连接到 UnknownsNode、BuyAckNode、SellAfterBuyNode 和 SpeculativeCustomerNode 节点。

尽管我们是在使用中创建的 Business Events 应用程序,但是由于使用 Business Events 消息队列连接连接器来连接到 Message Broker,您需要修改该应用程序的事件和操作的连接器参数。为此,请完成以下操作步骤:

  1. 通过选择“开始”=>“所有程序”=>“IBM WebSphere Business Events v6.1”=>“Design Data”启动 Design Data 工具。
  2. 选择 File => Open Project,然后指定项目文件名称。
  3. 展开 Touchpoints 窗格,如图 7 所示:
    Touchpoints 窗格
  4. 对于 Trade System 接触点中显示的每个 Buy 和 Sell 事件,请完成以下操作步骤:
    1. 右键单击事件并选择 Event Properties
    2. 在 Connection 选项卡上,选择 Message Queue Event Connection,然后单击 Configure。注意:您可能需要首先签出事件,然后才能对其进行编辑。
    3. 选择 JMSQueue 作为 Queue Type,并指定 wbe_input_queue 作为 Queue Name
    4. 单击 Provider 并执行以下操作步骤,如图 8 所示:
      1. 选择 com.sun.jndi.fscontext.RefFSContextFactory 作为 Context Factory Class
      2. 选择 file:///C:/JNDI-Directory 作为 URL
      3. 选择 qcf1 作为 Factory Name
      4. 单击 OK
        配置 JMS 队列事件参数
    5. 单击 OK 保存连接器配置。

完成以下步骤为 Trade System 接触点中的 BuyAckSell After BuySpeculative Customer 操作配置连接:

  1. 右键单击操作并选择 Action Properties
  2. 在 Connection 选项卡上,选择 Message Queue Event Connection,然后单击 Configure。您可能需要首先签出操作,然后才能对其进行编辑。
  3. 选择 JMSQueue 作为 Queue Type,并指定 wbe_output_queue 作为 Queue Name
  4. 单击 Provider 并执行以下操作步骤,如图 9 所示:
    1. 选择 com.sun.jndi.fscontext.RefFSContextFactory 作为 Context Factory Class
    2. 选择 file:///C:/JNDI-Directory 作为 URL
    3. 选择 qcf1 作为 Factory Name
    4. 单击 OK
      配置 JMS 队列操作参数
  5. 单击 OK 保存连接器配置。

现在您需要测试已创建的 Message Broker 流。要测试流,请执行以下操作步骤:

  1. 按照中的说明部署和启动 Business Events 项目。
  2. 启动 WebSphere MQ。
  3. 如果您还没有这样做,可以使用以下命令创建 Message Broker 代理:
    mqsicreatebroker  -i  -a -q  -n
  4. 如果您还没有这样做,可以使用以下命令创建 Message Broker 配置管理器:
    mqsicreateconfigmgr  -i  -a  -q
  5. 使用以下命令启动代理和配置管理器:SimpleJSFmqsistart
  6. 在 Message Broker Toolkit 中,通过右键单击您在前面创建的新消息流项目,并选择 New => Message Broker Archive,从而创建一个新的 Message Broker BAR 文件。为该 BAR 文件指定所选择的名称。
  7. Prepare 窗格中,选择两个新的输入和输出消息流,然后单击 Build broker archive,如图 10 所示。
    准备存档
  8. 切换到 Message Broker Toolkit Broker Administration 视图,并通过选择 File => New => Domain Connection 创建到您的配置管理器的域连接。
  9. 填入您的配置管理器的 Queue Manager NamePort 参数。
  10. 连接配置管理器以后,单击 Next,然后单击 Finish
  11. 通过将您刚才创建的 BAR 文件拖放到代理的缺省执行组,从而对其进行部署,如图 11 所示。
    部署 BAR 文件
  12. 使用文本编辑器创建一个表示 Buy 消息的文本文件并将其保存在 c:\temp 中。例如:
    BuyString_CustomerIDStockA9.99.9
  13. 创建另一个表示 Sell 消息的文本文件并将其保存在 c:\temp 中。例如:
    SellString_CustomerIDStockA9.99.9
  14. 使用 向您正在使用的队列管理器(例如 WBRK61_DEFAULT_QUEUE_MANAGER)上的 wmb_input_queue 发送 Buy 或 Sell 消息。为实现此目的,您可以通过单击 Read File 按钮读取您刚才在 c:\temp 目录中创建的 Buy 消息。您可以通过单击 rfhutil 实用工具上的 Data 选项卡查看 Buy 消息的内容,如图 12 所示。
    查看 Buy 消息的内容
    然后单击 Write Q 将该消息发送到 wmb_input_queue,如图 13 所示。
    您应该看到 Business Events 流生成的操作消息出现在其中一个队列中(wmb_buy_ack_queuewmb_sell_after_buy_queuewmb_speculative_customer_queue),具体取决于操作的类型。
  15. 然后您可以使用 rfhutil 实用工具读取队列。例如,单击 Read Q 以读取 wmb_buy_ack_queue 中的消息,如图 14 所示:
  16. 选择 Data 选项卡查看消息的内容,如图 15 所示:
  17. 如果您紧跟在 Buy 消息之后立即发送 Sell 消息,您应该在 wmb_sell_after_buy_queue 中看到一条消息,表明发生了一个 Sell After Buy 事件。
  18. 如果您将在 Sell 消息之后发送 Buy 消息的周期重复三次,您应该看到一条消息存储在 wmb_speculative_customer_queue 队列中。

为简单起见,本文描述的解决方案使用 Business Events 内置的消息队列连接连接器来连接到 Message Broker 和 Business Events。这意味着将 Message Broker JMS 消息放在作为 Message Broker 和 Business Events 中介的 JMS 队列中,并从中读取消息。就性能而言,这不是最佳的解决方案,因为中间的消息队列连接连接器引入了额外的消息开销。这是因为事件消息由消息队列连接连接器转发到 Business Events 引擎,而不是直接发送到该引擎。

一种简单的解决方案是配置 Message Broker 输入流中的 JMSOutput 节点将事件直接发送到 Message Broker jms/eventTopicjms/durableEventTopic 主题,并配置 Message Broker 输出流中的 JMSInput 节点直接从 jms/actionTopicjms/durableActionTopic 接收 JMS 消息。但是,这样的配置当前不支持 Business Events ResultEvent,因此将把所有操作发送到 JMSInput 节点。您还可以使用其他连接支持(例如 HTTP/SOAP/File)连接到其他 Business Events 内置连接器。

在本文中,您了解了如何快速配置 Message Broker 和 Business Events 以实现互操作性。我们使用了示例 Broker 消息来描述如何为 Business Events 构造事件消息并由 Broker 消息流将其发送到 Business Events,以及如何从 Business Events 接收操作消息并由 Broker 消息流对其进行处理。

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/14789789/viewspace-610858/,如需转载,请注明出处,否则将追究法律责任。

转载于:http://blog.itpub.net/14789789/viewspace-610858/

你可能感兴趣的文章