svn commit: r1812474 - in /tomcat/tc7.0.x/trunk: java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java webapps/docs/changelog.xml

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r1812474 - in /tomcat/tc7.0.x/trunk: java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java webapps/docs/changelog.xml

kfujino
Author: kfujino
Date: Wed Oct 18 08:30:46 2017
New Revision: 1812474

URL: http://svn.apache.org/viewvc?rev=1812474&view=rev
Log:
-Ensure that the remaining Sender can send channel messages by avoiding unintended ChannelException caused by comparing the number of failed members and the number of remaining Senders.
-Ensure that remaining SelectionKeys that were not handled by throwing a ChannelException during SelectionKey processing are handled.

Modified:
    tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
    tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml

Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=1812474&r1=1812473&r2=1812474&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java (original)
+++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java Wed Oct 18 08:30:46 2017
@@ -21,8 +21,10 @@ import java.io.IOException;
 import java.net.UnknownHostException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -86,21 +88,26 @@ public class ParallelNioSender extends A
             boolean waitForAck = (Channel.SEND_OPTIONS_USE_ACK & msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK;
             while ( (remaining>0) && (delta<getTimeout()) ) {
                 try {
-                    remaining -= doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg);
+                    SendResult result = doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg);
+                    remaining -= result.getCompleted();
+                    if (result.getFailed() != null) {
+                        remaining -= result.getFailed().getFaultyMembers().length;
+                        if (cx == null) cx = result.getFailed();
+                        else cx.addFaultyMember(result.getFailed().getFaultyMembers());
+                    }
                 } catch (Exception x ) {
                     if (log.isTraceEnabled()) log.trace("Error sending message", x);
-                    int faulty = (cx == null)?0:cx.getFaultyMembers().length;
-                    if ( cx == null ) {
+                    if (cx == null) {
                         if ( x instanceof ChannelException ) cx = (ChannelException)x;
                         else cx = new ChannelException("Parallel NIO send failed.", x);
-                    } else {
-                        if (x instanceof ChannelException) cx.addFaultyMember( ( (ChannelException) x).getFaultyMembers());
                     }
-                    //count down the remaining on an error
-                    if (faulty<cx.getFaultyMembers().length) remaining -= (cx.getFaultyMembers().length-faulty);
+                    for (int i=0; i<senders.length; i++ ) {
+                        if (!senders[i].isComplete()) {
+                            cx.addFaultyMember(senders[i].getDestination(),x);
+                        }
+                    }
+                    throw cx;
                 }
-                //bail out if all remaining senders are failing
-                if ( cx != null && cx.getFaultyMembers().length == remaining ) throw cx;
                 delta = System.currentTimeMillis() - start;
             }
             if ( remaining > 0 ) {
@@ -123,12 +130,17 @@ public class ParallelNioSender extends A
 
     }
 
-    private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg) throws IOException, ChannelException {
-        int completed = 0;
-        int selectedKeys = selector.select(selectTimeOut);
+    private SendResult doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg) throws ChannelException {
+        SendResult result = new SendResult();
+        int selectedKeys;
+        try {
+            selectedKeys = selector.select(selectTimeOut);
+        } catch (IOException ioe) {
+            throw new ChannelException("Parallel NIO send failed.", ioe);
+        }
 
         if (selectedKeys == 0) {
-            return 0;
+            return result;
         }
 
         Iterator<SelectionKey> it = selector.selectedKeys().iterator();
@@ -140,8 +152,8 @@ public class ParallelNioSender extends A
             NioSender sender = (NioSender) sk.attachment();
             try {
                 if (sender.process(sk,waitForAck)) {
-                    completed++;
                     sender.setComplete(true);
+                    result.complete(sender);
                     if ( Logs.MESSAGES.isTraceEnabled() ) {
                         Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" + new UniqueId(msg.getUniqueId()) + " at " +new java.sql.Timestamp(System.currentTimeMillis())+ " to "+sender.getDestination().getName());
                     }
@@ -170,28 +182,47 @@ public class ParallelNioSender extends A
                     log.warn("Not retrying send for:" + sender.getDestination().getName() + "; Sender is disconnected.");
                     ChannelException cx = new ChannelException("Send failed, and sender is disconnected. Not retrying.",x);
                     cx.addFaultyMember(sender.getDestination(),x);
-                    throw cx;
+                    result.failed(cx);
+                    break;
                 }
 
                 byte[] data = sender.getMessage();
-                if ( retry ) {
+                if (retry) {
                     try {
                         sender.disconnect();
                         sender.connect();
                         sender.setAttempt(attempt);
                         sender.setMessage(data);
-                    }catch ( Exception ignore){
+                    } catch ( Exception ignore){
                         state.setFailing();
                     }
                 } else {
                     ChannelException cx = new ChannelException("Send failed, attempt:"+sender.getAttempt()+" max:"+maxAttempts,x);
                     cx.addFaultyMember(sender.getDestination(),x);
-                    throw cx;
+                    result.failed(cx);
                 }//end if
             }
         }
-        return completed;
+        return result;
+    }
+
+    private static class SendResult {
+        private List<NioSender> completeSenders = new ArrayList<NioSender>();
+        private ChannelException exception = null;
+        private void complete(NioSender sender) {
+            if (!completeSenders.contains(sender)) completeSenders.add(sender);
+        }
+        private int getCompleted() {
+            return completeSenders.size();
+        }
+        private void failed(ChannelException cx){
+            if (exception == null) exception = cx;
+            exception.addFaultyMember(cx.getFaultyMembers());
+        }
 
+        private ChannelException getFailed() {
+            return exception;
+        }
     }
 
     private void connect(NioSender[] senders) throws ChannelException {

Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1812474&r1=1812473&r2=1812474&view=diff
==============================================================================
--- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Wed Oct 18 08:30:46 2017
@@ -116,6 +116,16 @@
         than the actual setting value of <code>maxRetryAttempts</code>.
         (kfujino)
       </fix>
+      <fix>
+        Ensure that the remaining Sender can send channel messages by avoiding
+        unintended <code>ChannelException</code> caused by comparing the number
+        of failed members and the number of remaining Senders. (kfujino)
+      </fix>
+      <fix>
+        Ensure that remaining SelectionKeys that were not handled by throwing a
+        <code>ChannelException</code> during SelectionKey processing are
+        handled. (kfujino)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Other">



---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]