svn commit: r1812471 - in /tomcat/trunk: java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java webapps/docs/changelog.xml

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r1812471 - in /tomcat/trunk: java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java webapps/docs/changelog.xml

kfujino
Author: kfujino
Date: Wed Oct 18 08:25:03 2017
New Revision: 1812471

URL: http://svn.apache.org/viewvc?rev=1812471&view=rev
Log:
Ensure that remaining SelectionKeys that were not handled by throwing a ChannelException during SelectionKey processing are handled.

Modified:
    tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
    tomcat/trunk/webapps/docs/changelog.xml

Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java
URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java?rev=1812471&r1=1812470&r2=1812471&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java (original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/nio/ParallelNioSender.java Wed Oct 18 08:25:03 2017
@@ -20,8 +20,10 @@ import java.io.IOException;
 import java.net.UnknownHostException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -73,22 +75,25 @@ public class ParallelNioSender extends A
                     msg.getOptions()) == Channel.SEND_OPTIONS_USE_ACK;
             while ( (remaining>0) && (delta<getTimeout()) ) {
                 try {
-                    remaining -= doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg);
+                    SendResult result = doLoop(selectTimeout, getMaxRetryAttempts(),waitForAck,msg);
+                    remaining -= result.getCompleted();
+                    if (result.getFailed() != null) {
+                        remaining -= result.getFailed().getFaultyMembers().length;
+                        if (cx == null) cx = result.getFailed();
+                        else cx.addFaultyMember(result.getFailed().getFaultyMembers());
+                    }
                 } catch (Exception x ) {
                     if (log.isTraceEnabled()) log.trace("Error sending message", x);
-                    int faulty = (cx == null)?0:cx.getFaultyMembers().length;
-                    if ( cx == null ) {
+                    if (cx == null) {
                         if ( x instanceof ChannelException ) cx = (ChannelException)x;
                         else cx = new ChannelException(sm.getString("parallelNioSender.send.failed"), x);
-                    } else {
-                        if (x instanceof ChannelException) {
-                            cx.addFaultyMember(((ChannelException) x).getFaultyMembers());
-                        }
                     }
-                    //count down the remaining on an error
-                    if (faulty < cx.getFaultyMembers().length) {
-                        remaining -= (cx.getFaultyMembers().length - faulty);
+                    for (int i=0; i<senders.length; i++ ) {
+                        if (!senders[i].isComplete()) {
+                            cx.addFaultyMember(senders[i].getDestination(),x);
+                        }
                     }
+                    throw cx;
                 }
                 delta = System.currentTimeMillis() - start;
             }
@@ -118,13 +123,18 @@ public class ParallelNioSender extends A
 
     }
 
-    private int doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg)
-            throws IOException, ChannelException {
-        int completed = 0;
-        int selectedKeys = selector.select(selectTimeOut);
+    private SendResult doLoop(long selectTimeOut, int maxAttempts, boolean waitForAck, ChannelMessage msg)
+            throws ChannelException {
+        SendResult result = new SendResult();
+        int selectedKeys;
+        try {
+            selectedKeys = selector.select(selectTimeOut);
+        } catch (IOException ioe) {
+            throw new ChannelException(sm.getString("parallelNioSender.send.failed"), ioe);
+        }
 
         if (selectedKeys == 0) {
-            return 0;
+            return result;
         }
 
         Iterator<SelectionKey> it = selector.selectedKeys().iterator();
@@ -136,8 +146,8 @@ public class ParallelNioSender extends A
             NioSender sender = (NioSender) sk.attachment();
             try {
                 if (sender.process(sk,waitForAck)) {
-                    completed++;
                     sender.setComplete(true);
+                    result.complete(sender);
                     if ( Logs.MESSAGES.isTraceEnabled() ) {
                         Logs.MESSAGES.trace("ParallelNioSender - Sent msg:" +
                                 new UniqueId(msg.getUniqueId()) + " at " +
@@ -170,17 +180,18 @@ public class ParallelNioSender extends A
                     log.warn(sm.getString("parallelNioSender.sender.disconnected.notRetry", sender.getDestination().getName()));
                     ChannelException cx = new ChannelException(sm.getString("parallelNioSender.sender.disconnected.sendFailed"), x);
                     cx.addFaultyMember(sender.getDestination(),x);
-                    throw cx;
+                    result.failed(cx);
+                    break;
                 }
 
                 byte[] data = sender.getMessage();
-                if ( retry ) {
+                if (retry) {
                     try {
                         sender.disconnect();
                         sender.connect();
                         sender.setAttempt(attempt);
                         sender.setMessage(data);
-                    }catch ( Exception ignore){
+                    } catch (Exception ignore){
                         state.setFailing();
                     }
                 } else {
@@ -189,12 +200,31 @@ public class ParallelNioSender extends A
                                     Integer.toString(sender.getAttempt()),
                                     Integer.toString(maxAttempts)), x);
                     cx.addFaultyMember(sender.getDestination(),x);
-                    throw cx;
+                    result.failed(cx);
                 }//end if
             }
         }
-        return completed;
+        return result;
+
+    }
+
+    private static class SendResult {
+        private List<NioSender> completeSenders = new ArrayList<>();
+        private ChannelException exception = null;
+        private void complete(NioSender sender) {
+            if (!completeSenders.contains(sender)) completeSenders.add(sender);
+        }
+        private int getCompleted() {
+            return completeSenders.size();
+        }
+        private void failed(ChannelException cx){
+            if (exception == null) exception = cx;
+            exception.addFaultyMember(cx.getFaultyMembers());
+        }
 
+        private ChannelException getFailed() {
+            return exception;
+        }
     }
 
     private void connect(NioSender[] senders) throws ChannelException {

Modified: tomcat/trunk/webapps/docs/changelog.xml
URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/changelog.xml?rev=1812471&r1=1812470&r2=1812471&view=diff
==============================================================================
--- tomcat/trunk/webapps/docs/changelog.xml (original)
+++ tomcat/trunk/webapps/docs/changelog.xml Wed Oct 18 08:25:03 2017
@@ -118,6 +118,11 @@
         unintended <code>ChannelException</code> caused by comparing the number
         of failed members and the number of remaining Senders. (kfujino)
       </fix>
+      <fix>
+        Ensure that remaining SelectionKeys that were not handled by throwing a
+        <code>ChannelException</code> during SelectionKey processing are
+        handled. (kfujino)
+      </fix>
     </changelog>
   </subsection>
   <subsection name="Other">



---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]