# -*- coding: utf-8 -*-
# ELEKTRONN - Neural Network Toolkit
#
# Copyright (c) 2014 - now
# Max-Planck-Institute for Medical Research, Heidelberg, Germany
# Authors: Marius Killinger, Gregor Urban
import numpy as np
import time
import theano
import theano.tensor as T
from theano.tensor.shared_randomstreams import RandomStreams
import pooling
from netutils import initWeights
[docs]class PerceptronLayer(object):
"""
Typical hidden layer of a MLP: units are fully-connected.
Weight matrix W is of shape (n_in,n_out), the bias vector b is of shape (n_out,).
:type input: theano.tensor.dmatrix
:param input: a symbolic tensor of shape (n_examples, n_in)
:type n_in: int
:param n_in: dimensionality of input
:type n_out: int
:param n_out: number of hidden units
:type batch_size: int
:param batch_size: batch_size
:type enable_dropout: Bool
:param enable_dropout: whether to enable dropout in this layer. The default rate is 0.5 but it can be changed with
self.activation_noise.set_value(set_value(np.float32(p)) or using cnn.setDropoutRates
:type activation_func: string
:param activation_func: {'relu','sigmoid','tanh','abs', 'maxout <i>'}
:type input_noise: theano.shared float32
:param input_noise: std of gaussian (centered) input noise. 0 or None --> no noise
:type input_layer: layer object
:param input_layer: just for keeping track of un-usual input layers
:type W: np.ndarray or T.TensorVariable
:param W: weight matrix. If array, the values are used to initialise a shared variable for this layer.
If TensorVariable, than this variable is directly used (weight sharing with the
layer from which this variable comes from)
:type b: np.ndarray or T.TensorVariable
:param b: bias vector. If array, the values are used to initialise a shared variable for this layer.
If TensorVariable, than this variable is directly used (weight sharing with the
layer from which this variable comes from)
"""
def __init__(self,
input,
n_in,
n_out,
batch_size,
enable_dropout,
activation_func='tanh',
input_noise=None,
input_layer=None,
W=None,
b=None):
self.input_layer = input_layer # only for autoencoder
self.activation_func = activation_func
self.output_shape = (batch_size, n_out)
self.n_in = n_in
self.lin_output = None
self.output = None
self.last_grads = [] # only for autoencoder
print "PerceptronLayer( #Inputs =", n_in, "#Outputs =", n_out, ")"
if input_noise is not None:
self.input_noise = theano.shared(np.float32(input_noise), name='Input Noise')
print "Input_noise active, p=" + str(self.input_noise.get_value())
rng = np.random.RandomState(int(time.time()))
theano_rng = RandomStreams(rng.randint(2**30))
# apply multiplicative noise to input
#self.input = theano_rng.binomial(size=input.shape, n=1, p=1-self.input_noise,
# dtype='float32') * input
# apply additive noise to input
self.input = input + theano_rng.normal(size=input.shape,
avg=0,
std=input_noise,
dtype='float32')
else: # no input noise
self.input = input
if W is None:
W_values = np.asarray(initWeights((n_in, n_out), scale='glorot', mode='uni'), dtype='float32')
self.W = theano.shared(value=W_values, name='W_perceptron' + str(n_in) + '.' + str(n_out), borrow=True)
else:
print "Directly using fixed/shared W (", W, "), no Training on it in this layer!"
if isinstance(W, np.ndarray):
self.W = theano.shared(value=W.astype(np.float32),
name='W_perceptron' + str(n_in) + '.' + str(n_out),
borrow=True)
else:
assert isinstance(W, T.TensorVariable), "W must be either np.ndarray or theano var"
self.W = W
if b is None:
#b_values = np.asarray(np.random.uniform(-1e-8,1e-8,(n_out,)), dtype='float32')
if activation_func == 'relu' or activation_func == 'ReLU':
b_values = np.asarray(initWeights((n_out, ), scale=1.0, mode='const'), dtype='float32')
elif activation_func == 'sigmoid':
b_values = np.asarray(initWeights((n_out, ), scale=0.5, mode='const'), dtype='float32')
else: # activation_func=='tanh':
b_values = np.asarray(initWeights((n_out, ), scale=1e-6, mode='fix-uni'), dtype='float32')
self.b = theano.shared(value=b_values, name='b_perceptron' + str(n_in) + '.' + str(n_out), borrow=True)
else:
print "Directly using fixed given b (", b, "), no Training on it in this layer!"
if isinstance(b, np.ndarray):
self.b = theano.shared(value=b.astype(np.float32),
name='b_perceptron' + str(n_in) + '.' + str(n_out),
borrow=True)
else:
assert isinstance(b, T.TensorVariable), "b must be either np.ndarray or theano var"
self.b = b
lin_output = T.dot(self.input, self.W)
if enable_dropout:
print "Dropout ON"
self.activation_noise = theano.shared(np.float32(0.5), name='Dropout Rate')
rng = T.shared_randomstreams.RandomStreams(int(time.time()))
p = 1 - self.activation_noise
self.dropout_gate = 1.0 / p * rng.binomial((n_out, ), 1, p, dtype='float32')
lin_output = lin_output * self.dropout_gate.dimshuffle(('x', 0))
lin_output = lin_output + self.b
# Apply non-linearities and ggf. change bias-initialisations
if activation_func == 'tanh': # range = [-1,1]
self.output = T.tanh(lin_output) # shape: (batch_size, num_outputs)
elif activation_func == 'relu' or activation_func == 'ReLU': # rectified linear unit ,range = [0,inf]
self.activation_func = 'relu'
self.output = lin_output * (lin_output > 0) #T.maximum(lin_output,T.zeros_like(lin_output))
elif activation_func == 'abs': # abs unit ,range = [0,inf]
self.output = T.abs_(lin_output)
elif activation_func == 'sigmoid': # range = [0,1]
#print "WARNING: consider using tanh(.) or relu(.) instead! Sigmoid is BAD! (relu > tanh >> sigmoid)"
lin_output = T.dot(self.input, self.W) + self.b
self.output = T.nnet.sigmoid(lin_output) #1/(1 + T.exp(-lin_output))
elif activation_func == 'linear':
self.output = (lin_output)
elif activation_func.startswith("maxout"):
r = int(activation_func.split(" ")[1])
assert r >= 2
n_out = n_out / r
self.output = pooling.maxout(lin_output, factor=r)
else:
raise NotImplementedError("Options are: activation_func=('relu'|'sigmoid'|'tanh'|'abs')")
self.lin_output = lin_output
self.params = [self.b if b is None else []] + ([self.W]
if W is None else [])
self.class_probabilities = T.nnet.softmax(lin_output) # shape: (batch_size, num_outputs)
#self.class_probabilities = T.exp(lin_output) / T.sum(T.exp(lin_output), axis=1, keepdims=True) # For Hessian
self.class_prediction = T.argmax(self.class_probabilities, axis=1) # shape: (batch_size,)
#############################################################################################
[docs] def randomizeWeights(self, scale='glorot', mode='uni'):
n_in = self.n_in
n_out = self.output_shape[1]
if self.activation_func == 'relu':
b_values = np.asarray(initWeights((n_out, ), scale=1.0, mode='const'), dtype='float32')
elif self.activation_func == 'sigmoid':
b_values = np.asarray(initWeights((n_out, ), scale=0.5, mode='const'), dtype='float32')
else: #self.activation_func=='tanh':
b_values = np.asarray(initWeights((n_out, ), scale=1e-6, mode='fix-uni'), dtype='float32')
W_values = np.asarray(initWeights((n_in, n_out), scale, mode), dtype='float32')
self.W.set_value(W_values)
self.b.set_value(b_values)
[docs] def NLL(self, y, class_weights=None, example_weights=None, label_prop_thresh=None):
"""
Returns the symbolic mean and instance-wise negative log-likelihood of the prediction
of this model under a given target distribution.
y: theano.tensor.TensorType
corresponds to a vector that gives for each example the correct label. Labels < 0 are ignored (e.g. can
be used for label propagation)
class_weights: theano.tensor.TensorType
weight vector of float32 of length ``n_lab``. Values: ``1.0`` (default), ``w < 1.0`` (less important),
``w > 1.0`` (more important class)
label_prop_thresh: float (0.5,1)
This threshold allows unsupervised label propagation (only for examples with negative/ignore labels).
If the predictive probability of the most likely class exceeds the threshold, this class is assumed to
be the correct label and the training is pushed in this direction.
Should only be used with pre-trained networks, and values <= 0.5 are disabled.
"""
# NOTE: This whole function has a ugly problem with NaN. They arise for pred values close to 0 or 1
# (i.e. for NNs that make very confident and usually also correct predictions) because initially the log of
# all the whole pred tensor is taken. Later we want to use only some indices of the tensor (mask) but
# that is not so easy done. There are two ways:
# 1. advanced indexing: T.log(pred)[mask.nonzero()] --> fails if mask is all zero, cannot be fixed
# 2. multiplying with 0-1-mask: T.log(pred) * mask.nonzero --> but NaN * 0 = NaN, but we require 0!
# For the second option, in principle, the NaNs could be replaced by 0 using T.switch, but then the gradient
# fails because the replaced value is disconnected from the parameters and gives NaN (mathematically
# the gradient should correctly be 0 then; there is a Theano ticket open to request a fix).
# So finally the best practice is to add a stabilisation to the log: T.log(pred) --> T.log(pred+eps)
# This looks ugly, but does the task and the introduced error is completely negligible
eps = 1e-6
pred = self.class_probabilities # predictive (bs, cl)
y = y.dimshuffle(0, 'x') # the labels (bs, 1)
cls = T.arange(self.class_probabilities.shape[1]).dimshuffle('x', 0) # available classes
label_selection = T.eq(cls, y) # selects correct labels
if class_weights is None:
class_weights = T.ones_like(pred)
else:
class_weights = class_weights.dimshuffle('x', 0)
# Up vote block
nll_inst_up = -T.log(pred + eps) * label_selection * class_weights
N_up = T.sum(label_selection) # number of labelled examples
if label_prop_thresh is not None: # Label propagation block
above_thresh = pred > label_prop_thresh # this is one for the class with highes prob
prop_mask = above_thresh * (1 - label_selection.sum(axis=1)) # don't do where training labels are available
nll_inst_up_prop = -T.log(pred + pred) * prop_mask * class_weights
N_up_prop = prop_mask.sum()
nll_inst_up += nll_inst_up_prop
N_up += N_up_prop
nll_inst = nll_inst_up
N_up = T.switch(T.eq(N_up, 0), 1, N_up) # patch N to be not 0, when this is the case the sum is 0 anyway!
nll = nll_inst.sum() / N_up
return nll, nll_inst
[docs] def NLL_weak(self,
y,
class_weights=None,
example_weights=None,
label_prop_thresh=None):
"""
Returns the symbolic mean and instance-wise negative log-likelihood of the prediction
of this model under a given target distribution.
y: theano.tensor.TensorType
corresponds to a vector that gives for each example the correct label. Labels < 0 are ignored (e.g. can
be used for label propagation)
class_weights: theano.tensor.TensorType
weight vector of float32 of length ``n_lab``. Values: ``1.0`` (default), ``w < 1.0`` (less important),
``w > 1.0`` (more important class)
label_prop_thresh: float (0.5,1)
This threshold allows unsupervised label propagation (only for examples with negative/ignore labels).
If the predictive probability of the most likely class exceeds the threshold, this class is assumed to
be the correct label and the training is pushed in this direction.
Should only be used with pre-trained networks, and values <= 0.5 are disabled.
"""
# NOTE: This whole function has a ugly problem with NaN. They arise for pred values close to 0 or 1
# (i.e. for NNs that make very confident and usually also correct predictions) because initially the log of
# all the whole pred tensor is taken. Later we want to use only some indices of the tensor (mask) but
# that is not so easy done. There are two ways:
# 1. advanced indexing: T.log(pred)[mask.nonzero()] --> fails if mask is all zero, cannot be fixed
# 2. mutiplying with 0-1-mask: T.log(pred) * mask.nonzero --> but NaN * 0 = NaN, but we require 0!
# For the second option, in principle, the NaNs could be replaced by 0 using T.switch, but then the gradient
# fails because the replaced value is disconnected from the parameters and gives NaN (mathematically
# the gradient should correctly be 0 then; there is a Theano ticket open to request a fix).
# So finally the best practice is to add a stabilisation to the log: T.log(pred) --> T.log(pred+eps)
# This looks ugly, but does the task and the introduced error is completely negligible
eps = 1e-6
pred = self.class_probabilities # predictive (bs, cl)
y = y.dimshuffle(0, 'x') # the labels (bs, 1)
cls = T.arange(self.class_probabilities.shape[1]).dimshuffle('x', 0) # available classes
hard_labels = T.eq(cls, y) # selects correct labels
if class_weights is None:
class_weights = T.ones_like(pred)
else:
class_weights = class_weights.dimshuffle('x', 0)
soft_labels = 0.5 * hard_labels + 0.5 * pred
# Up vote block
nll_inst_up = -(soft_labels * T.log(pred + eps)) * class_weights
N_up = T.sum(hard_labels) # number of labelled examples
nll_inst = nll_inst_up
N_up = T.switch(T.eq(N_up, 0), 1, N_up) # patch N to be not 0, when this is the case the sum is 0 anyway!
nll = nll_inst.sum() / N_up
return nll, nll_inst
[docs] def nll_mutiple_binary(self, y, class_weights=None):
"""
Returns the mean and instance-wise negative log-likelihood of the prediction
of this model under a given target distribution.
:type y: theano.tensor.TensorType
:param y: corresponds to a vector that gives for each example the
correct label
Note: we use the mean instead of the sum so that
the learning rate is less dependent on the batch size
"""
# y (bs, n_lab)
eps = 1e-6
act = self.lin_output # (bs, n_lab)
prob_0 = T.exp(act) / (T.exp(act) + 1)
prob_1 = 1.0 - prob_0
self.class_probabilities = T.stack(prob_0, prob_1).dimshuffle(1, 2, 0) # (bs, n_lab, 2)
self.class_prediction = T.argmax(self.class_probabilities, axis=2)
if class_weights is None:
class_weights = T.ones(2)
else:
class_weights = class_weights
nll_inst = (-T.log(prob_0 + eps) * (1 - y) * class_weights[0] - T.log(prob_1 + eps) * y * class_weights[1])
nll = T.mean(nll_inst)
return nll, nll_inst
[docs] def squared_distance(self, Target, Mask=None, return_instancewise=True):
"""
Target is the TARGET image (vectorized), -> shape(x) = (batchsize, n_target)
output: scalar float32
mask: vectorized, 1==hole, 0==no_hole (== DOES NOT TRAIN ON NON-HOLES)
"""
if Mask is None:
batch = T.mean((self.output - Target)**2)
inst = (self.output - Target)**2
else:
print "squared_distance::Masked"
#batch = T.mean(((self.output - Target)*T.concatenate( (Mask,Mask,Mask),axis=1 ) )**2 ) #assuming RBG input
batch = T.mean(((self.output - Target) * Mask)**2)
inst = ((self.output - Target) * Mask)**2
if return_instancewise:
return batch, inst
else:
return batch
[docs] def errors(self, y):
"""
Returns classification accuracy
:type y: theano.tensor.TensorType
:param y: corresponds to a vector that gives for each example the
correct label
"""
# check if y has same dimension of class_prediction
if y.ndim != self.class_prediction.ndim:
raise TypeError('y should have the same shape as self.class_prediction',
('y', y.type, 'class_prediction', self.class_prediction.type))
# check if y is of the correct datatype
if y.dtype.startswith('int'):
# the T.neq operator returns a vector of 0s and 1s, where 1
# represents a mistake in prediction
return T.mean(T.neq(self.class_prediction, y))
else:
raise NotImplementedError()
[docs] def errors_no_tn(self, y):
pred = self.class_prediction
tp = T.sum((y * pred))
#tn = T.sum((1-y)*(1-pred))
fp = T.sum((1 - y) * pred)
fn = T.sum(y * (1 - pred))
acc = tp.astype('float32') / (tp + fp + fn)
return acc
def _make_window(self):
print "window is on 32x32, fixed sigma, assuming RGB."
denom = 29.8
x0 = 16
sig = 19
fun = lambda z, x, y: (32 / denom * np.exp(-(abs(x - x0))**3 / (2 * sig**3))) * (32 / denom * np.exp(-(abs(y - x0))**3 / (2 * sig**3))) #, {x, 0, 32}, {y, 0, 32}
return np.fromfunction(fun, (3, 32, 32))
[docs] def cross_entropy_array(self, Target, Mask=None, GaussianWindow=False):
"""
Target is the TARGET image (vectorized), -> shape(x) = (batchsize, imgsize**2)
the output is of length: <batchsize>, Use cross_entropy() to get a scalar output.
"""
if GaussianWindow:
window = self.__make_window().reshape(1, -1)
if Mask is None:
#XX = window#T.TensorConstant(T.TensorType('float32',[True,False])(),data=window)
return -T.mean(
(1. if GaussianWindow == False else window) *
(T.log(self.class_probabilities) * Target +
T.log(1.0 - self.class_probabilities) * (1.0 - Target)),
axis=1)
else:
return -T.mean(
(T.log(self.class_probabilities) * Target +
T.log(1.0 - self.class_probabilities) *
(1.0 - Target)) * T.concatenate(
(Mask, Mask, Mask),
axis=1),
axis=1) #assuming RBG input
[docs] def cross_entropy(self, Target, Mask=None, GaussianWindow=False):
"""
Target is the TARGET image (vectorized), -> shape(x) = (batchsize, imgsize**2) output: scalar float32
"""
if GaussianWindow:
window = self._make_window().reshape(1, -1)
if Mask is None:
#XX = window#T.TensorConstant(T.TensorType('float32',[True,False])(),data=window)
return -T.mean(
(1. if GaussianWindow == False else window) *
(T.log(self.class_probabilities) * Target +
T.log(1.0 - self.class_probabilities) * (1.0 - Target))
) # #.reshape(new_shape)[index[0]:index[2],index[1]:index[3]]
else:
print "cross_entropy::Masked, no window"
return -T.mean(
(T.log(self.class_probabilities) * Target +
T.log(1.0 - self.class_probabilities) *
(1.0 - Target)) * T.concatenate(
(Mask, Mask, Mask),
axis=1)
) # #.reshape(new_shape)[index[0]:index[2],index[1]:index[3]]#assuming RBG input
[docs]class RecurrentLayer(object):
"""
:type input: symbolic input carrying [time, batch, feat]
:param input: theano.tensor.ftensor3
:type n_in: int
:param n_in: dimensionality of input
:type n_hid: int
:param n_hid: number of hidden units
:type activation_func: string
:param activation_func: {'relu','sigmoid','tanh','abs'}
"""
def __init__(self, input, n_in, n_hid, batch_size, activation_func='tanh'):
assert input.ndim == 3
input = input.dimshuffle(1, 0, 2) # exchange batch and time
self.n_in = n_in
self.n_hid = n_hid
self.activation_func = activation_func
self.output_shape = (batch_size, n_hid)
self.hid_lin = None
self.output = None
print "RecurrentLayer( #Inputs =", n_in, "#Hidden = ", n_hid, ")"
W_in_values = np.asarray(initWeights((n_in, n_hid), scale='glorot', mode='uni'), dtype='float32')
self.W_in = theano.shared(W_in_values, name='W_in', borrow=True)
W_hid_values = np.asarray(initWeights((n_hid, n_hid), mode='rnn'), dtype='float32')
self.W_hid = theano.shared(W_hid_values, name='W_hid', borrow=True)
b_hid_values = np.asarray(initWeights((n_hid, ), scale=1e-6, mode='fix-uni'), dtype='float32')
self.b_hid = theano.shared(b_hid_values, name='b_hid', borrow=True)
hid_0_values = np.zeros(n_hid, dtype='float32')
self.hid_0 = theano.shared(hid_0_values, name='hid_0', borrow=True)
W_in, W_hid, b_hid, hid_0 = self.W_in, self.W_hid, self.b_hid, self.hid_0
self.params = [W_in, W_hid, b_hid, hid_0]
# Select non-linearities
if activation_func == 'tanh': # range = [-1,1]
act = T.tanh # shape: (batch_size, num_outputs)
elif activation_func == 'relu': # rectified linear unit ,range = [0,inf]
act = lambda x: x * (x > 0) #T.maximum(lin_output,T.zeros_like(lin_output))
elif activation_func == 'abs': # abs unit ,range = [0,inf]
act = T.abs_
elif activation_func == 'sigmoid': # range = [0,1]
print "WARNING: sig() used!"
#print "WARNING: consider using tanh(.) or relu(.) instead! Sigmoid is really BAD! (relu > tanh >> sigmoid)"
act = T.nnet.sigmoid #1/(1 + T.exp(-lin_output))
elif activation_func == 'linear':
print "Warning: linear activation in recurrent layer with fanout-%i! Is this the output layer?" % n_hid
act = lambda x: x
else:
raise NotImplementedError("options are: activation_func=('relu'|'sigmoid'|'tanh'|'abs')")
def recurrence(x_t, hid_prev):
hid_lin_t = T.dot(x_t, W_in) + T.dot(hid_prev, W_hid) + b_hid
hid_t = act(hid_lin_t)
return [hid_t, hid_lin_t]
outputs_info = [dict(initial=T.alloc(hid_0, input.shape[1], n_hid), taps=[-1]), dict()]
# shapes are [time, batch, feat]
([hid, hid_lin], updates) = theano.scan(fn=recurrence,
sequences=input,
outputs_info=outputs_info,
name='Recurrence')
hid_lin = hid_lin.dimshuffle(1, 0, 2) # exchange batch and time again --> [batch, time, hid/feat]
hid = act(hid_lin) # I think this is needed for structural damping (calculating grad wrt hid_lin)
self.output = hid[:, -1] # [batch, hid/feat]
self.hid = hid
self.hid_lin = hid_lin
[docs] def randomizeWeights(self, scale_w=1.0):
n_in, n_hid = self.n_in, self.n_hid
W_in_values = np.asarray(initWeights((n_in, n_hid), scale='glorot', mode='uni'), dtype='float32')
self.W_in.set_value(W_in_values)
W_hid_values = np.asarray(initWeights((n_in, n_hid), mode='rnn'), dtype='float32')
self.W_hid.set_value(W_hid_values)
b_hid_values = np.asarray(initWeights((n_hid, ), scale=1e-6, mode='fix-uni'), dtype='float32')
self.b_hid.set_value(b_hid_values)
hid_0_values = np.zeros(n_hid, dtype='float32')
self.hid_0.set_value(hid_0_values)