天天看點

如何合理地估算線程池大小?

如何合理地估算線程池大小?

這個問題雖然看起來很小,卻并不那麼容易回答。大家如果有更好的方法歡迎賜教,先來一個天真的估算方法:假設要求一個系統的tps(transaction per second或者task per second)至少為20,然後假設每個transaction由一個線程完成,繼續假設平均每個線程處理一個transaction的時間為4s。那麼問題轉化為:

如何設計線程池大小,使得可以在1s内處理完20個transaction?

計算過程很簡單,每個線程的處理能力為0.25tps,那麼要達到20tps,顯然需要20/0.25=80個線程。

很顯然這個估算方法很天真,因為它沒有考慮到cpu數目。一般伺服器的cpu核數為16或者32,如果有80個線程,那麼肯定會帶來太多不必要的線程上下文切換開銷。

再來第二種簡單的但不知是否可行的方法(n為cpu總核數):

如果是cpu密集型應用,則線程池大小設定為n+1

如果是io密集型應用,則線程池大小設定為2n+1

如果一台伺服器上隻部署這一個應用并且隻有這一個線程池,那麼這種估算或許合理,具體還需自行測試驗證。

接下來在這個文檔:伺服器性能io優化 中發現一個估算公式:

<code>1</code>

<code>最佳線程數目 = ((線程等待時間+線程cpu時間)/線程cpu時間 )* cpu數目</code>

比如平均每個線程cpu運作時間為0.5s,而線程等待時間(非cpu運作時間,比如io)為1.5s,cpu核心數為8,那麼根據上面這個公式估算得到:((0.5+1.5)/0.5)*8=32。這個公式進一步轉化為:

<code>最佳線程數目 = (線程等待時間與線程cpu時間之比 + 1)* cpu數目</code>

可以得出一個結論:

線程等待時間所占比例越高,需要越多線程。線程cpu時間所占比例越高,需要越少線程。

上一種估算方法也和這個結論相合。

一個系統最快的部分是cpu,是以決定一個系統吞吐量上限的是cpu。增強cpu處理能力,可以提高系統吞吐量上限。但根據短闆效應,真實的系統吞吐量并不能單純根據cpu來計算。那要提高系統吞吐量,就需要從“系統短闆”(比如網絡延遲、io)着手:

盡量提高短闆操作的并行化比率,比如多線程下載下傳技術

增強短闆能力,比如用nio替代io

第一條可以聯系到amdahl定律,這條定律定義了串行系統并行化後的加速比計算公式:

<code>加速比=優化前系統耗時 / 優化後系統耗時</code>

加速比越大,表明系統并行化的優化效果越好。addahl定律還給出了系統并行度、cpu數目和加速比的關系,加速比為speedup,系統串行化比率(指串行執行代碼所占比率)為f,cpu數目為n:

<code>speedup &lt;=</code><code>1</code> <code>/ (f + (</code><code>1</code><code>-f)/n)</code>

當n足夠大時,串行化比率f越小,加速比speedup越大。

寫到這裡,我突然冒出一個問題。

是否使用線程池就一定比使用單線程高效呢?

答案是否定的,比如redis就是單線程的,但它卻非常高效,基本操作都能達到十萬量級/s。從線程這個角度來看,部分原因在于:

多線程帶來線程上下文切換開銷,單線程就沒有這種開銷

當然“redis很快”更本質的原因在于:redis基本都是記憶體操作,這種情況下單線程可以很高效地利用cpu。而多線程适用場景一般是:存在相當比例的io和網絡操作。

是以即使有上面的簡單估算方法,也許看似合理,但實際上也未必合理,都需要結合系統真實情況(比如是io密集型或者是cpu密集型或者是純記憶體操作)和硬體環境(cpu、記憶體、硬碟讀寫速度、網絡狀況等)來不斷嘗試達到一個符合實際的合理估算值。

最後來一個“dark magic”估算方法(因為我暫時還沒有搞懂它的原理),使用下面的類:

<code>001</code>

<code>package</code> <code>pool_size_calculate;</code>

<code>002</code>

<code>003</code>

<code>import</code> <code>java.math.bigdecimal;</code>

<code>004</code>

<code>import</code> <code>java.math.roundingmode;</code>

<code>005</code>

<code>import</code> <code>java.util.timer;</code>

<code>006</code>

<code>import</code> <code>java.util.timertask;</code>

<code>007</code>

<code>import</code> <code>java.util.concurrent.blockingqueue;</code>

<code>008</code>

<code>009</code>

