第七章:TUXEDO的可靠消息队列/Q
10.1 /Q介绍
/Q是Tuxedo系统的一个重要组成部分,它提供了一种可靠队列机制,允许消息按某种排队规则存储到磁盘上或内存中,然后再转发给其它进程。这种存储转发机制可以保证在两个通信实体之间传递的消息不丢失、不重传,从而保证交易的完整性。TUXEDO /Q提供管理工具和编程接口用于对/Q进行管理和操作。
名词解释: /Q
指TUXEDO的可靠消息队列,是TUXEDO产品的一个组成部分,功能类似于IBM的 MQSeries,BEA MessageQ等其他消息中间件。
MESSAGE 消息,存储在QUEUE中的数据称为消息(MESSAGE),一个QUEUE中可以有多条消息.对QUEUE的操作是以消息为单位的,即以次只能发送或取出一条消息。 QUEUE
队列,用于保存发送来的消息,以便随后的处理,一个QUEUE中可以同时存在多条消息。 QUEUE SPACE
队列空间,是QUEUE的一个集合,以便进行统一管理,一个QUEUE SPACE中可包含多个QUEUE。 TMQUEUE
TUXEDO带的一个SERVER,接收用tpenqueue()发送来的消息.并把他们保存到相应的 QUEUE中,接受并处理tpdequeue()的请求。
TMQFORWARD
TUXEDO带的一个SERVER,从一个QUEUE中取数据,然后调用一个与该QUEUE同名的 SERVICE进行处理,并把该SERVICE返回的结果发送到另一个QUEUE中。 TMS_QM
是与/Q资源管理器对应的TMS(TRANSACTION MANAGER SERVER),它使对
消息的发送与取出操作可以以事务的方式进行.并使/Q资源管理器可参与全局事务。
/Q的组成 MESSAGE QUEUE
QUEUE SPACE /Q系统 MESSAGE QUEUE QUEUE SPACE
/Q的使用方式
图中左侧为TUXEDO客户端,右侧为TUXEDO事务管理器/T,中间为TUXEDO可靠消息队列系统/Q。右侧事务管理器/T提供了两个服务:SERVICE1和SERVICE2。在消息队列系统/Q中,一个QUEUE SPACE对应一个GROUP, TMS_QM是/Q的事务管理进程,在该GROUP中要进行定义,该QUEUE SPACE中定义了有四个消息队列,SERVICE1和SERVICE2分别是对应于同名的两个服务(SERVICE)的队列(这是一种习惯的命名规则,客户机若请求服务器中的SERVICE1服务,就把请求消息放入SERVICE1队列中)。SERVICE1的处理结果放到CLIENT_RPLY1中,如果SERVICE1,SERVICE2在处理过程中发生错误, 把错误信息保存到队列FAILUREQ中。
/Q有两种使用方式,我们称之为基本模式和转发模式 1. 基本模式:
只用到TMQUEUE,不使用TMQFORWARD,不调用SERVICE对QUEUE中的消息进行处理。流程说明如下(不需要进行4-7的操作,也不需要定义与QUEUE同名的SERVICE): 1. 客户端调用tpenqueue()把数据发送到SERVICE1
2. TMQUEUE接收tpenqueue()发送来的数据,并把他们保存到SERVICE1中 3. 如果以上操作成功,那么tpenqueue()返回成功.
8. 客户端调用tpdequeue(),请求从SERVICE1中取数据
10. TMQUEUE收到该请求,它把相应的消息从SERVICE1中取出,并发送给客户端
2. 转发模式:
用到TMQUEUE和TMQFORWARD,要定义与QUEUE同名的SERVICE并调用SERVICE对QUEUE中的消息进行处理。流程说明如下:
1. 客户端调用tpenqueue()把消息发送到SERVICE1
2. TMQUEUE接收tpenqueue()发送来的消息,并把他们保存到SERVICE1中 3. 如果以上操作成功,那么tpenqueue()返回成功.
4. TMQFORWARD在某个设定的时刻,从SERVICE1中取消息
5. TMQFORWARD用事务模式调用(采用TPCALL())一个名为 \的SERVICE,将 消息转发给SERVICE1
6. 名为 \的SERVICE 把处理的结果用tprerurn()返回给TPQFORWARD. 7. TPQFORWARD把收到的处理结果发送到REPLYQ
8. 客户端调用tpdequeue(),请求从REPLYQ中取回响应消息
9. TMQUEUE收到该请求,它把相应的消息从queue1中取出,并发送给客户端 10.如果以上操作成功,那么tpdequeue()返回成功
注意: 当一条消息QUEUE中被取出后,在该QUEUE中它将被删除,其他的进程就看不到该消息了.如果有多个进程同时要取该消息,只有最早的那个进程能取到该消息.
/Q的使用场合 1. /Q的常见用法是用于实现数据的可靠传送,把数据从一台机器可靠的传送到另一台机器.如:在电信计费业务中,可以用/Q把采集到的计费数据发送到计费中心进行处理;在银行中,不同的银行间可用/Q传送结算数据. 数据传送可以是在TUXEDO客户端与TUXEDO服务端之间,或TUXEDO服务端与服务端之间.
2. 可以用/Q实现工作流,如图,前面的处理流程把处理结果保存到QUEUE中,后面的处理流程从相应的QUEUE中取要处理的消息,也把处理结果保存到QUEUE中,如此下去.直到完成.
流程 开始 处理结果 QUEUE1 处理结果 流程2 处理结果 QUEUE2
3. 批处理:可以把很多消息发送到一个QUEUE中,并设置在某个时间这些消息才生效,把这些消息取出,进行成批处理.
10.2 /Q的管理
/Q的管理工作包括:QMCONFIG环境变量的设置,用QMADMIN或图形化管理工具进行QUEUE SPACE,QUEUE的创建及管理,因为/Q也是一种资源管理器,所以要象数据库那样在UBBCONFIG的GROUP种进行配置。在UBBCONFIG中还要配置TMQUEUE,TMQFORWARD这两个SERVER。下面分别进行说明:
QMCONFIG环境变量的设置
QMCONFIG环境变量指定存储QUEUE SPACE的设备(文件)名,以便QMADMIN对它进行管理.它可在命令行中设置,也可在执行QMADMIN时在命令行中指定。 在UNIX下
1. QMCONFIG=/usr/tuxedo/qsample/QUE; export QMCONFIG 2. qmadmin /usr/tuxedo/qsample/QUE
在NT下:
1. SET QMCONFIG=d:\\qsample\\QUE
2. qmadmin d:\\qsample\\QUE
QMCONFIG中指定的设备(文件)要先用crdl在TUXEDO文件系统中创建, 如:
D:\\>qmadmin
>crdl d:\\qsample\\QUE 0 5000 >q
该设备(文件)中可以有多个QUEUE SPACE, QUEUE中的消息等数据就保存在该设备(文件)中.
QMADMIN的使用方法
TUXEDO提供一个命令行管理工具QMADMIN,用于对/Q进行管理,QMADMIN类似TMADMIN,QMADMIN常用的命令介绍如下:
qspacecreate: 创建一个新的QUEUE SPACE
qcreate: 在某个QUEUE SPACE 上创建QUEUE
qinfo: 查看某个QUEUE中的信息
qlist: 显示一个QUEUE SPACE所包含的QUEUE,及每个QUEUE中的当前消息个数 qopen: 打开一个QUEUE SPACE qclose: 关闭一个QUEUE SPACE
说明:
在QMADMIN中输入help可以列出所有的命令,用help命令名可以得到该命令的帮助 如:
> help qinfo
qinfo [queue_name]
-------------------
List information about the specified queue or for all queues. This command lists the number of messages on the specified
queue or all queues if no argument is given, and the amount of free space in the queue space. In verbose mode, this
command also lists the queue creation parameters for each queue.
>
QUEUE SPACE的创建
在QMADMIN中执行qspacecreate,按提示进行操作,说明如下: > qspacecreate
Queue space name: myqueuespace IPC Key for queue space: 230458 Size of queue space in disk pages: 200
Number of queues in queue space: 3
Number of concurrent transactions in queue space: 3 Number of concurrent processes in queue space: 3 Number of messages in queue space: 12 Error queue name: errq
Initialize extents (y, n [default=n]): Blocking factor [default=16]: 16
参数说明:
IPC Key for queue space:
该QUEUE SPACE的IPC KEY,范围为32,768到 262,143.记住不要和系统的其他 IPC资源的ID号冲突.
Size of queue space in disk pages:
该QUEUE SPACE的大小,以页为单位(一页512字节) Number of queues in queue space:
该QUEUE SPACE中最多可以有多少个QUEUE在里面
Number of concurrent transactions in queue space.
在该QUEUE SPACE中最多可以有多少个事务同时存在,计算方法: 1. 该GROUP中的每个TMS_QM会用到一个事务
2. 该GROUP中的TMQEUE,TMQFORWARD也会各自用到一个事务
3. QMADMIN会用到一个事务
4. 客户端在调用TPENQUE(),tpdequeue()之前也开始的事务也要计算上 要估计最多可能有多少个这样的客户端同时使用该QUEUE SPACE
Number of concurrent processes in queue space
最多可以有多少个进程同时存取该QUEUE SPACE,要包括TMQUEUE,TMFORWARD这两个进程.
Number of Messages in queue space
该QUEUE SPACE中最多可以有多少个MESSAGE在里面
Error queue name:
当一个消息重试了指定的次数,还没有被取出时,TUXEDO把该消息发送到该QUEUE中,如果你输入了ERROR QUEUE NAME,你必须用QCREATE创建该QUEUE,如果没有创建ERROR QUEUE,那么本来应该发送到ERROR QUEUE中的消息将被丢弃.
Initialize extents (y, n [default=n]): 是否初始化该QUEUE SPACE
注意:
因为在创建QUEUE SPACE过程中会用到IPC资源,所以如果在创建QUEUE SPACE 时失败,在重新创建之前,最好把这些IPC资源释放掉,可在QMAMIN中用ipcrm命令释放某个QUEUE SPACE所占用的IPC资源。
QUEUE 的创建 > qcreate
Queue name: service1
Queue order (priority, time, fifo, lifo): fifo
Out-of-ordering enqueuing (top, msgid, [default=none]): none Retries [default=0]: 2
Retry delay in seconds [default=0]: 30
High limit for queue capacity warning (b for bytes used, B for blocks used, % for percent used, m for messages [default=100%]): 80% Reset (low) limit for queue capacity warning [default=0%]: 0% Queue capacity command:
No default queue capacity command Queue 'service1' created
参数说明:
Queue name: 要创建的QUEUE的名称
Queue order (priority, time, fifo, lifo): fifo 发送到该QUEUE的消息的存放方式,
priority:按优先级(在tpenqueue()的TPQCTL参数中指定) time:按时间(在tpenqueue()的TPQCTL参数中指定) fifo:先进先出 lifo:先进后出
Out-of-ordering enqueuing (top, msgid, [default=none]): none
指定一个消息可以放在该QUEUE的最前面,或在某个MSGID值为在tpenqueue()的TPQCTL参数中指定的值之后.
Retries [default=0]: 2
默任情况下,当从一个QUEUE中取出某个MESSAGE的事务回滚时,该MESSAGE会被重新放回到该QUEUE中,当该消息又处于该QUEUE的最前面时,TMQUEUE将再次试图取出该消息,你可以在这里指定重试的次数.默认值为0,也就时不进行重试.当达到重试的次数时,如
果该QUEUE SPACE设置了ERROR QUEUE,那么该消息将被移到该ERROR QUEUE中,如果该QUEUE SPACE没有设置ERROR QUEUE,那么该消息将被丢弃. Retry delay in seconds [default=0]: 30 指定重试的时间间隔
High limit for queue capacity warning (b for bytes used, B for blocks used, % for percent used, m for messages [default=100%]): 80%
Reset (low) limit for queue capacity warning [default=0%]: 10%
Queue capacity command: /usr/app/bin/mailme myqueuespace service1
以上3个设置,当该QUEUE中的消息对空间的使用或消息数达到设定的值时, TUXEDO系统系统自动执行一个命令,以达到对该QUEUE进行自动管理的目的. 在以上的设置中,当该QUEUE中,当该QUEUE的80%空间被使用时,将执行 /usr/app/bin/mailme, myqueuespace service1是mailme的参数.
当第一次到达80%之后, /usr/app/bin/mailme被执行,只有当降为10%之后,又 到达80%时, /usr/app/bin/mailme才再次被执行.
Reply Queue 和Failure Queue
当采用转发方式时,TMQFORWARD把它用TPCALL()调用的与该QUEUE同名的SERVICE的处理结果放到REPLY QUEUE中,该REPLY QUEUE的名字在tpeuqueue()中指定,如果该SERVICE处理失败,TPRETURN(TPFAIL?.),那么TMQFORWARD将把错误信息写到FAILUER QUEUE中,如果没有创建Reply QUEUE 或 Failure Queue,TMQFORWARD将把该SERVICE的返回丢弃,调用tpdequeue()的客户端将收不到任何信息,如果创建了REPLY QUEUE,那么即使该SERVICE没有返回信息,TMQFORWARD也会往该REPLY QUEUE中写入一条长度为0的消息.客户端可以收到该消息.
Error Queue
当一个消息重试了指定的次数,还没有被取出时,TUXEDO把该消息发送到该QUEUE中,如果你输入了ERROR QUEUE NAME,你必须用QCREATE创建该QUEUE,如果没有创建ERROR QUEUE,那么本来应该发送到ERROR QUEUE中的消息将被丢弃. 在创建ERROR QUEUE时,以下几个参数不能设置.
Queue order (priority, time, fifo, lifo):
Out-of-ordering enqueuing (top, msgid, [default=none]): Retries [default=0]: Retry delay in seconds
UBBCONFIG中要做的配置
GROUP中的配置
在GROUP中的配置与数据库通过XA协议与TUXEDO连接的配置差不多, 因为QUEUE SPACE 是资源管理器,而一个组只能有一个资源管理器。所以QUEUE SPACE 与 QUEUE SERVER GROUP 之间是一对一的关系, 在GROUP中的配置有: 在GROUP中必须有以下配置:
1. TMS(TRANSACTION MANAGEMENT SERVER): TMS_QM
2. OPENINFO,
它的设置格式如下: OPENINFO=\ TUXEDO/QM:为/Q所对应的资源管理器的名称,在$TUXDIR/UDATAOBJ/RM指定
device_name指定存储该QUEUE SPACE的设备(文件)名,queue_space_name为该QUEUE SPACE的名称 在UNIX下为: *GROUPS QUE1
LMID = SITE1 GRPNO=2
TMSNAME = TMS_QM TMSCOUNT = 2
OPENINFO = “TUXEDO/QM:/home/QUE:QSPACE”
在NT下为: *GROUPS QUE1
LMID = SITE1 GRPNO=2
TMSNAME = TMS_QM TMSCOUNT = 2
OPENINFO = “TUXEDO/QM:d:\\qsample\\QUE;QSPACE”
在SERVER中要做的配置:
在SERVER这一节中要配置TMQUEUE(必须),TMQFORWARD(可选)这两个SERVER, TMQUEUE的设置格式如下:
TMQUEUE CLOPT=“-s QSPACENAME:TMQUEUE -- [-t trantime]”
-s:指定该SERVER要发布的SERVICE的名称,采用别名方式:QUEUE SPACE的名字加上TMQUEUE -t:当在该SERVER中的SERVICE调用了tpbegin()时,用于指定这些事务的超时时间.默认值 为30秒. 例子:
*SERVERS
TMQUEUE SRVGRP = QUE1 SRVID = 1 CLOPT = “-s QSPACE:TMQUEUE -- -t 60”
TMQFORWARD的的设置格式如下:
TMQFORWARD CLOPT=“-- -q qname[,qname...] [-t trantime] [-i idletime] [-e] [-d] [-n] [-f delay]”
-q qname[,qname...] 用,隔开的QUEUE的名称,TMQFORWARD将从这些QUEUE中取数据,并用TPCALL调用与该QUEUE同名的SERVICE进行处理.
-t:当在该SERVER中的SERVICE调用了tpbegin()时,用于指定这些事务的超时时间.默认值
为60秒.
-i: 指定当该QUEUE中的消息都已被取出后,隔多长时间,TMQFORWARD再次读该QUEUE看是
否有新的消息到来.
-e: 如果再这些QUEUE中都没有消息,那么TMQFORWARD将退出.
-d: 把导致TMQFORWARD所调用的SERVICE失败(TPERRNO=TPESUCFAIL)的消息从REPLYQ 中
删除.使它不用再被重试. -n: 当TMQFORWARD调用TPCALL时,TPCALL()的FLAG为:TPNOTRAN -f delay: TMQFORWARD采用TPFORWARD方式而不是TPCALL方式调用与QUEUE同名的SERVICE 例子: *SERVERS
TMQFORWARD SRVGRP=QUE1 SRVID = 5 CLOPT=”-- -i 2 -q STRING”
10.3 /Q的编程
/Q的编程接口很简单,包括一个结构体TPQCTL,及函数tpenque(),tpdequeue(),下面分别说明.
TPQCTL结构体
TPQCTL结构体用于tpenque(),tpdequeue()中指定或传递与/Q有关的参数,它在atmi.h中的定义如下:
struct tpqctl_t { }
long flags;
/* control parameters to queue primitives */ /* absolute/relative time for dequeuing */ /* enqueue priority */
/* indicates which of the values are set */
long deq_time; long priority;
long diagnostic; /* indicates reason for failure */
char msgid[TMMSGIDLEN]; /* id of message before which to queue */ char corrid[TMCORRIDLEN];/* correlation id used to identify message */ char replyqueue[TMQNAMELEN+1]; CLIENTID cltid; long urcode; long appkey;
/* queue name for reply message */
char failurequeue[TMQNAMELEN+1];/* queue name for failure message */
/* client identifier for originating client */ /* application user-return code */
/* application authentication client key */
typedef struct tpqctl_t TPQCTL;
/* application user-return code */
long appkey;
/* application authentication client key */
说明:
long flags: 指定该结构中的那些域将被设置 long deq_time: 指定什么时候该消息将可用 long priority: 指定该消息的优先级
long diagnostic:保存错误信息
char msgid[TMMSGIDLEN]:系统产生的用于唯一标识该消息的ID号.
char corrid[TMCORRIDLEN]:可以为一个MESSAGE指定一个CORRID,这个值是不变的,所以跟
该MESSAGE有关的REPLY MESSAGE或FAILURE MESSAGE能够被识别. char replyqueue[TMQNAMELEN+1]:REPLY QUEUE的名称
char failurequeue[TMQNAMELEN+1]:FAILURE QUEUE的名称 CLIENTID cltid: 调用TPENQUUE()或tpdequeue()的客户端的ID
long urcode: tpreturn()中指定的urcode
long appkey: 如果TUXEDO应用系统采用安全认证方式,TUXEDO给通过认证的客户端返回
的一个KEY,用于标识该客户端。
tpenqueue()
int tpenqueue (char *qspace, char *qname, TPQCTL *ctl,char *data, long len, long
flags)
qspace: QUEUE SPACE的名称 qname: QUEUE的名称
ctl: 指向TMQCTL的指针
data: 要发送到名为QNAME的QUEUE的数据所对应的缓冲区 len: DATA缓冲区的长度
flags: 与TPCALL的设置相同,要注意的是TPNOTRAN
如果设置了TPNOTRAN,并不是说该调用不处于事务模式,只是表明
如果调用tpenqueue()的程序已处于事务模式中,那么tpenqueue()将单独启一个
事务.
TPQCTL中flag的值可以为:
TPNOFLAGS: 不对TPQCTL中的各字段进行设置,让它们采用默认值.
TPQTOP: 把该消息放在该QUEUE的顶部,使该消息最先被取出,在创建该
QUEUE时,当指定out-of-order enqueuing时,TOP选项必须被选上才行.
TPQBEFOREMSGID:把该消息放在MSGID为TPQCTL中的MSGID域指定的MSGID 所对应的消息之前.在创建该QUEUE时,当指定out-of-order enqueuing时,MSGID
选项必须被选上才行. TPQTIME_ABS:只有到了TPQCTL中的DEQ_TIME域指定的时间之后,该消息才能被取出.
TPQTIME_REL:只有从当前开始过了TPQCTL中的TPQTIME_REL域指定的时间之后,该消息才能
被取出.
TPQPRORITY:设置该消息的优先级,在TPQCTL中的TPQPRORITY域指定,范围为0-100
TPQCORRID: 如果该标志被设置,那么在进行tpdequeue()时,可以取出corrid=ctl-->corrid
的MESSAGE
TPQREPLYQ: 把该消息的处理结果放在TPQCTL中的TPQREPLYQ域指定的QUEUE中. TPQFAILUREQ:把该消息的处理结果放在TPQCTL中的TPQFAILUREQ域指定的QUEUE中.
如果tpenqueue()调用成功,在TPQCTL中的TPQMSGID域中返回该消息所对应的MSGID
tpenqueue()与事务:
如果调用 tpenqueue()的程序当前处于事务模式,并且tpenqueue()中的FLAGS参数没有设置TPNOTRAN,那么tpenqueue()处于当前事务中,如果该事务成功提交,那么tpenqueue()中发送的消息一定在QUEUE中, 如果该事务成功提交,那么tpenqueue()中发送的消息一定不会在QUEUE中
如果调用 tpenqueue()的程序当前不处于事务模式,或者tpenqueue()中的FLAGS参数设置了TPNOTRAN,那么tpenqueue()不处于当前的事务中,如果tpenqueue()返回成功,那么tpenqueue()中发送的消息一定在QUEUE中, 如果,那么tpenqueue()因为网络故障或事务超时而返回失败,那么调用tpenqueue()的程序将不能中发送的消息一定不会在QUEUE中
tpdequeue()
int tpdequeue (char *qspace, char *qname, TPQCTL *ctl,char **data, long *len, long flags)
从名为QNAME的QUEUE中取出一条消息.
qspace: QUEUE SPACE的名称 qname: QUEUE的名称
ctl: 指向TMQCTL的指针
data: 把该消息中的数据保存到该缓冲区中. len: 该消息中数据的长度
flags: 与TPCALL的设置相同,要注意的是TPNOTRAN
如果设置了TPNOTRAN,并不是说该调用不处于事务模式,只是表明 如果调用tpenqueue()的程序已处于事务模式中,那么tpenqueue() 将单独启一个事务.
TPQCTL中的flags的设置:
TPNOFLAGS: 不对TPQCTL中的各字段进行设置,让它们采用默认值. TPQGETBYMSGID: 从队列qname中取msgid=TPQCTL->msgid的消息
TPQGETBYCORRID: 从队列qname中取corrid =TPQCTL-> corrid的消息 TPQWAIT:
如果qname中没有消息,调用tpdequeue ()的进程要阻塞在那里,直到有 qname中有可以被取出的消息为止.
根据取出的消息的属性,TUXEDO自动为下面的字段赋值: TPQPRIORITY: 取出的消息的优先级 TPMSGID: 取出的消息的msgid TPQCORRID: TPQREPLYQ: TPQFAILUREQ:
取出的消息的corrid
取出的消息所对应的reply queue的名字 取出的消息所对应的failure queue的名字
10.4 一个简单的例子
qsample(在$TUXDIR/APPS/QSAMPLE目录下)是TUXEDO用于演示/Q的例子,客户机将一个包含字符串的消息放入STRING队列中,TMQFORWARD将该消息转发到STRING服务,要求它将消息体内的字符串转换成大写。处理完毕后,TMQFORWARD把响应消息放入响应队列RPLYQ,客户机再从RPLYQ中取出响应消息,进行打印输出。
qsample的服务端程序
服务端程序server.c的内容如下 #include
STRING(msg) TPSVCINFO *msg; {
char *str; int i;
/* set pointer to data portion of message */ str = (char *)msg->data;
/* convert string to upper case */ for (i = 0; i < msg->len; i++) { str[i] = toupper(str[i]);
}
/*通过tpreturn函数将转换结果传回给TMQFORWARD服务器,并表明转换成功*/
tpreturn(TPSUCCESS, 0, msg->data, 0L, 0); }
设置环境变量
除了设置QMCONFIG外,还需要根据实际情况设置:TUXDIR(Tuxedo的安装目录)、APPDIR和TUXCONFIG环境变量。
创建消息队列
qsample用到了一个队列空间QSPACE和三个队列STRING、RPLYQ与errque。在NT平台下创建它们的最好办法是使用nmake命令,即“c:\\qsample\\nmake/f qsapme.nt QUE”,当然也可以手工创建。
创建事务日志
TMS_QM在队列事务管理时,需要用到日志设备,创建办法也是用nmake命令: c:\\qsample\\nmake/f qsapme.nt TLOG
它执行了qsample.nt中预定义的一组Tuxedo命令。
qsample的客户程序
客户程序client.c的流程是:加入Tuxedo系统→消息入队→消息出队→退出系统。simpapp直接通过tpcall()函数调用TOUPPER服务,而qsample则将消息放入队列STRING,由TMQFORWARD服务器通过tpcall()调用STRING服务。 #include
int main(int argc char **argv) {
char *reqstr; long len; TPQCTL qctl;
/* string to be sent */ /* Q control structures */ /* length of return string */
/* join the application */
if (tpinit(NULL) == -1) { (void) fprintf(stderr,\ tpstrerror(tperrno)); exit(1); }
/* get request buffer */
if ((reqstr = tpalloc(\ (void) fprintf(stderr,\ tpstrerror(tperrno)); exit(1); }
(void) strcpy(reqstr,\(void) printf(\
/* set flag in control structure indicating a reply queue */ qctl.flags = TPQREPLYQ;
/* provide name of reply queue in control structure */ (void) strcpy(qctl.replyqueue, \
if (tpenqueue(\
0, 0) == -1) {
(void) fprintf(stderr,\ tpstrerror(tperrno)); if (tperrno == TPEDIAGNOSTIC) { (void) fprintf(stderr,\ qctl.diagnostic); }
tpfree((char *)reqstr);
}
}
(void) tpterm(); exit(1);
/* set flag indicating we will wait for a reply */
qctl.flags = TPQWAIT;
if (tpdequeue(\
&len, TPNOTIME) == -1) {
(void) fprintf(stderr,\ }
tpstrerror(tperrno));
(void) fprintf(stderr,\
qctl.diagnostic);
if (tperrno == TPEDIAGNOSTIC) {
tpfree(reqstr); (void) tpterm();
exit(1); }
(void) printf(\/* clean up and exit */ tpfree(reqstr); (void) tpterm(); return(0);
修改配置文件
配置文件ubb.sample定义了三个SERVER:server、TMQUEUE、TMQFORWARD和两个组:GROUP1和QUE1。SERVER提供STRING服务,TMQUEUE和TMQFORWARD属于QUE1组。QUE1组中的OPENINFO参数定义了队列资源管理器TUXEDO/QM、队列设备和队列空间。最后一行CLOPT参数中定义了将消息转发给STRING服务。
*RESOURCES IPCKEY 61234 DOMAINID MASTER MODEL
*MACHINES
DEMO LMID = SITE1 TUXDIR =\
APPDIR = \
TUXCONFIG = \ TLOGDEVICE =\
qsample SITE1 SHM
TLOGNAME=TLOG
TLOGSIZE=100 *GROUPS GROUP1
LMID = SITE1 GRPNO = 1 TMSNAME=TMS TMSCOUNT=2
QUE1
*SERVERS DEFAULT:
CLOPT=\
LMID = SITE1 GRPNO = 2 TMSNAME = TMS_QM TMSCOUNT = 2
OPENINFO = \
TMQUEUE SRVGRP = QUE1 SRVID = 1
GRACE = 0 RESTART = Y CONV = N MAXGEN=10 CLOPT = \
TMQFORWARD SRVGRP=QUE1 SRVID= 5 GRACE=0 RESTART=Y CONV=N MAXGEN=10 CLOPT=\
server
*SERVICES
编译运行
确保make文件qsample.nt中include和lib变量已经包含了Tuxedo系统的头文件和库文件,然后使用如下命令进行编译:
c:\\qsample\\nmake/f qsample.nt all
成功后,用tmloadcf命令加载配置文件,用tmboot命令启动应用程序,最后,运行客户程序client.exe,就可以看到STRING服务的处理结果。
SRVGRP=GROUP1 SRVID=1111
10.5 /Q与事务
TUXEDO的/Q也是一种资源管理器, 可以和其他资源管理器如数据库协作,完成全局事务。下面通过例子来说明在/Q中使用事务一些应注意的地方.
例子的UBBCONFIG文件的内容为:
*RESOURCES IPCKEY DOMAINID MASTER MODEL
52617 qsample SITE1 SHM
*MACHINES MYSERVER LMID = SITE1 TUXDIR =\
APPDIR = \
TUXCONFIG = \TLOGDEVICE =\TLOGNAME=TLOG TLOGSIZE=100 MAXWSCLIENTS=5
*GROUPS
GROUP1 LMID = SITE1 GRPNO = 1 TMSNAME=TMS TMSCOUNT=2 QUE1 \
LMID = SITE1 GRPNO = 2
TMSNAME = TMS_QM TMSCOUNT = 2
OPENINFO = \
GROUP3
LMID=SITE1 GRPNO=3
TMSNAME=\
OPENINFO=\
GROUP4 LMID=SITE1
GRPNO=4
*SERVERS DEFAULT:
CLOPT=\
TMQUEUE SRVGRP = QUE1 SRVID = 1
TMQFORWARD SRVGRP=QUE1 SRVID= 5 GRACE=0 RESTART=Y CONV=N MAXGEN=10
CLOPT=\
GRACE = 0 RESTART = Y CONV = N MAXGEN=10 CLOPT = \
server SRVGRP=GROUP3 SRVID=1111
test SRVGRP=GROUP3 SRVID=111
*SERVICES
1.如果时采用转发模式,那么与QUEUE同名的SERVCIE所在的GROUP,必须是配置了TMS才能正常工作。
假设在TUXEDO带的例子qsample中,有上面UBBCONFIG中列的4个GROUP,那么:
(1)当STRING在GROUP1中时要这样编译:
buildserver -o server -f server.c -s STRING 或
buildserver -o server -f server.c -s STRING -r NONE
说明:TMS是TUXEDO系统自带的一个TMS SERVER,它对应RM文件中的NONE:tmnull_switch: 。
实际上你当编译SERVER时,如果你不带-r 参数,TUXEDO会自动加上-r NONE的。 这时候在程序中你也可以调用tpbegin(),tpcommit()等与事务操作有关的ATMI
(2)当STRING在QUE1中时要这样编译:
buildserver -o server -f server.c -r TUXEDO/QM -s STRING
如果不加 -r TUXEDO/QM,启动时会出现如下错误: 193628.XCJ!server.1144: GP_CAT:214: ERROR: Null version of xa_open() found non-null info string
193628.XCJ!server.1144: LIBTUX_CAT:466: ERROR: tpopen TPERMERR xa_open returned XAER_INVAL
193628.XCJ!server.1144: LIBTUX_CAT:1353: ERROR: tpopen failed - TPERMERR - resource manager error
193628.XCJ!server.1144: LIBTUX_CAT:250: ERROR: tpsvrinit() failed
(3)当STRING在GROUUP3中时要这样编译:
buildserver -o server -f server.c -r Oracle_XA -s STRING
(4)在STRING在GROUP4中时要这样编译: buildserver -o server -f server.c -s STRING
server能启动,但tpenqueue()调用能成功,tpdequeue()调用不成功。因为GROUP4中没有TMS。
2. 在与QUEUE同名的那个SERVICE 中的事务处理
在与QUEUE同名的那个SERVICE中不能再开始一个事务,因为QMFORWORD会自动启动一个TRANSACTION,如果在
该SERVICE 中又启动一个事务,会导致事务的嵌套,而TUXEDO不支持事务的嵌套。下面以qsample为例说明,
假设UBBCONFIG中也包括上面列的4个GROUP,那么如果STRING SERVICE在GROUP3中,并且把STRING STRING改写成 如下各种方式.
(1)
#include
EXEC SQL INCLUDE sqlca;
EXEC ORACLE OPTION (RELEASE_CURSOR = YES);
STRING(msg)
TPSVCINFO *msg; {
if(tpbegin(30,0) == -1) { userlog(\char *str; int i;
}
tpreturn( TPFAIL, 0, NULL, 0, 0 );
}
EXEC SQL INSERT INTO ss VALUES(\if(tpcommit(0) == -1) { }
str = (char *)msg->data;
for (i = 0; i < msg->len; i++) {
str[i] = toupper(str[i]);
}
tpreturn(TPSUCCESS, 0, msg->data, 0L, 0);
userlog(\tpreturn( TPFAIL, 0, NULL, 0, 0 );
会出现错误:203414.XCJ!server.1392: gtrid x0 x3affcb5e x2bf: tpbegin() fail=TPEPROTO - protocol error
写表操作失败,并且tpdequeue也失败,TMQFORWORD把tpenqueue()发送的消息写到ERRQUE中.
(2)
#include
EXEC SQL INCLUDE sqlca;
EXEC ORACLE OPTION (RELEASE_CURSOR = YES);
STRING(TPSVCINFO *msg) {
EXEC SQL INSERT INTO ss VALUES('1113'); if(sqlca.sqlcode!=0) { }
userlog(\tpreturn(TPFAIL, 0, NULL, 0L, 0); char *str; int i;
百度搜索“70edu”或“70教育网”即可找到本站免费阅读全部范文。收藏本站方便下次阅读,70教育网,提供经典综合文库第10章:TUXEDO的可靠消息队列在线全文阅读。
相关推荐: