Personal tools
You are here: Home Development EM-algorithm learning.py
Document Actions

learning.py

Click here to get the file

Size 26.9 kB - File type text/python-source

File contents

###############################################################################
## OpenBayes
## OpenBayes for Python is a free and open source Bayesian Network library
## Copyright (C) 2006  Gaitanis Kosta, Francois de Brouchoven
##
## This library is free software; you can redistribute it and/or
## modify it under the terms of the GNU Lesser General Public
## License as published by the Free Software Foundation; either
## version 2.1 of the License, or (at your option) any later version.
##
## This library is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
## Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public
## License along with this library (LICENSE.TXT); if not, write to the 
## Free Software Foundation, 
## Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
###############################################################################
import graph
import bayesnet
import distributions
from inference import ConnexeInferenceJTree, JoinTree
import random
import unittest
import copy
import numarray as na
import math

import logging
# show INFO messages
#logging.basicConfig(level= logging.INFO)
#uncomment the following to remove all messages
logging.basicConfig(level = logging.NOTSET)

class EMLearningEngine:
	""" EM learning algorithm
	Learns the parameters of a known bayesian structure from incomplete data.
	"""	  
	BNet = None # The underlying bayesian network
	engine = None
	
	def __init__(self, BNet):
		self.BNet = BNet
		self.engine = JoinTree(BNet)
		#self.engine = MCMCEngine(BNet)
	
	def EMLearning(self, cases, max_iter, initializeFirst=False):
		""" cases = [{'c':0,'s':1,'r':'?','w':1},{...},...]
		Put '?' when the data is unknown.
		Will estimate  the '?' by inference.
		"""
		
		# if wanted the network is initialized (previously learned CPT's are
		# resetted, the previous learned data is lost)
		if initializeFirst:
			for v in self.BNet.v.values():
				if v.distribution.isAdjustable:
					v.distribution.initializeAugmentedEq() # sets all a_ij and b_ij to equivalent sample size
					v.distribution.initializeCounts() # sets all counts to zero
					v.distribution.setAugmentedCounts()
					v.distribution.normalize(dim=v.name) # set the initial Pr's to a_ij/(a_ij+b_ij)
			
		iter = 0
		old = None
		new = self.BNet
		precision = 0.05
		while self.hasntConverged(old, new, precision) and iter < max_iter:
			iter += 1
			print 'EM iteration: ', iter
			old = copy.deepcopy(new)
			self.LearnEMParamsWM(cases)
			# reinitialize the JunctionTree to take effect of new parameters learned
			self.engine.Initialization()
			# self.engine.GlobalPropagation()
			new = copy.deepcopy(self.BNet)
	
	def LearnEMParamsWM(self, cases):
		# Initialise the counts of each vertex
		for v in self.BNet.v.values():
			if v.distribution.isAdjustable:
				v.distribution.initializeCounts() # sets all counts to zero
		
		# First part of the algorithm : Estimation of the unknown 
		# data. (E-part)
		for case in cases:
			#assert(set(case.keys()) == set(self.BNet.v.keys())), "Not all values of 'case' are set"
			
			known = dict() # will contain al the known data of the case
			for key in case.iterkeys():
				if case[key] != '?':
					known[key] = case[key]
			
			for v in self.BNet.v.values():
				if v.distribution.isAdjustable:
					names = [parent.name for parent in v.family[1:]]
					nvals = [parent.nvalues for parent in v.family[1:]]
					names.append(v.name)
					nvals.append(v.nvalues)
					self.calcExpectation(v,known,names,nvals)
		
		# Second part of the algorithm : Estimation of the parameters. 
		# (M-part)
		for v in self.BNet.v.values():
			if v.distribution.isAdjustable:
				v.distribution.setAugmentedAndCounts()
				v.distribution.normalize(dim=v.name)
	
	def calcExpectation(self,v,known,names,nvals,cumPr=1):
		"""	calculate and set expectations of node v
			based on chain rule: Pr(names | known) = Pr(names[0]|names[1] .. names[end], known).Pr(names[1]|names[2]..names[end], known)...
			
			in:
			v           : node to calc and set
			known       : the known nodes of the net
			names       : the names of the nodes necessary to calcute P(X_v = val_v, pa_v = val_ij| known, prior_CPT)
			nvals       : the nvalues ...
			cumPr       : the cumulative chance of the chain rule
			
			out:
			Counts of node v are adjusted based on the known nodes
		"""
		if cumPr == 0:
			# chance will be remain zero so no use in calculating any further
			return
		
		newnames = list(names)
		name = newnames.pop()
		newnvals = list(nvals)
		nval = newnvals.pop()
		
		if known.has_key(name):
			# Value of P(name=X | pa_v(i+1) ... , known) is 1 because of the known information
			if len(newnames) == 0:
				v.distribution.addToCounts(known,cumPr)
			else:
				self.calcExpectation(v,known,newnames,newnvals,cumPr)
		else:
			self.engine.Initialization() #Only Initializiatino is enough to reset because SetObs calls GlobalPropagation
			self.engine.SetObs(known)
			marg = self.engine.Marginalise(name)
			
			for value in range(nval):
				newknown = dict(known)
				newknown[name] = value
				thisPr = marg[value]
				if not(str(thisPr) == 'nan' or thisPr == 0):
					if len(newnames) == 0:
						v.distribution.addToCounts(newknown,cumPr*thisPr)
					else:
						self.calcExpectation(v,newknown,newnames,newnvals,cumPr*thisPr)
	
	def LearnEMParams(self, cases):
		""" 
		First part of the algorithm : Estimation of the unknown 
		data. (E-part)
		""" 
		# Initialise the counts of each vertex
		for v in self.BNet.v.values():
				v.distribution.initializeCounts()
		for case in cases:
			known={} # will contain all the known data of case
			unknown=[] # will contain all the unknown keys of case
			for key in case.iterkeys():
				if case[key] != '?': # It's the only part of code you have to change if you want to have another 'unknown sign' instead of '?'
					known[key] = case[key]
				else:
					unknown.append(key)
			if len(case) == len(known): # Then all the data is known -> proceed as LearnMLParams (inference.py)
				for v in self.BNet.v.values():
					if v.distribution.isAdjustable: 
						v.distribution.incrCounts(case)
			else:
				states_list = self.Combinations(unknown) # Give a dictionary list of all the possible states of the unknown parameters
				likelihood_list = self.DetermineLikelihood(known, states_list) # Give a list with the likelihood to have the states in states_list				  
				for j, index_unknown in enumerate(states_list):
					index = copy.copy(known)
					index.update(index_unknown)
					for v in self.BNet.v.values():
						if v.distribution.isAdjustable:
							v.distribution.addToCounts(index, likelihood_list[j]) 
		""" 
		Second part of the algorithm : Estimation of the parameters. 
		(M-part)
		"""
		for v in self.BNet.v.values():
			if v.distribution.isAdjustable:
				v.distribution.setCounts()
				v.distribution.normalize(dim=v.name)
	
	def DetermineLikelihood(self, known, states_list):
		""" 
		Give a list with the likelihood to have the states in states_list
		I think this function could be optimized
		"""		   
		likelihood = []
		for states in states_list:
			# states = {'c':0,'r':1} for example (c and r were unknown in the beginning)
			like = 1
			temp_dic = {}
			copy_states = copy.copy(states)
			for key in states.iterkeys():
				""" 
				It has to be done for all keys because we have to set every 
				observation but key to compute the likelihood to have key in
				his state. The multiplication of all the likelihoods gives the
				likelihood to have states.				   
				"""
				temp_dic[key] = (copy_states[key])
				del copy_states[key]
				if len(copy_states) != 0:
					# first update copy_states because calling SetObs(known) and SetObs(copy_states)
					# isn't the same as calling copy_states.update(known) and SetObs(copy_states)
					# BUG?
					copy_states.update(known) 
					self.engine.SetObs(copy_states)
				else:
					self.engine.SetObs(known) # Has to be done at each iteration because of the self.engine.Initialization() below 
					
				#like = like*self.BNet.v[key].distribution.Convert_to_CPT()[temp_dic[key]]
				like = like*self.engine.ExtractCPT(key)[temp_dic[key]]
				if str(like) == 'nan':
					like = 0
					
				copy_states.update(temp_dic)			   
				del temp_dic[key]
				self.engine.Initialization()
			likelihood.append(like)
		return likelihood
	
	def DetermineLikelihoodIncorrect(self, known, states_list):
		""" 
		Give a list with the likelihood to have the states in states_list.
		6 to 10 time faster than the DetermineLikelihood function above, but 
		has to be tested more...
		"""		   
		likelihood = []
		for states in states_list:
			# states = {'c':0,'r':1} for example (c and r were unknown in the beginning)
			like = 1
			parents_state = copy.copy(known)
			parents_state.update(copy.copy(states))
			for key in states.iterkeys():
				cpt = self.BNet.v[key].distribution[parents_state]
				like = like * cpt				
			likelihood.append(like)
		return likelihood
	
	def Combinations(self, vertices):
		''' vertices is a list of BVertex instances
		
		in the case of the water-sprinkler BN :
		>>> Combinations([c,r,w])
		[{'c':0,'r':0,'w':0},{'c':0,'r':0,'w':1},...,{'c'=1,'r':1,'w':1}]
		'''
		if len(vertices) > 1:
			list_comb = self.Combinations(vertices[:-1])
			new_list_comb = []
			for el in list_comb:
				for i in range(self.BNet.v[vertices[-1]].nvalues):
					temp = copy.copy(el)
					temp[self.BNet.v[vertices[-1]].name]=i
					new_list_comb.append(temp)
			return new_list_comb
		else:
			return [{self.BNet.v[vertices[0]].name:el} for el in range(self.BNet.v[vertices[0]].nvalues)]
	
	def hasntConverged(self, old, new, precision):
		'''
		Return true if the difference of distribution of at least one vertex 
		of the old and new BNet is bigger than precision
		'''
		if not old :
			return True	  
		else:
			return not	na.alltrue([na.allclose(v.distribution, new.v[v.name].distribution, atol=precision) for v in old.v.values()])
	

