Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
v6
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Analytics
Analytics
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Commits
Open sidebar
pub
v6
Commits
a60e513a
Commit
a60e513a
authored
Apr 19, 2017
by
Christian Mohrbacher
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
updated to release 6.9
parent
366ed243
Changes
29
Hide whitespace changes
Inline
Side-by-side
Showing
29 changed files
with
246 additions
and
199 deletions
+246
-199
fhgfs_auto_package/make-packages-common.sh
fhgfs_auto_package/make-packages-common.sh
+26
-15
fhgfs_client_module/source/common/net/message/nodes/HeartbeatRequestMsgEx.c
...e/source/common/net/message/nodes/HeartbeatRequestMsgEx.c
+2
-1
fhgfs_client_module/source/common/nodes/MirrorBuddyGroupMapper.c
...lient_module/source/common/nodes/MirrorBuddyGroupMapper.c
+8
-21
fhgfs_client_module/source/common/nodes/NodeConnPool.c
fhgfs_client_module/source/common/nodes/NodeConnPool.c
+3
-0
fhgfs_client_module/source/filesystem/FhgfsOpsIoctl.c
fhgfs_client_module/source/filesystem/FhgfsOpsIoctl.c
+1
-0
fhgfs_client_module/source/net/filesystem/FhgfsOpsCommKit.c
fhgfs_client_module/source/net/filesystem/FhgfsOpsCommKit.c
+4
-3
fhgfs_common/source/common/app/log/Logger.cpp
fhgfs_common/source/common/app/log/Logger.cpp
+1
-0
fhgfs_common/source/common/app/log/Logger.h
fhgfs_common/source/common/app/log/Logger.h
+1
-0
fhgfs_common/source/common/components/StatsCollector.cpp
fhgfs_common/source/common/components/StatsCollector.cpp
+4
-6
fhgfs_common/source/common/components/worker/Work.cpp
fhgfs_common/source/common/components/worker/Work.cpp
+0
-9
fhgfs_common/source/common/components/worker/Work.h
fhgfs_common/source/common/components/worker/Work.h
+11
-0
fhgfs_common/source/common/components/worker/Worker.cpp
fhgfs_common/source/common/components/worker/Worker.cpp
+32
-17
fhgfs_common/source/common/components/worker/queue/MultiWorkQueue.cpp
.../source/common/components/worker/queue/MultiWorkQueue.cpp
+50
-30
fhgfs_common/source/common/components/worker/queue/MultiWorkQueue.h
...on/source/common/components/worker/queue/MultiWorkQueue.h
+24
-43
fhgfs_common/source/common/components/worker/queue/PersonalWorkQueue.h
...source/common/components/worker/queue/PersonalWorkQueue.h
+1
-1
fhgfs_common/source/common/components/worker/queue/WorkQueue.cpp
...ommon/source/common/components/worker/queue/WorkQueue.cpp
+0
-10
fhgfs_common/source/common/components/worker/queue/WorkQueue.h
..._common/source/common/components/worker/queue/WorkQueue.h
+12
-15
fhgfs_common/source/common/toolkit/Time.h
fhgfs_common/source/common/toolkit/Time.h
+5
-0
fhgfs_common/source/common/toolkit/TimeFine.h
fhgfs_common/source/common/toolkit/TimeFine.h
+5
-0
fhgfs_meta/source/app/App.cpp
fhgfs_meta/source/app/App.cpp
+2
-1
fhgfs_meta/source/components/InternodeSyncer.cpp
fhgfs_meta/source/components/InternodeSyncer.cpp
+21
-12
fhgfs_meta/source/components/InternodeSyncer.h
fhgfs_meta/source/components/InternodeSyncer.h
+6
-5
fhgfs_mgmtd/source/components/InternodeSyncer.cpp
fhgfs_mgmtd/source/components/InternodeSyncer.cpp
+13
-5
fhgfs_mgmtd/source/components/InternodeSyncer.h
fhgfs_mgmtd/source/components/InternodeSyncer.h
+1
-0
fhgfs_mgmtd/source/net/message/nodes/ChangeTargetConsistencyStatesMsgEx.cpp
.../net/message/nodes/ChangeTargetConsistencyStatesMsgEx.cpp
+2
-0
fhgfs_mgmtd/source/net/message/nodes/HeartbeatMsgEx.cpp
fhgfs_mgmtd/source/net/message/nodes/HeartbeatMsgEx.cpp
+6
-1
fhgfs_mgmtd/source/net/message/nodes/SetTargetConsistencyStatesMsgEx.cpp
...rce/net/message/nodes/SetTargetConsistencyStatesMsgEx.cpp
+2
-0
fhgfs_mgmtd/source/nodes/MgmtdTargetStateStore.cpp
fhgfs_mgmtd/source/nodes/MgmtdTargetStateStore.cpp
+3
-2
fhgfs_online_cfg/source/modes/ModeGetQuotaInfo.cpp
fhgfs_online_cfg/source/modes/ModeGetQuotaInfo.cpp
+0
-2
No files found.
fhgfs_auto_package/make-packages-common.sh
View file @
a60e513a
...
...
@@ -18,6 +18,7 @@ opentk_package="beegfs_opentk_lib" # deletes the libopentk.so, so needs to be la
export
CONCURRENCY
=
${
MAKE_CONCURRENCY
:-
4
}
PACKAGEDIR
=
"/tmp/beegfs_packages-
${
DATE
}
/"
DO_CLEAN
=
true
# print usage information
print_usage
()
...
...
@@ -39,7 +40,8 @@ print_usage()
echo
" -D Disable beegfs-admon package build."
echo
" -C Build client packages only."
echo
" -x Build with BEEGFS_DEBUG."
echo
" -l F log to specific file"
echo
" -l F log to specific file"
echo
" -K keep previously built files (no clean)"
echo
echo
"EXAMPLE:"
echo
"
$
`
basename
$0
`
-j 4 -p /tmp/my_beegfs_packages"
...
...
@@ -67,7 +69,7 @@ CLEAN_ONLY=0
CLIENT_ONLY
=
0
LOGFILE
=
while
getopts
"hcdm:v:DCxj:p:l:"
opt
;
do
while
getopts
"hcdm:v:DCxj:p:l:
K
"
opt
;
do
case
$opt
in
h
)
print_usage
...
...
@@ -99,16 +101,19 @@ while getopts "hcdm:v:DCxj:p:l:" opt; do
export
CONCURRENCY
=
"
$OPTARG
"
export
MAKE_CONCURRENCY
=
"
$OPTARG
"
;;
l
)
if
[[
"
$OPTARG
"
==
/
*
]]
;
then
LOGFILE
=
"
$OPTARG
"
else
LOGFILE
=
"
$PWD
/
$OPTARG
"
fi
;;
l
)
if
[[
"
$OPTARG
"
==
/
*
]]
;
then
LOGFILE
=
"
$OPTARG
"
else
LOGFILE
=
"
$PWD
/
$OPTARG
"
fi
;;
p
)
PACKAGEDIR
=
"
$OPTARG
"
;;
K
)
DO_CLEAN
=
false
;;
:
)
echo
"Option -
$OPTARG
requires an argument."
>
&2
print_usage
...
...
@@ -166,17 +171,23 @@ make_dep_lib()
echo
${
lib
}
pwd
run_cmd
"make -C
${
lib
}
/
${
EXTRA_DIR
}
/build clean >
${
LOGFILE
:-
/dev/null
}
"
if
${
DO_CLEAN
}
then
run_cmd
"make -C
${
lib
}
/
${
EXTRA_DIR
}
/build clean >
${
LOGFILE
:-
/dev/null
}
"
fi
run_cmd
"make -C
${
lib
}
/
${
EXTRA_DIR
}
/build -j
$make_concurrency
$targets
>
${
LOGFILE
:-
/dev/null
}
"
}
# clean packages up here first, do not do it below, as we need
# common and opentk
for
package
in
$packages
$thirdparty
$opentk
$common
;
do
echo
$package
clean
make
-C
${
package
}
/
${
EXTRA_DIR
}
/build clean
--silent
done
if
${
DO_CLEAN
}
then
for
package
in
$packages
$thirdparty
$opentk
$common
;
do
echo
$package
clean
make
-C
${
package
}
/
${
EXTRA_DIR
}
/build clean
--silent
done
fi
if
[
${
CLEAN_ONLY
}
-eq
1
]
;
then
exit
0
fi
...
...
fhgfs_client_module/source/common/net/message/nodes/HeartbeatRequestMsgEx.c
View file @
a60e513a
...
...
@@ -22,6 +22,7 @@ bool __HeartbeatRequestMsgEx_processIncoming(NetMessage* this, struct App* app,
Config
*
cfg
=
App_getConfig
(
app
);
Node
*
localNode
=
App_getLocalNode
(
app
);
const
char
*
localNodeID
=
Node_getID
(
localNode
);
NumNodeID
localNodeNumID
=
Node_getNumID
(
localNode
);
NicAddressList
*
nicList
=
Node_getNicList
(
localNode
);
const
BitStore
*
nodeFeatureFlags
=
Node_getNodeFeatures
(
localNode
);
...
...
@@ -31,7 +32,7 @@ bool __HeartbeatRequestMsgEx_processIncoming(NetMessage* this, struct App* app,
ssize_t
sendRes
;
HeartbeatMsgEx_initFromNodeData
(
&
hbMsg
,
localNodeID
,
(
NumNodeID
){
0
}
,
NODETYPE_Client
,
nicList
,
HeartbeatMsgEx_initFromNodeData
(
&
hbMsg
,
localNodeID
,
localNodeNumID
,
NODETYPE_Client
,
nicList
,
nodeFeatureFlags
);
HeartbeatMsgEx_setPorts
(
&
hbMsg
,
Config_getConnClientPortUDP
(
cfg
),
0
);
HeartbeatMsgEx_setFhgfsVersion
(
&
hbMsg
,
BEEGFS_VERSION_CODE
);
...
...
fhgfs_client_module/source/common/nodes/MirrorBuddyGroupMapper.c
View file @
a60e513a
...
...
@@ -212,31 +212,18 @@ void __MirrorBuddyGroupMapper_syncGroupsFromListsUnlocked(MirrorBuddyGroupMapper
}
// remove all unmarked (aka deleted) groups from the map
buddyGroupMapIter
=
MirrorBuddyGroupMap_begin
(
&
this
->
mirrorBuddyGroups
);
while
(
!
MirrorBuddyGroupMapIter_end
(
&
buddyGroupMapIter
))
{
int32_t
lastMarkedKey
=
-
1
;
buddyGroupMapIter
=
MirrorBuddyGroupMap_begin
(
&
this
->
mirrorBuddyGroups
);
for
(
/* iter init'ed above */
;
!
MirrorBuddyGroupMapIter_end
(
&
buddyGroupMapIter
);
MirrorBuddyGroupMapIter_next
(
&
buddyGroupMapIter
))
{
MirrorBuddyGroup
*
group
=
MirrorBuddyGroupMapIter_value
(
&
buddyGroupMapIter
);
if
(
group
->
marked
)
{
lastMarkedKey
=
MirrorBuddyGroupMapIter_key
(
&
buddyGroupMapIter
);
continue
;
}
MirrorBuddyGroup
*
group
=
MirrorBuddyGroupMapIter_value
(
&
buddyGroupMapIter
);
uint16_t
key
=
MirrorBuddyGroupMapIter_key
(
&
buddyGroupMapIter
);
// rewind the iterator a bit, since we are about to invalidate it
MirrorBuddyGroupMapIter_next
(
&
buddyGroupMapIter
);
MirrorBuddyGroupMap_erase
(
&
this
->
mirrorBuddyGroups
,
MirrorBuddyGroupMapIter_key
(
&
buddyGroupMapIter
));
if
(
!
group
->
marked
)
{
MirrorBuddyGroupMap_erase
(
&
this
->
mirrorBuddyGroups
,
key
);
MirrorBuddyGroup_put
(
group
);
if
(
lastMarkedKey
!=
-
1
)
buddyGroupMapIter
=
MirrorBuddyGroupMap_find
(
&
this
->
mirrorBuddyGroups
,
lastMarkedKey
);
else
buddyGroupMapIter
=
MirrorBuddyGroupMap_begin
(
&
this
->
mirrorBuddyGroups
);
}
}
}
...
...
fhgfs_client_module/source/common/nodes/NodeConnPool.c
View file @
a60e513a
...
...
@@ -134,6 +134,9 @@ Socket* NodeConnPool_acquireStreamSocketEx(NodeConnPool* this, bool allowWaiting
bool
isPrimaryInterface
=
true
;
// used to set expiration for non-primary interfaces;
// "primary" means: first interface in the list that is supported by client and server
if
(
unlikely
(
Thread_isSignalPending
()
)
)
return
NULL
;
// no need to try if a signal is pending
Mutex_lock
(
&
this
->
mutex
);
// L O C K
if
(
!
this
->
availableConns
&&
(
this
->
establishedConns
==
this
->
maxConns
)
)
...
...
fhgfs_client_module/source/filesystem/FhgfsOpsIoctl.c
View file @
a60e513a
...
...
@@ -621,6 +621,7 @@ long FhgfsOpsIoctl_mkfileWithStripeHints(struct file *file, void __user *argp)
if
(
get_user
(
userFilename
,
&
mkfileArg
->
filename
))
return
-
EFAULT
;
if
(
chunksize
!=
0
)
{
// check if chunksize is valid
if
(
unlikely
(
(
chunksize
<
STRIPEPATTERN_MIN_CHUNKSIZE
)
||
!
MathTk_isPowerOfTwo
(
chunksize
)
)
)
...
...
fhgfs_client_module/source/net/filesystem/FhgfsOpsCommKit.c
View file @
a60e513a
...
...
@@ -575,9 +575,10 @@ static void __commkit_cleanup_generic(CommKitContext* context, struct CommKitTar
(
info
->
nodeResult
==
-
FhgfsOpsErr_AGAIN
&&
(
context
->
ops
->
retryFlags
&
CK_RETRY_LOOP_EAGAIN
)
)
)
)
{
// comm error occurred => check whether we can do a retry
if
(
App_getConnRetriesEnabled
(
context
->
app
)
&&
(
(
!
context
->
maxNumRetries
)
||
(
context
->
currentRetryNum
<
context
->
maxNumRetries
)
)
)
if
(
Thread_isSignalPending
())
info
->
nodeResult
=
-
FhgfsOpsErr_INTERRUPTED
;
else
if
(
App_getConnRetriesEnabled
(
context
->
app
)
&&
(
!
context
->
maxNumRetries
||
context
->
currentRetryNum
<
context
->
maxNumRetries
))
{
// we have retries left
context
->
numRetryWaiters
++
;
info
->
state
=
CommKitState_RETRYWAIT
;
...
...
fhgfs_common/source/common/app/log/Logger.cpp
View file @
a60e513a
...
...
@@ -13,6 +13,7 @@ const Logger::LogTopicElem Logger::LogTopics[] =
{
{
"general"
,
LogTopic_GENERAL
},
{
"state-sync"
,
LogTopic_STATESYNC
},
{
"workqueues"
,
LogTopic_WORKQUEUES
},
{
"unknown"
,
LogTopic_INVALID
}
};
...
...
fhgfs_common/source/common/app/log/Logger.h
View file @
a60e513a
...
...
@@ -24,6 +24,7 @@ enum LogTopic
{
LogTopic_GENERAL
=
0
,
// default log topic
LogTopic_STATESYNC
=
1
,
// everything related to offline detection
LogTopic_WORKQUEUES
=
2
,
// En/dequeueing of work items
LogTopic_INVALID
};
...
...
fhgfs_common/source/common/components/StatsCollector.cpp
View file @
a60e513a
#include <common/toolkit/TimeAbs.h>
#include "StatsCollector.h"
#include <mutex>
StatsCollector
::
StatsCollector
(
MultiWorkQueue
*
workQ
,
unsigned
collectIntervalMS
,
unsigned
historyLength
)
throw
(
ComponentInitException
)
:
PThread
(
"Stats"
),
...
...
@@ -45,7 +47,7 @@ void StatsCollector::collectStats()
{
HighResolutionStats
currentStats
;
SafeMutexLock
mutexLock
(
&
mutex
);
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
// Note: Newer stats in the internal list are pushed at the front side
...
...
@@ -61,8 +63,6 @@ void StatsCollector::collectStats()
// push new stats to front
statsList
.
push_front
(
currentStats
);
mutexLock
.
unlock
();
}
/**
...
...
@@ -70,7 +70,7 @@ void StatsCollector::collectStats()
*/
void
StatsCollector
::
getStatsSince
(
uint64_t
lastStatsMS
,
HighResStatsList
&
outStatsList
)
{
SafeMutexLock
mutexLock
(
&
mutex
);
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
// Note: Newer stats in the internal list are pushed at the front side, but
// newer stats on the outStatsList are pushed to the back side.
...
...
@@ -83,6 +83,4 @@ void StatsCollector::getStatsSince(uint64_t lastStatsMS, HighResStatsList& outSt
outStatsList
.
push_back
(
*
iter
);
}
mutexLock
.
unlock
();
}
fhgfs_common/source/common/components/worker/Work.cpp
deleted
100644 → 0
View file @
366ed243
#include "Work.h"
fhgfs_common/source/common/components/worker/Work.h
View file @
a60e513a
...
...
@@ -2,6 +2,7 @@
#define WORK_H_
#include <common/toolkit/HighResolutionStats.h>
#include <common/toolkit/TimeFine.h>
#include <common/Common.h>
class
Work
;
...
...
@@ -34,6 +35,16 @@ class Work
{
return
&
stats
;
}
#ifdef BEEGFS_DEBUG_PROFILING
TimeFine
*
getAgeTime
()
{
return
&
age
;
}
private:
TimeFine
age
;
#endif
};
#endif
/*WORK_H_*/
fhgfs_common/source/common/components/worker/Worker.cpp
View file @
a60e513a
...
...
@@ -57,15 +57,11 @@ void Worker::workLoop(QueueWorkType workType)
while
(
!
getSelfTerminate
()
||
!
maySelfTerminateNow
()
)
{
//log.log(Log_DEBUG, "Waiting for work...");
Work
*
work
=
waitForWorkByType
(
stats
,
personalWorkQueue
,
workType
);
//log.log(Log_DEBUG, "Got work");
#ifdef BEEGFS_DEBUG_PROFILING
TimeFine
workStartTime
;
#endif
#ifdef BEEGFS_DEBUG_PROFILING
TimeFine
workStartTime
;
#endif
HighResolutionStatsTk
::
resetStats
(
&
stats
);
// prepare stats
...
...
@@ -76,18 +72,37 @@ void Worker::workLoop(QueueWorkType workType)
stats
.
incVals
.
workRequests
=
1
;
HighResolutionStatsTk
::
addHighResIncStats
(
*
work
->
getHighResolutionStats
(),
stats
);
#ifdef BEEGFS_DEBUG_PROFILING
TimeFine
workEndTime
;
int
workElapsedMS
=
workEndTime
.
elapsedSinceMS
(
&
workStartTime
);
int
workElapsedMicro
=
workEndTime
.
elapsedSinceMicro
(
&
workStartTime
);
#ifdef BEEGFS_DEBUG_PROFILING
TimeFine
workEndTime
;
const
auto
workElapsedMS
=
workEndTime
.
elapsedSinceMS
(
&
workStartTime
);
const
auto
workLatencyMS
=
workEndTime
.
elapsedSinceMS
(
work
->
getAgeTime
()
);
if
(
workEndTime
.
elapsedSinceMS
(
&
workStartTime
)
>=
10
)
LOG_DEBUG_CONTEXT
(
log
,
Log_DEBUG
,
"Work processed. Elapsed ms: "
+
StringTk
::
intToStr
(
workElapsedMS
)
);
if
(
workElapsedMS
>=
10
)
{
if
(
workLatencyMS
>=
10
)
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Work processed."
,
as
(
"Elapsed ms"
,
workElapsedMS
),
as
(
"Total latency (ms)"
,
workLatencyMS
));
else
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Work processed."
,
as
(
"Elapsed ms"
,
workElapsedMS
),
as
(
"Total latency (us)"
,
workEndTime
.
elapsedSinceMicro
(
work
->
getAgeTime
())));
}
else
{
if
(
workLatencyMS
>=
10
)
{
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Work processed."
,
as
(
"Elapsed us"
,
workEndTime
.
elapsedSinceMicro
(
&
workStartTime
)),
as
(
"Total latency (ms)"
,
workEndTime
.
elapsedSinceMS
(
work
->
getAgeTime
())));
}
else
LOG_DEBUG_CONTEXT
(
log
,
Log_DEBUG
,
"Work processed. Elapsed us: "
+
StringTk
::
intToStr
(
workElapsedMicro
)
);
#endif
{
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Work processed."
,
as
(
"Elapsed us"
,
workEndTime
.
elapsedSinceMicro
(
&
workStartTime
)),
as
(
"Total latency (us)"
,
workEndTime
.
elapsedSinceMicro
(
work
->
getAgeTime
())));
}
}
#endif
// cleanup
delete
(
work
);
...
...
fhgfs_common/source/common/components/worker/queue/MultiWorkQueue.cpp
View file @
a60e513a
#include "MultiWorkQueue.h"
#include "PersonalWorkQueue.h"
MultiWorkQueue
::
MultiWorkQueue
()
{
numPendingWorks
=
0
;
...
...
@@ -29,9 +28,7 @@ MultiWorkQueue::~MultiWorkQueue()
Work
*
MultiWorkQueue
::
waitForDirectWork
(
HighResolutionStats
&
newStats
,
PersonalWorkQueue
*
personalWorkQueue
)
{
Work
*
work
;
SafeMutexLock
mutexLock
(
&
mutex
);
// L O C K
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
HighResolutionStatsTk
::
addHighResIncStats
(
newStats
,
stats
);
stats
.
rawVals
.
busyWorkers
--
;
...
...
@@ -44,18 +41,33 @@ Work* MultiWorkQueue::waitForDirectWork(HighResolutionStats& newStats,
// personal is always first
if
(
unlikely
(
!
personalWorkQueue
->
getIsWorkListEmpty
()
)
)
{
// we got something in our personal queue
work
=
personalWorkQueue
->
getAndPopFirstWork
();
Work
*
work
=
personalWorkQueue
->
getAndPopFirstWork
();
#ifdef BEEGFS_DEBUG_PROFILING
const
auto
workAgeMS
=
work
->
getAgeTime
()
->
elapsedMS
();
if
(
workAgeMS
>
10
)
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Fetching personal work item."
,
work
,
as
(
"age (ms)"
,
workAgeMS
));
else
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Fetching personal work item."
,
work
,
as
(
"age (us)"
,
work
->
getAgeTime
()
->
elapsedMicro
()));
#endif
return
work
;
}
else
{
work
=
directWorkList
->
getAndPopNextWork
();
Work
*
work
=
directWorkList
->
getAndPopNextWork
();
numPendingWorks
--
;
#ifdef BEEGFS_DEBUG_PROFILING
const
auto
workAgeMS
=
work
->
getAgeTime
()
->
elapsedMS
();
if
(
workAgeMS
>
10
)
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Fetching direct work item."
,
work
,
as
(
"age (ms)"
,
workAgeMS
));
else
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Fetching direct work item."
,
work
,
as
(
"age (us)"
,
work
->
getAgeTime
()
->
elapsedMicro
()));
#endif
return
work
;
}
mutexLock
.
unlock
();
// U N L O C K
return
work
;
}
/**
...
...
@@ -67,9 +79,7 @@ Work* MultiWorkQueue::waitForDirectWork(HighResolutionStats& newStats,
Work
*
MultiWorkQueue
::
waitForAnyWork
(
HighResolutionStats
&
newStats
,
PersonalWorkQueue
*
personalWorkQueue
)
{
Work
*
work
;
SafeMutexLock
mutexLock
(
&
mutex
);
// L O C K
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
HighResolutionStatsTk
::
addHighResIncStats
(
newStats
,
stats
);
stats
.
rawVals
.
busyWorkers
--
;
...
...
@@ -97,7 +107,17 @@ Work* MultiWorkQueue::waitForAnyWork(HighResolutionStats& newStats,
// personal is always first
if
(
unlikely
(
!
personalWorkQueue
->
getIsWorkListEmpty
()
)
)
{
// we got something in our personal queue
work
=
personalWorkQueue
->
getAndPopFirstWork
();
Work
*
work
=
personalWorkQueue
->
getAndPopFirstWork
();
#ifdef BEEGFS_DEBUG_PROFILING
const
auto
workAgeMS
=
work
->
getAgeTime
()
->
elapsedMS
();
if
(
workAgeMS
>
10
)
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Fetching personal work item."
,
work
,
as
(
"age (ms)"
,
workAgeMS
));
else
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Fetching personal work item."
,
work
,
as
(
"age (us)"
,
work
->
getAgeTime
()
->
elapsedMicro
()));
#endif
return
work
;
}
else
{
...
...
@@ -114,11 +134,21 @@ Work* MultiWorkQueue::waitForAnyWork(HighResolutionStats& newStats,
if
(
!
currentWorkList
->
getIsEmpty
()
)
{
// this queue contains work for us
work
=
currentWorkList
->
getAndPopNextWork
();
Work
*
work
=
currentWorkList
->
getAndPopNextWork
();
numPendingWorks
--
;
goto
unlock_and_exit
;
#ifdef BEEGFS_DEBUG_PROFILING
const
std
::
string
direct
=
(
i
==
QueueWorkType_DIRECT
)
?
"direct"
:
"indirect"
;
const
auto
workAgeMS
=
work
->
getAgeTime
()
->
elapsedMS
();
if
(
workAgeMS
>
10
)
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Fetching direct work item."
,
work
,
as
(
"age (ms)"
,
workAgeMS
));
else
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Fetching direct work item."
,
work
,
as
(
"age (us)"
,
work
->
getAgeTime
()
->
elapsedMicro
()));
#endif
return
work
;
}
}
...
...
@@ -128,12 +158,6 @@ Work* MultiWorkQueue::waitForAnyWork(HighResolutionStats& newStats,
"All queues are empty. "
"numPendingWorks: "
+
StringTk
::
uint64ToStr
(
numPendingWorks
)
);
}
unlock_and_exit:
mutexLock
.
unlock
();
// U N L O C K
return
work
;
}
/**
...
...
@@ -144,13 +168,11 @@ unlock_and_exit:
*/
void
MultiWorkQueue
::
incNumWorkers
()
{
SafeMutexLock
mutexLock
(
&
mutex
);
std
::
lock_guard
<
Mutex
>
mutesLock
(
mutex
);
/* note: we increase number of busy workers here, because this value will be decreased
by 1 when the worker calls waitFor...Work(). */
stats
.
rawVals
.
busyWorkers
++
;
mutexLock
.
unlock
();
}
/**
...
...
@@ -184,7 +206,7 @@ void MultiWorkQueue::setIndirectWorkList(AbstractWorkContainer* newWorkList)
void
MultiWorkQueue
::
getStatsAsStr
(
std
::
string
&
outIndirectQueueStats
,
std
::
string
&
outDirectQueueStats
,
std
::
string
&
outBusyStats
)
{
SafeMutexLock
mutexLock
(
&
mutex
);
// L O C K
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
// get queue stats
indirectWorkList
->
getStatsAsStr
(
outIndirectQueueStats
);
...
...
@@ -202,6 +224,4 @@ void MultiWorkQueue::getStatsAsStr(std::string& outIndirectQueueStats,
"(reset every second)"
<<
std
::
endl
;
outBusyStats
=
busyStream
.
str
();
mutexLock
.
unlock
();
// U N L O C K
}
fhgfs_common/source/common/components/worker/queue/MultiWorkQueue.h
View file @
a60e513a
...
...
@@ -4,7 +4,6 @@
#include <common/app/log/LogContext.h>
#include <common/components/worker/Work.h>
#include <common/threading/Mutex.h>
#include <common/threading/SafeMutexLock.h>
#include <common/threading/Condition.h>
#include <common/toolkit/NamedException.h>
#include <common/toolkit/HighResolutionStats.h>
...
...
@@ -13,6 +12,8 @@
#include "ListWorkContainer.h"
#include "PersonalWorkQueue.h"
#include <mutex>
#define MULTIWORKQUEUE_DEFAULT_USERID (~0) // (usually similar to NETMESSAGE_DEFAULT_USERID)
...
...
@@ -84,7 +85,11 @@ class MultiWorkQueue
public:
void
addDirectWork
(
Work
*
work
,
unsigned
userID
=
MULTIWORKQUEUE_DEFAULT_USERID
)
{
SafeMutexLock
mutexLock
(
&
mutex
);
#ifdef BEEGFS_DEBUG_PROFILING
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Adding direct work item."
,
work
);
#endif
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
directWorkList
->
addWork
(
work
,
userID
);
...
...
@@ -92,21 +97,21 @@ class MultiWorkQueue
newWorkCond
.
signal
();
newDirectWorkCond
.
signal
();
mutexLock
.
unlock
();
}
void
addIndirectWork
(
Work
*
work
,
unsigned
userID
=
MULTIWORKQUEUE_DEFAULT_USERID
)
{
SafeMutexLock
mutexLock
(
&
mutex
);
#ifdef BEEGFS_DEBUG_PROFILING
LOG_TOP
(
WORKQUEUES
,
DEBUG
,
"Adding indirect work item."
,
work
);
#endif
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
indirectWorkList
->
addWork
(
work
,
userID
);
numPendingWorks
++
;
newWorkCond
.
signal
();
mutexLock
.
unlock
();
}
void
addPersonalWork
(
Work
*
work
,
PersonalWorkQueue
*
personalQ
)
...
...
@@ -114,7 +119,7 @@ class MultiWorkQueue
/* note: this is in the here (instead of the PersonalWorkQueue) because the MultiWorkQueue
mutex also syncs the personal queue. */
SafeMutexLock
mutexLock
(
&
mutex
);
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
personalQ
->
addWork
(
work
);
...
...
@@ -123,52 +128,30 @@ class MultiWorkQueue
// we assume this method is rarely used, so we just wake up all wokers (inefficient)
newDirectWorkCond
.
broadcast
();
newWorkCond
.
broadcast
();
mutexLock
.
unlock
();
}
size_t
getDirectWorkListSize
()
{
SafeMutexLock
mutexLock
(
&
mutex
);
size_t
retVal
=
directWorkList
->
getSize
();
mutexLock
.
unlock
();
return
retVal
;
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
return
directWorkList
->
getSize
();
}
size_t
getIndirectWorkListSize
()
{
SafeMutexLock
mutexLock
(
&
mutex
);
size_t
retVal
=
indirectWorkList
->
getSize
();
mutexLock
.
unlock
();
return
retVal
;
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
return
indirectWorkList
->
getSize
();
}
bool
getIsPersonalQueueEmpty
(
PersonalWorkQueue
*
personalQ
)
{
SafeMutexLock
mutexLock
(
&
mutex
);
bool
retVal
=
personalQ
->
getIsWorkListEmpty
();
mutexLock
.
unlock
();
return
retVal
;
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
return
personalQ
->
getIsWorkListEmpty
();
}
size_t
getNumPendingWorks
()
{
SafeMutexLock
mutexLock
(
&
mutex
);
size_t
retVal
=
numPendingWorks
;
mutexLock
.
unlock
();
return
retVal
;
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
return
numPendingWorks
;
}
/**
...
...
@@ -176,7 +159,7 @@ class MultiWorkQueue
*/
void
getAndResetStats
(
HighResolutionStats
*
outStats
)
{
SafeMutexLock
mutexLock
(
&
mutex
);
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
*
outStats
=
stats
;
outStats
->
rawVals
.
queuedRequests
=
numPendingWorks
;
...
...
@@ -184,8 +167,6 @@ class MultiWorkQueue
/* note: we only reset incremental stats vals, because otherwise we would lose info
like number of busyWorkers */
HighResolutionStatsTk
::
resetIncStats
(
&
stats
);
mutexLock
.
unlock
();
}
...
...
fhgfs_common/source/common/components/worker/queue/PersonalWorkQueue.h
View file @
a60e513a
...
...
@@ -35,7 +35,7 @@ class PersonalWorkQueue
for
(
WorkListIter
iter
=
workList
.
begin
();
iter
!=
workList
.
end
();
iter
++
)
delete
(
*
iter
);
}
private:
WorkList
workList
;
...
...
fhgfs_common/source/common/components/worker/queue/WorkQueue.cpp
deleted
100644 → 0
View file @
366ed243
#include "WorkQueue.h"
fhgfs_common/source/common/components/worker/queue/WorkQueue.h
View file @
a60e513a
...
...
@@ -3,10 +3,11 @@
#include <common/components/worker/Work.h>
#include <common/threading/Mutex.h>
#include <common/threading/SafeMutexLock.h>
#include <common/threading/Condition.h>
#include <common/Common.h>
#include <mutex>
class
WorkQueue
{
public:
...
...
@@ -16,36 +17,32 @@ class WorkQueue
for
(
WorkListIter
iter
=
workList
.
begin
();
iter
!=
workList
.
end
();
iter
++
)
delete
(
*
iter
);
}
Work
*
waitForNewWork
()
{
SafeMutexLock
mutexLock
(
&
mutex
);
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
while
(
workList
.
empty
()
)
newWorkCond
.
wait
(
&
mutex
);
Work
*
work
=
*
workList
.
begin
();
workList
.
pop_front
();
mutexLock
.
unlock
();
return
work
;
}
void
addWork
(
Work
*
work
)
{
SafeMutexLock
mutexLock
(
&
mutex
);
std
::
lock_guard
<
Mutex
>
mutexLock
(
mutex
);
newWorkCond
.
signal
();
workList
.
push_back
(
work
);