天天看點

Node.js 異步流控制

作者:提拉米蘇TiLaMiSu

在其核心,JavaScript被設計為在“主”線程上是非阻塞的,這是呈現視圖的位置。你可以想象這在浏覽器中的重要性。例如,當主線程被阻塞時,會導緻最終使用者害怕的臭名昭著的“當機”,并且無法排程其他事件,最終,導緻資料丢失。

這就産生了一些隻有函數式程式設計才能解決的獨特限制。然而,在更複雜的過程中,回調可能會變得很難處理。這通常會導緻“回調地獄”,其中帶有回調的多個嵌套函數使代碼在讀取、調試、組織等方面更具挑戰性。

例如:

async1(function (input, result1) {
	async2(function (result2) {
		async3(function (result3) {
			async4(function (result4) {
				async5(function (output) {
						// do something with output
				});
			});
		});
	});
});           

當然,在現實生活中,很可能會有額外的代碼行來處理result1、result2等,是以,這個問題的長度和複雜性通常會導緻代碼看起來比上面的例子混亂得多。

這就是函數的用武之地。更複雜的操作由許多功能組成:

  1. 調用方式 input
  2. 中間件
  3. 終止器

“調用方式 input”是對列中的第一個函數。此功能将接受操作的原始輸入(如果有)。操作是一系列可執行的功能,原始輸入主要是:

  1. 全局環境中的變量
  2. 帶參數或不帶參數的直接調用
  3. 通過檔案系統或網絡請求獲得的值

網絡請求可以是由外部網絡、同一網絡上的另一應用程式或同一網絡或外部網絡上的應用程式本身發起的傳入請求。

中間件函數将傳回另一個函數,終止器函數将調用回調。以下說明了網絡或檔案系統請求的流程。這裡的延遲是0,因為所有這些值都在記憶體中可用。

function final(someInput, callback) {
	callback(`${someInput} and terminated by executing callback `);
}
function middleware(someInput, callback) {
	return final(`${someInput} touched by middleware `, callback);
}
function initiate() {
const someInput = 'hello this is a function ';
  middleware(someInput, function (result) {
  	console.log(result);
  	// requires callback to `return` result
  });
}
initiate();           

狀态管理

函數可能與狀态相關,也可能不與狀态相關。當函數的輸入或其他變量依賴于外部函數時,就會産生狀态依賴性。

通過這種方式,有兩種主要的狀态管理政策:

  1. 将變量直接傳遞給函數
  2. 從緩存、會話、檔案、資料庫、網絡或其他外部源擷取變量值。

注意,我沒有提到全局變量。用全局變量管理狀态通常是一種草率的反模式,這使得很難或不可能保證狀态。在可能的情況下,應避免使用複雜程式中的全局變量。

控制流

如果一個對象在記憶體中可用,則可以進行疊代,并且不會對控制流進行更改:

function getSong() {
  let _song = '';
  let i = 100;
  for (i; i > 0; i -= 1) {
 	 _song += `${i} beers on the wall, you take one down and pass it around, ${
  		i - 1
  	} bottles of beer on the wall\n`;
  	if (i === 1) {
 			 _song += "Hey let's get some more beer";
  	}
  }
  return _song;
}
function singSong(_song) {
	if (!_song) throw new Error("song is '' empty, FEED ME A SONG!");
	console.log(_song);
}
const song = getSong();
// this will work
singSong(song);           

但是,如果資料在記憶體中不存在,則疊代将停止:

function getSong() {
  let _song = '';
  let i = 100;
  for (i; i > 0; i -= 1) {
  /* eslint-disable no-loop-func */
    setTimeout(function () {
    _song += `${i} beers on the wall, you take one down and pass it around, ${
    i - 1
    } bottles of beer on the wall\n`;
    if (i === 1) {
    _song += "Hey let's get some more beer";
    }
    }, 0);
  /* eslint-enable no-loop-func */
  }
  return _song;
}
function singSong(_song) {
if (!_song) throw new Error("song is '' empty, FEED ME A SONG!");
console.log(_song);
}
const song = getSong('beer');
// this will not work
singSong(song);
// Uncaught Error: song is '' empty, FEED ME A SONG!           

為什麼會發生這種情況?setTimeout訓示CPU将指令存儲在總線上的其他位置,并訓示将資料安排為稍後處理。在函數在0毫秒标記處再次命中之前,經過了數千個CPU周期,CPU從總線中擷取指令并執行它們。唯一的問題是song(“”)在數千個循環之前被傳回。

在處理檔案系統和網絡請求時也會出現同樣的情況。主線程不能在不确定的時間段内被阻塞——是以,我們使用回調來以可控的方式及時排程代碼的執行。

我們可以使用以下3種模式執行幾乎所有的操作:

1、串聯:函數将以嚴格的順序執行,這一順序與循環最相似。

// operations defined elsewhere and ready to execute
const operations = [
  { func: function1, args: args1 },
  { func: function2, args: args2 },
  { func: function3, args: args3 },
];
function executeFunctionWithArgs(operation, callback) {
// executes function
const { args, func } = operation;
	func(args, callback);
}
function serialProcedure(operation) {
  if (!operation) process.exit(0); // finished
    executeFunctionWithArgs(operation, function (result) {
    // continue AFTER callback
    serialProcedure(operations.shift());
  });
}
serialProcedure(operations.shift());           

2、完全并行-用于同時運作異步任務

let count = 0;
let success = 0;
const failed = [];
const recipients = [
  { name: 'Bart', email: 'bart@tld' },
  { name: 'Marge', email: 'marge@tld' },
  { name: 'Homer', email: 'homer@tld' },
  { name: 'Lisa', email: 'lisa@tld' },
  { name: 'Maggie', email: 'maggie@tld' },
];

function dispatch(recipient, callback) {
  // `sendEmail` is a hypothetical SMTP client
  sendMail(
    {
      subject: 'Dinner tonight',
      message: 'We have lots of cabbage on the plate. You coming?',
      smtp: recipient.email,
    },
    callback
  );
}

function final(result) {
  console.log(`Result: ${result.count} attempts \
      & ${result.success} succeeded emails`);
  if (result.failed.length)
    console.log(`Failed to send to: \
        \n${result.failed.join('\n')}\n`);
}

recipients.forEach(function (recipient) {
  dispatch(recipient, function (err) {
    if (!err) {
      success += 1;
    } else {
      failed.push(recipient.name);
    }
    count += 1;

    if (count === recipients.length) {
      final({
        count,
        success,
        failed,
      });
    }
  });
});           

3、有限并行:一種異步、并行、并發受限的循環,例如成功地向10E7使用者清單中的1000000個收件人發送電子郵件。

let successCount = 0;
function final() {
  console.log(`dispatched ${successCount} emails`);
  console.log('finished');
}
function dispatch(recipient, callback) {
// `sendEmail` is a hypothetical SMTP client
sendMail(
  {
  subject: 'Dinner tonight',
  message: 'We have lots of cabbage on the plate. You coming?',
  smtp: recipient.email,
  },
  callback
);
}
function sendOneMillionEmailsOnly() {
getListOfTenMillionGreatEmails(function (err, bigList) {
  if (err) throw err;
  function serial(recipient) {
    if (!recipient || successCount >= 1000000) return final();
    dispatch(recipient, function (_err) {
      if (!_err) successCount += 1;
        serial(bigList.pop());
      });
  }
  serial(bigList.pop());
  });
}
sendOneMillionEmailsOnly();