class GreedyStructLearningEngine(EMLearningEngine):
	""" Greedy Structural learning algorithm
	Learns the structure of a bayesian network from known parameters and an 
	initial structure.
	"""	  
	BNet = None # The underlying bayesian network
	engine = None
	
	def __init__(self, BNet):
		self.BNet = BNet.copy()
		#self.engine = ConnexeInferenceJTree(self.BNet)
		self.converged = False

	def StructLearning(self, cases):
		N = len(cases)
		iter = 0
		while (not self.converged):
			self.LearnStruct(cases, N)
			iter +=1
			print 'Structure iteration:', iter

	def GlobalBICScore(self, N, cases):
		'''Computes the BIC score of self.BNet'''
		score = 0
		for v in self.BNet.all_v:
			cpt_matrix = v.distribution.Convert_to_CPT()
			dim = self.BNet.Dimension(v)
			score = score + self.ScoreBIC(N, dim, self.BNet, v, cases) 
		return score

	def LearnStruct(self, cases, N):
		"""Greedy search for optimal structure (all the data in cases are known).
		It will go through every node of the BNet. At each node, it will delete 
		every outgoing edge, or add every possible edge, or reverse every 
		possible edge. It will compute the BIC score each time and keep the BNet
		with the highest score.
		"""
		G_initial = self.BNet.copy()
		engine_init = GreedyStructLearningEngine(G_initial)
		G_best = self.BNet.copy()	  
		prec_var_score = 0
		
		for v in self.BNet.all_v:
			G = copy.deepcopy(engine_init.BNet)
			edges = copy.deepcopy(G.v[v.name].out_e)
			
			# delete the outgoing edges
			while edges:
				edge = edges.pop(0)
				node = edge._v[1] #node is the child node, the only node for which the cpt table changes
				dim_init = G_initial.Dimension(node)
				score_init = engine_init.ScoreBIC(N, dim_init, G_initial, G_initial.v[node.name], cases)
				self.ChangeStruct('del', edge) #delete the current edge
				self.SetNewDistribution(engine_init.BNet, node, cases)
				dim = self.BNet.Dimension(node)
				score = self.ScoreBIC(N, dim, self.BNet, self.BNet.v[node.name], cases)
				var_score = score - score_init
				if var_score > prec_var_score:
					print 'deleted:', v.name, node.name, var_score
					prec_var_score = var_score
					G_best = self.BNet.copy()
				self.BNet = G_initial.copy()
			
			# Add all possible edges
			G = copy.deepcopy(engine_init.BNet)
			nodes = []
			for node in G.all_v:
				if (not (node.name in [vv.name for vv in self.BNet.v[v.name].out_v])) and (not (node.name == v.name)):
					nodes.append(node)
			while nodes:
				node = nodes.pop(0)
				if G.e.keys():
					edge = graph.DirEdge(max(G.e.keys())+1, self.BNet.v[v.name], self.BNet.v[node.name])
				else:
					edge = graph.DirEdge(0, self.BNet.v[v.name], self.BNet.v[node.name])
				self.ChangeStruct('add', edge)
				if self.BNet.HasNoCycles(self.BNet.v[node.name]):
					dim_init = engine_init.BNet.Dimension(node)
					score_init = engine_init.ScoreBIC(N, dim_init, G_initial, G_initial.v[node.name], cases)
					self.SetNewDistribution(engine_init.BNet, node, cases)
					dim = self.BNet.Dimension(node)
					score = self.ScoreBIC(N, dim, self.BNet, self.BNet.v[node.name], cases)
					var_score = score - score_init
					if var_score > prec_var_score:
						print 'added: ', v.name, node.name, var_score
						prec_var_score = var_score
						G_best = self.BNet.copy()
				self.BNet = G_initial.copy()
		
			# Invert all possible edges
			G = copy.deepcopy(G_initial)
			edges = copy.deepcopy(G.v[v.name].out_e)
			while edges:
				edge = edges.pop(0)
				node = self.BNet.v[edge._v[1].name] #node is the child node
				dim_init1 = G_initial.Dimension(node)
				score_init1 = engine_init.ScoreBIC(N, dim_init1, G_initial, G_initial.v[node.name], cases)
				self.ChangeStruct('del', edge)
				self.SetNewDistribution(engine_init.BNet, node, cases)
				dim1 = self.BNet.Dimension(node)
				score1 = self.ScoreBIC(N, dim1, self.BNet, self.BNet.v[node.name], cases)
				G_invert = self.BNet.copy() 
				engine_invert = GreedyStructLearningEngine(G_invert)  
				inverted_edge = graph.DirEdge(max(G.e.keys())+1, self.BNet.v[node.name],self.BNet.v[v.name])
				self.ChangeStruct('add', inverted_edge)
				if self.BNet.HasNoCycles(self.BNet.v[node.name]):
					dim_init = G_initial.Dimension(v)
					score_init = engine_init.ScoreBIC(N, dim_init, G_initial, G_initial.v[v.name], cases)
					self.SetNewDistribution(engine_invert.BNet, v, cases)
					dim = self.BNet.Dimension(v)
					score = self.ScoreBIC(N, dim, self.BNet, self.BNet.v[v.name], cases)
					var_score = score1 - score_init1 + score - score_init
					if var_score > prec_var_score + 5: #+ 5 is to avoid recalculation due to round errors
						print 'inverted:', v.name, node.name, var_score
						prec_var_score = var_score
						G_best = self.BNet.copy()
				self.BNet = G_initial.copy()
		
		#self.BNet is the optimal graph structure
		if prec_var_score == 0:
			self.converged = True
		self.BNet = G_best.copy()
		#self.engine = ConnexeInferenceJTree(self.BNet)
	
	def SetNewDistribution(self, G_initial, node, cases):
		'''Set the new distribution of the node node. The other distributions
		are the same as G_initial (only node has a new parent, so the other 
		distributions don't change). Works also with incomplete data'''
		self.BNet.InitDistributions()
		for v in G_initial.all_v:
			if v.name != node.name:
				cpt = G_initial.v[v.name].distribution.Convert_to_CPT()
				self.BNet.v[v.name].distribution.setParameters(cpt)
			else:
				if self.BNet.v[node.name].distribution.isAdjustable:
					self.BNet.v[node.name].distribution.initializeCounts()
					for case in cases :
						known={} # will contain all the known data of case
						unknown=[] # will contain all the unknown keys of case
						for key in case.iterkeys():
							if case[key] != '?': # It's the only part of code you have to change if you want to have another 'unknown sign' instead of '?'
								known[key] = case[key]
							else:
								unknown.append(key)
						if len(case) == len(known): # Then all the data is known -> proceed as LearnMLParams (inference.py)
							self.BNet.v[node.name].distribution.incrCounts(case)
						else:
							states_list = self.Combinations(unknown) # Give a dictionary list of all the possible states of the unknown parameters
							likelihood_list = self.DetermineLikelihood(known, states_list) # Give a list with the likelihood to have the states in states_list				  
							for j, index_unknown in enumerate(states_list):
								index = copy.copy(known)
								index.update(index_unknown)
								self.BNet.v[node.name].distribution.addToCounts(index, likelihood_list[j])
					self.BNet.v[node.name].distribution.setCounts()
					self.BNet.v[node.name].distribution.normalize(dim=node.name)
	
	def ScoreBIC (self, N, dim, G, node, data):
		''' This function computes the BIC score of one node.
		N is the size of the data from which we learn the structure
		dim is the dimension of the node, = (nbr of state - 1)*nbr of state of the parents
		data is the list of cases
		return the BIC score
		Works also with incomplete data!
		'''
		score = self.ForBIC(G, data, node)
		score = score - 0.5*dim*math.log(N)
		return score

	def ForBIC(self, G, cases, node):
		''' Computes for each case the probability to have node and his parents
		in the case state, take the log of that probability and add them.'''
		score = 0
		for case in cases :
			cpt = 0 
			known={} # will contain all the known data of case
			unknown=[] # will contain all the unknown data of case
			for key in case.iterkeys():
				if case[key] != '?': # It's the only part of code you have to change if you want to have another 'unknown sign' instead of '?'
					known[key] = case[key]
				else:
					unknown.append(key)
			if len(case) == len(known): # Then all the data is known
				cpt = node.distribution[case]
			else:
				states_list = self.Combinations(unknown) # Give a dictionary list of all the possible states of the unknown parameters
				likelihood_list = self.DetermineLikelihood(known, states_list) # Give a list with the likelihood to have the states in states_list				  
				for j, index_unknown in enumerate(states_list):
					index = copy.copy(known)
					index.update(index_unknown)
					cpt = cpt + likelihood_list[j]*node.distribution[index]
			if cpt == 0: # To avoid log(0)
				cpt = math.exp(-700)
			score = score + math.log(cpt)
		return score

	def ChangeStruct(self, change, edge):
		"""Changes the edge (add, remove or reverse)"""
		if change == 'del':
			self.BNet.del_e(edge)
		elif change == 'add':
			self.BNet.add_e(edge)
		elif change == 'inv':
			self.BNet.inv_e(edge)
		else:
			assert(False), "The asked change of structure is not possible. Only 'del' for delete, 'add' for add, and 'inv' for invert"

