diff --git a/scikits/learn/ann/mlp.py b/scikits/learn/ann/mlp.py index 6391c9f0a5a3b80080b8113bac55bfa1f6307c7b..c87861fc3976439583386614d0fdabd4b4f5535f 100644 --- a/scikits/learn/ann/mlp.py +++ b/scikits/learn/ann/mlp.py @@ -1,149 +1,146 @@ # mlp.py # by: Fred Mailhot -# last mod: 2006-08-18 +# last mod: 2006-08-19 -from scipy import * # I'll want to change this for numpy eventually +import numpy as N from scipy.optimize import leastsq -import copy class mlp: - """Class to define, train and test a multilayer perceptron.""" + """Class to define, train and test a multilayer perceptron. + """ _type = 'mlp' _outfxns = ('linear','logistic','softmax') - _algs = ('simplex','powell','bfgs','cg','ncg','leastsq') - def __init__(self,nin,nhid,nout,fxn,alg='leastsq',w=None): + def __init__(self,ni,nh,no,f='linear',w=None): """ Set up instance of mlp. Initial weights are drawn from a zero-mean Gaussian w/ variance is scaled by fan-in. - (see Bishop 1995 for justification) - Inputs: - nin/nhid/nout - integer number of input/hidden/output units, - respectively - fxn - string description of output unit activation - fxn (hidden units use tanh); can be 'linear', - 'logistic' or 'softmax' + Input: + ni - <int> # of inputs + nh - <int> # of hidden & context units + no - <int> # of outputs + f - <str> output activation fxn + w - <array dtype=Float> weight vector """ - if fxn not in self._outfxns: + if f not in self._outfxns: print "Undefined activation fxn. Using linear" self.outfxn = 'linear' else: - self.outfxn = fxn - self.nin = nin - self.nhid = nhid - self.nout = nout - self.alg = alg + self.outfxn = f + self.ni = ni + self.nh = nh + self.no = no + #self.alg = alg if w: - self.nwts = size(w) - self.w_packed = w - self.w1 = zeros((nin,nhid),dtype=Float) - self.b1 = zeros((1,nhid),dtype=Float) - self.w2 = zeros((nhid,nout),dtype=Float) - self.b2 = zeros((1,nout),dtype=Float) - self.unpackwts() + self.nw = N.size(w) + self.wp = w + self.w1 = N.zeros((ni,nh),dtype=Float) + self.b1 = N.zeros((1,nh),dtype=Float) + self.w2 = N.zeros((nh,no),dtype=Float) + self.b2 = N.zeros((1,no),dtype=Float) + self.unpack() else: - self.nwts = (nin+1)*nhid + (nhid+1)*nout - self.w1 = randn(nin,nhid)/sqrt(nin+1) - self.b1 = randn(1,nhid)/sqrt(nin+1) - self.w2 = randn(nhid,nout)/sqrt(nhid+1) - self.b2 = randn(1,nout)/sqrt(nhid+1) - self.packwts() + self.nw = (ni+1)*nh + (nh+1)*no + self.w1 = N.random.randn(ni,nh)/N.sqrt(ni+1) + self.b1 = N.random.randn(1,nh)/N.sqrt(ni+1) + self.w2 = N.random.randn(nh,no)/N.sqrt(nh+1) + self.b2 = N.random.randn(1,no)/N.sqrt(nh+1) + self.pack() - def unpackwts(self): + def unpack(self): """ Decompose 1-d vector of weights w into appropriate weight matrices (w1,b1,w2,b2) and reinsert them into net """ - self.w1 = reshape(array(self.w_packed)[:self.nin*self.nhid],(self.nin,self.nhid)) - self.b1 = reshape(array(self.w_packed)[(self.nin*self.nhid):(self.nin*self.nhid)+self.nhid],(1,self.nhid)) - self.w2 = reshape(array(self.w_packed)[(self.nin*self.nhid)+self.nhid:\ - (self.nin*self.nhid)+self.nhid+(self.nhid*self.nout)],(self.nhid,self.nout)) - self.b2 = reshape(array(self.w_packed)[(self.nin*self.nhid)+self.nhid+(self.nhid*self.nout):],(1,self.nout)) + self.w1 = N.array(self.wp)[:self.ni*self.nh].reshape(self.ni,self.nh) + self.b1 = N.array(self.wp)[(self.ni*self.nh):(self.ni*self.nh)+self.nh].reshape(1,self.nh) + self.w2 = N.array(self.wp)[(self.ni*self.nh)+self.nh:(self.ni*self.nh)+self.nh+(self.nh*self.no)].reshape(self.nh,self.no) + self.b2 = N.array(self.wp)[(self.ni*self.nh)+self.nh+(self.nh*self.no):].reshape(1,self.no) - def packwts(self): + def pack(self): """ Compile weight matrices w1,b1,w2,b2 from net into a single vector, suitable for optimization routines. """ - self.w_packed = hstack([self.w1.reshape(size(self.w1)), - self.b1.reshape(size(self.b1)), - self.w2.reshape(size(self.w2)), - self.b2.reshape(size(self.b2))]) + self.wp = N.hstack([self.w1.reshape(N.size(self.w1)), + self.b1.reshape(N.size(self.b1)), + self.w2.reshape(N.size(self.w2)), + self.b2.reshape(N.size(self.b2))]) - def fwd(self,inputs,wts=None,hid=False): - """ Propagate values forward through the net. - Inputs: - inputs - self.nin*1 vector of inputs - hid - boolean specifying whether or not to return hidden - unit activations, False by default + def fwd_all(self,x,w=None): + """ Propagate values forward through the net. + Input: + x - array (size>1) of input patterns + w - optional 1-d vector of weights + Returns: + y - array of outputs for all input patterns """ - if wts is not None: - self.w_packed = wts - self.unpackwts() - - z = tanh(dot(inputs,self.w1) + dot(ones((len(inputs),1)),self.b1)) - o = dot(z,self.w2) + dot(ones((len(z),1)),self.b2) - + if w is not None: + self.wp = w + self.unpack() + # compute vector of hidden unit values + z = N.tanh(N.dot(x,self.w1) + N.dot(N.ones((len(x),1)),self.b1)) + # compute vector of net outputs + o = N.dot(z,self.w2) + N.dot(N.ones((len(z),1)),self.b2) + # compute final output activations if self.outfxn == 'linear': y = o elif self.outfxn == 'logistic': # TODO: check for overflow here... - y = 1/(1+exp(-o)) + y = 1/(1+N.exp(-o)) elif self.outfxn == 'softmax': # TODO: and here... - tmp = exp(o) - y = tmp/(sum(temp,1)*ones((1,self.nout))) + tmp = N.exp(o) + y = tmp/(N.sum(temp,1)*N.ones((1,self.no))) - if hid: - return array(y),array(z) - else: - return array(y) + return N.array(y) def errfxn(self,w,x,t): """ Return vector of squared-errors for the leastsq optimizer """ - y = self.fwd(x,w) - return sum(array(y-t)**2,axis=1) + y = self.fwd_all(x,w) + return N.sum(N.array(y-t)**2,axis=1) def train(self,x,t): - """ Train a multilayer perceptron using scipy's leastsq optimizer + """ Train network using scipy's leastsq optimizer Input: - x - matrix of input data - t - matrix of target outputs + x - array of input data + t - array of targets + + N.B. x and t comprise the *entire* collection of training data + Returns: post-optimization weight vector """ - # something's going wrong w/ the full_output option - # return leastsq(self.errfxn,self.w_packed,args=(x,t),full_output=True) - return leastsq(self.errfxn,self.w_packed,args=(x,t)) + return leastsq(self.errfxn,self.wp,args=(x,t)) + + def test_all(self,x,t): + """ Test network on an array (size>1) of patterns + Input: + x - array of input data + t - array of targets + Returns: + sum-squared-error over all data + """ + return N.sum(self.errfxn(self.wp,x,t)) def main(): - """ Approx test of module, using the oilTrn/oilTst data files that are + """ Build/train/test MLP """ from scipy.io import read_array, write_array - # build the net - print "\nCreating 12-5-2 MLP with linear outputs" - net = mlp(12,5,2,'linear') - w_init = copy.copy(net.w_packed) - # prep the train/test data + print "\nCreating 2-2-1 MLP with logistic outputs" + net = mlp(2,2,1,'logistic') print "\nLoading training and test sets...", - trn_input = read_array('data/oilTrn.dat',lines=(3,-1),columns=(0,(1,12))) - trn_targs = read_array('data/oilTrn.dat',lines=(3,-1),columns=(12,-1)) - tst_input = read_array('data/oilTst.dat',lines=(3,-1),columns=(0,(1,12))) - tst_targs = read_array('data/oilTst.dat',lines=(3,-1),columns=(12,-1)) + trn_input = read_array('data/xor-trn.dat',lines=(3,-1),columns=(0,(1,2))) + trn_targs = read_array('data/xor-trn.dat',lines=(3,-1),columns=(2,-1)) + trn_targs = trn_targs.reshape(N.size(trn_targs),1) + tst_input = read_array('data/xor-tst.dat',lines=(3,-1),columns=(0,(1,2))) + tst_targs = read_array('data/xor-tst.dat',lines=(3,-1),columns=(2,-1)) + tst_targs = tst_targs.reshape(N.size(tst_targs),1) print "done." - # initial squared-error - print "\nInitial SSE on training set: ",\ - sum(net.errfxn(net.w_packed,trn_input,trn_targs)) - print "\nInitial SSE on testing set: ",\ - sum(net.errfxn(net.w_packed,tst_input,tst_targs)) - # train the net - net.w_packed = net.train(trn_input,trn_targs)[0] - # final squared-error - print "\nFinal SSE on training set: ",\ - sum(net.errfxn(net.w_packed,trn_input,trn_targs)) - print "\nFinal SSE on testing set: ",\ - sum(net.errfxn(net.w_packed,tst_input,tst_targs)) - # view extended output? - # REMOVING THIS OPTION FOR NOW - #if raw_input("Do you want to see the full training output? (y/n").lower() == 'y': - # print retval[1] + print "\nInitial SSE:\n" + print "\ttraining set: ",net.test_all(trn_input,trn_targs) + print "\ttesting set: ",net.test_all(tst_input,tst_targs),"\n" + net.wp = net.train(trn_input,trn_targs)[0] + print "\nFinal SSE:\n" + print "\ttraining set: ",net.test_all(trn_input,trn_targs) + print "\ttesting set: ",net.test_all(tst_input,tst_targs),"\n" if __name__ == '__main__': main() diff --git a/scikits/learn/ann/srn.py b/scikits/learn/ann/srn.py index e51810bd91601b4a36a7ef3b52efabae5412a09f..745f973d828aaac690104b4adb33f8de4a6f038c 100644 --- a/scikits/learn/ann/srn.py +++ b/scikits/learn/ann/srn.py @@ -1,54 +1,44 @@ # srn.py # by: Fred Mailhot -# last mod: 2006-06-22 +# last mod: 2006-08-18 -from scipy import * -import copy +import numpy as N +from scipy.optimize import leastsq class srn: """Class to define, train and test a simple recurrent network - a.k.a. 'Elman net' (cf. Elman 1991's Machine Learnig paper,inter alia) """ _type = 'srn' _outfxns = ('linear','logistic','softmax') - _alg = ('srn') - def __init__(self,ni,nh,no,f,h=-1,w=None): + def __init__(self,ni,nh,no,f='linear',w=None): """ Set up instance of srn. Initial weights are drawn from a zero-mean Gaussian w/ variance is scaled by fan-in. - (see Bishop 1995 for justification) - Inputs: - ni - integer number of input units - nh - integer number of hiden & context units - no - integer number of output units, - f - string description of output unit activation fxn; - one of {'linear','logistic','softmax'} - (n.b. hidden/context units use tanh) - w - initialized 1-d weight vector + Input: + ni - <int> # of inputs + nh - <int> # of hidden & context units + no - <int> # of outputs + f - <str> output activation fxn + w - <array dtype=Float> weight vector """ if f not in self._outfxns: print "Undefined activation fxn. Using linear" self.outfxn = 'linear' else: self.outfxn = f - # set up layers of units self.ni = ni self.nh = nh self.nc = nh self.no = no - self.z = zeros((h,nh),dtype=Float) # hidden activations for 1 epoch - self.c = zeros((h,nh),dtype=Float) # context activations for 1 epoch - self.o = zeros((h,no),dtype=Float) # output activiation for 1 epoch - self.p = zeros((nh,nw,nw),dtype=Float) if w: - self.nw = size(w) + self.nw = N.size(w) self.wp = w - self.w1 = zeros((ni,nh),dtype=Float) # input-hidden wts - self.b1 = zeros((1,nh),dtype=Float) # input biases - self.wc = zeros((nh,nh),dtype=Float) # context wts - self.w2 = zeros((nh,no),dtype=Float) # hidden-output wts - self.b2 = zeros((1,no),dtype=Float) # hidden biases + self.w1 = N.zeros((ni,nh),dtype=Float) # input-hidden wts + self.b1 = N.zeros((1,nh),dtype=Float) # input biases + self.wc = N.zeros((nh,nh),dtype=Float) # context wts + self.w2 = N.zeros((nh,no),dtype=Float) # hidden-output wts + self.b2 = N.zeros((1,no),dtype=Float) # hidden biases self.unpack() else: # N.B. I just understood something about the way reshape() works @@ -57,129 +47,113 @@ class srn: # propagation. # I'll implement this next week. self.nw = (ni+1)*nh + (nh*nh) + (nh+1)*no - self.w1 = randn(ni,nh)/sqrt(ni+1) - self.b1 = randn(1,nh)/sqrt(ni+1) - self.wc = randn(nh,nh)/sqrt(nh+1) - self.w2 = randn(nh,no)/sqrt(nh+1) - self.b2 = randn(1,no)/sqrt(nh+1) + self.w1 = N.random.randn(ni,nh)/N.sqrt(ni+1) + self.b1 = N.random.randn(1,nh)/N.sqrt(ni+1) + self.wc = N.random.randn(nh,nh)/N.sqrt(nh+1) + self.w2 = N.random.randn(nh,no)/N.sqrt(nh+1) + self.b2 = N.random.randn(1,no)/N.sqrt(nh+1) self.pack() - if size(self.wp) != self.nw: - raise ValueError, "Unexpected number of weights" def unpack(self): """ Decompose 1-d vector of weights w into appropriate weight matrices (w1,b1,w2,b2) and reinsert them into net """ - self.w1 = reshape(array(self.wp)[:self.ni*self.nh],(self.ni,self.nh)) - self.b1 = reshape(array(self.wp)[(self.ni*self.nh):(self.ni*self.nh)+self.nh],(1,self.nh)) - self.wc = reshape(array(self.wp)[(self.ni*self.nh)+self.nh:\ - (self.ni*self.nh)+self.nh+(self.nh*self.nh)],(self.nh,self.nh)) - self.w2 = reshape(array(self.wp)[(self.ni*self.nh)+self.nh+(self.nh*self.nh):\ - (self.ni*self.nh)+self.nh+(self.nh*self.nh)+(self.nh*self.no)],(self.nh,self.no)) - self.b2 = reshape(array(self.wp)[(self.ni*self.nh)+self.nh+(self.nh*self.no):],(1,self.no)) + self.w1 = N.array(self.wp)[:self.ni*self.nh].reshape(self.ni,self.nh) + self.b1 = N.array(self.wp)[(self.ni*self.nh):(self.ni*self.nh)+self.nh].reshape(1,self.nh) + self.wc = N.array(self.wp)[(self.ni*self.nh)+self.nh:(self.ni*self.nh)+self.nh+(self.nh*self.nh)].reshape(self.nh,self.nh) + self.w2 = N.array(self.wp)[(self.ni*self.nh)+self.nh+(self.nh*self.nh):(self.ni*self.nh)+self.nh+(self.nh*self.nh)+(self.nh*self.no)].reshape(self.nh,self.no) + self.b2 = N.array(self.wp)[(self.ni*self.nh)+self.nh+(self.nh*self.nh)+(self.nh*self.no):].reshape(1,self.no) def pack(self): """ Compile weight matrices w1,b1,wc,w2,b2 from net into a single vector, suitable for optimization routines. """ - self.wp = hstack([self.w1.reshape(size(self.w1)), - self.b1.reshape(size(self.b1)), - self.wc.reshape(size(self.wc)), - self.w2.reshape(size(self.w2)), - self.b2.reshape(size(self.b2))]) + self.wp = N.hstack([self.w1.reshape(N.size(self.w1)), + self.b1.reshape(N.size(self.b1)), + self.wc.reshape(N.size(self.wc)), + self.w2.reshape(N.size(self.w2)), + self.b2.reshape(N.size(self.b2))]) - def fwd(self,x,w=None,hid=False): + def fwd_all(self,x,w=None): """ Propagate values forward through the net. - This involves the following steps: - (i) feeds the current input and context values to the hidden layer, - (ii) hidden layer net input is transformed and then sent to the outputs - (iii) output values are copied to the context layer - Inputs: + Input: x - matrix of all input patterns w - 1-d vector of weights - hid - boolean specifying whether or not to return hidden - unit activations, False by default - Outputs: + Returns: y - matrix of all outputs - z - matrix of all hidden activations (if hid=True) """ - if wts is not None: + if w is not None: self.wp = w self.unpack() - # compute net input to hiddens and then squash it - self.z = tanh(dot(x,self.w1) + dot(self.c,self.wc) + dot(ones((len(x),1)),self.b1)) - # send hidden vals to output and copy to context - o = dot(self.z,self.w2) + dot(ones((len(self.z),1)),self.b2) - self.c = copy.copy(self.z) - # compute output activations + # compute vector of context values for current weight matrix + c = N.tanh(N.dot(x,self.w1) + N.dot(N.ones((len(x),1)),self.b1)) + c = N.vstack([c[1:],c[0]]) + # compute vector of hidden unit values + z = N.tanh(N.dot(x,self.w1) + N.dot(c,self.wc) + N.dot(N.ones((len(x),1)),self.b1)) + # compute vector of net outputs + o = N.dot(z,self.w2) + N.dot(N.ones((len(z),1)),self.b2) + # compute final output activations if self.outfxn == 'linear': y = o elif self.outfxn == 'logistic': # TODO: check for overflow here... - y = 1/(1+exp(-o)) + y = 1/(1+N.exp(-o)) elif self.outfxn == 'softmax': # TODO: and here... - tmp = exp(o) - y = tmp/(sum(temp,1)*ones((1,self.no))) + tmp = N.exp(o) + y = tmp/(N.sum(temp,1)*N.ones((1,self.no))) - if hid: - return array(y),array(z) - else: - return array(y) - - def train(self,x,t,N): - """ Train net by standard backpropagation - Inputs: - x - all input patterns - t - all target patterns - N - number of times to go over patterns - Outputs: - w - new weight vector + return N.array(y) + + def errfxn(self,w,x,t): + """ Return vector of squared-errors for the leastsq optimizer """ - for i in range(N): - + y = self.fwd_all(x,w) + return N.sum(N.array(y-t)**2,axis=1) - def errfxn(self,w,x,t): - """ Error functions for each of the output-unit activation functions. - Inputs: - w - current weight vector - x - current pattern input(s) (len(x) == self.h) - t - current pattern target(s) + def train(self,x,t): + """ Train a multilayer perceptron using scipy's leastsq optimizer + Input: + x - matrix of input data + t - matrix of target outputs + Returns: + post-optimization weight vector """ - y,z = self.fwd(w,x,True) - if self.outfxn == 'linear': - # calculate & return SSE - err = 0.5*sum(sum(array(y-t)**2,axis=1)) - elif self.outfxn == 'logistic': - # calculate & return x-entropy - err = -1.0*sum(sum(t*log2(y)+(1-t)*log2(1-y),axis=1)) - elif self.outfxn == 'softmax': - # calculate & return entropy - err = -1.0*sum(sum(t*log2(y),axis=1)) - else: - # this shouldn't happen, return SSE as safe default - err = 0.5*sum(sum(array(y-t)**2,axis=1)) - - # returning a tuple of info for now...not sure why - return err,y,z + return leastsq(self.errfxn,self.wp,args=(x,t)) + def test_all(self,x,t): + """ Test network on an array (size>1) of patterns + Input: + x - array of input data + t - array of targets + Returns: + sum-squared-error over all data + """ + return N.sum(self.errfxn(self.wp,x,t)) + + def main(): """ Set up a 1-2-1 SRN to solve the temporal-XOR problem from Elman 1990. """ from scipy.io import read_array, write_array - print "Creating 1-2-1 SRN for 'temporal-XOR' (net.h = 2)" - net = srn(1,2,1,'logistic',2) - print net + print "\nCreating 1-2-1 SRN for 'temporal-XOR'" + net = srn(1,2,1,'logistic') print "\nLoading training and test sets...", - trn_input = read_array('data/t-xor1.dat') - trn_targs = hstack([trn_input[1:],trn_input[0]]) - tst_input = read_array('data/t-xor2.dat') - tst_targs = hstack([tst_input[1:],tst_input[0]]) + trn_input = read_array('data/txor-trn.dat') + trn_targs = N.hstack([trn_input[1:],trn_input[0]]) + trn_input = trn_input.reshape(N.size(trn_input),1) + trn_targs = trn_targs.reshape(N.size(trn_targs),1) + tst_input = read_array('data/txor-tst.dat') + tst_targs = N.hstack([tst_input[1:],tst_input[0]]) + tst_input = tst_input.reshape(N.size(tst_input),1) + tst_targs = tst_targs.reshape(N.size(tst_targs),1) print "done." - N = input("Number of iterations over training set: ") + print "\nInitial SSE:\n" + print "\ttraining set: ",net.test_all(trn_input,trn_targs) + print "\ttesting set: ",net.test_all(tst_input,tst_targs),"\n" + net.wp = net.train(trn_input,trn_targs)[0] + print "\nFinal SSE:\n" + print "\ttraining set: ",net.test_all(trn_input,trn_targs) + print "\ttesting set: ",net.test_all(tst_input,tst_targs),"\n" - print "\nInitial error: ",net.errfxn(net.wp,tst_input,tst_targs) - net.train(trn_input,trn_targs,N) - print "\nFinal error: ",net.errfxn(net.wp,tst_input,tst_targs) - if __name__ == '__main__': main()