序
本文主要研究一下openmessaging-java
maven
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
<version>0.3.1-alpha</version>
</dependency>
复制
maven最新的是0.3.1-alpha,这里直接用源码的0.3.2-alpha-SNAPSHOT
producer
Producer
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/Producer.java
/**
* A {@code Producer} is a simple object used to send messages on behalf
* of a {@code MessagingAccessPoint}. An instance of {@code Producer} is
* created by calling the {@link MessagingAccessPoint#createProducer()} method.
* <p>
* It provides various {@code send} methods to send a message to a specified destination,
* which is a {@code Queue} in OMS.
* <p>
* {@link Producer#send(Message)} means send a message to the destination synchronously,
* the calling thread will block until the send request complete.
* <p>
* {@link Producer#sendAsync(Message)} means send a message to the destination asynchronously,
* the calling thread won't block and will return immediately. Since the send call is asynchronous
* it returns a {@link Future} for the send result.
*
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
public interface Producer extends MessageFactory, ServiceLifecycle {
/**
* Returns the attributes of this {@code Producer} instance.
* Changes to the return {@code KeyValue} are not reflected in physical {@code Producer}.
* <p>
* There are some standard attributes defined by OMS for {@code Producer}:
* <ul>
* <li> {@link OMSBuiltinKeys#PRODUCER_ID}, the unique producer id for a producer instance.
* <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code Producer}.
* </ul>
*
* @return the attributes
*/
KeyValue attributes();
/**
* Sends a message to the specified destination synchronously, the destination should be preset to
* {@link Message#sysHeaders()}, other header fields as well.
*
* @param message a message will be sent
* @return the successful {@code SendResult}
* @throws OMSMessageFormatException if an invalid message is specified.
* @throws OMSTimeOutException if the given timeout elapses before the send operation completes
* @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error.
*/
SendResult send(Message message);
/**
* Sends a message to the specified destination synchronously, using the specified attributes, the destination
* should be preset to {@link Message#sysHeaders()}, other header fields as well.
*
* @param message a message will be sent
* @param attributes the specified attributes
* @return the successful {@code SendResult}
* @throws OMSMessageFormatException if an invalid message is specified.
* @throws OMSTimeOutException if the given timeout elapses before the send operation completes
* @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error.
*/
SendResult send(Message message, KeyValue attributes);
/**
* Sends a transactional message to the specified destination synchronously, using the specified attributes,
* the destination should be preset to {@link Message#sysHeaders()}, other header fields as well.
* <p>
* A transactional message will be exposed to consumer if and only if the local transaction
* branch has been committed, or be discarded if local transaction has been rolled back.
*
* @param message a transactional message will be sent
* @param branchExecutor local transaction executor associated with the message
* @param attributes the specified attributes
* @return the successful {@code SendResult}
* @throws OMSMessageFormatException if an invalid message is specified.
* @throws OMSTimeOutException if the given timeout elapses before the send operation completes
* @throws OMSRuntimeException if the {@code Producer} fails to send the message due to some internal error.
*/
SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes);
/**
* Sends a message to the specified destination asynchronously, the destination should be preset to
* {@link Message#sysHeaders()}, other header fields as well.
* <p>
* The returned {@code Promise} will have the result once the operation completes, and the registered
* {@code FutureListener} will be notified, either because the operation was successful or because of an error.
*
* @param message a message will be sent
* @return the {@code Promise} of an asynchronous message send operation.
* @see Future
* @see FutureListener
*/
Future<SendResult> sendAsync(Message message);
/**
* Sends a message to the specified destination asynchronously, using the specified attributes, the destination
* should be preset to {@link Message#sysHeaders()}, other header fields as well.
* <p>
* The returned {@code Promise} will have the result once the operation completes, and the registered
* {@code FutureListener} will be notified, either because the operation was successful or because of an error.
*
* @param message a message will be sent
* @param attributes the specified attributes
* @return the {@code Promise} of an asynchronous message send operation.
* @see Future
* @see FutureListener
*/
Future<SendResult> sendAsync(Message message, KeyValue attributes);
/**
* Sends a message to the specified destination in one way, the destination should be preset to
* {@link Message.BuiltinKeys}, other header fields as well.
* <p>
* There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't
* care about the send result and also have no context to get the result.
*
* @param message a message will be sent
*/
void sendOneway(Message message);
/**
* Sends a message to the specified destination in one way, using the specified attributes, the destination
* should be preset to {@link Message.BuiltinKeys}, other header fields as well.
* <p>
* There is no {@code Promise} related or {@code RuntimeException} thrown. The calling thread doesn't
* care about the send result and also have no context to get the result.
*
* @param message a message will be sent
* @param properties the specified userHeaders
*/
void sendOneway(Message message, KeyValue properties);
/**
* Creates a {@code BatchMessageSender} to send message in batch manner.
*
* @return a {@code BatchMessageSender} instance
*/
BatchMessageSender createBatchMessageSender();
/**
* Adds a {@code ProducerInterceptor} to intercept send operations of producer.
*
* @param interceptor a producer interceptor
*/
void addInterceptor(ProducerInterceptor interceptor);
/**
* Removes a {@code ProducerInterceptor}
*
* @param interceptor a producer interceptor will be removed
*/
void removeInterceptor(ProducerInterceptor interceptor);
}
复制
- 这里提供了诸多发送消息的方法,也提供创建BatchMessageSender的方法
- 消息这里封装为Message,设置的属性封装为KeyValue,发送结果封装为SendResult
Message
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/Message.java
/**
* The {@code Message} interface is the root interface of all OMS messages, and the most commonly used OMS message is
* {@link BytesMessage}.
* <p>
* Most message-oriented middleware (MOM) products treat messages as lightweight entities that consist of
* header and body and is used by separate applications to exchange a piece of information,
* like <a href="http://rocketmq.apache.org/">Apache RocketMQ</a>.
* <p>
* The header contains fields used by the messaging system that describes the message's meta information,
* like QoS level, origin, destination, and so on, while the body contains the application data being transmitted.
* <p>
* As for the header, OMS defines two kinds types: System Header and User Header,
* with respect to flexibility in vendor implementation and user usage.
* <ul>
* <li>
* System Header, OMS defines some standard attributes that represent the characteristics of the message.
* </li>
* <li>
* User Header, some OMS vendors may require enhanced extra attributes of the message
* or some users may want to clarify some customized attributes to draw the body.
* OMS provides the improved scalability for these scenarios.
* </li>
* </ul>
* The body contains the application data being transmitted,
* which is generally ignored by the messaging system and simply transmitted to its destination.
* <p>
* In BytesMessage, the body is just a byte array, may be compressed and uncompressed
* in the transmitting process by the messaging system.
* The application is responsible for explaining the concrete content and format of the message body,
* OMS is never aware of that.
*
* The body part is placed in the implementation classes of {@code Message}.
*
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
public interface Message {
/**
* Returns all the system header fields of the {@code Message} object as a {@code KeyValue}.
*
* @return the system headers of a {@code Message}
* @see BuiltinKeys
*/
KeyValue sysHeaders();
/**
* Returns all the customized user header fields of the {@code Message} object as a {@code KeyValue}.
*
* @return the user headers of a {@code Message}
*/
KeyValue userHeaders();
/**
* Puts a {@code String}-{@code int} {@code KeyValue} entry to the system headers of a {@code Message}.
*
* @param key the key to be placed into the system headers
* @param value the value corresponding to <tt>key</tt>
*/
Message putSysHeaders(String key, int value);
/**
* Puts a {@code String}-{@code long} {@code KeyValue} entry to the system headers of a {@code Message}.
*
* @param key the key to be placed into the system headers
* @param value the value corresponding to <tt>key</tt>
*/
Message putSysHeaders(String key, long value);
/**
* Puts a {@code String}-{@code double} {@code KeyValue} entry to the system headers of a {@code Message}.
*
* @param key the key to be placed into the system headers
* @param value the value corresponding to <tt>key</tt>
*/
Message putSysHeaders(String key, double value);
/**
* Puts a {@code String}-{@code String} {@code KeyValue} entry to the system headers of a {@code Message}.
*
* @param key the key to be placed into the system headers
* @param value the value corresponding to <tt>key</tt>
*/
Message putSysHeaders(String key, String value);
/**
* Puts a {@code String}-{@code int} {@code KeyValue} entry to the user headers of a {@code Message}.
*
* @param key the key to be placed into the user headers
* @param value the value corresponding to <tt>key</tt>
*/
Message putUserHeaders(String key, int value);
/**
* Puts a {@code String}-{@code long} {@code KeyValue} entry to the user headers of a {@code Message}.
*
* @param key the key to be placed into the user headers
* @param value the value corresponding to <tt>key</tt>
*/
Message putUserHeaders(String key, long value);
/**
* Puts a {@code String}-{@code double} {@code KeyValue} entry to the user headers of a {@code Message}.
*
* @param key the key to be placed into the user headers
* @param value the value corresponding to <tt>key</tt>
*/
Message putUserHeaders(String key, double value);
/**
* Puts a {@code String}-{@code String} {@code KeyValue} entry to the user headers of a {@code Message}.
*
* @param key the key to be placed into the user headers
* @param value the value corresponding to <tt>key</tt>
*/
Message putUserHeaders(String key, String value);
/**
* Get message body
*
* @param type Message body type
* @param <T> Generic type
* @return message body
* @throws OMSMessageFormatException if the message body cannot be assigned to the specified type
*/
<T> T getBody(Class<T> type) throws OMSMessageFormatException;
interface BuiltinKeys {
/**
* The {@code MESSAGE_ID} header field contains a value that uniquely identifies
* each message sent by a {@code Producer}.
* <p>
* When a message is sent, MESSAGE_ID is assigned by the producer.
*/
String MESSAGE_ID = "MESSAGE_ID";
/**
* The {@code DESTINATION} header field contains the destination to which the message is being sent.
* <p>
* When a message is sent this value is set to the right {@code Queue}, then the message will be sent to
* the specified destination.
* <p>
* When a message is received, its destination is equivalent to the {@code Queue} where the message resides in.
*/
String DESTINATION = "DESTINATION";
//......
}
}
复制
- Message接口主要定义了设置header以及获取body的方法
- 另外还内置了BuiltinKeys的header,比如MESSAGE_ID,DESTINATION等
KeyValue
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/KeyValue.java
/**
* The {@code KeyValue} class represents a persistent set of attributes,
* which supports method chaining.
* <p>
* A {@code KeyValue} object only allows {@code String} keys and can contain four primitive type
* as values: {@code int}, {@code long}, {@code double}, {@code String}.
* <p>
* The {@code KeyValue} is a replacement of {@code Properties}, with simpler
* interfaces and reasonable entry limits.
* <p>
* A {@code KeyValue} object may be used in concurrent scenarios, so the implementation
* of {@code KeyValue} should consider concurrent related issues.
*
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
public interface KeyValue {
/**
* Inserts or replaces {@code int} value for the specified key.
*
* @param key the key to be placed into this {@code KeyValue} object
* @param value the value corresponding to <tt>key</tt>
*/
KeyValue put(String key, int value);
/**
* Inserts or replaces {@code long} value for the specified key.
*
* @param key the key to be placed into this {@code KeyValue} object
* @param value the value corresponding to <tt>key</tt>
*/
KeyValue put(String key, long value);
/**
* Inserts or replaces {@code double} value for the specified key.
*
* @param key the key to be placed into this {@code KeyValue} object
* @param value the value corresponding to <tt>key</tt>
*/
KeyValue put(String key, double value);
/**
* Inserts or replaces {@code String} value for the specified key.
*
* @param key the key to be placed into this {@code KeyValue} object
* @param value the value corresponding to <tt>key</tt>
*/
KeyValue put(String key, String value);
/**
* Searches for the {@code int} property with the specified key in this {@code KeyValue} object.
* If the key is not found in this property list, zero is returned.
*
* @param key the property key
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, int)
*/
int getInt(String key);
/**
* Searches for the {@code int} property with the specified key in this {@code KeyValue} object.
* If the key is not found in this property list, the default value argument is returned.
*
* @param key the property key
* @param defaultValue a default value
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, int)
*/
int getInt(String key, int defaultValue);
/**
* Searches for the {@code long} property with the specified key in this {@code KeyValue} object.
* If the key is not found in this property list, zero is returned.
*
* @param key the property key
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, long)
*/
long getLong(String key);
/**
* Searches for the {@code long} property with the specified key in this {@code KeyValue} object.
* If the key is not found in this property list, the default value argument is returned.
*
* @param key the property key
* @param defaultValue a default value
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, long)
*/
long getLong(String key, long defaultValue);
/**
* Searches for the {@code double} property with the specified key in this {@code KeyValue} object.
* If the key is not found in this property list, zero is returned.
*
* @param key the property key
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, double)
*/
double getDouble(String key);
/**
* Searches for the {@code double} property with the specified key in this {@code KeyValue} object.
* If the key is not found in this property list, the default value argument is returned.
*
* @param key the property key
* @param defaultValue a default value
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, double)
*/
double getDouble(String key, double defaultValue);
/**
* Searches for the {@code String} property with the specified key in this {@code KeyValue} object.
* If the key is not found in this property list, {@code null} is returned.
*
* @param key the property key
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, String)
*/
String getString(String key);
/**
* Searches for the {@code String} property with the specified key in this {@code KeyValue} object.
* If the key is not found in this property list, the default value argument is returned.
*
* @param key the property key
* @param defaultValue a default value
* @return the value in this {@code KeyValue} object with the specified key value
* @see #put(String, String)
*/
String getString(String key, String defaultValue);
/**
* Returns a {@link Set} view of the keys contained in this {@code KeyValue} object.
* <p>
* The set is backed by the {@code KeyValue}, so changes to the set are
* reflected in the @code KeyValue}, and vice-versa.
*
* @return the key set view of this {@code KeyValue} object.
*/
Set<String> keySet();
/**
* Tests if the specified {@code String} is a key in this {@code KeyValue}.
*
* @param key possible key
* @return <code>true</code> if and only if the specified key is in this {@code KeyValue}, <code>false</code>
* otherwise.
*/
boolean containsKey(String key);
}
复制
- 主要封装int、long、double、Sting这几种类型,其中key只能为String类型
SendResult
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/SendResult.java
/**
* The result of sending a OMS message to server
* with the message id and some attributes.
*
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
public interface SendResult {
/**
* The unique message id related to the {@code SendResult} instance.
*
* @return the message id
*/
String messageId();
}
复制
- 目前仅仅定义一个返回messageId的方法
BatchMessageSender
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/BatchMessageSender.java
/**
* A message sender created through {@link Producer#createBatchMessageSender()}, to send
* messages in batch manner, and commit or roll back at the appropriate time.
*
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
public interface BatchMessageSender {
/**
* Submits a message to this sender
*
* @param message a message to be sent
* @return this batch sender
*/
BatchMessageSender send(Message message);
/**
* Commits all the uncommitted messages in this sender.
*
* @throws OMSRuntimeException if the sender fails to commit the messages due to some internal error.
*/
void commit();
/**
* Discards all the uncommitted messages in this sender.
*/
void rollback();
/**
* Close this sender.
*/
void close();
}
复制
- 定义了send方法用于批量添加消息,这里的send语义不是太好,用pending好一些
- 然后定义了commit用于提交批量消息、rollback用于回滚、close用于关闭这个sender
consumer
PullConsumer
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/PullConsumer.java
/**
* A {@code PullConsumer} pulls messages from the specified queue,
* and supports submit the consume result by acknowledgement.
*
* @version OMS 1.0.0
* @see MessagingAccessPoint#createPullConsumer()
* @since OMS 1.0.0
*/
public interface PullConsumer extends ServiceLifecycle {
/**
* Returns the attributes of this {@code PullConsumer} instance.
* Changes to the return {@code KeyValue} are not reflected in physical {@code PullConsumer}.
* <p>
* There are some standard attributes defined by OMS for {@code PullConsumer}:
* <ul>
* <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance.
* <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PullConsumer}.
* </ul>
*
* @return the attributes
*/
KeyValue attributes();
/**
* Attaches the {@code PullConsumer} to a specified queue.
*
* @param queueName a specified queue
* @return this {@code PullConsumer} instance
*/
PullConsumer attachQueue(String queueName);
/**
* Attaches the {@code PullConsumer} to a specified queue with some specified attributes..
*
* @param queueName a specified queue
* @param attributes some specified attributes
* @return this {@code PullConsumer} instance
*/
PullConsumer attachQueue(String queueName, KeyValue attributes);
/**
* Detaches the {@code PullConsumer} from a specified queue.
* <p>
* After the success call, this consumer won't receive new message
* from the specified queue any more.
*
* @param queueName a specified queue
* @return this {@code PullConsumer} instance
*/
PullConsumer detachQueue(String queueName);
/**
* Receives the next message from the attached queues of this consumer.
* <p>
* This call blocks indefinitely until a message is arrives, the timeout expires,
* or until this {@code PullConsumer} is shut down.
*
* @return the next message received from the attached queues, or null if the consumer is
* concurrently shut down or the timeout expires
* @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error.
*/
Message receive();
/**
* Receives the next message from the attached queues of this consumer, using the specified attributes.
* <p>
* This call blocks indefinitely until a message is arrives, the timeout expires,
* or until this {@code PullConsumer} is shut down.
*
* @param attributes the specified attributes
* @return the next message received from the attached queues, or null if the consumer is
* concurrently shut down or the timeout expires
* @throws OMSRuntimeException if the consumer fails to pull the next message due to some internal error.
*/
Message receive(KeyValue attributes);
/**
* Acknowledges the specified and consumed message with the unique message receipt handle.
* <p>
* Messages that have been received but not acknowledged may be redelivered.
*
* @param receiptHandle the receipt handle associated with the consumed message
* @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.
*/
void ack(String receiptHandle);
/**
* Acknowledges the specified and consumed message with the specified attributes.
* <p>
* Messages that have been received but not acknowledged may be redelivered.
*
* @param receiptHandle the receipt handle associated with the consumed message
* @param attributes the specified attributes
* @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.
*/
void ack(String receiptHandle, KeyValue attributes);
}
复制
- 定义了attachQueue以及attachQueue方法,用于绑定及解绑队列
- receive方法用于接收消息,其实这里用receive语义不是太好,用pull可能好点
- ack方法用于ack消息
PushConsumer
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/PushConsumer.java
/**
* A {@code PushConsumer} receives messages from multiple queues, these messages are pushed from
* MOM server to {@code PushConsumer} client.
*
* @version OMS 1.0.0
* @see MessagingAccessPoint#createPushConsumer()
* @since OMS 1.0.0
*/
public interface PushConsumer extends ServiceLifecycle {
/**
* Returns the attributes of this {@code PushConsumer} instance.
* Changes to the return {@code KeyValue} are not reflected in physical {@code PushConsumer}.
* <p>
* There are some standard attributes defined by OMS for {@code PushConsumer}:
* <ul>
* <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance.
* <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code PushConsumer}.
* </ul>
*
* @return the attributes
*/
KeyValue attributes();
/**
* Resumes the {@code PushConsumer} after a suspend.
* <p>
* This method resumes the {@code PushConsumer} instance after it was suspended.
* The instance will not receive new messages between the suspend and resume calls.
*
* @throws OMSRuntimeException if the instance has not been suspended.
* @see PushConsumer#suspend()
*/
void resume();
/**
* Suspends the {@code PushConsumer} for later resumption.
* <p>
* This method suspends the consumer until it is resumed.
* The consumer will not receive new messages between the suspend and resume calls.
* <p>
* This method behaves exactly as if it simply performs the call {@code suspend(0)}.
*
* @throws OMSRuntimeException if the instance is not currently running.
* @see PushConsumer#resume()
*/
void suspend();
/**
* Suspends the {@code PushConsumer} for later resumption.
* <p>
* This method suspends the consumer until it is resumed or a
* specified amount of time has elapsed.
* The consumer will not receive new messages during the suspended state.
* <p>
* This method is similar to the {@link #suspend()} method, but it allows finer control
* over the amount of time to suspend, and the consumer will be suspended until it is resumed
* if the timeout is zero.
*
* @param timeout the maximum time to suspend in milliseconds.
* @throws OMSRuntimeException if the instance is not currently running.
*/
void suspend(long timeout);
/**
* This method is used to find out whether the {@code PushConsumer} is suspended.
*
* @return true if this {@code PushConsumer} is suspended, false otherwise
*/
boolean isSuspended();
/**
* Attaches the {@code PushConsumer} to a specified queue, with a {@code MessageListener}.
* <p>
* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new
* delivered message is coming.
*
* @param queueName a specified queue
* @param listener a specified listener to receive new message
* @return this {@code PushConsumer} instance
*/
PushConsumer attachQueue(String queueName, MessageListener listener);
/**
* Attaches the {@code PushConsumer} to a specified queue, with a {@code MessageListener} and some
* specified attributes.
* <p>
* {@link MessageListener#onReceived(Message, MessageListener.Context)} will be called when new
* delivered message is coming.
*
* @param queueName a specified queue
* @param listener a specified listener to receive new message
* @param attributes some specified attributes
* @return this {@code PushConsumer} instance
*/
PushConsumer attachQueue(String queueName, MessageListener listener, KeyValue attributes);
/**
* Detaches the {@code PushConsumer} from a specified queue.
* <p>
* After the success call, this consumer won't receive new message
* from the specified queue any more.
*
* @param queueName a specified queue
* @return this {@code PushConsumer} instance
*/
PushConsumer detachQueue(String queueName);
/**
* Adds a {@code ConsumerInterceptor} instance to this consumer.
*
* @param interceptor an interceptor instance
*/
void addInterceptor(ConsumerInterceptor interceptor);
/**
* Removes an interceptor from this consumer.
*
* @param interceptor an interceptor to be removed
*/
void removeInterceptor(ConsumerInterceptor interceptor);
}
复制
- 定义了attachQueue以及detachQueue方法用于绑定及解绑队列
- attachQueue方法有个MessageListener用于消息push的时候的回调处理
MessageListener
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/MessageListener.java
/**
* A message listener must implement this {@code MessageListener} interface and register
* itself to a consumer instance to asynchronously receive messages.
*
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
public interface MessageListener {
/**
* Callback method to receive incoming messages.
* <p>
* A message listener should handle different types of {@code Message}.
*
* @param message the received message object
* @param context the context delivered to the consume thread
*/
void onReceived(Message message, Context context);
interface Context {
/**
* Returns the attributes of this {@code MessageContext} instance.
*
* @return the attributes
*/
KeyValue attributes();
/**
* Acknowledges the specified and consumed message, which is related to this {@code MessageContext}.
* <p>
* Messages that have been received but not acknowledged may be redelivered.
*
* @throws OMSRuntimeException if the consumer fails to acknowledge the messages due to some internal error.
*/
void ack();
}
}
复制
- MessageListener定义了onReceived的回调方法,同时传递Context上下文对象
StreamingConsumer
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/StreamingConsumer.java
/**
* A {@code StreamingConsumer} provides low level APIs to open multiple streams
* from a specified queue and then retrieve messages from them through @{code StreamingIterator}.
*
* A {@code Queue} is consists of multiple streams, the {@code Stream} is an abstract concept and
* can be associated with partition in most messaging systems.
*
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
public interface StreamingConsumer extends ServiceLifecycle {
/**
* Returns the attributes of this {@code StreamingConsumer} instance.
* Changes to the return {@code KeyValue} are not reflected in physical {@code StreamingConsumer}.
* <p>
* There are some standard attributes defined by OMS for {@code StreamingConsumer}:
* <ul>
* <li> {@link OMSBuiltinKeys#CONSUMER_ID}, the unique consumer id for a consumer instance.
* <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code
* StreamingConsumer}.
* </ul>
*
* @return the attributes
*/
KeyValue attributes();
/**
* Creates a {@code StreamingIterator} from the end position of the specified stream.
*
* @param streamName the specified stream
* @return a message iterator at the begin position.
*/
StreamingIterator seekToEnd(String streamName);
/**
* Creates a {@code StreamingIterator} from the begin position of the specified stream.
*
* @param streamName the specified stream
* @return a message iterator at the begin position.
*/
StreamingIterator seekToBeginning(String streamName);
/**
* Creates a {@code StreamingIterator} from the fixed position of the specified stream.
* <p>
* Creates a {@code StreamingIterator} from the begin position if the given position
* is earlier than the first message's store position in this stream.
* <p>
* Creates a {@code StreamingIterator} from the end position, if the given position
* is later than the last message's store position in this stream.
* <p>
* The position is a {@code String} value, may represented by timestamp, offset, cursor,
* even a casual key.
*
* @param streamName the specified stream
* @param position the specified position
* @return a message iterator at the specified position
*/
StreamingIterator seek(String streamName, String position);
}
复制
- 流式处理主要是定义seek方法,返回的是StreamingIterator
StreamingIterator
openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/consumer/StreamingIterator.java
/**
* A {@code StreamingIterator} is provided by {@code Stream} and is used to
* retrieve messages a specified stream like a read-only iterator.
*
* @version OMS 1.0.0
* @since OMS 1.0.0
*/
public interface StreamingIterator {
/**
* Returns the attributes of this {@code StreamingIterator} instance.
* <p>
* There are some standard attributes defined by OMS for {@code Stream}:
* <ul>
* <li> {@link OMSBuiltinKeys#OPERATION_TIMEOUT}, the default timeout period for operations of {@code
* Stream}.
* </ul>
*
* @return the attributes
*/
KeyValue attributes();
/**
* Returns {@code true} if this iterator has more messages when
* traversing the iterator in the forward direction.
*
* @return {@code true} if the iterator has more messages
*/
boolean hasNext();
/**
* Returns the next message in the iteration and advances the offset position.
* <p>
* This method may be called repeatedly to iterate through the iteration,
* or intermixed with calls to {@link #previous} to go back and forth.
*
* @return the next message in the list
* @throws OMSRuntimeException if the iteration has no more message, or
* the the consumer fails to receive the next message
*/
Message next();
/**
* Returns {@code true} if this partition iterator has more messages when
* traversing the iterator in the reverse direction.
*
* @return {@code true} if the partition iterator has more messages when
* traversing the iterator in the reverse direction
*/
boolean hasPrevious();
/**
* Returns the previous message in the iteration and moves the offset
* position backwards.
* <p>
* This method may be called repeatedly to iterate through the iteration backwards,
* or intermixed with calls to {@link #next} to go back and forth.
*
* @return the previous message in the list
* @throws OMSRuntimeException if the iteration has no previous message, or
* the the consumer fails to receive the previous message
*/
Message previous();
/**
* Returns the position of the message that would be returned by a
* subsequent call to {@link #next}.
*
* @return the position of the next message
* @throws OMSRuntimeException if the iteration has no next message
*/
String nextPosition();
/**
* Returns the position of the message that would be returned by a
* subsequent call to {@link #previous}.
*
* @return the position of the previous message
* @throws OMSRuntimeException if the iteration has no previous message
*/
String previousPosition();
}
复制
- 是一个read-only的iterator,类似java的iterator,提供了hasNext、next等方法
小结
openmessaging-java的定义java实现OpenMessaging的api规范,其中producer提供了单个发送也提供了批量发送的方法,而consumer则提供了pull、push以及stream三类消费方式。
doc
- openmessaging.cloud
- OpenMessaging Runtime Interface for Java