class SEMLearningEngine(GreedyStructLearningEngine, EMLearningEngine):	
	""" Structural EM learning algorithm
	Learns the structure and the parameters of a bayesian network from 
	unknown parameters and an initial structure.
	""" 
	def __init__(self, BNet):
		self.BNet = BNet
		self.engine = ConnexeInferenceJTree(self.BNet)
		self.converged = False
	
	def SEMLearning(self, cases, max_iter = 30):
		"""Structural EM for optimal structure and parameters if some of the 
		data is unknown (put '?' for unknown data).
		"""
		N = len(cases)
		iter = 0
		while (not self.converged) and iter < max_iter:
			#First we estimate the distributions of the initial structure
			self.BNet.InitDistributions()
			self.EMLearning(cases, 10)
			#Then we find a better structure in the neighborhood of self.BNet
			self.LearnStruct(cases, N)
			iter +=1
			print 'Structure Expectation-Maximisation iteration:', iter
  

class SEMLearningTestCase(unittest.TestCase):
	def setUp(self):
		# create a discrete network
		G = bayesnet.BNet('Water Sprinkler Bayesian Network')
		c,s,r,w = [G.add_v(bayesnet.BVertex(nm,True,2)) for nm in 'c s r w'.split()]
		for ep in [(c,r), (c,s), (r,w), (s,w)]:
			G.add_e(graph.DirEdge(len(G.e), *ep))
		G.InitDistributions()
		c.setDistributionParameters([0.5, 0.5])
		s.setDistributionParameters([0.5, 0.9, 0.5, 0.1])
		r.setDistributionParameters([0.8, 0.2, 0.2, 0.8])
		w.distribution[:,0,0]=[0.99, 0.01]
		w.distribution[:,0,1]=[0.1, 0.9]
		w.distribution[:,1,0]=[0.1, 0.9]
		w.distribution[:,1,1]=[0.0, 1.0]
		self.c = c
		self.s = s
		self.r = r
		self.w = w
		self.BNet = G 

	def testSEM(self):
		N = 2000
		# sample the network N times, delete some data
		cases = self.BNet.Sample(N)	   # cases = [{'c':0,'s':1,'r':0,'w':1},{...},...]		 
		for i in range(500):
			case = cases[3*i]
			rand = random.sample(['c','s','r','w'],1)[0]
			case[rand] = '?' 
		for i in range(50):
			case = cases[3*i]
			rand = random.sample(['c','s','r','w'],1)[0]
			case[rand] = '?' 
		G = bayesnet.BNet('Water Sprinkler Bayesian Network2')
		c,s,r,w = [G.add_v(bayesnet.BVertex(nm,True,2)) for nm in 'c s r w'.split()]
		G.InitDistributions()
		# Test SEMLearning
		struct_engine = SEMLearningEngine(G)
		struct_engine.SEMLearning(cases)
		print 'learned structure: ', struct_engine.BNet
		print 'total bic score: ', struct_engine.GlobalBICScore(N, cases)