<code>/**</code>

<code>010</code>

<code> </code><code>* a class that calculates the optimal thread pool boundaries. it takes the</code>

<code>011</code>

<code> </code><code>* desired target utilization and the desired work queue memory consumption as</code>

<code>012</code>

<code> </code><code>* input and retuns thread count and work queue capacity.</code>

<code>013</code>

<code> </code><code>*</code>

<code>014</code>

<code> </code><code>* @author niklas schlimm</code>

<code>015</code>

<code>016</code>

<code> </code><code>*/</code>

<code>017</code>

<code>public</code> <code>abstract</code> <code>class</code> <code>poolsizecalculator {</code>

<code>018</code>

<code>019</code>

<code>    </code><code>/**</code>

<code>020</code>

<code>     </code><code>* the sample queue size to calculate the size of a single {@link runnable}</code>

<code>021</code>

<code>     </code><code>* element.</code>

<code>022</code>

<code>     </code><code>*/</code>

<code>023</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>sample_queue_size =</code><code>1000</code><code>;</code>

<code>024</code>

<code>025</code>

<code>026</code>

<code>     </code><code>* accuracy of test run. it must finish within 20ms of the testtime</code>

<code>027</code>

<code>     </code><code>* otherwise we retry the test. this could be configurable.</code>

<code>028</code>

<code>029</code>

<code>    </code><code>private</code> <code>final</code> <code>int</code> <code>epsylon =</code><code>20</code><code>;</code>

<code>030</code>

<code>031</code>

<code>032</code>

<code>     </code><code>* control variable for the cpu time investigation.</code>

<code>033</code>

<code>034</code>

<code>    </code><code>private</code> <code>volatile</code> <code>boolean</code> <code>expired;</code>

<code>035</code>

<code>036</code>

<code>037</code>

<code>     </code><code>* time (millis) of the test run in the cpu time calculation.</code>

<code>038</code>

<code>039</code>

<code>    </code><code>private</code> <code>final</code> <code>long</code> <code>testtime =</code><code>3000</code><code>;</code>

<code>040</code>

<code>041</code>

<code>042</code>

<code>     </code><code>* calculates the boundaries of a thread pool for a given {@link runnable}.</code>

<code>043</code>

<code>     </code><code>*</code>

<code>044</code>

<code>     </code><code>* @param targetutilization</code>

<code>045</code>

<code>046</code>

<code>                </code><code>throw</code> <code>new</code> <code>illegalstateexception(</code><code>"test not accurate"</code><code>);</code>

<code>047</code>

<code>            </code><code>}</code>

<code>048</code>

<code>            </code><code>expired =</code><code>false</code><code>;</code>

<code>049</code>

<code>            </code><code>start = system.currenttimemillis();</code>

<code>050</code>

<code>            </code><code>timer timer =</code><code>new</code> <code>timer();</code>

<code>051</code>

<code>            </code><code>timer.schedule(</code><code>new</code> <code>timertask() {</code>

<code>052</code>

<code>                </code><code>public</code> <code>void</code> <code>run() {</code>

<code>053</code>

<code>                    </code><code>expired =</code><code>true</code><code>;</code>

<code>054</code>

<code>                </code><code>}</code>

<code>055</code>

<code>            </code><code>}, testtime);</code>

<code>056</code>

<code>            </code><code>while</code> <code>(!expired) {</code>

<code>057</code>

<code>                </code><code>task.run();</code>

<code>058</code>

<code>059</code>

<code>            </code><code>start = system.currenttimemillis() - start;</code>

<code>060</code>

<code>            </code><code>timer.cancel();</code>

<code>061</code>

<code>        </code><code>}</code><code>while</code> <code>(math.abs(start - testtime) &gt; epsylon);</code>

<code>062</code>

<code>        </code><code>collectgarbage(</code><code>3</code><code>);</code>

<code>063</code>

<code>    </code><code>}</code>

<code>064</code>

<code>065</code>

