00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #ifndef _PASSENGER_LOGGING_H_
00026 #define _PASSENGER_LOGGING_H_
00027
00028 #include <boost/shared_ptr.hpp>
00029 #include <oxt/system_calls.hpp>
00030 #include <oxt/backtrace.hpp>
00031
00032 #include <sys/types.h>
00033 #include <sys/time.h>
00034 #include <unistd.h>
00035 #include <fcntl.h>
00036 #include <string>
00037 #include <map>
00038 #include <ostream>
00039 #include <sstream>
00040 #include <cstdio>
00041 #include <ctime>
00042
00043 #include "RandomGenerator.h"
00044 #include "FileDescriptor.h"
00045 #include "MessageClient.h"
00046 #include "StaticString.h"
00047 #include "Exceptions.h"
00048 #include "Utils.h"
00049 #include "Utils/MD5.h"
00050 #include "Utils/SystemTime.h"
00051
00052
00053 namespace Passenger {
00054
00055 using namespace std;
00056 using namespace boost;
00057 using namespace oxt;
00058
00059
00060
00061
00062 extern unsigned int _logLevel;
00063 extern ostream *_logStream;
00064 extern ostream *_debugStream;
00065
00066 unsigned int getLogLevel();
00067 void setLogLevel(unsigned int value);
00068 void setDebugFile(const char *logFile = NULL);
00069
00070
00071
00072
00073
00074
00075
00076 #define P_LOG_TO(expr, stream) \
00077 do { \
00078 if (stream != 0) { \
00079 time_t the_time; \
00080 struct tm *the_tm; \
00081 char datetime_buf[60]; \
00082 struct timeval tv; \
00083 std::stringstream sstream; \
00084 \
00085 the_time = time(NULL); \
00086 the_tm = localtime(&the_time); \
00087 strftime(datetime_buf, sizeof(datetime_buf), "%F %H:%M:%S", the_tm); \
00088 gettimeofday(&tv, NULL); \
00089 sstream << \
00090 "[ pid=" << ((unsigned long) getpid()) << \
00091 " file=" << __FILE__ << ":" << (unsigned long) __LINE__ << \
00092 " time=" << datetime_buf << "." << (unsigned long) (tv.tv_usec / 1000) << " ]:" << \
00093 "\n " << expr << std::endl; \
00094 *stream << sstream.str(); \
00095 stream->flush(); \
00096 } \
00097 } while (false)
00098
00099
00100
00101
00102 #define P_LOG(expr) P_LOG_TO(expr, Passenger::_logStream)
00103
00104
00105
00106
00107
00108 #define P_WARN(expr) P_LOG(expr)
00109
00110
00111
00112
00113
00114 #define P_ERROR(expr) P_LOG(expr)
00115
00116
00117
00118
00119
00120 #define P_DEBUG(expr) P_TRACE(1, expr)
00121
00122 #ifdef PASSENGER_DEBUG
00123 #define P_TRACE(level, expr) \
00124 do { \
00125 if (Passenger::_logLevel >= level) { \
00126 P_LOG_TO(expr, Passenger::_debugStream); \
00127 } \
00128 } while (false)
00129
00130 #define P_ASSERT(expr, result_if_failed, message) \
00131 do { \
00132 if (!(expr)) { \
00133 P_ERROR("Assertion failed: " << message); \
00134 return result_if_failed; \
00135 } \
00136 } while (false)
00137 #else
00138 #define P_TRACE(level, expr) do { } while (false)
00139
00140 #define P_ASSERT(expr, result_if_failed, message) do { } while (false)
00141 #endif
00142
00143
00144
00145
00146 class AnalyticsLog {
00147 private:
00148 static const int INT64_STR_BUFSIZE = 22;
00149
00150 FileDescriptor handle;
00151 string groupName;
00152 string txnId;
00153 bool largeMessages;
00154
00155 class FileLock {
00156 private:
00157 int handle;
00158 public:
00159 FileLock(const int &handle) {
00160 this->handle = handle;
00161 int ret;
00162 do {
00163 ret = flock(handle, LOCK_EX);
00164 } while (ret == -1 && errno == EINTR);
00165 if (ret == -1) {
00166 int e = errno;
00167 throw SystemException("Cannot lock analytics log file", e);
00168 }
00169 }
00170
00171 ~FileLock() {
00172 int ret;
00173 do {
00174 ret = flock(handle, LOCK_UN);
00175 } while (ret == -1 && errno == EINTR);
00176 if (ret == -1) {
00177 int e = errno;
00178 throw SystemException("Cannot unlock analytics log file", e);
00179 };
00180 }
00181 };
00182
00183
00184
00185
00186
00187
00188 void atomicWrite(const char *data, unsigned int size) {
00189 int ret;
00190
00191 ret = syscalls::write(handle, data, size);
00192 if (ret == -1) {
00193 int e = errno;
00194 throw SystemException("Cannot write to the transaction log", e);
00195 } else if ((unsigned int) ret != size) {
00196 throw IOException("Cannot atomically write to the transaction log");
00197 }
00198 }
00199
00200
00201
00202
00203 char *insertTxnIdAndTimestamp(char *buffer) {
00204 int size;
00205
00206
00207 memcpy(buffer, txnId.c_str(), txnId.size());
00208 buffer += txnId.size();
00209
00210
00211 *buffer = ' ';
00212 buffer++;
00213
00214
00215 size = snprintf(buffer, INT64_STR_BUFSIZE, "%llu", SystemTime::getUsec());
00216 if (size >= INT64_STR_BUFSIZE) {
00217
00218 throw IOException("Cannot format a new transaction log message timestamp.");
00219 }
00220 buffer += size;
00221
00222
00223 *buffer = ' ';
00224
00225 return buffer + 1;
00226 }
00227
00228 public:
00229 AnalyticsLog() { }
00230
00231 AnalyticsLog(const FileDescriptor &handle, const string &groupName, const string &txnId,
00232 bool largeMessages)
00233 {
00234 this->handle = handle;
00235 this->groupName = groupName;
00236 this->txnId = txnId;
00237 this->largeMessages = largeMessages;
00238 message("ATTACH");
00239 }
00240
00241 ~AnalyticsLog() {
00242 if (handle != -1) {
00243 message("DETACH");
00244 }
00245 }
00246
00247 void message(const StaticString &text) {
00248 if (handle != -1 && largeMessages) {
00249
00250 char header[txnId.size() + 1 + INT64_STR_BUFSIZE + 1];
00251 char *end = insertTxnIdAndTimestamp(header);
00252 char sizeHeader[7];
00253
00254 snprintf(sizeHeader, sizeof(sizeHeader) - 1,
00255 "%4x ", (int) (end - header) + (int) text.size() + 1);
00256 sizeHeader[sizeof(sizeHeader) - 1] = '\0';
00257
00258 MessageChannel channel(handle);
00259 FileLock lock(handle);
00260 channel.writeRaw(sizeHeader, strlen(sizeHeader));
00261 channel.writeRaw(header, end - header);
00262 channel.writeRaw(text);
00263 channel.writeRaw("\n");
00264 } else if (handle != -1 && !largeMessages) {
00265
00266 char data[txnId.size() + 1 + INT64_STR_BUFSIZE + 1 + text.size() + 1];
00267 char *end;
00268
00269
00270 end = insertTxnIdAndTimestamp(data);
00271
00272
00273 memcpy(end, text.c_str(), text.size());
00274 end += text.size();
00275
00276
00277 *end = '\n';
00278 end++;
00279
00280 atomicWrite(data, end - data);
00281 }
00282 }
00283
00284 void abort(const StaticString &text) {
00285 if (handle != -1 && largeMessages) {
00286 char header[txnId.size() + 1 + INT64_STR_BUFSIZE + 1 + sizeof("ABORT: ") - 1];
00287 char *end;
00288 char sizeHeader[7];
00289
00290
00291 end = insertTxnIdAndTimestamp(header);
00292
00293 memcpy(end, "ABORT: ", sizeof("ABORT: ") - 1);
00294 end += sizeof("ABORT: ") - 1;
00295
00296 snprintf(sizeHeader, sizeof(sizeHeader) - 1,
00297 "%4x ", (int) (end - header) + (int) text.size() + 1);
00298 sizeHeader[sizeof(sizeHeader) - 1] = '\0';
00299
00300 MessageChannel channel(handle);
00301 FileLock lock(handle);
00302 channel.writeRaw(sizeHeader, strlen(sizeHeader));
00303 channel.writeRaw(header, end - header);
00304 channel.writeRaw(text);
00305 channel.writeRaw("\n");
00306 } else if (handle != -1 && !largeMessages) {
00307
00308 char data[txnId.size() + 1 + INT64_STR_BUFSIZE + 1 +
00309 (sizeof("ABORT: ") - 1) + text.size() + 1];
00310 char *end;
00311
00312
00313 end = insertTxnIdAndTimestamp(data);
00314
00315
00316 memcpy(end, "ABORT: ", sizeof("ABORT: ") - 1);
00317 end += sizeof("ABORT: ") - 1;
00318
00319
00320 *end = '\n';
00321 end++;
00322
00323 atomicWrite(data, end - data);
00324 }
00325 }
00326
00327 bool isNull() const {
00328 return handle == -1;
00329 }
00330
00331 string getGroupName() const {
00332 return groupName;
00333 }
00334
00335 string getTxnId() const {
00336 return txnId;
00337 }
00338 };
00339
00340 typedef shared_ptr<AnalyticsLog> AnalyticsLogPtr;
00341
00342 class AnalyticsScopeLog {
00343 private:
00344 AnalyticsLog *log;
00345 enum {
00346 NAME,
00347 GRANULAR
00348 } type;
00349 union {
00350 const char *name;
00351 struct {
00352 const char *endMessage;
00353 const char *abortMessage;
00354 } granular;
00355 } data;
00356 bool ok;
00357 public:
00358 AnalyticsScopeLog(const AnalyticsLogPtr &log, const char *name) {
00359 this->log = log.get();
00360 type = NAME;
00361 data.name = name;
00362 ok = false;
00363 log->message(string("BEGIN: ") + name);
00364 }
00365
00366 AnalyticsScopeLog(const AnalyticsLogPtr &log, const char *beginMessage,
00367 const char *endMessage, const char *abortMessage = NULL
00368 ) {
00369 this->log = log.get();
00370 type = GRANULAR;
00371 data.granular.endMessage = endMessage;
00372 data.granular.abortMessage = abortMessage;
00373 ok = abortMessage == NULL;
00374 log->message(beginMessage);
00375 }
00376
00377 ~AnalyticsScopeLog() {
00378 if (type == NAME) {
00379 if (ok) {
00380 log->message(string("END: ") + data.name);
00381 } else {
00382 log->message(string("FAIL: ") + data.name);
00383 }
00384 } else {
00385 if (ok) {
00386 log->message(data.granular.endMessage);
00387 } else {
00388 log->message(data.granular.abortMessage);
00389 }
00390 }
00391 }
00392
00393 void success() {
00394 ok = true;
00395 }
00396 };
00397
00398 class AnalyticsLogger {
00399 private:
00400 struct CachedFileHandle {
00401 FileDescriptor fd;
00402 time_t lastUsed;
00403 };
00404
00405 typedef map<string, CachedFileHandle> Cache;
00406
00407 string socketFilename;
00408 string username;
00409 string password;
00410 RandomGenerator randomGenerator;
00411
00412 boost::mutex lock;
00413 Cache fileHandleCache;
00414
00415 FileDescriptor openLogFile(const StaticString &groupName, unsigned long long timestamp,
00416 const StaticString &category = "web")
00417 {
00418 string logFile = determineLogFilename("", groupName, category, timestamp);
00419 Cache::iterator it;
00420 lock_guard<boost::mutex> l(lock);
00421
00422 it = fileHandleCache.find(logFile);
00423 if (it == fileHandleCache.end()) {
00424
00425
00426
00427
00428
00429 while (fileHandleCache.size() >= 10) {
00430 Cache::iterator oldest_it = fileHandleCache.begin();
00431 it = oldest_it;
00432 it++;
00433
00434 for (; it != fileHandleCache.end(); it++) {
00435 if (it->second.lastUsed < oldest_it->second.lastUsed) {
00436 oldest_it = it;
00437 }
00438 }
00439
00440 fileHandleCache.erase(oldest_it);
00441 }
00442
00443 MessageClient client;
00444 CachedFileHandle fileHandle;
00445 vector<string> args;
00446
00447 client.connect(socketFilename, username, password);
00448 client.write("open log file",
00449 groupName.c_str(),
00450 toString(timestamp).c_str(),
00451 category.c_str(),
00452 NULL);
00453 if (!client.read(args)) {
00454
00455
00456 throw IOException("The logging agent unexpectedly closed the connection.");
00457 }
00458 if (args[0] == "error") {
00459 throw IOException("The logging agent could not open the log file: " + args[1]);
00460 }
00461 fileHandle.fd = client.readFileDescriptor();
00462 fileHandle.lastUsed = SystemTime::get();
00463 it = fileHandleCache.insert(make_pair(logFile, fileHandle)).first;
00464 } else {
00465 it->second.lastUsed = SystemTime::get();
00466 }
00467 return it->second.fd;
00468 }
00469
00470 unsigned long long extractTimestamp(const string &txnId) const {
00471 const char *timestampBegin = strchr(txnId.c_str(), '-');
00472 if (timestampBegin != NULL) {
00473 return atoll(timestampBegin + 1);
00474 } else {
00475 return 0;
00476 }
00477 }
00478
00479 public:
00480 AnalyticsLogger() { }
00481
00482 AnalyticsLogger(const string &socketFilename, const string &username, const string &password) {
00483 this->socketFilename = socketFilename;
00484 this->username = username;
00485 this->password = password;
00486 }
00487
00488 static string determineGroupDir(const string &dir, const StaticString &groupName) {
00489 string result = dir;
00490 appendVersionAndGroupId(result, groupName);
00491 return result;
00492 }
00493
00494 static void appendVersionAndGroupId(string &output, const StaticString &groupName) {
00495 md5_state_t state;
00496 md5_byte_t digest[16];
00497 char groupId[32];
00498
00499 if (output.capacity() < 3 + 32) {
00500 output.reserve(3 + 32);
00501 }
00502 output.append("/1/", 3);
00503 md5_init(&state);
00504 md5_append(&state, (const md5_byte_t *) groupName.data(), groupName.size());
00505 md5_finish(&state, digest);
00506 toHex(StaticString((const char *) digest, 16), groupId);
00507 output.append(groupId, 32);
00508 }
00509
00510 static string determineLogFilename(const string &dir, const StaticString &groupName,
00511 const StaticString &category, unsigned long long timestamp)
00512 {
00513 struct tm tm;
00514 time_t time_value;
00515 char time_str[14];
00516
00517 time_value = timestamp / 1000000;
00518 localtime_r(&time_value, &tm);
00519 strftime(time_str, sizeof(time_str), "%Y/%m/%d/%H", &tm);
00520
00521 string filename;
00522 filename.reserve(dir.size()
00523 + (3 + 32)
00524 + 1
00525 + category.size()
00526 + 1
00527 + sizeof(time_str)
00528 + sizeof("log.txt")
00529 );
00530 filename.append(dir);
00531 appendVersionAndGroupId(filename, groupName);
00532 filename.append(1, '/');
00533 filename.append(category.c_str(), category.size());
00534 filename.append(1, '/');
00535 filename.append(time_str);
00536 filename.append("/log.txt");
00537 return filename;
00538 }
00539
00540 AnalyticsLogPtr newTransaction(const string &groupName, const StaticString &category = "web",
00541 bool largeMessages = false)
00542 {
00543 if (socketFilename.empty()) {
00544 return ptr(new AnalyticsLog());
00545 } else {
00546 unsigned long long timestamp = SystemTime::getUsec();
00547 string txnId = randomGenerator.generateHexString(4);
00548 txnId.append("-");
00549 txnId.append(toString(timestamp));
00550 return ptr(new AnalyticsLog(openLogFile(groupName, timestamp, category),
00551 groupName, txnId, largeMessages));
00552 }
00553 }
00554
00555 AnalyticsLogPtr continueTransaction(const string &groupName, const string &txnId,
00556 const StaticString &category = "web",
00557 bool largeMessages = false)
00558 {
00559 if (socketFilename.empty() || groupName.empty() || txnId.empty()) {
00560 return ptr(new AnalyticsLog());
00561 } else {
00562 unsigned long long timestamp;
00563
00564 timestamp = extractTimestamp(txnId);
00565 if (timestamp == 0) {
00566 TRACE_POINT();
00567 throw ArgumentException("Invalid transaction ID '" + txnId + "'");
00568 }
00569 return ptr(new AnalyticsLog(openLogFile(groupName, timestamp, category),
00570 groupName, txnId, largeMessages));
00571 }
00572 }
00573
00574 bool isNull() const {
00575 return socketFilename.empty();
00576 }
00577 };
00578
00579 typedef shared_ptr<AnalyticsLogger> AnalyticsLoggerPtr;
00580
00581 }
00582
00583 #endif
00584