class GreedyStructLearningTestCase(unittest.TestCase):
	#TEST Waterspringler
	def setUp(self):
		# create a discrete network
		G = bayesnet.BNet('Water Sprinkler Bayesian Network')
		c,s,r,w = [G.add_v(bayesnet.BVertex(nm,True,2)) for nm in 'c s r w'.split()]
		for ep in [(c,r), (c,s), (r,w), (s,w)]:
			G.add_e(graph.DirEdge(len(G.e), *ep))
		G.InitDistributions()
		c.setDistributionParameters([0.5, 0.5])
		s.setDistributionParameters([0.5, 0.9, 0.5, 0.1])
		r.setDistributionParameters([0.8, 0.2, 0.2, 0.8])
		w.distribution[:,0,0]=[0.99, 0.01]
		w.distribution[:,0,1]=[0.1, 0.9]
		w.distribution[:,1,0]=[0.1, 0.9]
		w.distribution[:,1,1]=[0.0, 1.0]
		self.c = c
		self.s = s
		self.r = r
		self.w = w
		self.BNet = G  
	
	def testStruct(self):
		N = 2000
		# sample the network N times, delete some data
		cases = self.BNet.Sample(N)	   # cases = [{'c':0,'s':1,'r':0,'w':1},{...},...]		 
		for i in range(500):
			case = cases[3*i]
			rand = random.sample(['c','s','r','w'],1)[0]
			case[rand] = '?' 
		for i in range(50):
			case = cases[3*i]
			rand = random.sample(['c','s','r','w'],1)[0]
			case[rand] = '?' 
		G = bayesnet.BNet('Water Sprinkler Bayesian Network2')
		c,s,r,w = [G.add_v(bayesnet.BVertex(nm,True,2)) for nm in 'c s r w'.split()]
		G.InitDistributions()
		c.setDistributionParameters([0.5, 0.5])
		s.setDistributionParameters([0.7, 0.3])
		r.setDistributionParameters([0.5, 0.5])
		w.setDistributionParameters([0.35, 0.65])
		# Test StructLearning
		struct_engine = GreedyStructLearningEngine(G)
		struct_engine.StructLearning(cases)
		print 'learned structure: ', struct_engine.BNet
		print 'total bic score: ', struct_engine.GlobalBICScore(N, cases)


