Oracle Advanced Queueing with .Net
I can't help you with the best practices, but I can help you with a UDT Queue. Before you deal with the queue, you need to generate custom types from the database into your C# project. Assuming you have Visual Studio and ODP.NET installed, you simply need to connect to the database through the Server Explorer, locate your UDTs, right click and choose "Generate Custom Class..." These classes map directly to your UDTs and are used to store the Dequeued information.
Here is an example of the code you would use to enqueue a message:
private void main(string[] args)
{
string _connstring = "Data Source=host/DB;User
Id=USER;Password=PASSWORD1;";
OracleConnection _connObj = new OracleConnection(_connstring);
// Create a new queue object
OracleAQQueue _queueObj = new OracleAQQueue("UDT_NAME", _connObj);
_connObj.Open();
OracleTransaction _txn = _connObj.BeginTransaction();
// Set the payload type to your UDT
_queueObj.MessageType = OracleAQMessageType.Udt;
_queueObj.UdtTypeName = "UDT_NAME";
// Create a new message object
OracleAQMessage _msg = new OracleAQMessage();
// Create an instance of JobClass and pass it in as the payload for the
// message
UDT_CUSTOM_CLASS _custClass = new UDT_CUSTOM_CLASS();
// Load up all of the properties of custClass
custClass.CustString = "Custom String";
custClass.CustInt = 5;
_msg.Payload = custClass;
// Enqueue the message
_queueObj.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
_queueObj.Enqueue(_msg);
_txn.Commit();
_queueObj.Dispose();
_connObj.Close();
_connObj.Dispose();
_connObj = null;
}
It's a similar process to dequeue:
private void main(string[] args)
{
string _connstring = "Data Source=host/DB;User
Id=USER;Password=PASSWORD1;";
OracleConnection _connObj = new OracleConnection(_connstring);
// Create a new queue object
OracleAQQueue _queueObj = new OracleAQQueue("UDT_NAME", _connObj);
// Set the payload type to your UDT
_queueObj.MessageType = OracleAQMessageType.Udt;
_queueObj.UdtTypeName = "UDT_NAME";
_connObj.Open();
OracleTransaction _txn = _connObj.BeginTransaction();
// Dequeue the message.
_queueObj.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
_queueObj.DequeueOptions.Wait = 10;
OracleAQMessage _deqMsg = _queueObj.Dequeue();
UDT_CUSTOM_CLASS data = (UDT_CUSTOM_CLASS)_deqMsg.Payload;
// At this point, you have the data and can do whatever you need to do with it
_txn.Commit();
_queueObj.Dispose();
_connObj.Close();
_connObj.Dispose();
_connObj = null;
}
That's a "simple" example. I pulled most of that out of Pro ODP.NET for Oracle Database 11g by Ed Zehoo. It's an excellent book and I strongly recommend it to help you gain a better understanding of the ins and outs of all things OPD.NET. You can buy the eBook here: http://apress.com/book/view/9781430228202. If you enter the coupon code MACWORLDOC, you can get the eBook for $21.00. That offer is only good for the eBook which comes in a password protected PDF format. I hope this helps!
I don't know the exact answer to this problem but here is what we did:
- First every .net application that need to listen on the ESB (ESB is build on AQ) has to use his own local Oracle DB and dequeue messages from there. The messages are propagated to the local queues. This solves the potential scalability problem linked to keeping a DB connection open to recieve messages.
- Second we built our own AQ library that basicly encapsulate stored procedures. - this is not needed any more as Oracle has finaly released an the ODAC 11.1.0.7.20 (with an ODP.NET that supports AQ). We use Oracle types as sort of DTO to define the message contracts.
I had a requirement where I had to enqueue/dequeue UDT messages to a queue. This post was really helpful. It has almost everything but creation of a "Oracle Custom Type" is missing. I thought its worth adding that code here so that the solution is complete.
To EnQueue/DeQueue in Oracle:
User with role "AQ_ADMINISTRATOR_ROLE" has to be created. In the example below, the "AQUSER" is created with that role.
PL Sql to EnQueue:
DECLARE
queue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_id RAW(16);
my_message AQUSER.USER_DEFINED_TYPE;
BEGIN
my_message := AQUSER.USER_DEFINED_TYPE('XXX','YYY','ZZZ');
DBMS_AQ.ENQUEUE(
queue_name => 'AQUSER.QUEUE_NAME',
enqueue_options => queue_options,
message_properties => message_properties,
payload => my_message,
msgid => message_id);
COMMIT;
END;
/
PL SQL to DeQueue
DECLARE
queue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_id RAW(2000);
my_message AQUSER.USER_DEFINED_TYPE;
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'AQUSER.QUEUE_NAME',
dequeue_options => queue_options,
message_properties => message_properties,
payload => my_message,
msgid => message_id );
COMMIT;
END;
/
-------------------------------------------------------------------------------------------
To create a Oracle Custom Type, you can use the following code:
public class CustomMessageType : IOracleCustomType, INullable
{
[OracleObjectMappingAttribute("XXXXX")]
public string XXXXX { get; set; }
[OracleObjectMappingAttribute("YYYYY")]
public string YYYYY { get; set; }
[OracleObjectMappingAttribute("ZZZZZ")]
public string ZZZZZ { get; set; }
public void FromCustomObject(Oracle.DataAccess.Client.OracleConnection con, IntPtr pUdt)
{
if (!string.IsNullOrEmpty(XXXXX))
{
OracleUdt.SetValue(con, pUdt, "XXXXX", XXXXX);
}
if (!string.IsNullOrEmpty(YYYYY))
{
OracleUdt.SetValue(con, pUdt, "YYYYY", YYYYY);
}
if (!string.IsNullOrEmpty(ZZZZZ))
{
OracleUdt.SetValue(con, pUdt, "ZZZZZ", ZZZZZ);
}
}
public void ToCustomObject(Oracle.DataAccess.Client.OracleConnection con, IntPtr pUdt)
{
XXXXX = (string)OracleUdt.GetValue(con, pUdt, "XXXXX");
YYYYY = (string)OracleUdt.GetValue(con, pUdt, "YYYYY");
ZZZZZ = (string)OracleUdt.GetValue(con, pUdt, "ZZZZZ");
}
public bool IsNull { get; set; }
}
[OracleCustomTypeMappingAttribute("SCHEMA.CUSTOM_TYPE")]
public class QueueMessageTypeFactory : IOracleCustomTypeFactory
{
public IOracleCustomType CreateObject()
{
return new CustomMessageType();
}
}