source: ThirdParty/CodePatterns/src/Observer/Relay.cpp@ c742bb1

Candidate_v1.7.1 stable
Last change on this file since c742bb1 was c8cb0d, checked in by Frederik Heber <frederik.heber@…>, 2 days ago

Streamlines channel creation in Observables.

  • CodePatterns is now version 1.3.4.
  • we no longer need to add the channels manually in the cstor of a class that derives from Observable. Instead, we just need to pass the maximum number of channels (as they are typically enumerated anyway) and they are generated and added.
  • added mutex protection when inserting.
  • adjusted class Relay to forward similar convenience cstors.
  • adjusted all call sites in molecuilder.
  • Property mode set to 100644
File size: 10.9 KB
RevLine 
[084729c]1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2010 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * Relay.cpp
10 *
11 * Created on: Dec 1, 2011
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
[9eb71b3]20//#include "CodePatterns/MemDebug.hpp"
[084729c]21
22#include "CodePatterns/Observer/Relay.hpp"
23
24#include "CodePatterns/Assert.hpp"
25#include "CodePatterns/Observer/Channels.hpp"
26#include "CodePatterns/Observer/Notification.hpp"
27
28#include <boost/thread/locks.hpp>
29#include <boost/thread/recursive_mutex.hpp>
30
31/** Constructor for class Relay.
32 */
33Relay::Relay(std::string name) :
34 Observable(name),
35 Updater(NULL)
36{
37#ifdef LOG_OBSERVER
38 observerLog().addName(this,name);
39 observerLog().addMessage() << "++ Creating Relay " << observerLog().getName(this);
40#endif
41}
42
[c8cb0d]43/** Constructor for class Relay.
44 */
45Relay::Relay(std::string name, const channels_t &_channels) :
46 Observable(name, _channels),
47 Updater(NULL)
48{
49#ifdef LOG_OBSERVER
50 observerLog().addName(this,name);
51 observerLog().addMessage() << "++ Creating Relay " << observerLog().getName(this);
52#endif
53}
54
55/** Constructor for class Relay.
56 */
57Relay::Relay(std::string name, const unsigned int _maximum_notification_types) :
58 Observable(name, _maximum_notification_types),
59 Updater(NULL)
60{
61#ifdef LOG_OBSERVER
62 observerLog().addName(this,name);
63 observerLog().addMessage() << "++ Creating Relay " << observerLog().getName(this);
64#endif
65}
66
[084729c]67/** Destructor for class Relay.
68 * When an observable is deleted, we let all our observers know. \sa Relay::subjectKilled().
69 */
70Relay::~Relay()
71{
72#ifdef LOG_OBSERVER
73 observerLog().addMessage() << "-- Destroying Relay " << observerLog().getName(this);
74#endif
75 // killing subjects is done by Observables' dstor
76}
77
78
79
80/** Sign on an Observer to this Observable.
81 * Puts \a *target into Observable::(GlobalObservableInfo::getInstance().getcallTable()) list.
82 * \param *target Observer
83 * \param priority number in [-20,20]
84 */
85void Relay::signOn(Observer *target, GlobalObservableInfo::PriorityLevel priority) const
86{
87#ifdef LOG_OBSERVER
88 observerLog().addMessage() << "@@ Signing on " << observerLog().getName(target)
89 << " to "
90 << observerLog().getName(const_cast<Observable *>(static_cast<const Observable * const>(this)));
91#endif
92 bool res = false;
93 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
94 GlobalObservableInfo::callees_t &callees = (GlobalObservableInfo::getInstance().getcallTable())[const_cast<Observable *>(static_cast<const Observable * const>(this))];
95
96 GlobalObservableInfo::callees_t::iterator iter;
97 for(iter=callees.begin();iter!=callees.end();++iter){
98 res |= ((*iter).second == target);
99 }
100 if(!res)
101 callees.insert(std::pair<int,Observer*>(priority.level,target));
102}
103
104/** Sign off an Observer from this Observable.
105 * Removes \a *target from Observable::(GlobalObservableInfo::getInstance().getcallTable()) list.
106 * \param *target Observer
107 */
108void Relay::signOff(Observer *target) const
109{
110 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
111 GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable();
112 ASSERT(callTable.count(const_cast<Observable *>(static_cast<const Observable * const>(this))),
113 "Relay::signOff() - called for an Observable without Observers.");
114#ifdef LOG_OBSERVER
115 observerLog().addMessage() << "** Signing off " << observerLog().getName(target)
116 << " from "
117 << observerLog().getName(const_cast<Observable *>(static_cast<const Observable * const>(this)));
118#endif
119 GlobalObservableInfo::callees_t &callees = callTable[const_cast<Observable *>(static_cast<const Observable * const>(this))];
120
121 GlobalObservableInfo::callees_t::iterator iter;
122 GlobalObservableInfo::callees_t::iterator deliter;
123 for(iter=callees.begin();iter!=callees.end();) {
124 if((*iter).second == target) {
125 callees.erase(iter++);
126 }
127 else {
128 ++iter;
129 }
130 }
131 if(callees.empty()){
132 callTable.erase(const_cast<Observable *>(static_cast<const Observable * const>(this)));
133 }
134}
135
136void Relay::signOn(Observer *target, size_t channelno, GlobalObservableInfo::PriorityLevel priority) const
137{
138 Notification_ptr notification = getChannel(channelno);
139 notification->addObserver(target, priority.level);
140}
141
142void Relay::signOff(Observer *target, size_t channelno) const
143{
144 Notification_ptr notification = getChannel(channelno);
145 notification->removeObserver(target);
146}
147
148/** Notify all Observers of changes.
149 * Puts \a *this into Relay::(GlobalObservableInfo::getInstance().getbusyObservables()), calls Observer::update() for all in callee_t
150 * and removes from busy list.
151 */
152void Relay::notifyAll() {
153 ASSERT(Updater != NULL,
154 "Relay::notifyAll() called while Updater is NULL.");
155 // we are busy notifying others right now
156 // add ourselves to the list of busy subjects to enable circle detection
157 {
158 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
159 (GlobalObservableInfo::getInstance().getbusyObservables()).insert(this);
160 }
161 // see if anyone has signed up for observation
162 // and call all observers
163 try {
164 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
165 GlobalObservableInfo::calltable_t& callTable = GlobalObservableInfo::getInstance().getcallTable();
166 const bool callTable_contains = callTable.count(this);
167 if(callTable_contains) {
168 // elements are stored sorted by keys in the multimap
169 // so iterating over it gives us a the callees sorted by
170 // the priorities
171 // copy such that signOff() within receiving update() does not affect iterating
172 // this is because within the same thread and with the updateKilled() signOff() may be
173 // called and when executed it modifies targets
174 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
175 GlobalObservableInfo::callees_t callees = callTable[this];
176 GlobalObservableInfo::callees_t::iterator iter;
177 for(iter=callees.begin();iter!=callees.end();++iter){
178#ifdef LOG_OBSERVER
179 observerLog().addMessage() << "-> " << observerLog().getName(this)
180 << " is relaying update from " << observerLog().getName(Updater)
181 << " to " << observerLog().getName((*iter).second)
182 << " (priority=" << (*iter).first << ")";
183#endif
184 (*iter).second->update(Updater);
185 }
186 }
187 }
188 ASSERT_NOCATCH("Exception thrown from Observer Update");
189
190 // send out all (GlobalObservableInfo::getInstance().getnotifications()) that need to be done
191 {
192 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
193 GlobalObservableInfo::notificationSet currentNotifications =
194 (GlobalObservableInfo::getInstance().getnotifications())[Updater];
195 for(GlobalObservableInfo::notificationSet::iterator it = currentNotifications.begin();
196 it != currentNotifications.end();++it){
197 (*it)->notifyAll(Updater);
198 }
199 }
200
201 {
202 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
203 (GlobalObservableInfo::getInstance().getnotifications()).erase(Updater);
204
205 // done with notification, we can leave the set of busy subjects
206 (GlobalObservableInfo::getInstance().getbusyObservables()).erase(this);
207 }
208}
209
210
211/** Handles passing on updates from sub-Relays.
212 * Mimicks basically the Observer::update() function.
213 *
214 * \param *publisher The \a *this we observe.
215 */
216void Relay::update(Observable *publisher) {
217 // circle detection
218 bool circle_present = false;
219 {
220 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
221 std::set<Observable*>& busyObservables = GlobalObservableInfo::getInstance().getbusyObservables();
222 circle_present = busyObservables.find(this)!=busyObservables.end();
223 }
224 if(circle_present) {
225 // somehow a circle was introduced... we were busy notifying our
226 // observers, but still we are called by one of our sub-Relays
227 // we cannot be sure observation will still work at this point
228 ASSERT(0,"Circle detected in observation-graph.\n"
229 "Observation-graph always needs to be a DAG to work correctly!\n"
230 "Please check your observation code and fix this!\n");
231 return;
232 }
233 else {
234 // see if we are in the process of changing ourselves
235 // if we are changing ourselves at the same time our sub-observables change
236 // we do not need to publish all the changes at each time we are called
237 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
238 std::map<Observable*, int>& depth = GlobalObservableInfo::getInstance().getdepth();
239 const bool depth_contains = depth.find(this)==depth.end();
240 if(depth_contains) {
241#ifdef LOG_OBSERVER
242 observerLog().addMessage() << "-* Update from " << observerLog().getName(publisher)
243 << " relayed by " << observerLog().getName(this);
244#endif
245 Updater = publisher;
246 notifyAll();
247 Updater = NULL;
248 }
249 else{
250#ifdef LOG_OBSERVER
251 observerLog().addMessage() << "-| Update from " << observerLog().getName(publisher)
252 << " not relayed by " << observerLog().getName(this);
253#endif
254 }
255 }
256}
257
258/** Method for receiving specialized (GlobalObservableInfo::getInstance().getnotifications()).
259 *
260 * \param *publisher The \a *this we observe.
261 * \param notification type of notification
262 */
263void Relay::recieveNotification(Observable *publisher, Notification_ptr notification)
264{
265 Updater = publisher;
266 bool contains_channels = false;
267 {
268 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
269 contains_channels = NotificationChannels.find(this) != NotificationChannels.end();
270 }
271 if (contains_channels) {
272 const size_t channelno = notification->getChannelNo();
273 Notification *mynotification = NULL;
274 {
275 boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex());
276 ChannelMap::const_iterator iter = NotificationChannels.find(this);
277 const Channels *myChannels = iter->second;
278 mynotification = myChannels->getChannel(channelno);
279 }
280 ASSERT(mynotification != NULL,
281 "Relay::recieveNotification() - this relay does not have a notification no "+toString(channelno)+".");
282 mynotification->notifyAll(Updater);
283 Updater = NULL;
284 } else {
285 // note that this relay does not seem to have any channels
286 }
287}
288
289/** Handles sub-observables that just got killed
290 * when an sub-observerable dies we usually don't need to do anything
291 * \param *publisher Sub-Relay.
292 */
293void Relay::subjectKilled(Observable *publisher)
294{
295}
296
Note: See TracBrowser for help on using the repository browser.