<code>    </code><code>private</code> <code>void</code> <code>collectgarbage(</code><code>int</code> <code>times) {</code>

<code>066</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt; times; i++) {</code>

<code>067</code>

<code>            </code><code>system.gc();</code>

<code>068</code>

<code>            </code><code>try</code> <code>{</code>

<code>069</code>

<code>                </code><code>thread.sleep(</code><code>10</code><code>);</code>

<code>070</code>

<code>            </code><code>}</code><code>catch</code> <code>(interruptedexception e) {</code>

<code>071</code>

<code>                </code><code>thread.currentthread().interrupt();</code>

<code>072</code>

<code>                </code><code>break</code><code>;</code>

<code>073</code>

<code>074</code>

<code>        </code><code>}</code>

<code>075</code>

<code>076</code>

<code>077</code>

<code>078</code>

<code>     </code><code>* calculates the memory usage of a single element in a work queue. based on</code>

<code>079</code>

<code>     </code><code>* heinz kabbutz' ideas</code>

<code>080</code>

<code>081</code>

<code>082</code>

<code>     </code><code>* @return memory usage of a single {@link runnable} element in the thread</code>

<code>083</code>

<code>     </code><code>*         pools work queue</code>

<code>084</code>

<code>085</code>

<code>    </code><code>public</code> <code>long</code> <code>calculatememoryusage() {</code>

<code>086</code>

<code>        </code><code>blockingqueue queue = createworkqueue();</code>

<code>087</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i =</code><code>0</code><code>; i &lt; sample_queue_size; i++) {</code>

<code>088</code>

<code>            </code><code>queue.add(creattask());</code>

<code>089</code>

<code>090</code>

<code>        </code><code>long</code> <code>mem0 = runtime.getruntime().totalmemory()</code>

<code>091</code>

<code>                </code><code>- runtime.getruntime().freememory();</code>

<code>092</code>

<code>        </code><code>long</code> <code>mem1 = runtime.getruntime().totalmemory()</code>

<code>093</code>

<code>094</code>

<code>        </code><code>queue =</code><code>null</code><code>;</code>

<code>095</code>

<code>        </code><code>collectgarbage(</code><code>15</code><code>);</code>

<code>096</code>

<code>        </code><code>mem0 = runtime.getruntime().totalmemory()</code>

<code>097</code>

<code>098</code>

<code>        </code><code>queue = createworkqueue();</code>

<code>099</code>

<code>100</code>

<code>101</code>

<code>102</code>

<code>103</code>

<code>        </code><code>mem1 = runtime.getruntime().totalmemory()</code>

<code>104</code>

<code>105</code>

<code>        </code><code>return</code> <code>(mem1 - mem0) / sample_queue_size;</code>

<code>106</code>

<code>107</code>

<code>108</code>

<code>109</code>

<code>     </code><code>* create your runnable task here.</code>

<code>110</code>

<code>111</code>

<code>     </code><code>* @return an instance of your runnable task under investigation</code>

<code>112</code>

<code>113</code>

<code>    </code><code>protected</code> <code>abstract</code> <code>runnable creattask();</code>

<code>114</code>

<code>115</code>

<code>116</code>

<code>     </code><code>* return an instance of the queue used in the thread pool.</code>

<code>117</code>

<code>118</code>

<code>     </code><code>* @return queue instance</code>

<code>119</code>

<code>120</code>

<code>    </code><code>protected</code> <code>abstract</code> <code>blockingqueue createworkqueue();</code>

<code>121</code>

<code>122</code>

<code>123</code>

<code>     </code><code>* calculate current cpu time. various frameworks may be used here,</code>

<code>124</code>

<code>     </code><code>* depending on the operating system in use. (e.g.</code>

<code>125</code>

<code>126</code>

<code>     </code><code>* measurement, the more accurate the results for thread count boundaries.</code>

<code>127</code>

<code>128</code>

<code>     </code><code>* @return current cpu time of current thread</code>

<code>129</code>

<code>130</code>

<code>    </code><code>protected</code> <code>abstract</code> <code>long</code> <code>getcurrentthreadcputime();</code>

<code>131</code>

<code>132</code>

<code>}</code>

然後自己繼承這個抽象類并實作它的三個抽象方法,比如下面是我寫的一個示例(任務是請求網絡資料),其中我指定期望cpu使用率為1.0(即100%),任務隊列總大小不超過100,000位元組:

<code>01</code>

<code>02</code>

<code>03</code>

<code>import</code> <code>java.io.bufferedreader;</code>

<code>04</code>

<code>import</code> <code>java.io.ioexception;</code>

<code>05</code>

<code>import</code> <code>java.io.inputstreamreader;</code>

<code>06</code>

<code>import</code> <code>java.lang.management.managementfactory;</code>

<code>07</code>

<code>08</code>

<code>import</code> <code>java.net.httpurlconnection;</code>

<code>09</code>

<code>import</code> <code>java.net.url;</code>

<code>10</code>

<code>11</code>

<code>import</code> <code>java.util.concurrent.linkedblockingqueue;</code>

<code>12</code>

<code>13</code>

<code>public</code> <code>class</code> <code>simplepoolsizecaculatorimpl</code><code>extends</code> <code>poolsizecalculator {</code>

<code>14</code>

<code>15</code>

<code>    </code><code>@override</code>

<code>16</code>

<code>    </code><code>protected</code> <code>runnable creattask() {</code>

<code>17</code>

<code>        </code><code>return</code> <code>new</code> <code>asynciotask();</code>

<code>18</code>

<code>19</code>

<code>20</code>

<code>21</code>

<code>    </code><code>protected</code> <code>blockingqueue createworkqueue() {</code>

<code>22</code>

<code>        </code><code>return</code> <code>new</code> <code>linkedblockingqueue(</code><code>1000</code><code>);</code>

<code>23</code>

<code>24</code>

<code>25</code>

<code>26</code>

<code>    </code><code>protected</code> <code>long</code> <code>getcurrentthreadcputime() {</code>

<code>27</code>

<code>        </code><code>return</code> <code>managementfactory.getthreadmxbean().getcurrentthreadcputime();</code>

<code>28</code>

<code>29</code>

<code>30</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(string[] args) {</code>

<code>31</code>

<code>        </code><code>poolsizecalculator poolsizecalculator =</code><code>new</code> <code>simplepoolsizecaculatorimpl();</code>

<code>32</code>

<code>        </code><code>poolsizecalculator.calculateboundaries(</code><code>new</code> <code>bigdecimal(</code><code>1.0</code><code>),</code><code>new</code> <code>bigdecimal(</code><code>100000</code><code>));</code>

<code>33</code>

<code>34</code>

<code>35</code>

<code>36</code>

<code>37</code>

<code>38</code>

<code> </code><code>* 自定義的異步io任務</code>

<code>39</code>

<code> </code><code>* @author will</code>

<code>40</code>

<code>41</code>

<code>42</code>

<code>class</code> <code>asynciotask</code><code>implements</code> <code>runnable {</code>

<code>43</code>

<code>44</code>

<code>45</code>

<code>    </code><code>public</code> <code>void</code> <code>run() {</code>

<code>46</code>

<code>        </code><code>httpurlconnection connection =</code><code>null</code><code>;</code>

<code>47</code>

<code>        </code><code>bufferedreader reader =</code><code>null</code><code>;</code>

<code>48</code>

<code>        </code><code>try</code> <code>{</code>

<code>49</code>

<code>50</code>

<code>            </code><code>url geturl =</code><code>new</code> <code>url(geturl);</code>

<code>51</code>

<code>52</code>

<code>            </code><code>connection = (httpurlconnection) geturl.openconnection();</code>

<code>53</code>

<code>            </code><code>connection.connect();</code>

<code>54</code>

<code>            </code><code>reader =</code><code>new</code> <code>bufferedreader(</code><code>new</code> <code>inputstreamreader(</code>

<code>55</code>

<code>                    </code><code>connection.getinputstream()));</code>

<code>56</code>

<code>57</code>

<code>            </code><code>string line;</code>

<code>58</code>

<code>            </code><code>while</code> <code>((line = reader.readline()) !=</code><code>null</code><code>) {</code>

<code>59</code>

<code>                </code><code>// empty loop</code>

<code>60</code>

<code>61</code>

<code>62</code>

<code>63</code>

<code>        </code><code>catch</code> <code>(ioexception e) {</code>

<code>64</code>

<code>65</code>

<code>        </code><code>}</code><code>finally</code> <code>{</code>

<code>66</code>

<code>            </code><code>if</code><code>(reader !=</code><code>null</code><code>) {</code>

<code>67</code>

<code>                </code><code>try</code> <code>{</code>

<code>68</code>

<code>                    </code><code>reader.close();</code>

<code>69</code>

<code>70</code>

<code>                </code><code>catch</code><code>(exception e) {</code>

<code>71</code>

<code>72</code>

<code>73</code>

<code>74</code>

<code>            </code><code>connection.disconnect();</code>

<code>75</code>

<code>76</code>

<code>77</code>

<code>78</code>

<code>79</code>

得到的輸出如下:

<code>target queue memory usage (bytes): 100000</code>

<code>createtask() produced pool_size_calculate.asynciotask which took 40 bytes in a queue</code>

<code>formula: 100000 / 40</code>

<code>* recommended queue capacity (bytes): 2500</code>

<code>number of cpu: 4</code>

<code>target utilization: 1</code>

<code>elapsed time (nanos): 3000000000</code>

<code>compute time (nanos): 47181000</code>

<code>wait time (nanos): 2952819000</code>

<code>formula: 4 * 1 * (1 + 2952819000 / 47181000)</code>

<code>* optimal thread count: 256</code>

推薦的任務隊列大小為2500,線程數為256,有點出乎意料之外。我可以如下構造一個線程池:

<code>threadpoolexecutor pool =</code>

<code>2</code>

<code> </code><code>new</code> <code>threadpoolexecutor(</code><code>256</code><code>,</code><code>256</code><code>, 0l, timeunit.milliseconds,</code><code>new</code> <code>linkedblockingqueue(</code><code>2500</code><code>));</code>