[关闭]
@Pigmon 2017-11-29T04:11:00.000000Z 字数 10702 阅读 85

BP 前馈神经网络简易实现

教案


neuron.py

  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Sun Apr 09 17:25:53 2017
  4. @author: Yuan Sheng
  5. """
  6. #import random
  7. import numpy as np
  8. import random as rnd
  9. def sigmoid(x):
  10. return 1. / (1. + np.e ** (-x))
  11. class Neuron:
  12. "神经元基类"
  13. # layer: 所在层,第一个输入层为0
  14. # index: 在当前层的位置,最上面为0
  15. # gid: 全局位置,从 0 开始
  16. # 整个网络输入在左,输出在右
  17. # 从输入神经元算起,按左上到右下依次排列
  18. # 2
  19. # ->0 5 7->
  20. # 3
  21. # ->1 6 8->
  22. # 4
  23. layer, index, gid = 0, 0, 0
  24. # output: 输出值
  25. # error: 误差项
  26. output, error = 0.0, 0.0
  27. def __init__ (self, _layer, _index, _global_index):
  28. self.layer, self.index, self.gid = _layer, _index, _global_index
  29. self.bias = rnd.uniform(-1.0, 1.0)
  30. def sigmoid_derive(self):
  31. return self.output * (1.0 - self.output)
  32. def count_out(self, _in_w, _in_vec):
  33. "计算输出值"
  34. if len(_in_w) != len(_in_vec):
  35. return 0.0
  36. self.output = sigmoid(self.bias + np.dot(_in_w, _in_vec))
  37. return self.output
  38. def count_error(self, _out_w, _next_layer_error):
  39. "计算误差项"
  40. if len(_out_w) != len(_next_layer_error):
  41. return 0.0
  42. self.error = self.sigmoid_derive() * np.dot(_out_w, _next_layer_error)
  43. return self.error
  44. def update_bais(self, _eta):
  45. "调整 bias 值"
  46. self.bias -= _eta * self.error
  47. def debug(self):
  48. print ('%d: <%d, %d> b=%f' % (self.gid, self.layer, self.index, self.bias))
  49. class OutputNeuron(Neuron):
  50. "输出层神经元"
  51. def __init__ (self, _layer, _index, _global_index):
  52. Neuron.__init__(self, _layer, _index, _global_index)
  53. def count_error(self, _label):
  54. self.error = self.sigmoid_derive() * (self.output - _label)
  55. #self.error = self.sigmoid_derive() * (_label - self.output)
  56. return self.error
  57. if __name__ == '__main__':
  58. nr = Neuron(0, 2, 1)
  59. nr.count_out(np.array([0.1, 0.2, 0.3]), np.array([3.0, 2.0, 1.0]))
  60. #nr.Count_Error(np.array([1.0, 2.0]), np.array([2.0, 1.0]))
  61. #nr.Update_Bais(1.0)
  62. print (nr.output)
  63. #onr = OutputNeuron(3, 0)
  64. #label = 10.0
  65. #onr.count_error(label)
  66. #print onr.error

bpnn.py

  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Sun Apr 09 20:50:49 2017
  4. @author: Yuan Sheng
  5. """
  6. import numpy as np
  7. import random as rnd
  8. from neuron import Neuron, OutputNeuron
  9. class BPNN:
  10. "BP前馈神经网络"
  11. # @input_vec_len: 输入层维数
  12. # @output_vec_len: 输出层维数
  13. # @hidden_layer_count: 隐藏层数
  14. # @eta: 学习率,步长
  15. # @threshold: 全局均方差的阈值
  16. input_vec_len, output_vec_len, hidden_layer_count, eta, threshold = 2, 1, 0, 0.2, 0.1
  17. # @hidden_layers_len: 每个隐藏层维数
  18. # @hidden_neurons:隐藏神经元二维数组
  19. # @output_neurons:输出神经元一维数组
  20. # @weights:权重三维数组
  21. hidden_layers_len, hidden_neurons, output_neurons, weights = [], [], [], []
  22. # 全局位置到(layer, index)的映射字典; 当前的输出值
  23. gid_dict = {}
  24. # -----------------输出变量-----------------
  25. _correct_rate, _output_vec = 0.0, []
  26. # -------------------------------------------------------------------------
  27. def __init__ (self, _in_len, _out_len, _hiddens_len, _learning_rate):
  28. self.input_vec_len, self.output_vec_len, self.hidden_layer_count, self.hidden_layers_len, self.eta = \
  29. _in_len, _out_len, len(_hiddens_len), _hiddens_len, _learning_rate
  30. # 生成隐藏层 Nerons
  31. cnter = _in_len
  32. for i in range(0, len(_hiddens_len)):
  33. cnt, layer_list = _hiddens_len[i], []
  34. for j in range(0, cnt):
  35. layer_list.append(Neuron(i, j, cnter))
  36. self.gid_dict[cnter] = (i, j)
  37. cnter += 1
  38. self.hidden_neurons.append(layer_list)
  39. # 生成输出层 Neurons
  40. for i in range(0, _out_len):
  41. self.output_neurons.append(OutputNeuron(self.hidden_layer_count, i, cnter))
  42. self.gid_dict[cnter] = (self.hidden_layer_count, i)
  43. cnter += 1
  44. # 生成权重,例如:
  45. # 形如下面这样的输入层2,隐藏层为(3, 2),输出层为2:
  46. #
  47. # 2
  48. # ->0 5 7->
  49. # 3
  50. # ->1 6 8->
  51. # 4
  52. #
  53. # 权重数组如下所示:
  54. # [
  55. # [ [w_0_2, w_1_2], [w_0_3, w_1_3], [w_0_4, w_1_4] ],
  56. # [ [w_2_5, w_3_5, w_4_5], [w_2_6, w_3_6, w_4_6] ],
  57. # [ [w_5_7, w_6_7], [w_5_8, w_6_8] ]
  58. # ]
  59. #
  60. # 可以在循环最内部,用pair.append(self.debug_weight_index(cnt_list, i, j, k))调试
  61. #
  62. cnt_list = np.append(np.array([_in_len]), np.append(_hiddens_len, _out_len))
  63. #print cnt_list
  64. for i in range(0, len(cnt_list)-1):
  65. layer1_cnt, layer2_cnt, layer_weight = cnt_list[i], cnt_list[i + 1], []
  66. for j in range(0, layer2_cnt):
  67. pair = []
  68. for k in range(0, layer1_cnt):
  69. pair.append(rnd.uniform(-1.0, 1.0))
  70. #pair.append(self.debug_weight_index(cnt_list, i, j, k))
  71. layer_weight.append(pair)
  72. self.weights.append(layer_weight)
  73. # -------------------------------------------------------------------------
  74. def input_weights_of(self, layer, index):
  75. """
  76. 得到在第layer层的第index个神经元的输入权重(左侧)列表。
  77. layer 0 代表第一个隐藏层,index 0代表最上面的神经元
  78. """
  79. return self.weights[layer][index]
  80. # -------------------------------------------------------------------------
  81. def input_weights_of_gid(self, _gid):
  82. "通过gid得到输入权重列表"
  83. return self.input_weights_of(self.gid_dict[_gid][0], self.gid_dict[_gid][1])
  84. # -------------------------------------------------------------------------
  85. def output_weights_of(self, layer, index):
  86. """
  87. 得到在第layer层的第index个神经元的输出权重(右侧)列表。
  88. layer 0 代表第一个隐藏层,index 0代表最上面的神经元
  89. """
  90. # 如果是输出层的神经元,返回空数组
  91. if layer == self.hidden_layer_count:
  92. return []
  93. next_layer_weights, ret = self.weights[layer + 1], []
  94. for group in next_layer_weights:
  95. ret.append(group[index])
  96. return ret
  97. # -------------------------------------------------------------------------
  98. def output_weights_of_gid(self, _gid):
  99. "通过gid得到输出权重列表"
  100. return self.output_weights_of(self.gid_dict[_gid][0], self.gid_dict[_gid][1])
  101. # -------------------------------------------------------------------------
  102. def pre_layer_outs(self, layer, input_vec):
  103. """
  104. 得到在第layer层的第index个神经元的<前一层>神经元的值的列表。
  105. layer 0 代表第一个隐藏层,index 0代表最上面的神经元
  106. """
  107. if layer == 0:
  108. return input_vec
  109. else:
  110. pre_layer_neurons, ret = self.hidden_neurons[layer - 1], []
  111. for neuron in pre_layer_neurons:
  112. ret.append(neuron.output)
  113. return ret
  114. # -------------------------------------------------------------------------
  115. def next_layer_errors(self, layer):
  116. "得到第layer+1层的神经元的误差项列表"
  117. # 如果是输出层的神经元,返回空数组
  118. if layer >= self.hidden_layer_count:
  119. return []
  120. next_layer_neurons, ret = [], []
  121. if layer == self.hidden_layer_count - 1:
  122. next_layer_neurons = self.output_neurons
  123. else:
  124. next_layer_neurons = self.hidden_neurons[layer + 1]
  125. for neuron in next_layer_neurons:
  126. ret.append(neuron.error)
  127. return ret
  128. # -------------------------------------------------------------------------
  129. def forward(self, input_vec):
  130. "前向传播计算值的过程"
  131. assert len(input_vec) == self.input_vec_len
  132. self._output_vec = []
  133. for i in range(0, len(self.hidden_neurons)):
  134. hidden_layer = self.hidden_neurons[i]
  135. for j in range(0, len(hidden_layer)):
  136. neuron, input_values, input_weights = \
  137. hidden_layer[j], self.pre_layer_outs(i, input_vec), self.input_weights_of(i, j)
  138. neuron.count_out(input_weights, input_values)
  139. for j in range(0, len(self.output_neurons)):
  140. neuron, input_values, input_weights = self.output_neurons[j], \
  141. self.pre_layer_outs(self.hidden_layer_count, input_vec), \
  142. self.input_weights_of(self.hidden_layer_count, j)
  143. neuron.count_out(input_weights, input_values)
  144. self._output_vec.append(neuron.output)
  145. return self._output_vec
  146. # -------------------------------------------------------------------------
  147. def backward(self, _result_vec, _label_vec):
  148. "后向传播误差项"
  149. assert len(_result_vec) == len(_label_vec)
  150. # 计算输出层误差项
  151. for j in range(0, len(self.output_neurons)):
  152. self.output_neurons[j].count_error(_label_vec[j])
  153. # 从后向前计算隐藏层误差项
  154. arr, length = self.hidden_neurons[::-1], len(self.hidden_neurons)
  155. for i in range(0, length):
  156. hidden_layer, layer_idx = arr[i], length - i - 1
  157. for j in range(0, len(hidden_layer)):
  158. neuron, output_weights, next_layer_errors = \
  159. hidden_layer[j], self.output_weights_of(layer_idx, j), self.next_layer_errors(layer_idx)
  160. neuron.count_error(output_weights, next_layer_errors)
  161. # -------------------------------------------------------------------------
  162. def update_param(self, _inpput_vec):
  163. "调整误差和 bias 的值,可以放到 backward 函数里,单独写出来是为了看起来清晰"
  164. # 调整输出层的输入权重和 bias
  165. for j in range(0, len(self.output_neurons)):
  166. neuron, input_values, input_weights = self.output_neurons[j], \
  167. self.pre_layer_outs(self.hidden_layer_count, []), self.input_weights_of(self.hidden_layer_count, j)
  168. neuron.update_bais(self.eta)
  169. for i in range(0, len(input_weights)):
  170. input_weights[i] -= self.eta * neuron.error * input_values[i]
  171. # 更新 self.weights
  172. self.weights[self.hidden_layer_count][j] = input_weights
  173. # 调整隐藏层的输入权重和 bias
  174. for i in range(0, len(self.hidden_neurons)):
  175. hidden_layer = self.hidden_neurons[i]
  176. for j in range(0, len(hidden_layer)):
  177. neuron, input_values, input_weights =\
  178. hidden_layer[j], self.pre_layer_outs(i, _inpput_vec), self.input_weights_of(i, j)
  179. neuron.update_bais(self.eta)
  180. for k in range(0, len(input_weights)):
  181. input_weights[k] -= self.eta * neuron.error * input_values[k]
  182. # 更新 self.weights
  183. self.weights[i][j] = input_weights
  184. # -------------------------------------------------------------------------
  185. @staticmethod
  186. def correct_rate(_vec1, _vec2):
  187. "正确率"
  188. assert len(_vec1) == len(_vec2)
  189. counter = 0
  190. for i in range(0, len(_vec1)):
  191. if abs(_vec1[i] - _vec2[i]) < 0.5:
  192. counter += 1
  193. return float(counter) / float(len(_vec1))
  194. # -------------------------------------------------------------------------
  195. def total_error_var(self, _train_set):
  196. total = 0
  197. for smpl in _train_set:
  198. total += (self.predict_labeled([smpl])[0] - smpl[1]) ** 2
  199. return total / len(_train_set)
  200. # -------------------------------------------------------------------------
  201. def predict_labeled(self, _test_set):
  202. "_test_set 每个元素,传入之前需要格式化成 [[x1, x2, ...], label]"
  203. assert len(_test_set) > 0 and len(_test_set[0]) == 2
  204. ret_vec, label_vec = [], []
  205. for sample in _test_set:
  206. ret_vec.append(self.forward(sample[0])[0])
  207. label_vec.append(sample[1])
  208. self._correct_rate = BPNN.correct_rate(ret_vec, label_vec)
  209. return ret_vec
  210. # -------------------------------------------------------------------------
  211. @staticmethod
  212. def sigsign(x):
  213. "目前只测试2分类问题,输入标签只有0和1"
  214. if x >= 0.5:
  215. return 1
  216. else:
  217. return 0
  218. # -------------------------------------------------------------------------
  219. def predict(self, _attr_set):
  220. "只包含特征的测试集,[[x1, x2, ...], [x1, x2,...], ...]"
  221. ret_vec = []
  222. for point in _attr_set:
  223. ret_vec.append(self.forward(point))
  224. #ret_vec.append(BPNN.sigsign(self.forward(point)[0]))
  225. return ret_vec
  226. # -------------------------------------------------------------------------
  227. def fit(self, _train_set, max_epochs = 20000):
  228. "_train_set 每个元素,传入之前需要格式化成 [[x1, x2, ...], label]"
  229. assert len(_train_set) > 0 and len(_train_set[0]) == 2
  230. counter, go_through = 0, False
  231. while (counter <= max_epochs):
  232. # 退出循环的条件(temp):所有训练样本方差小于阈值
  233. if self.total_error_var(_train_set) <= self.threshold:
  234. go_through = True
  235. break
  236. # 1. 随机选取一个测试样本
  237. sample = rnd.choice(_train_set)
  238. input_vec, label = sample[0], sample[1]
  239. # 2. 前向计算值
  240. y_hat = self.forward(input_vec)
  241. # 3. 反向传播误差
  242. self.backward([y_hat], [label])
  243. # 4. 更新参数
  244. self.update_param(input_vec)
  245. counter += 1
  246. print (counter)
  247. return go_through
  248. # -------------------------------------------------------------------------
  249. # Debug Functions
  250. # -------------------------------------------------------------------------
  251. def debug(self):
  252. "输出 debug 信息"
  253. print ('Hidden Layers Length: ' + str(nn.hidden_layers_len))
  254. print ('Hidden Neurons: ')
  255. for layer_neurons in self.hidden_neurons:
  256. for neuron in layer_neurons:
  257. print (neuron.debug())
  258. for neuron in self.output_neurons:
  259. print (neuron.debug())
  260. print (self.weights)
  261. # -------------------------------------------------------------------------
  262. def debug_weight_index(self, cnt_list, layer, next_layer_index, current_layer_index):
  263. """
  264. 测试用,生成权重下标。如连接3和8号神经元的权重返回 'w_3_8'
  265. layer 是从输入层开始为0, 依次+1
  266. 调用方式:
  267. 在构造函数生成weights的循环最内部 pair.append(self.debug_weight_index(cnt_list, i, j, k))
  268. """
  269. # 当前层起始位置
  270. current_layer_start_index = 0
  271. for i in range(0, layer):
  272. current_layer_start_index += cnt_list[i]
  273. # 下一层的起始位置
  274. next_layer_start_index = current_layer_start_index + cnt_list[layer]
  275. idx1, idx2 = current_layer_start_index + current_layer_index, \
  276. next_layer_start_index + next_layer_index
  277. #return 'w_%d_%d' % (idx1, idx2)
  278. return float(idx1) + idx2 / 10.0 # temp test
  279. if __name__ == '__main__':
  280. X = [ [[1, 1], 0], [[3, 3], 1], [[3, 4], 1] ]
  281. T = [[0, 0], [0, 1], [4, 3], [5, 5]] # 期望输出:[0, 0, 1, 1]
  282. nn = BPNN(2, 1, [3], 4)
  283. #X = [ [[0, 0], 1], [[1, 1], 1], [[0, 1], 0], [[1, 0], 0] ]
  284. #T = [[0, 0], [1, 1], [0, 1], [1, 0]]
  285. #nn = BPNN(2, 1, [2], 0.7)
  286. # -----test-----
  287. print (nn.fit(X))
  288. #print (nn._correct_rate)
  289. print (nn.predict(T))
  290. #nn.debug()
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注