##	  # TEST ASIA
##	  def setUp(self):
##		  # create the network
##		  G = bayesnet.BNet( 'Asia Bayesian Network' )
##		  visit, smoking, tuberculosis, bronchitis, lung, ou, Xray, dyspnoea = [G.add_v( bayesnet.BVertex( nm, True, 2 ) ) for nm in 'visit smoking tuberculosis bronchitis lung ou Xray dyspnoea'.split()]
##		  for ep in [(visit,tuberculosis), (tuberculosis, ou), (smoking,lung), (lung, ou), (ou, Xray), (smoking, bronchitis), (bronchitis, dyspnoea), (ou, dyspnoea)]:
##			  G.add_e( graph.DirEdge( len( G.e ), *ep ) )
##		  G.InitDistributions()
##		  visit.setDistributionParameters([0.99, 0.01])
##		  tuberculosis.distribution[:,0]=[0.99, 0.01]
##		  tuberculosis.distribution[:,1]=[0.95, 0.05]
##		  smoking.setDistributionParameters([0.5, 0.5])
##		  lung.distribution[:,0]=[0.99, 0.01]
##		  lung.distribution[:,1]=[0.9, 0.1]
##		  ou.distribution[:,0,0]=[1, 0]
##		  ou.distribution[:,0,1]=[0, 1]
##		  ou.distribution[:,1,0]=[0, 1]
##		  ou.distribution[:,1,1]=[0, 1]
##		  Xray.distribution[:,0]=[0.95, 0.05]
##		  Xray.distribution[:,1]=[0.02, 0.98]
##		  bronchitis.distribution[:,0]=[0.7, 0.3]
##		  bronchitis.distribution[:,1]=[0.4, 0.6]
##		  dyspnoea.distribution[{'bronchitis':0,'ou':0}]=[0.9, 0.1]
##		  dyspnoea.distribution[{'bronchitis':1,'ou':0}]=[0.2, 0.8]
##		  dyspnoea.distribution[{'bronchitis':0,'ou':1}]=[0.3, 0.7]
##		  dyspnoea.distribution[{'bronchitis':1,'ou':1}]=[0.1, 0.9]
##		  self.visit = visit
##		  self.tuberculosis = tuberculosis
##		  self.smoking = smoking
##		  self.lung = lung
##		  self.ou = ou
##		  self.Xray = Xray
##		  self.bronchitis = bronchitis
##		  self.dyspnoea = dyspnoea
##		  self.BNet = G
##	  
##	  def testStruct(self):
##		  N = 10000
##		  # sample the network N times
##		  cases = self.BNet.Sample(N)	 # cases = [{'c':0,'s':1,'r':0,'w':1},{...},...]
##		  # create two new bayesian network with the same parameters as self.BNet
##		  G1 = bayesnet.BNet( 'Asia Bayesian Network2' )
##		  visit, smoking, tuberculosis, bronchitis, lung, ou, Xray, dyspnoea = [G1.add_v( bayesnet.BVertex( nm, True, 2 ) ) for nm in 'visit smoking tuberculosis bronchitis lung ou Xray dyspnoea'.split()]
##		  G1.InitDistributions()
##		  visit.setDistributionParameters([0.99, 0.01])
##		  tuberculosis.setDistributionParameters([0.99, 0.01])
##		  smoking.setDistributionParameters([0.5, 0.5])
##		  lung.setDistributionParameters([0.95, 0.05])
##		  ou.setDistributionParameters([0.94, 0.06])
##		  Xray.setDistributionParameters([0.89, 0.11])
##		  bronchitis.setDistributionParameters([0.55, 0.45])
##		  dyspnoea.setDistributionParameters([0.56, 0.44])
##		  # Test StructLearning
##		  struct_engine1 = GreedyStructLearningEngine(G1)
##		  struct_engine1.StructLearning(cases)
##		  print struct_engine1.BNet
	

class EMLearningTestCase(unittest.TestCase):
	def setUp(self):
		# create a discrete network
		G = bayesnet.BNet('Water Sprinkler Bayesian Network')
		c,s,r,w = [G.add_v(bayesnet.BVertex(nm,True,2)) for nm in 'c s r w'.split()]
		for ep in [(c,r), (c,s), (r,w), (s,w)]:
			G.add_e(graph.DirEdge(len(G.e), *ep))
		G.InitDistributions()
		c.setDistributionParameters([0.5, 0.5])
		s.setDistributionParameters([0.5, 0.9, 0.5, 0.1])
		r.setDistributionParameters([0.8, 0.2, 0.2, 0.8])
		w.distribution[:,0,0]=[0.99, 0.01]
		w.distribution[:,0,1]=[0.1, 0.9]
		w.distribution[:,1,0]=[0.1, 0.9]
		w.distribution[:,1,1]=[0.0, 1.0]
		
		self.c = c
		self.s = s
		self.r = r
		self.w = w
		self.BNet = G
	
	def testEM(self):
		# sample the network 2000 times
		cases = self.BNet.Sample(2000)
		# delete some observations
		for i in range(500):
			case = cases[3*i]
			rand = random.sample(['c','s','r','w'],1)[0]
			case[rand] = '?' 
		for i in range(50):
			case = cases[3*i]
			rand = random.sample(['c','s','r','w'],1)[0]
			case[rand] = '?'

		# create a new BNet with same nodes as self.BNet but all parameters
		# set to 1s
		G = copy.deepcopy(self.BNet)
		
		G.InitDistributions()
		
		engine = EMLearningEngine(G)
		engine.EMLearning(cases, 10)
		
		tol = 0.08
		assert(na.alltrue([na.allclose(v.distribution.cpt, self.BNet.v[v.name].distribution.cpt, atol=tol) \
			   for v in engine.BNet.all_v])), \
				" Learning does not converge to true values "
		print 'ok!!!!!!!!!!!!'

if __name__ == '__main__':
	suite = unittest.makeSuite(SEMLearningTestCase, 'test')
	runner = unittest.TextTestRunner()
	runner.run(suite) 

##	  suite = unittest.makeSuite(GreedyStructLearningTestCase, 'test')
##	  runner = unittest.TextTestRunner()
##	  runner.run(suite)	   
	
##	  suite = unittest.makeSuite(EMLearningTestCase, 'test')
##	  runner = unittest.TextTestRunner()
##	  runner.run(suite)
by Wannes Meert last modified 2007-01-12 16:12

Powered by Plone, the Open Source Content Management System

This site conforms to the following standards: